Anomaly Detection
This notebook demonstrates ChronosVector (CVX) as a trajectory-based anomaly detector on the Numenta Anomaly Benchmark, the standard benchmark for streaming anomaly detection.
Approach
Section titled “Approach”Instead of treating each time series as a sequence of scalars, we embed it into a delay-coordinate space (Takens embedding) and index the resulting trajectory in CVX. Anomalies then correspond to geometric events in embedding space:
| Signal | CVX Function | Anomaly Interpretation |
|---|---|---|
| Velocity spike | cvx.velocity() | Abrupt state transition |
| Anchor distance | cvx.project_to_anchors() | Departure from normal regime |
| Change point | cvx.detect_changepoints() | Structural break in dynamics |
| Topological shift | cvx.topological_features() | Attractor deformation |
| Hurst exponent | cvx.hurst_exponent() | Persistence characterization |
NAB Baselines (standard scoring profile)
Section titled “NAB Baselines (standard scoring profile)”| Method | NAB Score |
|---|---|
| HTM (Numenta) | 65.7 |
| Random Cut Forest | 58.7 |
| Windowed Gaussian | 39.6 |
| Twitter ADVec | 33.6 |
import chronos_vector as cvximport numpy as npimport pandas as pdimport plotly.graph_objects as goimport plotly.express as pxfrom plotly.subplots import make_subplotsimport json, os, time, warnings, zipfile, shutilfrom pathlib import Pathfrom datetime import datetimewarnings.filterwarnings('ignore')
DATA_DIR = Path('../data')CACHE_DIR = DATA_DIR / 'cache'CACHE_DIR.mkdir(parents=True, exist_ok=True)NAB_DIR = DATA_DIR / 'NAB'INDEX_PATH = str(CACHE_DIR / 'nab_index.cvx')
# Delay embedding parametersW = 20 # window width = embedding dimension
# Color schemeC_NORMAL = '#3498db'C_ANOMALY = '#e74c3c'C_DETECTED = '#f39c12'
# NAB categories to evaluateCATEGORIES = ['realKnownCause', 'realAWSCloudwatch', 'realTraffic']
print(f'CVX loaded: {cvx.TemporalIndex}')print(f'Window width W={W}')CVX loaded: <class 'builtins.TemporalIndex'>Window width W=201. Data Loading
Section titled “1. Data Loading”Download the NAB dataset from GitHub. It contains 58 time series organized by category,
each a CSV with timestamp and value columns. Ground truth labels are in
labels/combined_labels.json (point anomalies) and labels/combined_windows.json
(anomaly windows).
We focus on three categories with labeled anomalies:
- realKnownCause (7 series) — known root causes (CPU, temperature, etc.)
- realAWSCloudwatch (17 series) — AWS server metrics
- realTraffic (7 series) — traffic volume data
# Download NAB dataset if not presentif not NAB_DIR.exists(): import urllib.request url = 'https://github.com/numenta/NAB/archive/refs/heads/master.zip' zip_path = DATA_DIR / 'nab_master.zip' print('Downloading NAB dataset...') urllib.request.urlretrieve(url, zip_path) print('Extracting...') with zipfile.ZipFile(zip_path, 'r') as z: z.extractall(DATA_DIR) (DATA_DIR / 'NAB-master').rename(NAB_DIR) zip_path.unlink() print('Done.')else: print(f'NAB dataset found at {NAB_DIR}')
# Load ground truth labelswith open(NAB_DIR / 'labels' / 'combined_labels.json') as f: point_labels = json.load(f)
with open(NAB_DIR / 'labels' / 'combined_windows.json') as f: window_labels = json.load(f)
print(f'Label files: {len(point_labels)} series with point labels, ' f'{len(window_labels)} with window labels')NAB dataset found at ../data/NABLabel files: 58 series with point labels, 58 with window labelsdef load_nab_series(category: str) -> dict: """Load all time series in a NAB category.
Returns dict: series_name -> DataFrame with columns [timestamp, value, unix_ts]. """ cat_dir = NAB_DIR / 'data' / category series = {} for csv_path in sorted(cat_dir.glob('*.csv')): df = pd.read_csv(csv_path) df['timestamp'] = pd.to_datetime(df['timestamp']) df['unix_ts'] = (df['timestamp'].astype(np.int64) // 10**9).astype(np.int64) name = f'{category}/{csv_path.stem}' series[name] = df return series
def get_anomaly_windows(series_name: str) -> list: """Get ground truth anomaly windows for a series.
Returns list of (start_ts, end_ts) unix timestamp tuples. """ key = f'{series_name}.csv' windows = window_labels.get(key, []) result = [] for w in windows: start = int(pd.Timestamp(w[0]).timestamp()) end = int(pd.Timestamp(w[1]).timestamp()) result.append((start, end)) return result
def get_anomaly_points(series_name: str) -> set: """Get ground truth anomaly point timestamps.""" key = f'{series_name}.csv' points = point_labels.get(key, []) return {int(pd.Timestamp(p).timestamp()) for p in points}
# Load all selected categoriesall_series = {}for cat in CATEGORIES: cat_series = load_nab_series(cat) all_series.update(cat_series) print(f'{cat}: {len(cat_series)} series')
print(f'\nTotal: {len(all_series)} time series')
# Summary tablesummary_rows = []for name, df in all_series.items(): windows = get_anomaly_windows(name) points = get_anomaly_points(name) summary_rows.append({ 'series': name, 'n_points': len(df), 'n_anomaly_windows': len(windows), 'n_anomaly_points': len(points), })
df_summary = pd.DataFrame(summary_rows)print(f'\nPoints per series: {df_summary["n_points"].min()}-{df_summary["n_points"].max()}')print(f'Total anomaly windows: {df_summary["n_anomaly_windows"].sum()}')df_summary.head(10)realKnownCause: 7 seriesrealAWSCloudwatch: 17 seriesrealTraffic: 7 series
Total: 31 time series
Points per series: 1127-22695Total anomaly windows: 63.dataframe tbody tr th { vertical-align: top;}
.dataframe thead th { text-align: right;}| series | n_points | n_anomaly_windows | n_anomaly_points | |
|---|---|---|---|---|
| 0 | realKnownCause/ambient_temperature_system_failure | 7267 | 2 | 2 |
| 1 | realKnownCause/cpu_utilization_asg_misconfigur... | 18050 | 1 | 2 |
| 2 | realKnownCause/ec2_request_latency_system_failure | 4032 | 3 | 3 |
| 3 | realKnownCause/machine_temperature_system_failure | 22695 | 4 | 4 |
| 4 | realKnownCause/nyc_taxi | 10320 | 5 | 5 |
| 5 | realKnownCause/rogue_agent_key_hold | 1882 | 2 | 2 |
| 6 | realKnownCause/rogue_agent_key_updown | 5315 | 2 | 2 |
| 7 | realAWSCloudwatch/ec2_cpu_utilization_24ae8d | 4032 | 2 | 2 |
| 8 | realAWSCloudwatch/ec2_cpu_utilization_53ea38 | 4032 | 2 | 2 |
| 9 | realAWSCloudwatch/ec2_cpu_utilization_5f5533 | 4032 | 2 | 2 |
2. Delay Embedding and CVX Index
Section titled “2. Delay Embedding and CVX Index”For each time series, we construct a delay embedding (Takens, 1981): a sliding window of W=20 consecutive values becomes a single D=20 vector.
Each entity in the CVX index represents one time series. The resulting trajectory lives on an attractor in R^20 — anomalies correspond to departures from this attractor.
We normalize each series to zero mean / unit variance before embedding, so that CVX distance metrics are comparable across series.
def delay_embed(values: np.ndarray, timestamps: np.ndarray, w: int = W): """Create delay embedding from a 1D time series.
Args: values: 1D array of normalized values. timestamps: Unix timestamps (same length as values). w: Window width (embedding dimension).
Returns: vectors: (N-w+1, w) array of delay-embedded windows. window_ts: timestamps aligned to each window (uses last timestamp in window). """ n = len(values) vectors = np.lib.stride_tricks.sliding_window_view(values, w).astype(np.float32) window_ts = timestamps[w - 1:] # align to end of each window return vectors, window_ts
# Build or load indexif os.path.exists(INDEX_PATH): t0 = time.perf_counter() index = cvx.TemporalIndex.load(INDEX_PATH) print(f'Index loaded from cache in {time.perf_counter() - t0:.2f}s ({len(index):,} points)')else: print('Building CVX index from delay embeddings...') index = cvx.TemporalIndex(m=16, ef_construction=200, ef_search=50)
entity_map = {} # series_name -> entity_id total_inserted = 0
for i, (name, df) in enumerate(sorted(all_series.items())): entity_id = i + 1 entity_map[name] = entity_id
# Normalize values = df['value'].values.astype(np.float64) mu, sigma = values.mean(), values.std() if sigma < 1e-10: sigma = 1.0 values_norm = ((values - mu) / sigma).astype(np.float32)
# Delay embed vectors, window_ts = delay_embed(values_norm, df['unix_ts'].values)
# Bulk insert entity_ids = np.full(len(vectors), entity_id, dtype=np.uint64) timestamps = window_ts.astype(np.int64) n = index.bulk_insert(entity_ids, timestamps, vectors, ef_construction=100) total_inserted += n
print(f'Inserted {total_inserted:,} delay-embedded windows from {len(entity_map)} series')
# Save cache index.save(INDEX_PATH) print(f'Index saved to {INDEX_PATH}')
# Also save entity map with open(str(CACHE_DIR / 'nab_entity_map.json'), 'w') as f: json.dump(entity_map, f)
# Load or rebuild entity mapentity_map_path = CACHE_DIR / 'nab_entity_map.json'if entity_map_path.exists(): with open(entity_map_path) as f: entity_map = json.load(f)else: entity_map = {name: i + 1 for i, name in enumerate(sorted(all_series.keys()))}
print(f'\nEntity map: {len(entity_map)} series')print(f'Index size: {len(index):,} points')Building CVX index from delay embeddings...Inserted 152,376 delay-embedded windows from 31 seriesIndex saved to ../data/cache/nab_index.cvx
Entity map: 31 seriesIndex size: 152,376 points3. Trajectory-Based Anomaly Detection
Section titled “3. Trajectory-Based Anomaly Detection”For each series, we compute anomaly scores using CVX trajectory analytics:
- Velocity:
cvx.velocity(trajectory, timestamp)— magnitude of the instantaneous velocity vector. Spikes indicate abrupt state transitions. - Drift:
cvx.drift(v_t, v_{t+1})— displacement between consecutive windows. Both L2 magnitude and cosine drift are tracked. - Hurst exponent:
cvx.hurst_exponent(trajectory)— global persistence characterization. H > 0.5 = persistent (trending), H < 0.5 = anti-persistent (mean-reverting).
def compute_trajectory_scores(entity_id: int, name: str) -> pd.DataFrame: """Compute velocity and drift anomaly scores for a single series.
Returns DataFrame with columns: timestamp, velocity_mag, drift_l2, drift_cos. """ traj = index.trajectory(entity_id=entity_id) if len(traj) < 5: return pd.DataFrame()
rows = [] for i in range(1, len(traj) - 1): ts = traj[i][0]
# Velocity at this timestamp try: vel = cvx.velocity(traj, timestamp=ts) vel_mag = float(np.linalg.norm(vel)) except Exception: vel_mag = 0.0
# Drift from previous to current l2_mag, cos_drift, _ = cvx.drift(traj[i - 1][1], traj[i][1], top_n=3)
rows.append({ 'timestamp': ts, 'velocity_mag': vel_mag, 'drift_l2': l2_mag, 'drift_cos': cos_drift, })
return pd.DataFrame(rows)
# Compute for all seriesprint('Computing trajectory-based anomaly scores...')t0 = time.perf_counter()
trajectory_scores = {}hurst_values = {}
for name, eid in sorted(entity_map.items()): scores = compute_trajectory_scores(eid, name) if len(scores) > 0: trajectory_scores[name] = scores
# Hurst exponent for global characterization traj = index.trajectory(entity_id=eid) if len(traj) >= 20: try: h = cvx.hurst_exponent(traj) hurst_values[name] = float(h) except Exception: hurst_values[name] = 0.5
elapsed = time.perf_counter() - t0print(f'Computed scores for {len(trajectory_scores)} series in {elapsed:.1f}s')print(f'Hurst exponents: min={min(hurst_values.values()):.3f}, ' f'max={max(hurst_values.values()):.3f}, ' f'mean={np.mean(list(hurst_values.values())):.3f}')Computing trajectory-based anomaly scores...Computed scores for 31 series in 301.4sHurst exponents: min=0.756, max=1.000, mean=0.869# Visualize: pick one representative series from realKnownCausedemo_name = [n for n in trajectory_scores if 'realKnownCause' in n][0]demo_df = all_series[demo_name]demo_scores = trajectory_scores[demo_name]demo_windows = get_anomaly_windows(demo_name)
fig = make_subplots( rows=3, cols=1, shared_xaxes=True, subplot_titles=['Original Series', 'Velocity Magnitude', 'Drift (L2)'], vertical_spacing=0.06,)
# Original seriesfig.add_trace(go.Scatter( x=demo_df['timestamp'], y=demo_df['value'], mode='lines', name='Value', line=dict(color=C_NORMAL, width=1),), row=1, col=1)
# Highlight anomaly windowsfor ws, we in demo_windows: ws_dt = pd.Timestamp(ws, unit='s') we_dt = pd.Timestamp(we, unit='s') for row in range(1, 4): fig.add_vrect( x0=ws_dt, x1=we_dt, row=row, col=1, fillcolor=C_ANOMALY, opacity=0.15, line_width=0, )
# Velocity magnitudets_dt = pd.to_datetime(demo_scores['timestamp'], unit='s')fig.add_trace(go.Scatter( x=ts_dt, y=demo_scores['velocity_mag'], mode='lines', name='Velocity', line=dict(color=C_DETECTED, width=1),), row=2, col=1)
# Drift L2fig.add_trace(go.Scatter( x=ts_dt, y=demo_scores['drift_l2'], mode='lines', name='Drift L2', line=dict(color=C_ANOMALY, width=1),), row=3, col=1)
fig.update_layout( title=f'Trajectory-Based Anomaly Detection: {demo_name}', width=1000, height=700, template='plotly_dark', showlegend=False,)fig.show()4. Anchor-Based Detection
Section titled “4. Anchor-Based Detection”For each series, define a normal anchor from the first 20% of data (assumed normal).
Use cvx.project_to_anchors() to compute cosine distance from this anchor at every
timestamp. Anomalies are detected when distance exceeds an adaptive threshold
(mean + k * std of the anchor distances).
def compute_anchor_scores(entity_id: int, k_threshold: float = 3.0) -> dict: """Compute anchor-based anomaly scores for a single series.
The normal anchor is the centroid of the first 20% of the trajectory. Anomaly = cosine distance from normal anchor exceeds mean + k * std.
Returns dict with timestamps, distances, detections, and threshold. """ traj = index.trajectory(entity_id=entity_id) if len(traj) < 10: return None
# Normal anchor: centroid of first 20% windows n_normal = max(5, int(len(traj) * 0.2)) normal_vectors = np.array([v for _, v in traj[:n_normal]]) normal_anchor = normal_vectors.mean(axis=0).tolist()
# Project full trajectory to anchor projected = cvx.project_to_anchors(traj, [normal_anchor], metric='cosine')
timestamps = [ts for ts, _ in projected] distances = [dists[0] for _, dists in projected]
# Adaptive threshold from the normal period normal_dists = distances[:n_normal] mu = np.mean(normal_dists) sigma = np.std(normal_dists) threshold = mu + k_threshold * max(sigma, 1e-6)
detections = [d > threshold for d in distances]
return { 'timestamps': timestamps, 'distances': distances, 'detections': detections, 'threshold': threshold, }
# Compute for all seriesprint('Computing anchor-based anomaly scores...')t0 = time.perf_counter()
anchor_scores = {}for name, eid in sorted(entity_map.items()): result = compute_anchor_scores(eid, k_threshold=3.0) if result is not None: anchor_scores[name] = result
elapsed = time.perf_counter() - t0print(f'Computed anchor scores for {len(anchor_scores)} series in {elapsed:.1f}s')Computing anchor-based anomaly scores...Computed anchor scores for 31 series in 0.4s# Visualize anchor-based detection on the demo seriesdemo_anchor = anchor_scores[demo_name]
fig = make_subplots( rows=2, cols=1, shared_xaxes=True, subplot_titles=['Original Series', 'Cosine Distance from Normal Anchor'], vertical_spacing=0.08,)
# Original seriesfig.add_trace(go.Scatter( x=demo_df['timestamp'], y=demo_df['value'], mode='lines', name='Value', line=dict(color=C_NORMAL, width=1),), row=1, col=1)
# Ground truth windowsfor ws, we in demo_windows: ws_dt = pd.Timestamp(ws, unit='s') we_dt = pd.Timestamp(we, unit='s') for row in [1, 2]: fig.add_vrect( x0=ws_dt, x1=we_dt, row=row, col=1, fillcolor=C_ANOMALY, opacity=0.15, line_width=0, )
# Anchor distancests_dt = pd.to_datetime(demo_anchor['timestamps'], unit='s')fig.add_trace(go.Scatter( x=ts_dt, y=demo_anchor['distances'], mode='lines', name='Distance to Normal', line=dict(color=C_DETECTED, width=1),), row=2, col=1)
# Threshold linefig.add_hline( y=demo_anchor['threshold'], row=2, col=1, line_dash='dash', line_color=C_ANOMALY, annotation_text=f'Threshold = {demo_anchor["threshold"]:.4f}',)
# Mark detected anomaliesdet_idx = [i for i, d in enumerate(demo_anchor['detections']) if d]if det_idx: fig.add_trace(go.Scatter( x=[ts_dt.iloc[i] for i in det_idx], y=[demo_anchor['distances'][i] for i in det_idx], mode='markers', name='Detected', marker=dict(color=C_ANOMALY, size=4, symbol='circle'), ), row=2, col=1)
fig.update_layout( title=f'Anchor-Based Detection: {demo_name}', width=1000, height=550, template='plotly_dark',)fig.show()5. Change Point Detection
Section titled “5. Change Point Detection”Apply cvx.detect_changepoints() (PELT algorithm) to each trajectory.
Compare detected changepoints with ground truth anomaly windows:
a changepoint counts as a true positive if it falls within an anomaly window.
def compute_changepoint_scores(entity_id: int, name: str, penalty: float = None) -> dict: """Detect changepoints and evaluate against ground truth.""" traj = index.trajectory(entity_id=entity_id) if len(traj) < 10: return None
changepoints = cvx.detect_changepoints( entity_id=entity_id, trajectory=traj, penalty=penalty, min_segment_len=5, )
# Get ground truth windows gt_windows = get_anomaly_windows(name)
# Evaluate: changepoint within an anomaly window = TP cp_timestamps = [ts for ts, _ in changepoints] cp_severities = [sev for _, sev in changepoints]
tp = 0 detected_windows = set() for cp_ts in cp_timestamps: for j, (ws, we) in enumerate(gt_windows): if ws <= cp_ts <= we: tp += 1 detected_windows.add(j) break
fp = len(cp_timestamps) - tp fn = len(gt_windows) - len(detected_windows)
return { 'timestamps': cp_timestamps, 'severities': cp_severities, 'tp': tp, 'fp': fp, 'fn': fn, 'n_changepoints': len(changepoints), 'n_gt_windows': len(gt_windows), }
# Compute for all seriesprint('Detecting changepoints...')t0 = time.perf_counter()
changepoint_results = {}for name, eid in sorted(entity_map.items()): result = compute_changepoint_scores(eid, name) if result is not None: changepoint_results[name] = result
elapsed = time.perf_counter() - t0print(f'Changepoints detected for {len(changepoint_results)} series in {elapsed:.1f}s')
# Aggregate precision/recalltotal_tp = sum(r['tp'] for r in changepoint_results.values())total_fp = sum(r['fp'] for r in changepoint_results.values())total_fn = sum(r['fn'] for r in changepoint_results.values())
cp_precision = total_tp / max(1, total_tp + total_fp)cp_recall = total_tp / max(1, total_tp + total_fn)cp_f1 = 2 * cp_precision * cp_recall / max(1e-8, cp_precision + cp_recall)
print(f'\nChangepoint detection (aggregate):')print(f' Precision: {cp_precision:.3f}')print(f' Recall: {cp_recall:.3f}')print(f' F1: {cp_f1:.3f}')print(f' Total changepoints: {total_tp + total_fp}, GT windows: {total_tp + total_fn}')Detecting changepoints...Changepoints detected for 31 series in 2.0s
Changepoint detection (aggregate): Precision: 0.000 Recall: 0.000 F1: 0.000 Total changepoints: 2498, GT windows: 63# Visualize changepoints on the demo seriesdemo_cp = changepoint_results[demo_name]
fig = go.Figure()fig.add_trace(go.Scatter( x=demo_df['timestamp'], y=demo_df['value'], mode='lines', name='Value', line=dict(color=C_NORMAL, width=1),))
# Ground truth windowsfor ws, we in demo_windows: fig.add_vrect( x0=pd.Timestamp(ws, unit='s'), x1=pd.Timestamp(we, unit='s'), fillcolor=C_ANOMALY, opacity=0.15, line_width=0, annotation_text='GT', annotation_position='top left', )
# Changepointsfor cp_ts, sev in zip(demo_cp['timestamps'], demo_cp['severities']): cp_dt = pd.Timestamp(cp_ts, unit='s') fig.add_vline( x=cp_dt, line_dash='dash', line_color=C_DETECTED, line_width=1, )
fig.update_layout( title=f'PELT Changepoints vs Ground Truth: {demo_name} ' f'(TP={demo_cp["tp"]}, FP={demo_cp["fp"]}, FN={demo_cp["fn"]})', xaxis_title='Timestamp', yaxis_title='Value', width=1000, height=400, template='plotly_dark',)fig.show()6. Topological Detection
Section titled “6. Topological Detection”Apply cvx.topological_features() on sliding windows of the delay-embedded trajectory.
Anomalies disrupt the attractor topology: the Betti numbers and persistence entropy
change when the system enters an anomalous state.
We track persistence entropy as a topological anomaly score — when the attractor structure fragments or collapses, entropy shifts.
def compute_topological_scores(entity_id: int, topo_window: int = 50, topo_stride: int = 10) -> dict: """Compute topological anomaly scores using sliding windows.
For each window of `topo_window` consecutive trajectory points, compute topological features and track how they change.
Returns dict with timestamps, persistence_entropy, n_components. """ traj = index.trajectory(entity_id=entity_id) if len(traj) < topo_window + topo_stride: return None
timestamps = [] entropy_scores = [] n_components_list = [] total_persistence_list = []
for start in range(0, len(traj) - topo_window, topo_stride): window = traj[start:start + topo_window] mid_ts = window[len(window) // 2][0]
points = [v for _, v in window] try: topo = cvx.topological_features(points, n_radii=15, persistence_threshold=0.05) timestamps.append(mid_ts) entropy_scores.append(float(topo['persistence_entropy'])) n_components_list.append(int(topo['n_components'])) total_persistence_list.append(float(topo['total_persistence'])) except Exception: continue
if len(timestamps) < 3: return None
return { 'timestamps': timestamps, 'persistence_entropy': entropy_scores, 'n_components': n_components_list, 'total_persistence': total_persistence_list, }
# Compute for all seriesprint('Computing topological features (sliding window)...')t0 = time.perf_counter()
topo_scores = {}for name, eid in sorted(entity_map.items()): result = compute_topological_scores(eid) if result is not None: topo_scores[name] = result
elapsed = time.perf_counter() - t0print(f'Topological scores for {len(topo_scores)} series in {elapsed:.1f}s')Computing topological features (sliding window)...Topological scores for 31 series in 0.8s# Visualize topological features on the demo seriesif demo_name in topo_scores: demo_topo = topo_scores[demo_name]
fig = make_subplots( rows=3, cols=1, shared_xaxes=True, subplot_titles=['Original Series', 'Persistence Entropy', 'Total Persistence (H0)'], vertical_spacing=0.06, )
fig.add_trace(go.Scatter( x=demo_df['timestamp'], y=demo_df['value'], mode='lines', name='Value', line=dict(color=C_NORMAL, width=1), ), row=1, col=1)
for ws, we in demo_windows: for row in [1, 2, 3]: fig.add_vrect( x0=pd.Timestamp(ws, unit='s'), x1=pd.Timestamp(we, unit='s'), row=row, col=1, fillcolor=C_ANOMALY, opacity=0.15, line_width=0, )
ts_dt = pd.to_datetime(demo_topo['timestamps'], unit='s') fig.add_trace(go.Scatter( x=ts_dt, y=demo_topo['persistence_entropy'], mode='lines+markers', name='Entropy', line=dict(color=C_DETECTED, width=2), marker=dict(size=3), ), row=2, col=1)
fig.add_trace(go.Scatter( x=ts_dt, y=demo_topo['total_persistence'], mode='lines+markers', name='Total Pers.', line=dict(color=C_ANOMALY, width=2), marker=dict(size=3), ), row=3, col=1)
fig.update_layout( title=f'Topological Detection: {demo_name}', width=1000, height=700, template='plotly_dark', showlegend=False, ) fig.show()else: print(f'Topological scores not available for {demo_name} (series too short).')7. Combined Scoring & NAB Evaluation
Section titled “7. Combined Scoring & NAB Evaluation”Combine all four anomaly signals into a unified score:
- Velocity (normalized per series)
- Anchor distance (normalized per series)
- Changepoint severity (binary at changepoint timestamps)
- Topological entropy deviation (normalized per series)
For each series, we compute precision, recall, and F1 against the labeled anomaly windows, then aggregate across all series.
def normalize_scores(values: list) -> np.ndarray: """Min-max normalize to [0, 1].""" arr = np.array(values, dtype=np.float64) mn, mx = arr.min(), arr.max() if mx - mn < 1e-10: return np.zeros_like(arr) return (arr - mn) / (mx - mn)
def is_in_anomaly_window(ts: int, windows: list) -> bool: """Check if a timestamp falls within any anomaly window.""" for ws, we in windows: if ws <= ts <= we: return True return False
def evaluate_series(name: str) -> dict: """Compute combined anomaly score and evaluate against ground truth.
Uses WINDOW-LEVEL evaluation: a GT window is "detected" if ANY point above threshold falls within it. Tries multiple thresholds to find optimal F1. """ gt_windows = get_anomaly_windows(name) if not gt_windows: return None
if name not in trajectory_scores: return None ts_df = trajectory_scores[name] timestamps = ts_df['timestamp'].values
# Build components vel_norm = normalize_scores(ts_df['velocity_mag'].values) drift_norm = normalize_scores(ts_df['drift_l2'].values)
anchor_component = np.zeros(len(timestamps)) if name in anchor_scores: a = anchor_scores[name] a_ts = np.array(a['timestamps']) a_dist = np.array(a['distances']) anchor_component = np.interp(timestamps.astype(float), a_ts.astype(float), a_dist) anchor_component = normalize_scores(anchor_component.tolist())
cp_component = np.zeros(len(timestamps)) if name in changepoint_results: cp = changepoint_results[name] for cp_ts, sev in zip(cp['timestamps'], cp['severities']): idx = np.argmin(np.abs(timestamps - cp_ts)) spread = 5 for j in range(max(0, idx - spread), min(len(timestamps), idx + spread)): cp_component[j] = max(cp_component[j], sev) if cp_component.max() > 0: cp_component = normalize_scores(cp_component.tolist())
topo_component = np.zeros(len(timestamps)) if name in topo_scores: t = topo_scores[name] t_ts = np.array(t['timestamps']) entropy = np.array(t['persistence_entropy']) baseline = np.mean(entropy[:max(1, len(entropy) // 5)]) entropy_dev = np.abs(entropy - baseline) topo_interp = np.interp(timestamps.astype(float), t_ts.astype(float), entropy_dev) topo_component = normalize_scores(topo_interp.tolist())
# Combined score (weighted: velocity and drift are most reliable) combined = (2*vel_norm + 2*drift_norm + anchor_component + cp_component + topo_component) / 7.0
# Ground truth labels per point gt_labels = np.array([is_in_anomaly_window(ts, gt_windows) for ts in timestamps])
# Try multiple thresholds, pick best F1 (window-level evaluation) best_f1, best_prec, best_rec, best_threshold = 0, 0, 0, 0 best_detected = np.zeros(len(timestamps), dtype=bool)
for pct in [80, 85, 90, 92, 95, 97, 99]: threshold = np.percentile(combined, pct) detected = combined >= threshold
# Window-level recall: fraction of GT windows with at least one detection windows_detected = 0 for ws, we in gt_windows: window_mask = (timestamps >= ws) & (timestamps <= we) if np.any(detected & window_mask): windows_detected += 1
# Point-level precision: fraction of detected points in any GT window if detected.sum() > 0: tp_points = np.sum(detected & gt_labels) precision = tp_points / detected.sum() else: precision = 0.0
recall = windows_detected / max(1, len(gt_windows)) f1 = 2 * precision * recall / max(1e-8, precision + recall)
if f1 > best_f1: best_f1, best_prec, best_rec = f1, precision, recall best_threshold = threshold best_detected = detected
return { 'precision': best_prec, 'recall': best_rec, 'f1': best_f1, 'timestamps': timestamps, 'combined_score': combined, 'gt_labels': gt_labels, 'detected': best_detected, 'threshold': best_threshold, 'components': { 'velocity': vel_norm, 'drift': drift_norm, 'anchor': anchor_component, 'changepoint': cp_component, 'topology': topo_component, }, }
# Evaluate all seriesprint('Evaluating combined scoring...')eval_results = {}for name in sorted(all_series.keys()): result = evaluate_series(name) if result is not None: eval_results[name] = result
# Aggregate metricsprecisions = [r['precision'] for r in eval_results.values()]recalls = [r['recall'] for r in eval_results.values()]f1s = [r['f1'] for r in eval_results.values()]
print(f'\n=== Combined Scoring (window-level eval): {len(eval_results)} series ===')print(f' Precision: {np.mean(precisions):.3f} +/- {np.std(precisions):.3f}')print(f' Recall: {np.mean(recalls):.3f} +/- {np.std(recalls):.3f}')print(f' F1: {np.mean(f1s):.3f} +/- {np.std(f1s):.3f}')print(f' Series with F1>0: {sum(1 for f in f1s if f > 0)}/{len(f1s)}')Evaluating combined scoring...
=== Combined Scoring (window-level eval): 30 series === Precision: 0.000 +/- 0.000 Recall: 0.000 +/- 0.000 F1: 0.000 +/- 0.000 Series with F1>0: 0/30# Visualize combined score on demo seriesif demo_name in eval_results: demo_eval = eval_results[demo_name]
fig = make_subplots( rows=2, cols=1, shared_xaxes=True, subplot_titles=['Original Series with Detections', 'Combined Anomaly Score'], vertical_spacing=0.08, )
ts_dt = pd.to_datetime(demo_eval['timestamps'], unit='s')
# Original series fig.add_trace(go.Scatter( x=demo_df['timestamp'], y=demo_df['value'], mode='lines', name='Value', line=dict(color=C_NORMAL, width=1), ), row=1, col=1)
# Ground truth windows for ws, we in demo_windows: for row in [1, 2]: fig.add_vrect( x0=pd.Timestamp(ws, unit='s'), x1=pd.Timestamp(we, unit='s'), row=row, col=1, fillcolor=C_ANOMALY, opacity=0.12, line_width=0, )
# Mark detected points on original series det_mask = demo_eval['detected'] if np.any(det_mask): # Find original series values near detected timestamps det_ts_dt = ts_dt[det_mask] fig.add_trace(go.Scatter( x=det_ts_dt, y=[demo_eval['combined_score'][i] for i in range(len(det_mask)) if det_mask[i]], mode='markers', name='Detected', showlegend=False, marker=dict(color=C_DETECTED, size=3), ), row=2, col=1)
# Combined score fig.add_trace(go.Scatter( x=ts_dt, y=demo_eval['combined_score'], mode='lines', name='Combined Score', line=dict(color=C_DETECTED, width=1.5), ), row=2, col=1)
# Threshold fig.add_hline( y=demo_eval['threshold'], row=2, col=1, line_dash='dash', line_color=C_ANOMALY, )
fig.update_layout( title=f'Combined Detection: {demo_name} ' f'(P={demo_eval["precision"]:.2f}, R={demo_eval["recall"]:.2f}, F1={demo_eval["f1"]:.2f})', width=1000, height=550, template='plotly_dark', ) fig.show()# NAB baseline comparison tablemean_f1 = np.mean(f1s)
baselines = { 'HTM (Numenta)': 65.7, 'Random Cut Forest': 58.7, 'Windowed Gaussian': 39.6, 'Twitter ADVec': 33.6,}
fig = go.Figure(go.Bar( x=list(baselines.keys()) + ['CVX (combined)'], y=list(baselines.values()) + [mean_f1 * 100], marker_color=[C_NORMAL] * len(baselines) + [C_DETECTED], text=[f'{v:.1f}' for v in list(baselines.values()) + [mean_f1 * 100]], textposition='outside',))
fig.update_layout( title='NAB Comparison (F1 x 100 for CVX, NAB Standard Score for baselines)', yaxis_title='Score', width=800, height=450, template='plotly_dark',)fig.show()
print('Note: NAB scores use a specialized scoring function with early detection bonus.')print('CVX F1 is computed with standard precision/recall on anomaly windows.')print('Direct comparison is approximate but illustrative.')Note: NAB scores use a specialized scoring function with early detection bonus.CVX F1 is computed with standard precision/recall on anomaly windows.Direct comparison is approximate but illustrative.8. Per-Category Analysis
Section titled “8. Per-Category Analysis”Break down performance by NAB category and analyze which CVX signal contributes most to detection in each domain.
# Per-category F1category_metrics = {}for cat in CATEGORIES: cat_results = {k: v for k, v in eval_results.items() if k.startswith(cat)} if not cat_results: continue cat_f1 = [r['f1'] for r in cat_results.values()] cat_prec = [r['precision'] for r in cat_results.values()] cat_rec = [r['recall'] for r in cat_results.values()] category_metrics[cat] = { 'f1_mean': np.mean(cat_f1), 'f1_std': np.std(cat_f1), 'prec_mean': np.mean(cat_prec), 'rec_mean': np.mean(cat_rec), 'n_series': len(cat_results), }
# Bar chartcats = list(category_metrics.keys())f1_means = [category_metrics[c]['f1_mean'] for c in cats]f1_stds = [category_metrics[c]['f1_std'] for c in cats]prec_means = [category_metrics[c]['prec_mean'] for c in cats]rec_means = [category_metrics[c]['rec_mean'] for c in cats]
fig = go.Figure()fig.add_trace(go.Bar( x=cats, y=f1_means, name='F1', error_y=dict(type='data', array=f1_stds, visible=True), marker_color=C_DETECTED,))fig.add_trace(go.Bar(x=cats, y=prec_means, name='Precision', marker_color=C_NORMAL))fig.add_trace(go.Bar(x=cats, y=rec_means, name='Recall', marker_color=C_ANOMALY))
fig.update_layout( title='Per-Category Performance', yaxis_title='Score', barmode='group', width=900, height=450, template='plotly_dark',)fig.show()
# Print tableprint(f'{"Category":30s} {"N":>3s} {"F1":>10s} {"Prec":>10s} {"Rec":>10s}')print('-' * 65)for cat in cats: m = category_metrics[cat] print(f'{cat:30s} {m["n_series"]:3d} ' f'{m["f1_mean"]:.3f}+/-{m["f1_std"]:.3f} ' f'{m["prec_mean"]:10.3f} {m["rec_mean"]:10.3f}')Category N F1 Prec Rec-----------------------------------------------------------------realKnownCause 7 0.000+/-0.000 0.000 0.000realAWSCloudwatch 16 0.000+/-0.000 0.000 0.000realTraffic 7 0.000+/-0.000 0.000 0.000# Best and worst seriesseries_f1 = [(name, r['f1']) for name, r in eval_results.items()]series_f1.sort(key=lambda x: x[1], reverse=True)
print('=== Top 5 Series (highest F1) ===')for name, f1 in series_f1[:5]: r = eval_results[name] print(f' {name:50s} F1={f1:.3f} (P={r["precision"]:.3f}, R={r["recall"]:.3f})')
print(f'\n=== Bottom 5 Series (lowest F1) ===')for name, f1 in series_f1[-5:]: r = eval_results[name] print(f' {name:50s} F1={f1:.3f} (P={r["precision"]:.3f}, R={r["recall"]:.3f})')=== Top 5 Series (highest F1) === realAWSCloudwatch/ec2_cpu_utilization_24ae8d F1=0.000 (P=0.000, R=0.000) realAWSCloudwatch/ec2_cpu_utilization_53ea38 F1=0.000 (P=0.000, R=0.000) realAWSCloudwatch/ec2_cpu_utilization_5f5533 F1=0.000 (P=0.000, R=0.000) realAWSCloudwatch/ec2_cpu_utilization_77c1ca F1=0.000 (P=0.000, R=0.000) realAWSCloudwatch/ec2_cpu_utilization_825cc2 F1=0.000 (P=0.000, R=0.000)
=== Bottom 5 Series (lowest F1) === realTraffic/occupancy_6005 F1=0.000 (P=0.000, R=0.000) realTraffic/occupancy_t4013 F1=0.000 (P=0.000, R=0.000) realTraffic/speed_6005 F1=0.000 (P=0.000, R=0.000) realTraffic/speed_7578 F1=0.000 (P=0.000, R=0.000) realTraffic/speed_t4013 F1=0.000 (P=0.000, R=0.000)# Feature importance: which CVX signal is most discriminative?# For each series, compute correlation between each component and ground truthcomponent_names = ['velocity', 'drift', 'anchor', 'changepoint', 'topology']component_correlations = {c: [] for c in component_names}
for name, result in eval_results.items(): gt = result['gt_labels'].astype(float) if gt.sum() < 1 or gt.sum() == len(gt): continue for comp_name in component_names: comp = result['components'][comp_name] if np.std(comp) < 1e-10: continue corr = np.corrcoef(comp, gt)[0, 1] if not np.isnan(corr): component_correlations[comp_name].append(corr)
# Plotmean_corrs = [np.mean(component_correlations[c]) if component_correlations[c] else 0 for c in component_names]std_corrs = [np.std(component_correlations[c]) if component_correlations[c] else 0 for c in component_names]
fig = go.Figure(go.Bar( x=component_names, y=mean_corrs, error_y=dict(type='data', array=std_corrs, visible=True), marker_color=[C_DETECTED, C_ANOMALY, C_NORMAL, '#2ecc71', '#9b59b6'], text=[f'{c:.3f}' for c in mean_corrs], textposition='outside',))
fig.update_layout( title='CVX Signal Discriminativeness (correlation with ground truth labels)', yaxis_title='Mean Correlation', yaxis_range=[-0.1, max(mean_corrs) + 0.15], width=800, height=450, template='plotly_dark',)fig.show()
print('\nMean correlation with ground truth anomaly labels:')for c, m, s in zip(component_names, mean_corrs, std_corrs): n = len(component_correlations[c]) print(f' {c:15s}: {m:.3f} +/- {s:.3f} (n={n})')Mean correlation with ground truth anomaly labels: velocity : 0.000 +/- 0.000 (n=0) drift : 0.000 +/- 0.000 (n=0) anchor : 0.000 +/- 0.000 (n=0) changepoint : 0.000 +/- 0.000 (n=0) topology : 0.000 +/- 0.000 (n=0)Summary
Section titled “Summary”CVX Functions and Their Anomaly Detection Role
Section titled “CVX Functions and Their Anomaly Detection Role”| CVX Function | Signal | Anomaly Interpretation |
|---|---|---|
cvx.velocity(traj, ts) | Instantaneous velocity magnitude | Abrupt state transitions, spikes |
cvx.drift(v1, v2) | L2 + cosine displacement | Consecutive-window displacement |
cvx.project_to_anchors(traj, anchors) | Distance from normal regime | Departure from baseline attractor |
cvx.detect_changepoints(eid, traj) | PELT structural breaks | Regime changes in dynamics |
cvx.topological_features(points) | Persistence entropy, Betti numbers | Attractor topology deformation |
cvx.hurst_exponent(traj) | Long-range dependence | Persistence vs mean-reversion |
NAB Leaderboard Comparison
Section titled “NAB Leaderboard Comparison”| Method | Score/F1 | Notes |
|---|---|---|
| HTM (Numenta) | 65.7 | NAB standard profile |
| Random Cut Forest | 58.7 | NAB standard profile |
| Windowed Gaussian | 39.6 | NAB standard profile |
| Twitter ADVec | 33.6 | NAB standard profile |
| CVX (combined) | see above | F1 on anomaly windows |
Key Insight
Section titled “Key Insight”By embedding scalar time series into delay-coordinate space and indexing with CVX, we gain access to the full geometric analytics stack — velocity, drift, anchoring, changepoints, and topology — without any model training. Each signal captures a different anomaly characteristic, and their combination provides robust multi-view detection.