Back to Models
model_sklearn
v1.0.0
scikit-learnIsolation Forest anomaly detection using scikit-learn. Identifies statistical outliers using tree-based ensemble methods. Returns risk scores on a 0-100 scale with anomaly classifications.
$ openuba install model_sklearn OpenUBA
sklearn
License: Apache-2.0
isolation-forestanomaly-detectionstatisticalunsupervisedsklearn
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
| contamination | float | 0.1 | The proportion of outliers in the data set |
| random_state | integer | 42 | Random state for reproducibility |
model.yaml
1name: model_sklearn
2version: 1.0.0
3runtime: sklearn
4description: Isolation Forest Anomaly Detection
5parameters:
6 contamination:
7 type: float
8 default: 0.1
9 description: The amount of contamination of the data set, i.e. the proportion of outliers in the data set.
10 random_state:
11 type: integer
12 default: 42
13 description: Random state for reproducibility
14MODEL.py
1
2import pandas as pd
3import numpy as np
4from sklearn.ensemble import IsolationForest
5from typing import Dict, Any
6
7class Model:
8 def __init__(self):
9 self.model = IsolationForest(contamination=0.1, random_state=42)
10 self.is_trained = False
11
12 def train(self, ctx) -> Dict[str, Any]:
13 """
14 Train the isolation forest model
15 """
16 ctx.logger.info("Starting Sklearn Isolation Forest training...")
17
18 # Load data from context
19 if ctx.df is None or (hasattr(ctx.df, 'empty') and ctx.df.empty):
20 raise ValueError("No training data provided. Specify a data source (elasticsearch, spark, or local_csv).")
21
22 X = ctx.df.select_dtypes(include=[np.number]).values
23 if X.shape[0] == 0 or X.shape[1] == 0:
24 raise ValueError(f"Training data has no numeric columns (shape={ctx.df.shape}, numeric_shape={X.shape})")
25
26 self.model.fit(X)
27 self.is_trained = True
28
29 ctx.logger.info("Training completed.")
30 return {
31 "status": "success",
32 "model_type": "IsolationForest",
33 "n_samples": len(X),
34 "n_features": X.shape[1]
35 }
36
37 def infer(self, ctx) -> pd.DataFrame:
38 """
39 Inference using the trained model
40 """
41 ctx.logger.info("Starting inference...")
42
43 if not self.is_trained:
44 ctx.logger.warning("Model not explicitly trained, fitting on inference data for demo")
45
46 if ctx.df is None or (hasattr(ctx.df, 'empty') and ctx.df.empty):
47 raise ValueError("No inference data provided. Specify a data source (elasticsearch, spark, or local_csv).")
48
49 X = ctx.df.select_dtypes(include=[np.number]).values
50 if X.shape[0] == 0 or X.shape[1] == 0:
51 raise ValueError(f"Inference data has no numeric columns (shape={ctx.df.shape}, numeric_shape={X.shape})")
52
53 # Try to find an ID column
54 if "entity_id" in ctx.df.columns:
55 ids = ctx.df["entity_id"].values
56 elif "user_id" in ctx.df.columns:
57 ids = ctx.df["user_id"].values
58 else:
59 ids = [f"entity_{i}" for i in range(len(X))]
60
61 # Fit if needed (for demo purposes if weights loading isn't fully implemented in runner)
62 if not hasattr(self.model, "estimators_"):
63 ctx.logger.info(f"fitting IsolationForest on {X.shape[0]} samples, {X.shape[1]} features...")
64 self.model.fit(X)
65
66 ctx.logger.info(f"running predictions on {X.shape[0]} samples...")
67 predictions = self.model.predict(X)
68 ctx.logger.info(f"computing anomaly scores...")
69 scores = self.model.decision_function(X)
70
71 ctx.logger.info(f"building risk scores for {len(predictions)} results...")
72 # -1 is anomaly, 1 is normal in IsolationForest
73 # We want risk score 0-100.
74 # decision_function: lower is more anomalous.
75
76 results = []
77 for i, (pred, score) in enumerate(zip(predictions, scores)):
78 # convert score to risk (simple heuristic)
79 risk = 0.0
80 if pred == -1:
81 risk = min(100.0, abs(score) * 100 + 50)
82 else:
83 risk = max(0.0, (1 - score) * 20)
84
85 results.append({
86 "entity_id": str(ids[i]),
87 "risk_score": float(risk),
88 "anomaly_type": "statistical_outlier" if pred == -1 else "normal",
89 "details": {"raw_score": float(score)}
90 })
91
92 return pd.DataFrame(results)
93
94 def execute(self, data=None):
95 # shim for v1 interface
96 class MockCtx:
97 def __init__(self, d): self.df = d if d else pd.DataFrame(); self.logger = type('obj', (object,), {'info': print, 'warning': print})
98 return self.infer(MockCtx(pd.DataFrame(data) if data else None)).to_dict('records')
99