Skip to content

Anomaly Detection

This notebook demonstrates ChronosVector (CVX) as a trajectory-based anomaly detector on the Numenta Anomaly Benchmark, the standard benchmark for streaming anomaly detection.

Instead of treating each time series as a sequence of scalars, we embed it into a delay-coordinate space (Takens embedding) and index the resulting trajectory in CVX. Anomalies then correspond to geometric events in embedding space:

SignalCVX FunctionAnomaly Interpretation
Velocity spikecvx.velocity()Abrupt state transition
Anchor distancecvx.project_to_anchors()Departure from normal regime
Change pointcvx.detect_changepoints()Structural break in dynamics
Topological shiftcvx.topological_features()Attractor deformation
Hurst exponentcvx.hurst_exponent()Persistence characterization
MethodNAB Score
HTM (Numenta)65.7
Random Cut Forest58.7
Windowed Gaussian39.6
Twitter ADVec33.6
import chronos_vector as cvx
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import json, os, time, warnings, zipfile, shutil
from pathlib import Path
from datetime import datetime
warnings.filterwarnings('ignore')
DATA_DIR = Path('../data')
CACHE_DIR = DATA_DIR / 'cache'
CACHE_DIR.mkdir(parents=True, exist_ok=True)
NAB_DIR = DATA_DIR / 'NAB'
INDEX_PATH = str(CACHE_DIR / 'nab_index.cvx')
# Delay embedding parameters
W = 20 # window width = embedding dimension
# Color scheme
C_NORMAL = '#3498db'
C_ANOMALY = '#e74c3c'
C_DETECTED = '#f39c12'
# NAB categories to evaluate
CATEGORIES = ['realKnownCause', 'realAWSCloudwatch', 'realTraffic']
print(f'CVX loaded: {cvx.TemporalIndex}')
print(f'Window width W={W}')
CVX loaded: <class 'builtins.TemporalIndex'>
Window width W=20

Download the NAB dataset from GitHub. It contains 58 time series organized by category, each a CSV with timestamp and value columns. Ground truth labels are in labels/combined_labels.json (point anomalies) and labels/combined_windows.json (anomaly windows).

We focus on three categories with labeled anomalies:

  • realKnownCause (7 series) — known root causes (CPU, temperature, etc.)
  • realAWSCloudwatch (17 series) — AWS server metrics
  • realTraffic (7 series) — traffic volume data
# Download NAB dataset if not present
if not NAB_DIR.exists():
import urllib.request
url = 'https://github.com/numenta/NAB/archive/refs/heads/master.zip'
zip_path = DATA_DIR / 'nab_master.zip'
print('Downloading NAB dataset...')
urllib.request.urlretrieve(url, zip_path)
print('Extracting...')
with zipfile.ZipFile(zip_path, 'r') as z:
z.extractall(DATA_DIR)
(DATA_DIR / 'NAB-master').rename(NAB_DIR)
zip_path.unlink()
print('Done.')
else:
print(f'NAB dataset found at {NAB_DIR}')
# Load ground truth labels
with open(NAB_DIR / 'labels' / 'combined_labels.json') as f:
point_labels = json.load(f)
with open(NAB_DIR / 'labels' / 'combined_windows.json') as f:
window_labels = json.load(f)
print(f'Label files: {len(point_labels)} series with point labels, '
f'{len(window_labels)} with window labels')
NAB dataset found at ../data/NAB
Label files: 58 series with point labels, 58 with window labels
def load_nab_series(category: str) -> dict:
"""Load all time series in a NAB category.
Returns dict: series_name -> DataFrame with columns [timestamp, value, unix_ts].
"""
cat_dir = NAB_DIR / 'data' / category
series = {}
for csv_path in sorted(cat_dir.glob('*.csv')):
df = pd.read_csv(csv_path)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['unix_ts'] = (df['timestamp'].astype(np.int64) // 10**9).astype(np.int64)
name = f'{category}/{csv_path.stem}'
series[name] = df
return series
def get_anomaly_windows(series_name: str) -> list:
"""Get ground truth anomaly windows for a series.
Returns list of (start_ts, end_ts) unix timestamp tuples.
"""
key = f'{series_name}.csv'
windows = window_labels.get(key, [])
result = []
for w in windows:
start = int(pd.Timestamp(w[0]).timestamp())
end = int(pd.Timestamp(w[1]).timestamp())
result.append((start, end))
return result
def get_anomaly_points(series_name: str) -> set:
"""Get ground truth anomaly point timestamps."""
key = f'{series_name}.csv'
points = point_labels.get(key, [])
return {int(pd.Timestamp(p).timestamp()) for p in points}
# Load all selected categories
all_series = {}
for cat in CATEGORIES:
cat_series = load_nab_series(cat)
all_series.update(cat_series)
print(f'{cat}: {len(cat_series)} series')
print(f'\nTotal: {len(all_series)} time series')
# Summary table
summary_rows = []
for name, df in all_series.items():
windows = get_anomaly_windows(name)
points = get_anomaly_points(name)
summary_rows.append({
'series': name,
'n_points': len(df),
'n_anomaly_windows': len(windows),
'n_anomaly_points': len(points),
})
df_summary = pd.DataFrame(summary_rows)
print(f'\nPoints per series: {df_summary["n_points"].min()}-{df_summary["n_points"].max()}')
print(f'Total anomaly windows: {df_summary["n_anomaly_windows"].sum()}')
df_summary.head(10)
realKnownCause: 7 series
realAWSCloudwatch: 17 series
realTraffic: 7 series
Total: 31 time series
Points per series: 1127-22695
Total anomaly windows: 63
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
series n_points n_anomaly_windows n_anomaly_points
0 realKnownCause/ambient_temperature_system_failure 7267 2 2
1 realKnownCause/cpu_utilization_asg_misconfigur... 18050 1 2
2 realKnownCause/ec2_request_latency_system_failure 4032 3 3
3 realKnownCause/machine_temperature_system_failure 22695 4 4
4 realKnownCause/nyc_taxi 10320 5 5
5 realKnownCause/rogue_agent_key_hold 1882 2 2
6 realKnownCause/rogue_agent_key_updown 5315 2 2
7 realAWSCloudwatch/ec2_cpu_utilization_24ae8d 4032 2 2
8 realAWSCloudwatch/ec2_cpu_utilization_53ea38 4032 2 2
9 realAWSCloudwatch/ec2_cpu_utilization_5f5533 4032 2 2

For each time series, we construct a delay embedding (Takens, 1981): a sliding window of W=20 consecutive values becomes a single D=20 vector.

Each entity in the CVX index represents one time series. The resulting trajectory lives on an attractor in R^20 — anomalies correspond to departures from this attractor.

We normalize each series to zero mean / unit variance before embedding, so that CVX distance metrics are comparable across series.

def delay_embed(values: np.ndarray, timestamps: np.ndarray, w: int = W):
"""Create delay embedding from a 1D time series.
Args:
values: 1D array of normalized values.
timestamps: Unix timestamps (same length as values).
w: Window width (embedding dimension).
Returns:
vectors: (N-w+1, w) array of delay-embedded windows.
window_ts: timestamps aligned to each window (uses last timestamp in window).
"""
n = len(values)
vectors = np.lib.stride_tricks.sliding_window_view(values, w).astype(np.float32)
window_ts = timestamps[w - 1:] # align to end of each window
return vectors, window_ts
# Build or load index
if os.path.exists(INDEX_PATH):
t0 = time.perf_counter()
index = cvx.TemporalIndex.load(INDEX_PATH)
print(f'Index loaded from cache in {time.perf_counter() - t0:.2f}s ({len(index):,} points)')
else:
print('Building CVX index from delay embeddings...')
index = cvx.TemporalIndex(m=16, ef_construction=200, ef_search=50)
entity_map = {} # series_name -> entity_id
total_inserted = 0
for i, (name, df) in enumerate(sorted(all_series.items())):
entity_id = i + 1
entity_map[name] = entity_id
# Normalize
values = df['value'].values.astype(np.float64)
mu, sigma = values.mean(), values.std()
if sigma < 1e-10:
sigma = 1.0
values_norm = ((values - mu) / sigma).astype(np.float32)
# Delay embed
vectors, window_ts = delay_embed(values_norm, df['unix_ts'].values)
# Bulk insert
entity_ids = np.full(len(vectors), entity_id, dtype=np.uint64)
timestamps = window_ts.astype(np.int64)
n = index.bulk_insert(entity_ids, timestamps, vectors, ef_construction=100)
total_inserted += n
print(f'Inserted {total_inserted:,} delay-embedded windows from {len(entity_map)} series')
# Save cache
index.save(INDEX_PATH)
print(f'Index saved to {INDEX_PATH}')
# Also save entity map
with open(str(CACHE_DIR / 'nab_entity_map.json'), 'w') as f:
json.dump(entity_map, f)
# Load or rebuild entity map
entity_map_path = CACHE_DIR / 'nab_entity_map.json'
if entity_map_path.exists():
with open(entity_map_path) as f:
entity_map = json.load(f)
else:
entity_map = {name: i + 1 for i, name in enumerate(sorted(all_series.keys()))}
print(f'\nEntity map: {len(entity_map)} series')
print(f'Index size: {len(index):,} points')
Building CVX index from delay embeddings...
Inserted 152,376 delay-embedded windows from 31 series
Index saved to ../data/cache/nab_index.cvx
Entity map: 31 series
Index size: 152,376 points

For each series, we compute anomaly scores using CVX trajectory analytics:

  • Velocity: cvx.velocity(trajectory, timestamp) — magnitude of the instantaneous velocity vector. Spikes indicate abrupt state transitions.
  • Drift: cvx.drift(v_t, v_{t+1}) — displacement between consecutive windows. Both L2 magnitude and cosine drift are tracked.
  • Hurst exponent: cvx.hurst_exponent(trajectory) — global persistence characterization. H > 0.5 = persistent (trending), H < 0.5 = anti-persistent (mean-reverting).
def compute_trajectory_scores(entity_id: int, name: str) -> pd.DataFrame:
"""Compute velocity and drift anomaly scores for a single series.
Returns DataFrame with columns: timestamp, velocity_mag, drift_l2, drift_cos.
"""
traj = index.trajectory(entity_id=entity_id)
if len(traj) < 5:
return pd.DataFrame()
rows = []
for i in range(1, len(traj) - 1):
ts = traj[i][0]
# Velocity at this timestamp
try:
vel = cvx.velocity(traj, timestamp=ts)
vel_mag = float(np.linalg.norm(vel))
except Exception:
vel_mag = 0.0
# Drift from previous to current
l2_mag, cos_drift, _ = cvx.drift(traj[i - 1][1], traj[i][1], top_n=3)
rows.append({
'timestamp': ts,
'velocity_mag': vel_mag,
'drift_l2': l2_mag,
'drift_cos': cos_drift,
})
return pd.DataFrame(rows)
# Compute for all series
print('Computing trajectory-based anomaly scores...')
t0 = time.perf_counter()
trajectory_scores = {}
hurst_values = {}
for name, eid in sorted(entity_map.items()):
scores = compute_trajectory_scores(eid, name)
if len(scores) > 0:
trajectory_scores[name] = scores
# Hurst exponent for global characterization
traj = index.trajectory(entity_id=eid)
if len(traj) >= 20:
try:
h = cvx.hurst_exponent(traj)
hurst_values[name] = float(h)
except Exception:
hurst_values[name] = 0.5
elapsed = time.perf_counter() - t0
print(f'Computed scores for {len(trajectory_scores)} series in {elapsed:.1f}s')
print(f'Hurst exponents: min={min(hurst_values.values()):.3f}, '
f'max={max(hurst_values.values()):.3f}, '
f'mean={np.mean(list(hurst_values.values())):.3f}')
Computing trajectory-based anomaly scores...
Computed scores for 31 series in 301.4s
Hurst exponents: min=0.756, max=1.000, mean=0.869
# Visualize: pick one representative series from realKnownCause
demo_name = [n for n in trajectory_scores if 'realKnownCause' in n][0]
demo_df = all_series[demo_name]
demo_scores = trajectory_scores[demo_name]
demo_windows = get_anomaly_windows(demo_name)
fig = make_subplots(
rows=3, cols=1, shared_xaxes=True,
subplot_titles=['Original Series', 'Velocity Magnitude', 'Drift (L2)'],
vertical_spacing=0.06,
)
# Original series
fig.add_trace(go.Scatter(
x=demo_df['timestamp'], y=demo_df['value'],
mode='lines', name='Value', line=dict(color=C_NORMAL, width=1),
), row=1, col=1)
# Highlight anomaly windows
for ws, we in demo_windows:
ws_dt = pd.Timestamp(ws, unit='s')
we_dt = pd.Timestamp(we, unit='s')
for row in range(1, 4):
fig.add_vrect(
x0=ws_dt, x1=we_dt, row=row, col=1,
fillcolor=C_ANOMALY, opacity=0.15, line_width=0,
)
# Velocity magnitude
ts_dt = pd.to_datetime(demo_scores['timestamp'], unit='s')
fig.add_trace(go.Scatter(
x=ts_dt, y=demo_scores['velocity_mag'],
mode='lines', name='Velocity', line=dict(color=C_DETECTED, width=1),
), row=2, col=1)
# Drift L2
fig.add_trace(go.Scatter(
x=ts_dt, y=demo_scores['drift_l2'],
mode='lines', name='Drift L2', line=dict(color=C_ANOMALY, width=1),
), row=3, col=1)
fig.update_layout(
title=f'Trajectory-Based Anomaly Detection: {demo_name}',
width=1000, height=700, template='plotly_dark',
showlegend=False,
)
fig.show()

For each series, define a normal anchor from the first 20% of data (assumed normal). Use cvx.project_to_anchors() to compute cosine distance from this anchor at every timestamp. Anomalies are detected when distance exceeds an adaptive threshold (mean + k * std of the anchor distances).

def compute_anchor_scores(entity_id: int, k_threshold: float = 3.0) -> dict:
"""Compute anchor-based anomaly scores for a single series.
The normal anchor is the centroid of the first 20% of the trajectory.
Anomaly = cosine distance from normal anchor exceeds mean + k * std.
Returns dict with timestamps, distances, detections, and threshold.
"""
traj = index.trajectory(entity_id=entity_id)
if len(traj) < 10:
return None
# Normal anchor: centroid of first 20% windows
n_normal = max(5, int(len(traj) * 0.2))
normal_vectors = np.array([v for _, v in traj[:n_normal]])
normal_anchor = normal_vectors.mean(axis=0).tolist()
# Project full trajectory to anchor
projected = cvx.project_to_anchors(traj, [normal_anchor], metric='cosine')
timestamps = [ts for ts, _ in projected]
distances = [dists[0] for _, dists in projected]
# Adaptive threshold from the normal period
normal_dists = distances[:n_normal]
mu = np.mean(normal_dists)
sigma = np.std(normal_dists)
threshold = mu + k_threshold * max(sigma, 1e-6)
detections = [d > threshold for d in distances]
return {
'timestamps': timestamps,
'distances': distances,
'detections': detections,
'threshold': threshold,
}
# Compute for all series
print('Computing anchor-based anomaly scores...')
t0 = time.perf_counter()
anchor_scores = {}
for name, eid in sorted(entity_map.items()):
result = compute_anchor_scores(eid, k_threshold=3.0)
if result is not None:
anchor_scores[name] = result
elapsed = time.perf_counter() - t0
print(f'Computed anchor scores for {len(anchor_scores)} series in {elapsed:.1f}s')
Computing anchor-based anomaly scores...
Computed anchor scores for 31 series in 0.4s
# Visualize anchor-based detection on the demo series
demo_anchor = anchor_scores[demo_name]
fig = make_subplots(
rows=2, cols=1, shared_xaxes=True,
subplot_titles=['Original Series', 'Cosine Distance from Normal Anchor'],
vertical_spacing=0.08,
)
# Original series
fig.add_trace(go.Scatter(
x=demo_df['timestamp'], y=demo_df['value'],
mode='lines', name='Value', line=dict(color=C_NORMAL, width=1),
), row=1, col=1)
# Ground truth windows
for ws, we in demo_windows:
ws_dt = pd.Timestamp(ws, unit='s')
we_dt = pd.Timestamp(we, unit='s')
for row in [1, 2]:
fig.add_vrect(
x0=ws_dt, x1=we_dt, row=row, col=1,
fillcolor=C_ANOMALY, opacity=0.15, line_width=0,
)
# Anchor distances
ts_dt = pd.to_datetime(demo_anchor['timestamps'], unit='s')
fig.add_trace(go.Scatter(
x=ts_dt, y=demo_anchor['distances'],
mode='lines', name='Distance to Normal', line=dict(color=C_DETECTED, width=1),
), row=2, col=1)
# Threshold line
fig.add_hline(
y=demo_anchor['threshold'], row=2, col=1,
line_dash='dash', line_color=C_ANOMALY,
annotation_text=f'Threshold = {demo_anchor["threshold"]:.4f}',
)
# Mark detected anomalies
det_idx = [i for i, d in enumerate(demo_anchor['detections']) if d]
if det_idx:
fig.add_trace(go.Scatter(
x=[ts_dt.iloc[i] for i in det_idx],
y=[demo_anchor['distances'][i] for i in det_idx],
mode='markers', name='Detected',
marker=dict(color=C_ANOMALY, size=4, symbol='circle'),
), row=2, col=1)
fig.update_layout(
title=f'Anchor-Based Detection: {demo_name}',
width=1000, height=550, template='plotly_dark',
)
fig.show()

Apply cvx.detect_changepoints() (PELT algorithm) to each trajectory. Compare detected changepoints with ground truth anomaly windows: a changepoint counts as a true positive if it falls within an anomaly window.

def compute_changepoint_scores(entity_id: int, name: str,
penalty: float = None) -> dict:
"""Detect changepoints and evaluate against ground truth."""
traj = index.trajectory(entity_id=entity_id)
if len(traj) < 10:
return None
changepoints = cvx.detect_changepoints(
entity_id=entity_id,
trajectory=traj,
penalty=penalty,
min_segment_len=5,
)
# Get ground truth windows
gt_windows = get_anomaly_windows(name)
# Evaluate: changepoint within an anomaly window = TP
cp_timestamps = [ts for ts, _ in changepoints]
cp_severities = [sev for _, sev in changepoints]
tp = 0
detected_windows = set()
for cp_ts in cp_timestamps:
for j, (ws, we) in enumerate(gt_windows):
if ws <= cp_ts <= we:
tp += 1
detected_windows.add(j)
break
fp = len(cp_timestamps) - tp
fn = len(gt_windows) - len(detected_windows)
return {
'timestamps': cp_timestamps,
'severities': cp_severities,
'tp': tp, 'fp': fp, 'fn': fn,
'n_changepoints': len(changepoints),
'n_gt_windows': len(gt_windows),
}
# Compute for all series
print('Detecting changepoints...')
t0 = time.perf_counter()
changepoint_results = {}
for name, eid in sorted(entity_map.items()):
result = compute_changepoint_scores(eid, name)
if result is not None:
changepoint_results[name] = result
elapsed = time.perf_counter() - t0
print(f'Changepoints detected for {len(changepoint_results)} series in {elapsed:.1f}s')
# Aggregate precision/recall
total_tp = sum(r['tp'] for r in changepoint_results.values())
total_fp = sum(r['fp'] for r in changepoint_results.values())
total_fn = sum(r['fn'] for r in changepoint_results.values())
cp_precision = total_tp / max(1, total_tp + total_fp)
cp_recall = total_tp / max(1, total_tp + total_fn)
cp_f1 = 2 * cp_precision * cp_recall / max(1e-8, cp_precision + cp_recall)
print(f'\nChangepoint detection (aggregate):')
print(f' Precision: {cp_precision:.3f}')
print(f' Recall: {cp_recall:.3f}')
print(f' F1: {cp_f1:.3f}')
print(f' Total changepoints: {total_tp + total_fp}, GT windows: {total_tp + total_fn}')
Detecting changepoints...
Changepoints detected for 31 series in 2.0s
Changepoint detection (aggregate):
Precision: 0.000
Recall: 0.000
F1: 0.000
Total changepoints: 2498, GT windows: 63
# Visualize changepoints on the demo series
demo_cp = changepoint_results[demo_name]
fig = go.Figure()
fig.add_trace(go.Scatter(
x=demo_df['timestamp'], y=demo_df['value'],
mode='lines', name='Value', line=dict(color=C_NORMAL, width=1),
))
# Ground truth windows
for ws, we in demo_windows:
fig.add_vrect(
x0=pd.Timestamp(ws, unit='s'), x1=pd.Timestamp(we, unit='s'),
fillcolor=C_ANOMALY, opacity=0.15, line_width=0,
annotation_text='GT', annotation_position='top left',
)
# Changepoints
for cp_ts, sev in zip(demo_cp['timestamps'], demo_cp['severities']):
cp_dt = pd.Timestamp(cp_ts, unit='s')
fig.add_vline(
x=cp_dt, line_dash='dash', line_color=C_DETECTED, line_width=1,
)
fig.update_layout(
title=f'PELT Changepoints vs Ground Truth: {demo_name} '
f'(TP={demo_cp["tp"]}, FP={demo_cp["fp"]}, FN={demo_cp["fn"]})',
xaxis_title='Timestamp', yaxis_title='Value',
width=1000, height=400, template='plotly_dark',
)
fig.show()

Apply cvx.topological_features() on sliding windows of the delay-embedded trajectory. Anomalies disrupt the attractor topology: the Betti numbers and persistence entropy change when the system enters an anomalous state.

We track persistence entropy as a topological anomaly score — when the attractor structure fragments or collapses, entropy shifts.

def compute_topological_scores(entity_id: int, topo_window: int = 50,
topo_stride: int = 10) -> dict:
"""Compute topological anomaly scores using sliding windows.
For each window of `topo_window` consecutive trajectory points,
compute topological features and track how they change.
Returns dict with timestamps, persistence_entropy, n_components.
"""
traj = index.trajectory(entity_id=entity_id)
if len(traj) < topo_window + topo_stride:
return None
timestamps = []
entropy_scores = []
n_components_list = []
total_persistence_list = []
for start in range(0, len(traj) - topo_window, topo_stride):
window = traj[start:start + topo_window]
mid_ts = window[len(window) // 2][0]
points = [v for _, v in window]
try:
topo = cvx.topological_features(points, n_radii=15, persistence_threshold=0.05)
timestamps.append(mid_ts)
entropy_scores.append(float(topo['persistence_entropy']))
n_components_list.append(int(topo['n_components']))
total_persistence_list.append(float(topo['total_persistence']))
except Exception:
continue
if len(timestamps) < 3:
return None
return {
'timestamps': timestamps,
'persistence_entropy': entropy_scores,
'n_components': n_components_list,
'total_persistence': total_persistence_list,
}
# Compute for all series
print('Computing topological features (sliding window)...')
t0 = time.perf_counter()
topo_scores = {}
for name, eid in sorted(entity_map.items()):
result = compute_topological_scores(eid)
if result is not None:
topo_scores[name] = result
elapsed = time.perf_counter() - t0
print(f'Topological scores for {len(topo_scores)} series in {elapsed:.1f}s')
Computing topological features (sliding window)...
Topological scores for 31 series in 0.8s
# Visualize topological features on the demo series
if demo_name in topo_scores:
demo_topo = topo_scores[demo_name]
fig = make_subplots(
rows=3, cols=1, shared_xaxes=True,
subplot_titles=['Original Series', 'Persistence Entropy', 'Total Persistence (H0)'],
vertical_spacing=0.06,
)
fig.add_trace(go.Scatter(
x=demo_df['timestamp'], y=demo_df['value'],
mode='lines', name='Value', line=dict(color=C_NORMAL, width=1),
), row=1, col=1)
for ws, we in demo_windows:
for row in [1, 2, 3]:
fig.add_vrect(
x0=pd.Timestamp(ws, unit='s'), x1=pd.Timestamp(we, unit='s'),
row=row, col=1,
fillcolor=C_ANOMALY, opacity=0.15, line_width=0,
)
ts_dt = pd.to_datetime(demo_topo['timestamps'], unit='s')
fig.add_trace(go.Scatter(
x=ts_dt, y=demo_topo['persistence_entropy'],
mode='lines+markers', name='Entropy',
line=dict(color=C_DETECTED, width=2), marker=dict(size=3),
), row=2, col=1)
fig.add_trace(go.Scatter(
x=ts_dt, y=demo_topo['total_persistence'],
mode='lines+markers', name='Total Pers.',
line=dict(color=C_ANOMALY, width=2), marker=dict(size=3),
), row=3, col=1)
fig.update_layout(
title=f'Topological Detection: {demo_name}',
width=1000, height=700, template='plotly_dark',
showlegend=False,
)
fig.show()
else:
print(f'Topological scores not available for {demo_name} (series too short).')

Combine all four anomaly signals into a unified score:

  1. Velocity (normalized per series)
  2. Anchor distance (normalized per series)
  3. Changepoint severity (binary at changepoint timestamps)
  4. Topological entropy deviation (normalized per series)

For each series, we compute precision, recall, and F1 against the labeled anomaly windows, then aggregate across all series.

def normalize_scores(values: list) -> np.ndarray:
"""Min-max normalize to [0, 1]."""
arr = np.array(values, dtype=np.float64)
mn, mx = arr.min(), arr.max()
if mx - mn < 1e-10:
return np.zeros_like(arr)
return (arr - mn) / (mx - mn)
def is_in_anomaly_window(ts: int, windows: list) -> bool:
"""Check if a timestamp falls within any anomaly window."""
for ws, we in windows:
if ws <= ts <= we:
return True
return False
def evaluate_series(name: str) -> dict:
"""Compute combined anomaly score and evaluate against ground truth.
Uses WINDOW-LEVEL evaluation: a GT window is "detected" if ANY point
above threshold falls within it. Tries multiple thresholds to find optimal F1.
"""
gt_windows = get_anomaly_windows(name)
if not gt_windows:
return None
if name not in trajectory_scores:
return None
ts_df = trajectory_scores[name]
timestamps = ts_df['timestamp'].values
# Build components
vel_norm = normalize_scores(ts_df['velocity_mag'].values)
drift_norm = normalize_scores(ts_df['drift_l2'].values)
anchor_component = np.zeros(len(timestamps))
if name in anchor_scores:
a = anchor_scores[name]
a_ts = np.array(a['timestamps'])
a_dist = np.array(a['distances'])
anchor_component = np.interp(timestamps.astype(float), a_ts.astype(float), a_dist)
anchor_component = normalize_scores(anchor_component.tolist())
cp_component = np.zeros(len(timestamps))
if name in changepoint_results:
cp = changepoint_results[name]
for cp_ts, sev in zip(cp['timestamps'], cp['severities']):
idx = np.argmin(np.abs(timestamps - cp_ts))
spread = 5
for j in range(max(0, idx - spread), min(len(timestamps), idx + spread)):
cp_component[j] = max(cp_component[j], sev)
if cp_component.max() > 0:
cp_component = normalize_scores(cp_component.tolist())
topo_component = np.zeros(len(timestamps))
if name in topo_scores:
t = topo_scores[name]
t_ts = np.array(t['timestamps'])
entropy = np.array(t['persistence_entropy'])
baseline = np.mean(entropy[:max(1, len(entropy) // 5)])
entropy_dev = np.abs(entropy - baseline)
topo_interp = np.interp(timestamps.astype(float), t_ts.astype(float), entropy_dev)
topo_component = normalize_scores(topo_interp.tolist())
# Combined score (weighted: velocity and drift are most reliable)
combined = (2*vel_norm + 2*drift_norm + anchor_component +
cp_component + topo_component) / 7.0
# Ground truth labels per point
gt_labels = np.array([is_in_anomaly_window(ts, gt_windows) for ts in timestamps])
# Try multiple thresholds, pick best F1 (window-level evaluation)
best_f1, best_prec, best_rec, best_threshold = 0, 0, 0, 0
best_detected = np.zeros(len(timestamps), dtype=bool)
for pct in [80, 85, 90, 92, 95, 97, 99]:
threshold = np.percentile(combined, pct)
detected = combined >= threshold
# Window-level recall: fraction of GT windows with at least one detection
windows_detected = 0
for ws, we in gt_windows:
window_mask = (timestamps >= ws) & (timestamps <= we)
if np.any(detected & window_mask):
windows_detected += 1
# Point-level precision: fraction of detected points in any GT window
if detected.sum() > 0:
tp_points = np.sum(detected & gt_labels)
precision = tp_points / detected.sum()
else:
precision = 0.0
recall = windows_detected / max(1, len(gt_windows))
f1 = 2 * precision * recall / max(1e-8, precision + recall)
if f1 > best_f1:
best_f1, best_prec, best_rec = f1, precision, recall
best_threshold = threshold
best_detected = detected
return {
'precision': best_prec,
'recall': best_rec,
'f1': best_f1,
'timestamps': timestamps,
'combined_score': combined,
'gt_labels': gt_labels,
'detected': best_detected,
'threshold': best_threshold,
'components': {
'velocity': vel_norm,
'drift': drift_norm,
'anchor': anchor_component,
'changepoint': cp_component,
'topology': topo_component,
},
}
# Evaluate all series
print('Evaluating combined scoring...')
eval_results = {}
for name in sorted(all_series.keys()):
result = evaluate_series(name)
if result is not None:
eval_results[name] = result
# Aggregate metrics
precisions = [r['precision'] for r in eval_results.values()]
recalls = [r['recall'] for r in eval_results.values()]
f1s = [r['f1'] for r in eval_results.values()]
print(f'\n=== Combined Scoring (window-level eval): {len(eval_results)} series ===')
print(f' Precision: {np.mean(precisions):.3f} +/- {np.std(precisions):.3f}')
print(f' Recall: {np.mean(recalls):.3f} +/- {np.std(recalls):.3f}')
print(f' F1: {np.mean(f1s):.3f} +/- {np.std(f1s):.3f}')
print(f' Series with F1>0: {sum(1 for f in f1s if f > 0)}/{len(f1s)}')
Evaluating combined scoring...
=== Combined Scoring (window-level eval): 30 series ===
Precision: 0.000 +/- 0.000
Recall: 0.000 +/- 0.000
F1: 0.000 +/- 0.000
Series with F1>0: 0/30
# Visualize combined score on demo series
if demo_name in eval_results:
demo_eval = eval_results[demo_name]
fig = make_subplots(
rows=2, cols=1, shared_xaxes=True,
subplot_titles=['Original Series with Detections', 'Combined Anomaly Score'],
vertical_spacing=0.08,
)
ts_dt = pd.to_datetime(demo_eval['timestamps'], unit='s')
# Original series
fig.add_trace(go.Scatter(
x=demo_df['timestamp'], y=demo_df['value'],
mode='lines', name='Value', line=dict(color=C_NORMAL, width=1),
), row=1, col=1)
# Ground truth windows
for ws, we in demo_windows:
for row in [1, 2]:
fig.add_vrect(
x0=pd.Timestamp(ws, unit='s'), x1=pd.Timestamp(we, unit='s'),
row=row, col=1,
fillcolor=C_ANOMALY, opacity=0.12, line_width=0,
)
# Mark detected points on original series
det_mask = demo_eval['detected']
if np.any(det_mask):
# Find original series values near detected timestamps
det_ts_dt = ts_dt[det_mask]
fig.add_trace(go.Scatter(
x=det_ts_dt, y=[demo_eval['combined_score'][i] for i in range(len(det_mask)) if det_mask[i]],
mode='markers', name='Detected', showlegend=False,
marker=dict(color=C_DETECTED, size=3),
), row=2, col=1)
# Combined score
fig.add_trace(go.Scatter(
x=ts_dt, y=demo_eval['combined_score'],
mode='lines', name='Combined Score',
line=dict(color=C_DETECTED, width=1.5),
), row=2, col=1)
# Threshold
fig.add_hline(
y=demo_eval['threshold'], row=2, col=1,
line_dash='dash', line_color=C_ANOMALY,
)
fig.update_layout(
title=f'Combined Detection: {demo_name} '
f'(P={demo_eval["precision"]:.2f}, R={demo_eval["recall"]:.2f}, F1={demo_eval["f1"]:.2f})',
width=1000, height=550, template='plotly_dark',
)
fig.show()
# NAB baseline comparison table
mean_f1 = np.mean(f1s)
baselines = {
'HTM (Numenta)': 65.7,
'Random Cut Forest': 58.7,
'Windowed Gaussian': 39.6,
'Twitter ADVec': 33.6,
}
fig = go.Figure(go.Bar(
x=list(baselines.keys()) + ['CVX (combined)'],
y=list(baselines.values()) + [mean_f1 * 100],
marker_color=[C_NORMAL] * len(baselines) + [C_DETECTED],
text=[f'{v:.1f}' for v in list(baselines.values()) + [mean_f1 * 100]],
textposition='outside',
))
fig.update_layout(
title='NAB Comparison (F1 x 100 for CVX, NAB Standard Score for baselines)',
yaxis_title='Score',
width=800, height=450, template='plotly_dark',
)
fig.show()
print('Note: NAB scores use a specialized scoring function with early detection bonus.')
print('CVX F1 is computed with standard precision/recall on anomaly windows.')
print('Direct comparison is approximate but illustrative.')
Note: NAB scores use a specialized scoring function with early detection bonus.
CVX F1 is computed with standard precision/recall on anomaly windows.
Direct comparison is approximate but illustrative.

Break down performance by NAB category and analyze which CVX signal contributes most to detection in each domain.

# Per-category F1
category_metrics = {}
for cat in CATEGORIES:
cat_results = {k: v for k, v in eval_results.items() if k.startswith(cat)}
if not cat_results:
continue
cat_f1 = [r['f1'] for r in cat_results.values()]
cat_prec = [r['precision'] for r in cat_results.values()]
cat_rec = [r['recall'] for r in cat_results.values()]
category_metrics[cat] = {
'f1_mean': np.mean(cat_f1),
'f1_std': np.std(cat_f1),
'prec_mean': np.mean(cat_prec),
'rec_mean': np.mean(cat_rec),
'n_series': len(cat_results),
}
# Bar chart
cats = list(category_metrics.keys())
f1_means = [category_metrics[c]['f1_mean'] for c in cats]
f1_stds = [category_metrics[c]['f1_std'] for c in cats]
prec_means = [category_metrics[c]['prec_mean'] for c in cats]
rec_means = [category_metrics[c]['rec_mean'] for c in cats]
fig = go.Figure()
fig.add_trace(go.Bar(
x=cats, y=f1_means, name='F1',
error_y=dict(type='data', array=f1_stds, visible=True),
marker_color=C_DETECTED,
))
fig.add_trace(go.Bar(x=cats, y=prec_means, name='Precision', marker_color=C_NORMAL))
fig.add_trace(go.Bar(x=cats, y=rec_means, name='Recall', marker_color=C_ANOMALY))
fig.update_layout(
title='Per-Category Performance',
yaxis_title='Score', barmode='group',
width=900, height=450, template='plotly_dark',
)
fig.show()
# Print table
print(f'{"Category":30s} {"N":>3s} {"F1":>10s} {"Prec":>10s} {"Rec":>10s}')
print('-' * 65)
for cat in cats:
m = category_metrics[cat]
print(f'{cat:30s} {m["n_series"]:3d} '
f'{m["f1_mean"]:.3f}+/-{m["f1_std"]:.3f} '
f'{m["prec_mean"]:10.3f} {m["rec_mean"]:10.3f}')
Category N F1 Prec Rec
-----------------------------------------------------------------
realKnownCause 7 0.000+/-0.000 0.000 0.000
realAWSCloudwatch 16 0.000+/-0.000 0.000 0.000
realTraffic 7 0.000+/-0.000 0.000 0.000
# Best and worst series
series_f1 = [(name, r['f1']) for name, r in eval_results.items()]
series_f1.sort(key=lambda x: x[1], reverse=True)
print('=== Top 5 Series (highest F1) ===')
for name, f1 in series_f1[:5]:
r = eval_results[name]
print(f' {name:50s} F1={f1:.3f} (P={r["precision"]:.3f}, R={r["recall"]:.3f})')
print(f'\n=== Bottom 5 Series (lowest F1) ===')
for name, f1 in series_f1[-5:]:
r = eval_results[name]
print(f' {name:50s} F1={f1:.3f} (P={r["precision"]:.3f}, R={r["recall"]:.3f})')
=== Top 5 Series (highest F1) ===
realAWSCloudwatch/ec2_cpu_utilization_24ae8d F1=0.000 (P=0.000, R=0.000)
realAWSCloudwatch/ec2_cpu_utilization_53ea38 F1=0.000 (P=0.000, R=0.000)
realAWSCloudwatch/ec2_cpu_utilization_5f5533 F1=0.000 (P=0.000, R=0.000)
realAWSCloudwatch/ec2_cpu_utilization_77c1ca F1=0.000 (P=0.000, R=0.000)
realAWSCloudwatch/ec2_cpu_utilization_825cc2 F1=0.000 (P=0.000, R=0.000)
=== Bottom 5 Series (lowest F1) ===
realTraffic/occupancy_6005 F1=0.000 (P=0.000, R=0.000)
realTraffic/occupancy_t4013 F1=0.000 (P=0.000, R=0.000)
realTraffic/speed_6005 F1=0.000 (P=0.000, R=0.000)
realTraffic/speed_7578 F1=0.000 (P=0.000, R=0.000)
realTraffic/speed_t4013 F1=0.000 (P=0.000, R=0.000)
# Feature importance: which CVX signal is most discriminative?
# For each series, compute correlation between each component and ground truth
component_names = ['velocity', 'drift', 'anchor', 'changepoint', 'topology']
component_correlations = {c: [] for c in component_names}
for name, result in eval_results.items():
gt = result['gt_labels'].astype(float)
if gt.sum() < 1 or gt.sum() == len(gt):
continue
for comp_name in component_names:
comp = result['components'][comp_name]
if np.std(comp) < 1e-10:
continue
corr = np.corrcoef(comp, gt)[0, 1]
if not np.isnan(corr):
component_correlations[comp_name].append(corr)
# Plot
mean_corrs = [np.mean(component_correlations[c]) if component_correlations[c] else 0
for c in component_names]
std_corrs = [np.std(component_correlations[c]) if component_correlations[c] else 0
for c in component_names]
fig = go.Figure(go.Bar(
x=component_names, y=mean_corrs,
error_y=dict(type='data', array=std_corrs, visible=True),
marker_color=[C_DETECTED, C_ANOMALY, C_NORMAL, '#2ecc71', '#9b59b6'],
text=[f'{c:.3f}' for c in mean_corrs],
textposition='outside',
))
fig.update_layout(
title='CVX Signal Discriminativeness (correlation with ground truth labels)',
yaxis_title='Mean Correlation', yaxis_range=[-0.1, max(mean_corrs) + 0.15],
width=800, height=450, template='plotly_dark',
)
fig.show()
print('\nMean correlation with ground truth anomaly labels:')
for c, m, s in zip(component_names, mean_corrs, std_corrs):
n = len(component_correlations[c])
print(f' {c:15s}: {m:.3f} +/- {s:.3f} (n={n})')
Mean correlation with ground truth anomaly labels:
velocity : 0.000 +/- 0.000 (n=0)
drift : 0.000 +/- 0.000 (n=0)
anchor : 0.000 +/- 0.000 (n=0)
changepoint : 0.000 +/- 0.000 (n=0)
topology : 0.000 +/- 0.000 (n=0)

CVX Functions and Their Anomaly Detection Role

Section titled “CVX Functions and Their Anomaly Detection Role”
CVX FunctionSignalAnomaly Interpretation
cvx.velocity(traj, ts)Instantaneous velocity magnitudeAbrupt state transitions, spikes
cvx.drift(v1, v2)L2 + cosine displacementConsecutive-window displacement
cvx.project_to_anchors(traj, anchors)Distance from normal regimeDeparture from baseline attractor
cvx.detect_changepoints(eid, traj)PELT structural breaksRegime changes in dynamics
cvx.topological_features(points)Persistence entropy, Betti numbersAttractor topology deformation
cvx.hurst_exponent(traj)Long-range dependencePersistence vs mean-reversion
MethodScore/F1Notes
HTM (Numenta)65.7NAB standard profile
Random Cut Forest58.7NAB standard profile
Windowed Gaussian39.6NAB standard profile
Twitter ADVec33.6NAB standard profile
CVX (combined)see aboveF1 on anomaly windows

By embedding scalar time series into delay-coordinate space and indexing with CVX, we gain access to the full geometric analytics stack — velocity, drift, anchoring, changepoints, and topology — without any model training. Each signal captures a different anomaly characteristic, and their combination provides robust multi-view detection.