Back to Models
model_networkx
v1.0.0
NetworkXGraph-based anomaly detection using NetworkX. Constructs graphs from entity relationships and uses PageRank centrality to identify anomalous nodes with high connectivity or influence.
$ openuba install model_networkx OpenUBA
networkx
License: Apache-2.0
graph-analysispagerankcentralitynetworkrelationshipnetworkx
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
| max_degree | integer | 50 | Threshold for node degree anomaly definition |
| community_detection | boolean | true | Whether to run community detection |
model.yaml
1name: model_networkx
2version: 1.0.0
3runtime: networkx
4description: Graph Anomaly Detection
5parameters:
6 max_degree:
7 type: integer
8 default: 50
9 description: Threshold for node degree anomaly definition
10 community_detection:
11 type: boolean
12 default: true
13 description: Whether to run community detection
14MODEL.py
1
2import pandas as pd
3import networkx as nx
4from typing import Dict, Any
5
6class Model:
7 def __init__(self):
8 self.graph = None
9
10 def train(self, ctx) -> Dict[str, Any]:
11 """
12 Build graph from data (Training = Graph Construction)
13 """
14 ctx.logger.info("Starting NetworkX Graph construction...")
15
16 G = nx.Graph()
17
18 if isinstance(ctx.df, dict):
19 # SourceGroup multi-table support: pick first available source for now
20 if not ctx.df:
21 ctx.logger.warning("Received empty dictionary input")
22 ctx.df = None
23 else:
24 first_key = next(iter(ctx.df))
25 ctx.logger.info(f"Received dictionary input. Using source: {first_key}")
26 ctx.df = ctx.df[first_key]
27
28 if ctx.df is None or ctx.df.empty:
29 ctx.logger.warning("No data, generating dummy graph")
30 # Generate random edges between 20 nodes
31 import random
32 nodes = [f"user_{i}" for i in range(20)]
33 for _ in range(50):
34 u, v = random.sample(nodes, 2)
35 G.add_edge(u, v)
36 else:
37
38 # Use parameters if available
39 source_col = "source"
40 target_col = "target"
41
42 # Check for hyperparameters (safely access if attribute or dict)
43 params = {}
44 if hasattr(ctx, 'hyperparameters'):
45 params = ctx.hyperparameters or {}
46 elif isinstance(ctx, dict) and 'hyperparameters' in ctx:
47 params = ctx['hyperparameters'] or {}
48
49 if params.get('source_column'):
50 source_col = params['source_column']
51 if params.get('target_column'):
52 target_col = params['target_column']
53
54 ctx.logger.info(f"Using columns - Source: {source_col}, Target: {target_col}")
55
56 cols = ctx.df.columns
57 # Validate columns exist
58 if source_col not in cols or target_col not in cols:
59 ctx.logger.warning(f"Specified columns ({source_col}, {target_col}) not found in data: {cols}. Falling back to position.")
60 if len(cols) >= 2:
61 source_col = cols[0]
62 target_col = cols[1]
63 else:
64 ctx.logger.warning("Not enough columns for graph, using dummy")
65 G.add_node("dummy_node")
66 self.graph = G
67 return {
68 "status": "warning",
69 "message": "Not enough columns",
70 "nodes": 1,
71 "edges": 0
72 }
73
74 edges = list(zip(ctx.df[source_col], ctx.df[target_col]))
75 G.add_edges_from(edges)
76
77 self.graph = G
78 ctx.logger.info(f"Graph constructed. Nodes: {G.number_of_nodes()}, Edges: {G.number_of_edges()}")
79
80 return {
81 "status": "success",
82 "model_type": "NetworkX PageRank",
83 "nodes": G.number_of_nodes(),
84 "edges": G.number_of_edges()
85 }
86
87 def infer(self, ctx) -> pd.DataFrame:
88 """
89 Inference: Calculate PageRank centrality
90 """
91 ctx.logger.info("Starting NetworkX inference...")
92
93 if self.graph is None:
94 self.train(ctx)
95
96 # Calculate PageRank
97 try:
98 pagerank = nx.pagerank(self.graph)
99 except Exception as e:
100 ctx.logger.warning(f"PageRank failed: {e}, returning empty")
101 return pd.DataFrame()
102
103 # Users want anomalies. Let's say high Pagerank = "Key Player" (Anomaly type)
104 # Or low pagerank = "Isolate".
105
106 results = []
107 for node, score in pagerank.items():
108 # Normalize score for risk 0-100? PageRank sums to 1.
109 # Multiply by N to normalize relative to uniform?
110 N = self.graph.number_of_nodes()
111 relative_score = score * N
112
113 # Simple heuristic: heavily central nodes are "risky" or "important"
114 risk = min(100.0, relative_score * 20)
115
116 results.append({
117 "entity_id": str(node),
118 "risk_score": float(risk),
119 "anomaly_type": "high_centrality" if risk > 50 else "normal",
120 "details": {"pagerank": float(score)}
121 })
122
123 return pd.DataFrame(results)
124
125 def execute(self, data=None):
126 # shim for v1
127 class MockCtx:
128 def __init__(self, d): self.df = d if d else pd.DataFrame(); self.logger = type('obj', (object,), {'info': print, 'warning': print})
129 return self.infer(MockCtx(pd.DataFrame(data) if data else None)).to_dict('records')
130