Back to Models

model_networkx

v1.0.0
NetworkX

Graph-based anomaly detection using NetworkX. Constructs graphs from entity relationships and uses PageRank centrality to identify anomalous nodes with high connectivity or influence.

$ openuba install model_networkx
OpenUBA
networkx
License: Apache-2.0
graph-analysispagerankcentralitynetworkrelationshipnetworkx

Parameters

NameTypeDefaultDescription
max_degreeinteger50Threshold for node degree anomaly definition
community_detectionbooleantrueWhether to run community detection
model.yaml
1name: model_networkx
2version: 1.0.0
3runtime: networkx
4description: Graph Anomaly Detection
5parameters:
6  max_degree:
7    type: integer
8    default: 50
9    description: Threshold for node degree anomaly definition
10  community_detection:
11    type: boolean
12    default: true
13    description: Whether to run community detection
14
MODEL.py
1
2import pandas as pd
3import networkx as nx
4from typing import Dict, Any
5
6class Model:
7    def __init__(self):
8        self.graph = None
9        
10    def train(self, ctx) -> Dict[str, Any]:
11        """
12        Build graph from data (Training = Graph Construction)
13        """
14        ctx.logger.info("Starting NetworkX Graph construction...")
15        
16        G = nx.Graph()
17        
18        if isinstance(ctx.df, dict):
19            # SourceGroup multi-table support: pick first available source for now
20            if not ctx.df:
21                ctx.logger.warning("Received empty dictionary input")
22                ctx.df = None
23            else:
24                first_key = next(iter(ctx.df))
25                ctx.logger.info(f"Received dictionary input. Using source: {first_key}")
26                ctx.df = ctx.df[first_key]
27
28        if ctx.df is None or ctx.df.empty:
29            ctx.logger.warning("No data, generating dummy graph")
30            # Generate random edges between 20 nodes
31            import random
32            nodes = [f"user_{i}" for i in range(20)]
33            for _ in range(50):
34                u, v = random.sample(nodes, 2)
35                G.add_edge(u, v)
36        else:
37
38            # Use parameters if available
39            source_col = "source"
40            target_col = "target"
41            
42            # Check for hyperparameters (safely access if attribute or dict)
43            params = {}
44            if hasattr(ctx, 'hyperparameters'):
45                params = ctx.hyperparameters or {}
46            elif isinstance(ctx, dict) and 'hyperparameters' in ctx:
47                params = ctx['hyperparameters'] or {}
48                
49            if params.get('source_column'):
50                source_col = params['source_column']
51            if params.get('target_column'):
52                target_col = params['target_column']
53                
54            ctx.logger.info(f"Using columns - Source: {source_col}, Target: {target_col}")
55
56            cols = ctx.df.columns
57            # Validate columns exist
58            if source_col not in cols or target_col not in cols:
59                ctx.logger.warning(f"Specified columns ({source_col}, {target_col}) not found in data: {cols}. Falling back to position.")
60                if len(cols) >= 2:
61                    source_col = cols[0]
62                    target_col = cols[1]
63                else:
64                    ctx.logger.warning("Not enough columns for graph, using dummy")
65                    G.add_node("dummy_node")
66                    self.graph = G
67                    return {
68                        "status": "warning",
69                        "message": "Not enough columns",
70                        "nodes": 1,
71                        "edges": 0
72                    }
73
74            edges = list(zip(ctx.df[source_col], ctx.df[target_col]))
75            G.add_edges_from(edges)
76        
77        self.graph = G
78        ctx.logger.info(f"Graph constructed. Nodes: {G.number_of_nodes()}, Edges: {G.number_of_edges()}")
79        
80        return {
81            "status": "success",
82            "model_type": "NetworkX PageRank",
83            "nodes": G.number_of_nodes(),
84            "edges": G.number_of_edges()
85        }
86
87    def infer(self, ctx) -> pd.DataFrame:
88        """
89        Inference: Calculate PageRank centrality
90        """
91        ctx.logger.info("Starting NetworkX inference...")
92        
93        if self.graph is None:
94             self.train(ctx)
95             
96        # Calculate PageRank
97        try:
98            pagerank = nx.pagerank(self.graph)
99        except Exception as e:
100            ctx.logger.warning(f"PageRank failed: {e}, returning empty")
101            return pd.DataFrame()
102
103        # Users want anomalies. Let's say high Pagerank = "Key Player" (Anomaly type)
104        # Or low pagerank = "Isolate".
105        
106        results = []
107        for node, score in pagerank.items():
108            # Normalize score for risk 0-100? PageRank sums to 1.
109            # Multiply by N to normalize relative to uniform?
110            N = self.graph.number_of_nodes()
111            relative_score = score * N 
112            
113            # Simple heuristic: heavily central nodes are "risky" or "important"
114            risk = min(100.0, relative_score * 20)
115            
116            results.append({
117                "entity_id": str(node),
118                "risk_score": float(risk),
119                "anomaly_type": "high_centrality" if risk > 50 else "normal",
120                "details": {"pagerank": float(score)}
121            })
122            
123        return pd.DataFrame(results)
124
125    def execute(self, data=None):
126         # shim for v1
127        class MockCtx:
128            def __init__(self, d): self.df = d if d else pd.DataFrame(); self.logger = type('obj', (object,), {'info': print, 'warning': print})
129        return self.infer(MockCtx(pd.DataFrame(data) if data else None)).to_dict('records')
130