Post

Anomaly Detection: Complete Professional Guide

Introduction

Anomaly detection (also known as outlier detection) is a critical discipline in machine learning and data science, essential for identifying patterns, observations, or events that deviate significantly from expected normal behavior. In today’s industrial context, where data volumes are exploding and systems are becoming increasingly complex, the ability to automatically detect anomalies has become a major competitive advantage.

Strategic Importance

Anomaly detection systems play a crucial role in:

  • Cybersecurity: Intrusion detection, DDoS attacks, malicious behavior (average cost of a data breach: $4.35M according to IBM 2022)
  • Finance: Fraud prevention, suspicious transaction detection (annual fraud losses: $40B+ globally)
  • Industry: Predictive maintenance, failure detection before critical breakdown (30-50% reduction in downtime)
  • Healthcare: Medical anomaly detection, patient monitoring, rare disease identification
  • IoT and Smart Cities: Sensor monitoring, dysfunction detection in urban infrastructure
  • E-commerce: Fraudulent behavior detection, bots, payment system abuse

Contemporary Challenges

The field faces several major challenges:

  1. Data volume: Real-time processing of millions of events per second
  2. Dimensionality: Handling high-dimensional data (curse of dimensionality)
  3. Class imbalance: Anomalies often <1% of data
  4. Concept drift: Evolution of normal and abnormal patterns over time
  5. Explainability: Need to understand why an observation is considered anomalous
  6. False positives: Operational cost of false alerts

Theoretical Foundations

Formal Definition

An anomaly is an observation whose probability of appearing in the normal distribution model is significantly lower than a defined threshold. In other words, it’s a data point that has very little chance of belonging to the normal data distribution.

An anomaly can also be defined in terms of distance: it’s an observation whose distance from the center of the normal distribution exceeds a critical threshold. The further a point is from the center, the more likely it is to be an anomaly.

Learning Framework

Anomaly detection can be formulated according to three paradigms:

1. Supervised Learning

  • Labeled data with normal and abnormal examples
  • Imbalanced binary classification problem
  • Requires significant set of anomaly examples
  • Advantages: High performance, clear metrics
  • Disadvantages: Annotation cost, bias toward known anomalies

2. Semi-Supervised Learning (One-Class Learning)

  • Training on normal data only
  • Learns a decision boundary around the normal class
  • The model returns: 1 if observation is normal, -1 otherwise
  • Advantages: No need for anomalies in training
  • Disadvantages: May miss certain types of anomalies

3. Unsupervised Learning

  • No annotation required
  • Based on distribution or density assumptions
  • Identifies low-density points as anomalies
  • Advantages: Applicable without labels, discovers unknown anomalies
  • Disadvantages: Variable performance, requires expert validation

Fundamental Assumptions

Anomaly detection algorithms typically rely on one or more of these assumptions:

  1. Density hypothesis: Anomalies appear in regions of low probability density
  2. Distance hypothesis: Anomalies are far from their nearest neighbors
  3. Separability hypothesis: Anomalies can be separated from normal data by a decision boundary
  4. Reconstruction hypothesis: Anomalies are difficult to reconstruct by a model trained on normal data

Types of Anomalies

Detailed Classification

1. Point Anomalies

Individual data points that are abnormal compared to the rest of the dataset.

Characteristics:

  • Isolated deviant observations
  • Simplest to detect
  • Represent ~70% of use cases

Industrial examples:

  • Bank transaction of €50,000 when average is €100
  • Abnormal temperature spike in a datacenter (80°C vs 22°C nominal)
  • API response time of 30s vs 200ms typical
  • Unusual electricity consumption in a building

Suitable methods: Z-Score, IQR, Isolation Forest, LOF

2. Contextual Anomalies

Observations that are normal in one context but abnormal in another. Context can be temporal, spatial, or multivariate.

Characteristics:

  • Depend on observation context
  • Require considering contextual features
  • Common in time series

Industrial examples:

  • Temperature of 30°C normal in summer but abnormal in winter
  • High network traffic during day (normal) vs night (suspect)
  • Spending €500 abroad (travel context: normal, usual context: suspect)
  • CPU usage at 90% during nightly batch (normal) vs daytime (abnormal)

Suitable methods: LSTM, Prophet, Conditional Anomaly Detection, Seasonal Decomposition

3. Collective Anomalies

A collection of data points that, taken collectively, are abnormal even if individually they may seem normal.

Characteristics:

  • Require analysis of sequences or groups
  • More complex to identify
  • Often related to business processes

Industrial examples:

  • Coordinated purchase sequence indicating a fraud ring
  • Unusual network activity pattern suggesting an APT cyberattack
  • Series of micro-transactions building fraud
  • Progressive performance degradation (drift) before system failure
  • Coordinated bot behavior on a platform

Suitable methods: Sequential Pattern Mining, RNN/LSTM, HMM, Graph-based methods

4. Additional Anomalies (Advanced Classification)

Conditional Anomalies

  • Violation of business rules or logical constraints
  • Example: Negative age, future date in history

Relative Anomalies

  • Deviations from a reference group
  • Example: Store performance vs regional average

Drift Anomalies

  • Gradual changes in distribution
  • Example: Progressive degradation of sensor quality

Approaches and Algorithms

1. Statistical Methods

Statistical methods are based on probabilistic models and hypothesis testing. They assume normal data follows a known distribution (often Gaussian).

Gaussian Distribution and Z-Score Test

Theory: The Z-score measures how many standard deviations an observation is from the mean. It’s calculated by taking the difference between the observed value and the mean, divided by the standard deviation.

Formula: Z-score = (value - mean) / standard deviation

An observation is considered abnormal if its absolute Z-score exceeds a threshold (typically 3, corresponding to 99.7% of data in a normal distribution).

Production-Ready Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import numpy as np
from scipy import stats
from typing import Tuple, Optional
import logging

class ZScoreDetector:
    """
    Z-score based anomaly detector with robust handling
    """
    def __init__(self, threshold: float = 3.0, use_modified: bool = False):
        """
        Args:
            threshold: Detection threshold (number of standard deviations)
            use_modified: Use modified Z-score (MAD) for robustness
        """
        self.threshold = threshold
        self.use_modified = use_modified
        self.mu_ = None
        self.sigma_ = None
        self.mad_ = None
        self.logger = logging.getLogger(__name__)
        
    def fit(self, data: np.ndarray) -> 'ZScoreDetector':
        """
        Compute statistics on training data
        """
        if len(data) == 0:
            raise ValueError("Training data is empty")
            
        self.mu_ = np.mean(data)
        self.sigma_ = np.std(data)
        
        if self.use_modified:
            # MAD (Median Absolute Deviation) - more robust to outliers
            median = np.median(data)
            self.mad_ = np.median(np.abs(data - median))
            
        self.logger.info(f"Model trained: μ={self.mu_:.2f}, σ={self.sigma_:.2f}")
        return self
    
    def predict(self, data: np.ndarray) -> np.ndarray:
        """
        Predict anomalies (-1) and normal (1)
        """
        scores = self.decision_function(data)
        return np.where(np.abs(scores) > self.threshold, -1, 1)
    
    def decision_function(self, data: np.ndarray) -> np.ndarray:
        """
        Compute anomaly scores (Z-scores)
        """
        if self.mu_ is None:
            raise ValueError("Model must be trained before prediction")
            
        if self.use_modified and self.mad_ is not None:
            # Modified Z-score (more robust)
            median = np.median(data)
            return 0.6745 * (data - median) / self.mad_
        else:
            # Classical Z-score
            return (data - self.mu_) / self.sigma_
    
    def get_anomaly_scores(self, data: np.ndarray) -> np.ndarray:
        """
        Return raw scores for analysis
        """
        return np.abs(self.decision_function(data))

# Usage example
np.random.seed(42)
normal_data = np.random.normal(100, 15, 1000)
test_data = np.concatenate([normal_data, [200, 250, -50]])

detector = ZScoreDetector(threshold=3.0, use_modified=True)
detector.fit(normal_data)
predictions = detector.predict(test_data)
scores = detector.get_anomaly_scores(test_data)

print(f"Detected anomalies: {(predictions == -1).sum()}")
print(f"Indices: {np.where(predictions == -1)[0]}")

IQR Method (Interquartile Range)

Theory: The IQR (Interquartile Range) is robust to outliers and doesn’t assume a particular distribution. The IQR is the difference between the 3rd quartile (Q3) and the 1st quartile (Q1).

Calculation: IQR = Q3 - Q1

Detection bounds are:

  • Lower bound: Q1 - 1.5 × IQR
  • Upper bound: Q3 + 1.5 × IQR

Any observation outside these bounds is considered an anomaly.

Advanced Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class IQRDetector:
    """
    IQR-based detector with configurable options
    """
    def __init__(self, multiplier: float = 1.5, percentiles: Tuple[float, float] = (25, 75)):
        """
        Args:
            multiplier: IQR multiplier (1.5 standard, 3.0 for extreme outliers)
            percentiles: Percentiles for Q1 and Q3
        """
        self.multiplier = multiplier
        self.percentiles = percentiles
        self.q1_ = None
        self.q3_ = None
        self.iqr_ = None
        self.lower_bound_ = None
        self.upper_bound_ = None
        
    def fit(self, data: np.ndarray) -> 'IQRDetector':
        """
        Compute quartiles and bounds
        """
        self.q1_ = np.percentile(data, self.percentiles[0])
        self.q3_ = np.percentile(data, self.percentiles[1])
        self.iqr_ = self.q3_ - self.q1_
        
        self.lower_bound_ = self.q1_ - self.multiplier * self.iqr_
        self.upper_bound_ = self.q3_ + self.multiplier * self.iqr_
        
        return self
    
    def predict(self, data: np.ndarray) -> np.ndarray:
        """
        Predict anomalies
        """
        is_anomaly = (data < self.lower_bound_) | (data > self.upper_bound_)
        return np.where(is_anomaly, -1, 1)
    
    def decision_function(self, data: np.ndarray) -> np.ndarray:
        """
        Normalized distance to bounds
        """
        scores = np.zeros_like(data, dtype=float)
        
        # Distance below lower bound
        below = data < self.lower_bound_
        scores[below] = (self.lower_bound_ - data[below]) / self.iqr_
        
        # Distance above upper bound
        above = data > self.upper_bound_
        scores[above] = (data[above] - self.upper_bound_) / self.iqr_
        
        return scores
    
    def get_bounds(self) -> Tuple[float, float]:
        """
        Return detection bounds
        """
        return self.lower_bound_, self.upper_bound_

# Usage
detector = IQRDetector(multiplier=1.5)
detector.fit(normal_data)
predictions = detector.predict(test_data)
print(f"Bounds: {detector.get_bounds()}")

Advanced Statistical Tests

Grubbs Test (Single Outlier)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from scipy.stats import t

def grubbs_test(data: np.ndarray, alpha: float = 0.05) -> Tuple[bool, float, float]:
    """
    Grubbs test to detect a single outlier
    
    H0: No outlier
    H1: At least one outlier
    
    Returns:
        (is_outlier, test_statistic, critical_value)
    """
    n = len(data)
    mean = np.mean(data)
    std = np.std(data, ddof=1)
    
    # Compute G statistic
    abs_deviations = np.abs(data - mean)
    max_idx = np.argmax(abs_deviations)
    G = abs_deviations[max_idx] / std
    
    # Critical value
    t_dist = t.ppf(1 - alpha / (2 * n), n - 2)
    critical_value = ((n - 1) * np.sqrt(t_dist**2)) / np.sqrt(n * (n - 2 + t_dist**2))
    
    return G > critical_value, G, critical_value

# Test
is_outlier, stat, crit = grubbs_test(test_data[-10:])
print(f"Outlier detected: {is_outlier}, G={stat:.3f}, Critical={crit:.3f}")

Dixon Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def dixon_test(data: np.ndarray, alpha: float = 0.05) -> bool:
    """
    Dixon test for small samples (n < 30)
    """
    n = len(data)
    if n < 3 or n > 30:
        raise ValueError("Dixon test applicable for 3 <= n <= 30")
    
    sorted_data = np.sort(data)
    
    # Compute Q ratio
    if n <= 7:
        Q = (sorted_data[1] - sorted_data[0]) / (sorted_data[-1] - sorted_data[0])
    else:
        Q = (sorted_data[1] - sorted_data[0]) / (sorted_data[-2] - sorted_data[0])
    
    # Critical values (simplified for alpha=0.05)
    critical_values = {3: 0.941, 4: 0.765, 5: 0.642, 6: 0.560, 7: 0.507}
    critical = critical_values.get(n, 0.5)
    
    return Q > critical

2. Machine Learning Methods

Isolation Forest - In-Depth Analysis

Theoretical Principle:

Isolation Forest is based on the principle that anomalies are “easier to isolate” than normal points. The algorithm builds isolation trees by recursively separating data with random splits.

Anomaly score:

The anomaly score is calculated from the path length needed to isolate an observation in the trees. The shorter the path, the easier the observation is to isolate, so the more likely it is to be an anomaly.

The score is normalized to obtain a value between 0 and 1, using the average path length as reference. The formula takes into account the expected average depth in a random isolation tree.

Interpretation:

  • Score close to 1: strong anomaly
  • Score ~0.5: normal point
  • Score <0.5: very normal point (cluster center)

Production Implementation with Optimizations:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
from typing import Dict, Tuple
import joblib

class ProductionIsolationForest:
    """
    Isolation Forest optimized for production environment
    """
    def __init__(
        self,
        contamination: float = 0.1,
        n_estimators: int = 100,
        max_samples: int = 256,
        max_features: float = 1.0,
        n_jobs: int = -1,
        random_state: int = 42
    ):
        """
        Args:
            contamination: Expected proportion of anomalies
            n_estimators: Number of trees (more = stable but slow)
            max_samples: Samples per tree (256 optimal per paper)
            max_features: Proportion of features to consider
            n_jobs: Number of CPUs (-1 = all)
        """
        self.contamination = contamination
        self.scaler = StandardScaler()
        self.model = IsolationForest(
            contamination=contamination,
            n_estimators=n_estimators,
            max_samples=max_samples,
            max_features=max_features,
            n_jobs=n_jobs,
            random_state=random_state,
            warm_start=False,
            bootstrap=False
        )
        self.feature_names_ = None
        self.threshold_ = None
        
    def fit(self, X: np.ndarray, feature_names: list = None) -> 'ProductionIsolationForest':
        """
        Train the model
        """
        # Normalization
        X_scaled = self.scaler.fit_transform(X)
        
        # Training
        self.model.fit(X_scaled)
        
        # Compute decision threshold
        scores = self.model.score_samples(X_scaled)
        self.threshold_ = np.percentile(scores, self.contamination * 100)
        
        self.feature_names_ = feature_names
        
        return self
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        Predict anomalies (-1) and normal (1)
        """
        X_scaled = self.scaler.transform(X)
        return self.model.predict(X_scaled)
    
    def decision_function(self, X: np.ndarray) -> np.ndarray:
        """
        Anomaly scores (more negative = more abnormal)
        """
        X_scaled = self.scaler.transform(X)
        return self.model.score_samples(X_scaled)
    
    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        """
        Probability of being an anomaly [0, 1]
        """
        scores = self.decision_function(X)
        # Normalization between 0 and 1
        min_score = scores.min()
        max_score = scores.max()
        proba = 1 - (scores - min_score) / (max_score - min_score)
        return proba
    
    def explain_anomaly(self, X: np.ndarray, idx: int) -> Dict:
        """
        Explain why an observation is anomalous
        """
        if X.ndim == 1:
            X = X.reshape(1, -1)
            
        X_scaled = self.scaler.transform(X)
        score = self.model.score_samples(X_scaled)[idx]
        prediction = self.model.predict(X_scaled)[idx]
        
        # Feature contribution (approximation)
        contributions = {}
        if self.feature_names_:
            X_mean = np.zeros_like(X[idx])
            base_score = self.model.score_samples(
                self.scaler.transform(X_mean.reshape(1, -1))
            )[0]
            
            for i, fname in enumerate(self.feature_names_):
                X_modified = X[idx].copy()
                X_modified[i] = 0  # Neutral value
                modified_score = self.model.score_samples(
                    self.scaler.transform(X_modified.reshape(1, -1))
                )[0]
                contributions[fname] = abs(score - modified_score)
        
        return {
            'anomaly_score': score,
            'is_anomaly': prediction == -1,
            'threshold': self.threshold_,
            'feature_contributions': contributions
        }
    
    def save(self, filepath: str):
        """
        Save the model
        """
        joblib.dump({
            'model': self.model,
            'scaler': self.scaler,
            'contamination': self.contamination,
            'threshold': self.threshold_,
            'feature_names': self.feature_names_
        }, filepath)
    
    @classmethod
    def load(cls, filepath: str) -> 'ProductionIsolationForest':
        """
        Load a saved model
        """
        data = joblib.load(filepath)
        instance = cls(contamination=data['contamination'])
        instance.model = data['model']
        instance.scaler = data['scaler']
        instance.threshold_ = data['threshold']
        instance.feature_names_ = data['feature_names']
        return instance

# Advanced usage example
np.random.seed(42)

# Generate realistic data
n_samples = 10000
n_features = 10

# Normal data (multivariate distribution)
mean = np.zeros(n_features)
cov = np.eye(n_features)
X_normal = np.random.multivariate_normal(mean, cov, n_samples)

# Anomalies (different types)
n_anomalies = 500
X_anomalies = np.vstack([
    np.random.uniform(-5, 5, (n_anomalies // 2, n_features)),  # Uniform
    np.random.normal(4, 1, (n_anomalies // 2, n_features))     # Distant cluster
])

X = np.vstack([X_normal, X_anomalies])
y_true = np.hstack([np.zeros(n_samples), np.ones(n_anomalies)])

# Training
feature_names = [f'feature_{i}' for i in range(n_features)]
detector = ProductionIsolationForest(
    contamination=0.05,
    n_estimators=200,
    max_samples=256
)
detector.fit(X_normal, feature_names=feature_names)

# Prediction
predictions = detector.predict(X)
probas = detector.predict_proba(X)
scores = detector.decision_function(X)

# Explain an anomaly
anomaly_idx = np.where(predictions == -1)[0][0]
explanation = detector.explain_anomaly(X, anomaly_idx)
print(f"Explanation for anomaly #{anomaly_idx}:")
print(f"  Score: {explanation['anomaly_score']:.4f}")
print(f"  Threshold: {explanation['threshold']:.4f}")
print(f"  Contributions: {explanation['feature_contributions']}")

# Save
detector.save('isolation_forest_model.joblib')

One-Class SVM - Deep Dive

Theory:

One-Class SVM seeks a hypersphere (or hyperplane) of minimal volume containing most of the normal data.

Objective function:

The algorithm minimizes a function that balances two objectives:

  1. Minimize the size of the hypersphere (to have the tightest boundary possible)
  2. Minimize violations (normal points outside the boundary)

The nu parameter (ν) controls the maximum fraction of expected anomalies and must be set between 0 and 1. The rho parameter (ρ) represents the hyperplane offset from the origin.

Optimized Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from sklearn.svm import OneClassSVM
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer
import numpy as np

class OptimizedOneClassSVM:
    """
    One-Class SVM with hyperparameter optimization
    """
    def __init__(self, kernel: str = 'rbf', nu: float = 0.1, gamma: str = 'scale'):
        self.kernel = kernel
        self.nu = nu
        self.gamma = gamma
        self.scaler = StandardScaler()
        self.model = None
        self.best_params_ = None
        
    def fit(self, X: np.ndarray, optimize: bool = False) -> 'OptimizedOneClassSVM':
        """
        Train the model with optimization option
        """
        X_scaled = self.scaler.fit_transform(X)
        
        if optimize:
            self._optimize_hyperparameters(X_scaled)
        else:
            self.model = OneClassSVM(
                kernel=self.kernel,
                nu=self.nu,
                gamma=self.gamma
            )
            self.model.fit(X_scaled)
            
        return self
    
    def _optimize_hyperparameters(self, X: np.ndarray):
        """
        Optimize hyperparameters via cross-validation
        """
        param_grid = {
            'kernel': ['rbf', 'poly', 'sigmoid'],
            'nu': [0.01, 0.05, 0.1, 0.2],
            'gamma': ['scale', 'auto', 0.001, 0.01, 0.1]
        }
        
        # Custom scorer (hypersphere volume)
        def volume_scorer(estimator, X):
            n_support = len(estimator.support_vectors_)
            return -n_support / len(X)  # Minimize number of support vectors
        
        grid_search = GridSearchCV(
            OneClassSVM(),
            param_grid,
            scoring=make_scorer(volume_scorer),
            cv=5,
            n_jobs=-1,
            verbose=1
        )
        
        grid_search.fit(X)
        self.model = grid_search.best_estimator_
        self.best_params_ = grid_search.best_params_
        
        print(f"Best parameters: {self.best_params_}")
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        X_scaled = self.scaler.transform(X)
        return self.model.predict(X_scaled)
    
    def decision_function(self, X: np.ndarray) -> np.ndarray:
        X_scaled = self.scaler.transform(X)
        return self.model.decision_function(X_scaled)
    
    def get_support_vectors(self) -> np.ndarray:
        """
        Return support vectors (decision boundary)
        """
        return self.scaler.inverse_transform(self.model.support_vectors_)

# Usage with optimization
svm_detector = OptimizedOneClassSVM()
svm_detector.fit(X_normal, optimize=True)
predictions = svm_detector.predict(X)

Local Outlier Factor (LOF) - Advanced Version

Theory:

LOF measures the local density of a point relative to its neighbors.

Calculation in three steps:

  1. Reachability distance (reach-dist): Maximum distance between two points considering the distance to the k-th nearest neighbor. This distance is calculated as the maximum between the k-th neighbor distance of B and the actual distance between A and B.

  2. Local reachability density (lrd): Inverse of the average reachability distance from a point to its k nearest neighbors. Higher value means the point is in a dense area.

  3. LOF (Local Outlier Factor): Ratio between the average local density of neighbors and the local density of the point. Indicates how isolated a point is relative to its neighbors.

Interpretation:

  • LOF ≈ 1: similar density to neighbors (normal)
  • LOF > 1: lower density (anomaly)
  • LOF » 1: strong anomaly

Production Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from sklearn.neighbors import LocalOutlierFactor
from sklearn.model_selection import ParameterGrid

class AdaptiveLOF:
    """
    LOF with dynamic adaptation of number of neighbors
    """
    def __init__(
        self,
        n_neighbors: int = 20,
        contamination: float = 0.1,
        novelty: bool = True,
        metric: str = 'minkowski'
    ):
        self.n_neighbors = n_neighbors
        self.contamination = contamination
        self.novelty = novelty
        self.metric = metric
        self.scaler = StandardScaler()
        self.model = None
        
    def fit(self, X: np.ndarray) -> 'AdaptiveLOF':
        """
        Train LOF model
        """
        X_scaled = self.scaler.fit_transform(X)
        
        self.model = LocalOutlierFactor(
            n_neighbors=self.n_neighbors,
            contamination=self.contamination,
            novelty=self.novelty,
            metric=self.metric,
            n_jobs=-1
        )
        
        if self.novelty:
            self.model.fit(X_scaled)
        else:
            # Non-novelty mode: fit_predict in one go
            self.model.fit_predict(X_scaled)
            
        return self
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        Predict anomalies (requires novelty=True)
        """
        if not self.novelty:
            raise ValueError("predict() requires novelty=True")
            
        X_scaled = self.scaler.transform(X)
        return self.model.predict(X_scaled)
    
    def decision_function(self, X: np.ndarray) -> np.ndarray:
        """
        LOF scores (more negative = more abnormal)
        """
        if not self.novelty:
            raise ValueError("decision_function() requires novelty=True")
            
        X_scaled = self.scaler.transform(X)
        return self.model.decision_function(X_scaled)
    
    def get_lof_scores(self, X: np.ndarray) -> np.ndarray:
        """
        Return raw LOF scores (>1 = anomaly)
        """
        scores = -self.decision_function(X)
        return scores
    
    @staticmethod
    def select_optimal_k(X: np.ndarray, k_range: range = range(5, 51, 5)) -> int:
        """
        Select optimal number of neighbors
        """
        silhouette_scores = []
        
        for k in k_range:
            lof = LocalOutlierFactor(n_neighbors=k, novelty=False)
            labels = lof.fit_predict(X)
            
            # If all points classified the same, skip
            if len(np.unique(labels)) == 1:
                continue
                
            from sklearn.metrics import silhouette_score
            score = silhouette_score(X, labels)
            silhouette_scores.append((k, score))
        
        best_k = max(silhouette_scores, key=lambda x: x[1])[0]
        return best_k

# Example with automatic k selection
optimal_k = AdaptiveLOF.select_optimal_k(X_normal)
print(f"Optimal number of neighbors: {optimal_k}")

lof_detector = AdaptiveLOF(n_neighbors=optimal_k, novelty=True)
lof_detector.fit(X_normal)
predictions = lof_detector.predict(X)
lof_scores = lof_detector.get_lof_scores(X)

print(f"Detected anomalies: {(predictions == -1).sum()}")
print(f"LOF scores (anomalies): min={lof_scores[predictions == -1].min():.2f}, "
      f"max={lof_scores[predictions == -1].max():.2f}")

3. Deep Learning Methods

Autoencoders - Architecture and Theory

Fundamental Principle:

An autoencoder learns a compressed representation (encoding) of normal data. Anomalies, not being part of the training distribution, will be poorly reconstructed.

Architecture:

The autoencoder consists of two parts:

  • The encoder transforms input data (x) into a compressed representation (z) in the latent space
  • The decoder reconstructs the original data (x̂) from this compressed representation

Loss function:

The loss function measures reconstruction error by calculating the average squared difference between original and reconstructed data. This error is called Mean Squared Error (MSE).

Detection:

An observation is abnormal if reconstruction error (difference between original and reconstruction) exceeds a defined threshold. Anomalies are hard to reconstruct because they weren’t present in training data.

Production-Ready Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model, callbacks
import numpy as np
from typing import Tuple, Optional
import matplotlib.pyplot as plt

class AnomalyAutoencoder:
    """
    Sophisticated autoencoder for anomaly detection
    """
    def __init__(
        self,
        input_dim: int,
        encoding_dims: list = [64, 32, 16],
        activation: str = 'relu',
        dropout_rate: float = 0.2,
        l2_reg: float = 1e-5
    ):
        """
        Args:
            input_dim: Input feature dimension
            encoding_dims: List of dimensions for encoding layers
            activation: Activation function
            dropout_rate: Dropout rate for regularization
            l2_reg: L2 regularization coefficient
        """
        self.input_dim = input_dim
        self.encoding_dims = encoding_dims
        self.activation = activation
        self.dropout_rate = dropout_rate
        self.l2_reg = l2_reg
        
        self.autoencoder = None
        self.encoder = None
        self.decoder = None
        self.threshold_ = None
        self.history_ = None
        
        self._build_model()
    
    def _build_model(self):
        """
        Build autoencoder architecture
        """
        # Regularization
        regularizer = keras.regularizers.l2(self.l2_reg)
        
        # ===== ENCODER =====
        input_layer = keras.Input(shape=(self.input_dim,), name='input')
        x = input_layer
        
        # Progressive encoding layers
        for i, dim in enumerate(self.encoding_dims):
            x = layers.Dense(
                dim,
                activation=self.activation,
                kernel_regularizer=regularizer,
                name=f'encoder_{i}'
            )(x)
            x = layers.BatchNormalization(name=f'bn_encoder_{i}')(x)
            x = layers.Dropout(self.dropout_rate, name=f'dropout_encoder_{i}')(x)
        
        # Bottleneck (latent space)
        latent_dim = self.encoding_dims[-1]
        latent = layers.Dense(
            latent_dim,
            activation=self.activation,
            name='latent_space'
        )(x)
        
        # ===== DECODER =====
        x = latent
        
        # Decoding layers (mirror of encoder)
        for i, dim in enumerate(reversed(self.encoding_dims[:-1])):
            x = layers.Dense(
                dim,
                activation=self.activation,
                kernel_regularizer=regularizer,
                name=f'decoder_{i}'
            )(x)
            x = layers.BatchNormalization(name=f'bn_decoder_{i}')(x)
            x = layers.Dropout(self.dropout_rate, name=f'dropout_decoder_{i}')(x)
        
        # Output layer (reconstruction)
        output = layers.Dense(
            self.input_dim,
            activation='linear',  # Or 'sigmoid' if data normalized [0,1]
            name='output'
        )(x)
        
        # Models
        self.autoencoder = Model(input_layer, output, name='autoencoder')
        self.encoder = Model(input_layer, latent, name='encoder')
        
        # Separate decoder
        decoder_input = keras.Input(shape=(latent_dim,))
        decoder_layers = decoder_input
        for layer in self.autoencoder.layers[len(self.encoding_dims)+3:]:
            decoder_layers = layer(decoder_layers)
        self.decoder = Model(decoder_input, decoder_layers, name='decoder')
    
    def compile(
        self,
        optimizer: str = 'adam',
        learning_rate: float = 0.001,
        loss: str = 'mse'
    ):
        """
        Compile the model
        """
        opt = keras.optimizers.Adam(learning_rate=learning_rate)
        self.autoencoder.compile(optimizer=opt, loss=loss)
    
    def fit(
        self,
        X_train: np.ndarray,
        X_val: Optional[np.ndarray] = None,
        epochs: int = 100,
        batch_size: int = 32,
        verbose: int = 1
    ) -> 'AnomalyAutoencoder':
        """
        Train autoencoder on normal data
        """
        # Callbacks
        early_stop = callbacks.EarlyStopping(
            monitor='val_loss' if X_val is not None else 'loss',
            patience=10,
            restore_best_weights=True,
            verbose=verbose
        )
        
        reduce_lr = callbacks.ReduceLROnPlateau(
            monitor='val_loss' if X_val is not None else 'loss',
            factor=0.5,
            patience=5,
            min_lr=1e-7,
            verbose=verbose
        )
        
        # Training
        validation_data = (X_val, X_val) if X_val is not None else None
        
        self.history_ = self.autoencoder.fit(
            X_train, X_train,
            validation_data=validation_data,
            epochs=epochs,
            batch_size=batch_size,
            callbacks=[early_stop, reduce_lr],
            verbose=verbose
        )
        
        # Compute detection threshold
        reconstructions = self.autoencoder.predict(X_train, verbose=0)
        mse = np.mean(np.power(X_train - reconstructions, 2), axis=1)
        self.threshold_ = np.percentile(mse, 95)  # 95th percentile
        
        return self
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        Predict anomalies (-1) and normal (1)
        """
        reconstruction_errors = self.reconstruction_error(X)
        return np.where(reconstruction_errors > self.threshold_, -1, 1)
    
    def reconstruction_error(self, X: np.ndarray) -> np.ndarray:
        """
        Compute reconstruction error (MSE)
        """
        reconstructions = self.autoencoder.predict(X, verbose=0)
        mse = np.mean(np.power(X - reconstructions, 2), axis=1)
        return mse
    
    def encode(self, X: np.ndarray) -> np.ndarray:
        """
        Encode data into latent space
        """
        return self.encoder.predict(X, verbose=0)
    
    def decode(self, latent: np.ndarray) -> np.ndarray:
        """
        Decode from latent space
        """
        return self.decoder.predict(latent, verbose=0)
    
    def plot_training_history(self):
        """
        Visualize training history
        """
        plt.figure(figsize=(12, 4))
        
        plt.subplot(1, 2, 1)
        plt.plot(self.history_.history['loss'], label='Training Loss')
        if 'val_loss' in self.history_.history:
            plt.plot(self.history_.history['val_loss'], label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.title('Training History')
        plt.grid(True, alpha=0.3)
        
        plt.subplot(1, 2, 2)
        plt.hist(self.history_.history['loss'], bins=50, alpha=0.7)
        plt.axvline(self.threshold_, color='r', linestyle='--', 
                   label=f'Threshold: {self.threshold_:.4f}')
        plt.xlabel('Reconstruction Error')
        plt.ylabel('Frequency')
        plt.legend()
        plt.title('Error Distribution')
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
    
    def save(self, filepath: str):
        """
        Save complete model
        """
        self.autoencoder.save(f'{filepath}_autoencoder.h5')
        np.save(f'{filepath}_threshold.npy', self.threshold_)
    
    @classmethod
    def load(cls, filepath: str, input_dim: int) -> 'AnomalyAutoencoder':
        """
        Load saved model
        """
        instance = cls(input_dim=input_dim)
        instance.autoencoder = keras.models.load_model(f'{filepath}_autoencoder.h5')
        instance.threshold_ = np.load(f'{filepath}_threshold.npy')
        return instance

# Advanced usage example
np.random.seed(42)
tf.random.set_seed(42)

# Training data (normal)
n_train = 5000
n_features = 20
X_train_normal = np.random.randn(n_train, n_features)

# Test data (normal + anomalies)
n_test = 1000
n_anomalies = 100
X_test_normal = np.random.randn(n_test - n_anomalies, n_features)
X_test_anomalies = np.random.uniform(-5, 5, (n_anomalies, n_features))
X_test = np.vstack([X_test_normal, X_test_anomalies])
y_test = np.hstack([np.zeros(n_test - n_anomalies), np.ones(n_anomalies)])

# Validation split
from sklearn.model_selection import train_test_split
X_train, X_val = train_test_split(X_train_normal, test_size=0.2, random_state=42)

# Create and train
ae = AnomalyAutoencoder(
    input_dim=n_features,
    encoding_dims=[64, 32, 16],
    dropout_rate=0.2
)
ae.compile(learning_rate=0.001)

print("Training autoencoder...")
ae.fit(X_train, X_val, epochs=100, batch_size=64, verbose=1)

# Prediction
predictions = ae.predict(X_test)
reconstruction_errors = ae.reconstruction_error(X_test)

# Evaluation
from sklearn.metrics import classification_report, confusion_matrix
print("\nPerformance:")
print(classification_report(y_test, (predictions == -1).astype(int),
                           target_names=['Normal', 'Anomaly']))

# Visualization
ae.plot_training_history()

# Visualization of reconstruction errors
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(range(len(reconstruction_errors)), reconstruction_errors,
           c=['blue' if p == 1 else 'red' for p in predictions], alpha=0.5)
plt.axhline(ae.threshold_, color='black', linestyle='--', label='Threshold')
plt.xlabel('Sample Index')
plt.ylabel('Reconstruction Error')
plt.title('Reconstruction Errors')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.hist(reconstruction_errors[y_test == 0], bins=50, alpha=0.7, label='Normal')
plt.hist(reconstruction_errors[y_test == 1], bins=50, alpha=0.7, label='Anomaly')
plt.axvline(ae.threshold_, color='black', linestyle='--', label='Threshold')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.legend()
plt.title('Error Distribution by Class')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

Variational Autoencoder (VAE)

Theory:

VAE (Variational Autoencoders) model the latent space as a probabilistic distribution rather than a deterministic vector. Instead of creating a single point in latent space, they create a probability distribution.

VAE Loss Function:

The loss function combines two components:

  1. Reconstruction error: Measures the difference between input and output (like a classical autoencoder)
  2. KL divergence (Kullback-Leibler): Measures how much the learned latent distribution deviates from a standard Gaussian distribution (mean=0, variance=1)

The beta parameter controls the relative importance of these two components. Higher beta forces the latent space to be closer to a standard Gaussian distribution, potentially at the cost of reconstruction quality.

Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class VariationalAutoencoder(keras.Model):
    """
    VAE for anomaly detection
    """
    def __init__(self, input_dim: int, latent_dim: int = 10, beta: float = 1.0):
        super().__init__()
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.beta = beta
        
        # Encoder
        self.encoder_network = keras.Sequential([
            layers.Dense(128, activation='relu'),
            layers.Dense(64, activation='relu')
        ])
        
        # Latent distribution parameters
        self.mean_layer = layers.Dense(latent_dim, name='z_mean')
        self.log_var_layer = layers.Dense(latent_dim, name='z_log_var')
        
        # Decoder
        self.decoder_network = keras.Sequential([
            layers.Dense(64, activation='relu'),
            layers.Dense(128, activation='relu'),
            layers.Dense(input_dim, activation='linear')
        ])
        
        self.threshold_ = None
    
    def encode(self, x):
        """
        Encode x into latent distribution
        """
        h = self.encoder_network(x)
        z_mean = self.mean_layer(h)
        z_log_var = self.log_var_layer(h)
        return z_mean, z_log_var
    
    def reparameterize(self, z_mean, z_log_var):
        """
        Reparameterization trick for backprop
        """
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
    def decode(self, z):
        """
        Decode from latent space
        """
        return self.decoder_network(z)
    
    def call(self, x, training=None):
        """
        Complete forward pass
        """
        z_mean, z_log_var = self.encode(x)
        z = self.reparameterize(z_mean, z_log_var)
        reconstruction = self.decode(z)
        
        # Compute KL loss
        kl_loss = -0.5 * tf.reduce_mean(
            1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
        )
        
        # Add KL loss to metrics
        self.add_loss(self.beta * kl_loss)
        self.add_metric(kl_loss, name='kl_loss')
        
        return reconstruction
    
    def anomaly_score(self, x):
        """
        Compute anomaly score
        """
        reconstruction = self(x, training=False)
        # Reconstruction error
        mse = tf.reduce_mean(tf.square(x - reconstruction), axis=1)
        
        # Combined score with latent probability
        z_mean, z_log_var = self.encode(x)
        log_prob = -0.5 * tf.reduce_sum(
            tf.square(z_mean) + tf.exp(z_log_var), axis=1
        )
        
        # Final score
        return mse - log_prob
    
    def fit_with_threshold(self, X_train, epochs=100, batch_size=32):
        """
        Train and compute threshold
        """
        self.compile(optimizer='adam', loss='mse')
        self.fit(X_train, X_train, epochs=epochs, batch_size=batch_size, verbose=1)
        
        # Compute threshold
        scores = self.anomaly_score(X_train).numpy()
        self.threshold_ = np.percentile(scores, 95)
        
        return self
    
    def predict_anomalies(self, X):
        """
        Predict anomalies
        """
        scores = self.anomaly_score(X).numpy()
        return np.where(scores > self.threshold_, -1, 1)

# Example
vae = VariationalAutoencoder(input_dim=n_features, latent_dim=10, beta=1.0)
vae.fit_with_threshold(X_train, epochs=50)
vae_predictions = vae.predict_anomalies(X_test)
print(f"VAE - Detected anomalies: {(vae_predictions == -1).sum()}")

LSTM for Time Series - Advanced Version

Architecture for Temporal Anomaly Detection:

LSTMs are ideal for time series as they capture long-term dependencies and sequential patterns.

Production Implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
from tensorflow.keras.layers import LSTM, Dense, RepeatVector, TimeDistributed, Dropout
from tensorflow.keras.models import Sequential
import pandas as pd

class LSTMAnomalyDetector:
    """
    Anomaly detector for time series with LSTM
    """
    def __init__(
        self,
        timesteps: int,
        n_features: int,
        lstm_units: list = [128, 64],
        dropout_rate: float = 0.2
    ):
        """
        Args:
            timesteps: Number of time steps in each sequence
            n_features: Number of features per time step
            lstm_units: List of LSTM units for each layer
            dropout_rate: Dropout rate
        """
        self.timesteps = timesteps
        self.n_features = n_features
        self.lstm_units = lstm_units
        self.dropout_rate = dropout_rate
        
        self.model = None
        self.threshold_ = None
        self.scaler = StandardScaler()
        
        self._build_model()
    
    def _build_model(self):
        """
        Build LSTM autoencoder
        """
        self.model = Sequential(name='LSTM_Autoencoder')
        
        # ===== ENCODER =====
        # First LSTM layer
        self.model.add(LSTM(
            self.lstm_units[0],
            activation='tanh',
            input_shape=(self.timesteps, self.n_features),
            return_sequences=True if len(self.lstm_units) > 1 else False,
            name='encoder_lstm_1'
        ))
        self.model.add(Dropout(self.dropout_rate))
        
        # Additional LSTM layers
        for i, units in enumerate(self.lstm_units[1:], start=2):
            return_seq = i < len(self.lstm_units)
            self.model.add(LSTM(
                units,
                activation='tanh',
                return_sequences=return_seq,
                name=f'encoder_lstm_{i}'
            ))
            self.model.add(Dropout(self.dropout_rate))
        
        # ===== DECODER =====
        # Repeat latent vector
        self.model.add(RepeatVector(self.timesteps, name='repeat_vector'))
        
        # LSTM decoding layers
        for i, units in enumerate(reversed(self.lstm_units), start=1):
            self.model.add(LSTM(
                units,
                activation='tanh',
                return_sequences=True,
                name=f'decoder_lstm_{i}'
            ))
            self.model.add(Dropout(self.dropout_rate))
        
        # Output layer
        self.model.add(TimeDistributed(
            Dense(self.n_features, activation='linear'),
            name='output'
        ))
    
    def compile(self, learning_rate: float = 0.001, loss: str = 'mse'):
        """
        Compile the model
        """
        optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
        self.model.compile(optimizer=optimizer, loss=loss)
    
    def create_sequences(
        self,
        data: np.ndarray,
        window_size: int = None
    ) -> np.ndarray:
        """
        Create sliding sequences for training
        """
        if window_size is None:
            window_size = self.timesteps
            
        sequences = []
        for i in range(len(data) - window_size + 1):
            sequences.append(data[i:i + window_size])
        
        return np.array(sequences)
    
    def fit(
        self,
        X_train: np.ndarray,
        epochs: int = 100,
        batch_size: int = 32,
        validation_split: float = 0.2,
        verbose: int = 1
    ) -> 'LSTMAnomalyDetector':
        """
        Train the model
        """
        # Normalization
        original_shape = X_train.shape
        X_flat = X_train.reshape(-1, self.n_features)
        X_scaled = self.scaler.fit_transform(X_flat)
        X_train_scaled = X_scaled.reshape(original_shape)
        
        # Callbacks
        early_stop = callbacks.EarlyStopping(
            monitor='val_loss',
            patience=15,
            restore_best_weights=True
        )
        
        reduce_lr = callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=7,
            min_lr=1e-7
        )
        
        # Training
        self.model.fit(
            X_train_scaled, X_train_scaled,
            epochs=epochs,
            batch_size=batch_size,
            validation_split=validation_split,
            callbacks=[early_stop, reduce_lr],
            verbose=verbose
        )
        
        # Compute threshold
        reconstructions = self.model.predict(X_train_scaled, verbose=0)
        mse = np.mean(np.power(X_train_scaled - reconstructions, 2), axis=(1, 2))
        self.threshold_ = np.percentile(mse, 95)
        
        return self
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        Predict anomalies
        """
        reconstruction_errors = self.reconstruction_error(X)
        return np.where(reconstruction_errors > self.threshold_, -1, 1)
    
    def reconstruction_error(self, X: np.ndarray) -> np.ndarray:
        """
        Compute reconstruction error
        """
        # Normalization
        original_shape = X.shape
        X_flat = X.reshape(-1, self.n_features)
        X_scaled = self.scaler.transform(X_flat)
        X_scaled = X_scaled.reshape(original_shape)
        
        # Reconstruction
        reconstructions = self.model.predict(X_scaled, verbose=0)
        
        # MSE per sequence
        mse = np.mean(np.power(X_scaled - reconstructions, 2), axis=(1, 2))
        
        return mse
    
    def plot_reconstruction(self, X: np.ndarray, idx: int = 0):
        """
        Visualize reconstruction of a sequence
        """
        X_scaled = self.scaler.transform(
            X[idx].reshape(-1, self.n_features)
        ).reshape(self.timesteps, self.n_features)
        
        reconstruction = self.model.predict(
            X_scaled.reshape(1, self.timesteps, self.n_features),
            verbose=0
        )[0]
        
        fig, axes = plt.subplots(self.n_features, 1, figsize=(12, 3*self.n_features))
        if self.n_features == 1:
            axes = [axes]
        
        for i, ax in enumerate(axes):
            ax.plot(X_scaled[:, i], label='Original', marker='o')
            ax.plot(reconstruction[:, i], label='Reconstruction', marker='x')
            ax.set_title(f'Feature {i}')
            ax.legend()
            ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# Example with time series data
np.random.seed(42)

# Generate synthetic time series
def generate_timeseries(n_sequences=1000, timesteps=50, n_features=3):
    """
    Generate time series with sinusoidal patterns
    """
    X = []
    for _ in range(n_sequences):
        # Sinusoidal pattern with noise
        t = np.linspace(0, 4*np.pi, timesteps)
        series = []
        for f in range(n_features):
            freq = np.random.uniform(0.5, 2.0)
            phase = np.random.uniform(0, 2*np.pi)
            amplitude = np.random.uniform(0.5, 2.0)
            noise = np.random.normal(0, 0.1, timesteps)
            
            signal = amplitude * np.sin(freq * t + phase) + noise
            series.append(signal)
        
        X.append(np.array(series).T)
    
    return np.array(X)

# Training data (normal)
X_train_ts = generate_timeseries(n_sequences=800, timesteps=50, n_features=3)

# Test data (normal + anomalies)
X_test_normal = generate_timeseries(n_sequences=150, timesteps=50, n_features=3)

# Anomalies: series with different pattern
X_test_anomalies = np.random.uniform(-3, 3, (50, 50, 3))

X_test_ts = np.vstack([X_test_normal, X_test_anomalies])
y_test_ts = np.hstack([np.zeros(150), np.ones(50)])

# Training
lstm_detector = LSTMAnomalyDetector(
    timesteps=50,
    n_features=3,
    lstm_units=[128, 64],
    dropout_rate=0.2
)
lstm_detector.compile(learning_rate=0.001)

print("Training LSTM...")
lstm_detector.fit(X_train_ts, epochs=50, batch_size=32, verbose=1)

# Prediction
predictions_ts = lstm_detector.predict(X_test_ts)
errors_ts = lstm_detector.reconstruction_error(X_test_ts)

# Evaluation
print("\nLSTM Performance:")
print(classification_report(y_test_ts, (predictions_ts == -1).astype(int),
                           target_names=['Normal', 'Anomaly']))

# Visualization of a reconstruction
lstm_detector.plot_reconstruction(X_test_ts, idx=0)

GAN for Anomaly Detection

GAN-based Anomaly Detection Architecture:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class AnomalyGAN:
    """
    Generative Adversarial Network for anomaly detection
    """
    def __init__(self, input_dim: int, latent_dim: int = 100):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        
        self.generator = self._build_generator()
        self.discriminator = self._build_discriminator()
        self.gan = self._build_gan()
    
    def _build_generator(self):
        """
        Generator: latent -> data
        """
        model = Sequential([
            layers.Dense(128, activation='relu', input_dim=self.latent_dim),
            layers.BatchNormalization(),
            layers.Dense(256, activation='relu'),
            layers.BatchNormalization(),
            layers.Dense(self.input_dim, activation='tanh')
        ], name='generator')
        return model
    
    def _build_discriminator(self):
        """
        Discriminator: data -> probability
        """
        model = Sequential([
            layers.Dense(256, activation='relu', input_dim=self.input_dim),
            layers.Dropout(0.3),
            layers.Dense(128, activation='relu'),
            layers.Dropout(0.3),
            layers.Dense(1, activation='sigmoid')
        ], name='discriminator')
        
        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        return model
    
    def _build_gan(self):
        """
        Complete GAN
        """
        self.discriminator.trainable = False
        
        model = Sequential([
            self.generator,
            self.discriminator
        ], name='gan')
        
        model.compile(optimizer='adam', loss='binary_crossentropy')
        return model
    
    def train(self, X_train: np.ndarray, epochs: int = 100, batch_size: int = 128):
        """
        Train the GAN
        """
        for epoch in range(epochs):
            # Select real batch
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            real_data = X_train[idx]
            
            # Generate fake data
            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
            fake_data = self.generator.predict(noise, verbose=0)
            
            # Train discriminator
            d_loss_real = self.discriminator.train_on_batch(
                real_data, np.ones((batch_size, 1))
            )
            d_loss_fake = self.discriminator.train_on_batch(
                fake_data, np.zeros((batch_size, 1))
            )
            
            # Train generator
            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
            g_loss = self.gan.train_on_batch(noise, np.ones((batch_size, 1)))
            
            if epoch % 10 == 0:
                print(f"Epoch {epoch}: D_loss={d_loss_real[0]:.4f}, "
                      f"G_loss={g_loss:.4f}")
    
    def anomaly_score(self, X: np.ndarray) -> np.ndarray:
        """
        Compute anomaly score based on discriminator
        """
        # Discriminator score (probability of being real)
        scores = self.discriminator.predict(X, verbose=0)
        # Inversion: low score = likely anomaly
        return 1 - scores.flatten()

Advanced Evaluation Metrics

Metrics for Imbalanced Data

In anomaly detection problems, classes are heavily imbalanced (anomalies typically <5%). Accuracy is therefore a misleading metric.

1. Precision, Recall, F1-Score

Precision = True Positives / (True Positives + False Positives)

  • Proportion of correctly identified anomalies among all anomaly predictions

Recall = True Positives / (True Positives + False Negatives)

  • Proportion of actual anomalies that were detected

F1-Score = 2 × (Precision × Recall) / (Precision + Recall)

  • Harmonic mean of precision and recall

Business Interpretation:

  • Precision: Among triggered alerts, what proportion is legitimate? (Cost of false positives)
  • Recall: Among true anomalies, what proportion is detected? (Cost of false negatives)

2. ROC-AUC and PR-AUC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
from sklearn.metrics import (
    roc_auc_score, average_precision_score, 
    precision_recall_curve, roc_curve
)
import matplotlib.pyplot as plt

class AnomalyMetricsEvaluator:
    """
    Complete metrics evaluator for anomaly detection
    """
    def __init__(self, y_true: np.ndarray, y_scores: np.ndarray, threshold: float = None):
        """
        Args:
            y_true: True labels (0=normal, 1=anomaly)
            y_scores: Anomaly scores (higher = more abnormal)
            threshold: Decision threshold (auto-calculated if None)
        """
        self.y_true = y_true
        self.y_scores = y_scores
        self.threshold = threshold or self._find_optimal_threshold()
        self.y_pred = (y_scores >= self.threshold).astype(int)
    
    def _find_optimal_threshold(self) -> float:
        """
        Find optimal threshold via F1-Score
        """
        precision, recall, thresholds = precision_recall_curve(self.y_true, self.y_scores)
        f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)
        optimal_idx = np.argmax(f1_scores)
        return thresholds[optimal_idx] if optimal_idx < len(thresholds) else thresholds[-1]
    
    def compute_metrics(self) -> dict:
        """
        Compute all important metrics
        """
        from sklearn.metrics import (
            precision_score, recall_score, f1_score, 
            accuracy_score, matthews_corrcoef, cohen_kappa_score
        )
        
        metrics = {
            'accuracy': accuracy_score(self.y_true, self.y_pred),
            'precision': precision_score(self.y_true, self.y_pred, zero_division=0),
            'recall': recall_score(self.y_true, self.y_pred, zero_division=0),
            'f1_score': f1_score(self.y_true, self.y_pred, zero_division=0),
            'roc_auc': roc_auc_score(self.y_true, self.y_scores),
            'pr_auc': average_precision_score(self.y_true, self.y_scores),
            'mcc': matthews_corrcoef(self.y_true, self.y_pred),
            'cohen_kappa': cohen_kappa_score(self.y_true, self.y_pred),
            'threshold': self.threshold
        }
        
        return metrics
    
    def compute_confusion_matrix_costs(
        self, 
        cost_fp: float = 1.0, 
        cost_fn: float = 10.0
    ) -> dict:
        """
        Compute business cost of detection
        
        Args:
            cost_fp: Cost of a false positive (useless alert)
            cost_fn: Cost of a false negative (missed anomaly)
        """
        from sklearn.metrics import confusion_matrix
        
        tn, fp, fn, tp = confusion_matrix(self.y_true, self.y_pred).ravel()
        
        total_cost = cost_fp * fp + cost_fn * fn
        avg_cost_per_sample = total_cost / len(self.y_true)
        
        return {
            'true_negatives': int(tn),
            'false_positives': int(fp),
            'false_negatives': int(fn),
            'true_positives': int(tp),
            'total_cost': total_cost,
            'avg_cost_per_sample': avg_cost_per_sample,
            'cost_fp': cost_fp,
            'cost_fn': cost_fn
        }
    
    def plot_roc_curve(self, ax=None):
        """
        Plot ROC curve
        """
        if ax is None:
            fig, ax = plt.subplots(figsize=(8, 6))
        
        fpr, tpr, _ = roc_curve(self.y_true, self.y_scores)
        roc_auc = roc_auc_score(self.y_true, self.y_scores)
        
        ax.plot(fpr, tpr, label=f'ROC (AUC = {roc_auc:.3f})', linewidth=2)
        ax.plot([0, 1], [0, 1], 'k--', label='Random')
        ax.set_xlabel('False Positive Rate')
        ax.set_ylabel('True Positive Rate')
        ax.set_title('ROC Curve')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    def plot_pr_curve(self, ax=None):
        """
        Plot Precision-Recall curve
        """
        if ax is None:
            fig, ax = plt.subplots(figsize=(8, 6))
        
        precision, recall, _ = precision_recall_curve(self.y_true, self.y_scores)
        pr_auc = average_precision_score(self.y_true, self.y_scores)
        
        ax.plot(recall, precision, label=f'PR (AUC = {pr_auc:.3f})', linewidth=2)
        baseline = np.sum(self.y_true) / len(self.y_true)
        ax.axhline(baseline, color='k', linestyle='--', label=f'Baseline ({baseline:.3f})')
        ax.set_xlabel('Recall')
        ax.set_ylabel('Precision')
        ax.set_title('Precision-Recall Curve')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    def plot_score_distribution(self, ax=None):
        """
        Visualize score distribution
        """
        if ax is None:
            fig, ax = plt.subplots(figsize=(10, 6))
        
        normal_scores = self.y_scores[self.y_true == 0]
        anomaly_scores = self.y_scores[self.y_true == 1]
        
        ax.hist(normal_scores, bins=50, alpha=0.7, label='Normal', density=True)
        ax.hist(anomaly_scores, bins=50, alpha=0.7, label='Anomaly', density=True)
        ax.axvline(self.threshold, color='red', linestyle='--', 
                  linewidth=2, label=f'Threshold ({self.threshold:.3f})')
        ax.set_xlabel('Anomaly Score')
        ax.set_ylabel('Density')
        ax.set_title('Anomaly Score Distribution')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    def plot_threshold_impact(self):
        """
        Show threshold impact on metrics
        """
        thresholds = np.linspace(self.y_scores.min(), self.y_scores.max(), 100)
        precisions, recalls, f1_scores = [], [], []
        
        for thresh in thresholds:
            y_pred_temp = (self.y_scores >= thresh).astype(int)
            
            from sklearn.metrics import precision_score, recall_score, f1_score
            precisions.append(precision_score(self.y_true, y_pred_temp, zero_division=0))
            recalls.append(recall_score(self.y_true, y_pred_temp, zero_division=0))
            f1_scores.append(f1_score(self.y_true, y_pred_temp, zero_division=0))
        
        fig, ax = plt.subplots(figsize=(12, 6))
        ax.plot(thresholds, precisions, label='Precision', linewidth=2)
        ax.plot(thresholds, recalls, label='Recall', linewidth=2)
        ax.plot(thresholds, f1_scores, label='F1-Score', linewidth=2)
        ax.axvline(self.threshold, color='red', linestyle='--', 
                  label=f'Selected Threshold ({self.threshold:.3f})')
        ax.set_xlabel('Threshold')
        ax.set_ylabel('Score')
        ax.set_title('Threshold Impact on Metrics')
        ax.legend()
        ax.grid(True, alpha=0.3)
        plt.tight_layout()
    
    def generate_full_report(self, save_path: str = None):
        """
        Generate complete report with all visualizations
        """
        fig = plt.figure(figsize=(16, 12))
        
        # ROC Curve
        ax1 = plt.subplot(2, 3, 1)
        self.plot_roc_curve(ax1)
        
        # PR Curve
        ax2 = plt.subplot(2, 3, 2)
        self.plot_pr_curve(ax2)
        
        # Score Distribution
        ax3 = plt.subplot(2, 3, 3)
        self.plot_score_distribution(ax3)
        
        # Confusion Matrix
        from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
        ax4 = plt.subplot(2, 3, 4)
        cm = confusion_matrix(self.y_true, self.y_pred)
        ConfusionMatrixDisplay(cm, display_labels=['Normal', 'Anomaly']).plot(ax=ax4)
        ax4.set_title('Confusion Matrix')
        
        # Threshold Impact
        ax5 = plt.subplot(2, 3, (5, 6))
        thresholds = np.linspace(self.y_scores.min(), self.y_scores.max(), 100)
        precisions, recalls, f1_scores = [], [], []
        
        for thresh in thresholds:
            y_pred_temp = (self.y_scores >= thresh).astype(int)
            from sklearn.metrics import precision_score, recall_score, f1_score
            precisions.append(precision_score(self.y_true, y_pred_temp, zero_division=0))
            recalls.append(recall_score(self.y_true, y_pred_temp, zero_division=0))
            f1_scores.append(f1_score(self.y_true, y_pred_temp, zero_division=0))
        
        ax5.plot(thresholds, precisions, label='Precision', linewidth=2)
        ax5.plot(thresholds, recalls, label='Recall', linewidth=2)
        ax5.plot(thresholds, f1_scores, label='F1-Score', linewidth=2)
        ax5.axvline(self.threshold, color='red', linestyle='--', 
                   label=f'Selected Threshold')
        ax5.set_xlabel('Threshold')
        ax5.set_ylabel('Score')
        ax5.set_title('Threshold Impact')
        ax5.legend()
        ax5.grid(True, alpha=0.3)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        
        plt.show()
        
        # Text metrics
        metrics = self.compute_metrics()
        costs = self.compute_confusion_matrix_costs()
        
        print("\n" + "="*60)
        print("ANOMALY DETECTION EVALUATION REPORT")
        print("="*60)
        print(f"\n📊 PERFORMANCE METRICS:")
        print(f"  • Accuracy      : {metrics['accuracy']:.4f}")
        print(f"  • Precision     : {metrics['precision']:.4f}")
        print(f"  • Recall        : {metrics['recall']:.4f}")
        print(f"  • F1-Score      : {metrics['f1_score']:.4f}")
        print(f"  • ROC-AUC       : {metrics['roc_auc']:.4f}")
        print(f"  • PR-AUC        : {metrics['pr_auc']:.4f}")
        print(f"  • MCC           : {metrics['mcc']:.4f}")
        print(f"  • Cohen's Kappa : {metrics['cohen_kappa']:.4f}")
        
        print(f"\n🎯 CONFUSION MATRIX:")
        print(f"  • True Positives  : {costs['true_positives']}")
        print(f"  • True Negatives  : {costs['true_negatives']}")
        print(f"  • False Positives : {costs['false_positives']}")
        print(f"  • False Negatives : {costs['false_negatives']}")
        
        print(f"\n💰 COST ANALYSIS:")
        print(f"  • Total Cost        : {costs['total_cost']:.2f}")
        print(f"  • Avg Cost/Sample   : {costs['avg_cost_per_sample']:.4f}")
        
        print(f"\n⚙️  CONFIGURATION:")
        print(f"  • Optimal Threshold : {self.threshold:.4f}")
        print(f"  • Total Samples     : {len(self.y_true)}")
        print(f"  • Anomaly Rate      : {(self.y_true.sum() / len(self.y_true)):.2%}")
        print("="*60 + "\n")

# Complete usage example
evaluator = AnomalyMetricsEvaluator(
    y_true=y_test,
    y_scores=probas  # Or scores from detector
)

# Complete report
metrics = evaluator.compute_metrics()
costs = evaluator.compute_confusion_matrix_costs(cost_fp=1.0, cost_fn=20.0)
evaluator.generate_full_report(save_path='anomaly_detection_report.png')

Visualization of Anomalies

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import matplotlib.pyplot as plt
import seaborn as sns

def plot_anomalies(X, y_pred, title="Anomaly Detection"):
    """
    Visualize detected anomalies
    """
    plt.figure(figsize=(12, 6))
    
    # Normal points
    normal_mask = y_pred == 1
    plt.scatter(X[normal_mask, 0], X[normal_mask, 1], 
                c='blue', label='Normal', alpha=0.5)
    
    # Anomalies
    anomaly_mask = y_pred == -1
    plt.scatter(X[anomaly_mask, 0], X[anomaly_mask, 1], 
                c='red', label='Anomaly', alpha=0.8, marker='x', s=100)
    
    plt.title(title)
    plt.xlabel('Feature 1')
    plt.ylabel('Feature 2')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

# Usage example
clf = IsolationForest(contamination=0.1, random_state=42)
y_pred = clf.fit_predict(X_train)
plot_anomalies(X_train, y_pred)

Evaluation Metrics

Metrics for Imbalanced Data

In anomaly detection problems, classes are heavily imbalanced (anomalies typically <5%). Accuracy is therefore a misleading metric.

Use Cases

1. Banking Fraud Detection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest

def detect_fraud(transactions):
    """
    Detect fraudulent transactions
    """
    # Normalization
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(transactions)
    
    # Detection
    clf = IsolationForest(contamination=0.01, random_state=42)
    predictions = clf.fit_predict(X_scaled)
    
    # -1 for anomaly, 1 for normal
    fraud_mask = predictions == -1
    return fraud_mask

2. Network Intrusion Detection

1
2
3
4
5
6
7
8
9
10
11
12
13
def detect_network_intrusion(network_data):
    """
    Detect network intrusions
    """
    # Select important features
    features = ['packet_size', 'connection_duration', 'bytes_transferred']
    X = network_data[features]
    
    # One-Class SVM
    svm = OneClassSVM(kernel='rbf', gamma='scale', nu=0.05)
    predictions = svm.fit_predict(X)
    
    return predictions == -1

3. Industrial Equipment Monitoring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def monitor_equipment(sensor_data, window_size=50):
    """
    Monitor equipment status via sensors
    """
    # Autoencoder for time series
    autoencoder = build_lstm_autoencoder(timesteps=window_size, 
                                         n_features=sensor_data.shape[1])
    
    # Train on normal data
    autoencoder.fit(sensor_data, sensor_data, epochs=100, verbose=0)
    
    # Anomaly detection
    reconstructions = autoencoder.predict(sensor_data)
    mse = np.mean(np.power(sensor_data - reconstructions, 2), axis=1)
    
    threshold = np.percentile(mse, 99)
    return mse > threshold

Best Practices

1. Data Preprocessing

  • Normalization: Standardize features to avoid bias
  • Missing values: Imputation or removal
  • Feature engineering: Create relevant features

2. Model Selection

  • Tabular data: Isolation Forest, One-Class SVM, LOF
  • Time series: LSTM Autoencoder, Prophet
  • Images: CNN Autoencoder, VAE
  • Unsupervised data: Clustering (DBSCAN, K-Means)

3. Threshold Definition

1
2
3
4
5
6
def find_optimal_threshold(scores, percentile=95):
    """
    Find optimal threshold for classification
    """
    threshold = np.percentile(scores, percentile)
    return threshold

4. Handling Imbalance

Anomalies are often rare (1-5% of data). Use:

  • Contamination: Parameter to specify expected anomaly rate
  • Adapted metrics: Prefer F1-Score and ROC-AUC over accuracy

Challenges and Limitations

1. False Positives

Too many alerts can overwhelm operators. Optimize detection threshold.

2. Adaptation to New Patterns

Models must be regularly retrained to adapt to changes.

3. Interpretability

Deep learning models are often hard to interpret. Use techniques like SHAP for explainability.

4. Imbalanced Data

Anomalies are rare by nature. Use data augmentation techniques or robust models.

Complete Project: Transaction Anomaly Detection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt

class AnomalyDetector:
    def __init__(self, contamination=0.1):
        self.contamination = contamination
        self.scaler = StandardScaler()
        self.model = None
        
    def preprocess(self, data):
        """Data preprocessing"""
        # Normalization
        data_scaled = self.scaler.fit_transform(data)
        return data_scaled
    
    def train(self, X_train):
        """Train the model"""
        X_scaled = self.preprocess(X_train)
        self.model = IsolationForest(
            contamination=self.contamination,
            random_state=42,
            n_estimators=100
        )
        self.model.fit(X_scaled)
        
    def predict(self, X):
        """Predict anomalies"""
        X_scaled = self.scaler.transform(X)
        predictions = self.model.predict(X_scaled)
        return predictions
    
    def get_anomaly_scores(self, X):
        """Get anomaly scores"""
        X_scaled = self.scaler.transform(X)
        scores = self.model.score_samples(X_scaled)
        return scores
    
    def evaluate(self, X_test, y_test):
        """Evaluate the model"""
        predictions = self.predict(X_test)
        # Convert: -1 -> 1 (anomaly), 1 -> 0 (normal)
        y_pred = (predictions == -1).astype(int)
        
        print(classification_report(y_test, y_pred, 
                                    target_names=['Normal', 'Anomaly']))

# Usage example
np.random.seed(42)
n_samples = 1000
n_features = 5

# Normal data
X_normal = np.random.randn(n_samples, n_features)

# Anomalies
n_anomalies = 50
X_anomalies = np.random.uniform(low=-5, high=5, size=(n_anomalies, n_features))

# Combine
X = np.vstack([X_normal, X_anomalies])
y = np.hstack([np.zeros(n_samples), np.ones(n_anomalies)])

# Split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=42
)

# Detector
detector = AnomalyDetector(contamination=0.1)
detector.train(X_train)

# Evaluation
detector.evaluate(X_test, y_test)

Additional Resources

Python Libraries

  • scikit-learn: Classical algorithms (Isolation Forest, One-Class SVM, LOF)
  • PyOD: Library dedicated to anomaly detection
  • TensorFlow/Keras: Deep learning (autoencoders, LSTM)
  • Prophet: Anomaly detection in time series

Installation

1
pip install scikit-learn pyod tensorflow pandas numpy matplotlib seaborn

Public Datasets

  • Credit Card Fraud: Kaggle
  • KDD Cup 99: Network intrusion detection
  • UNSW-NB15: Intrusion detection
  • NAB (Numenta Anomaly Benchmark): Time series

Conclusion

Anomaly detection is a crucial field of machine learning with numerous practical applications. The choice of method depends on data type, business context, and performance constraints.

Key takeaways:

  • Understand business context to define what an anomaly is
  • Choose algorithm adapted to data type
  • Optimize detection threshold to balance precision and recall
  • Regularly validate and monitor performance
  • Maintain sufficient interpretability for operational action

Anomaly detection continues to evolve with emerging deep learning techniques and hybrid methods, offering increasingly powerful solutions adapted to specific needs of each domain.

This post is licensed under CC BY 4.0 by the author.