Political Rhetoric
This notebook treats presidential tweets as a temporal trajectory in embedding space and uses CVX native analytics to study how rhetorical dynamics correlate with economic indicators.
Instead of bag-of-words sentiment, we embed every tweet with a sentence transformer, aggregate to daily mean vectors, and build a CVX temporal index. Then:
- Semantic Anchoring: Project the trajectory onto 6 rhetorical anchors (economy, trade war, immigration, media attack, self-praise, threat) via
cvx.project_to_anchors(). - Change Point Detection: Detect rhetorical regime shifts with
cvx.detect_changepoints()and overlay known political events. - Economic Alignment: Align tweet trajectory velocity with VIX, S&P 500, oil, USD, and Treasury yields.
- Topic Drift & Signatures: Measure persistence (
cvx.hurst_exponent()), path signatures (cvx.path_signature()), and Wasserstein drift across political periods. - Event Study: Identify tweet storms via embedding velocity spikes and measure market reaction windows.
- Classification: Can CVX rhetorical features predict next-day S&P 500 direction?
| Strategy | CVX Functions | Signal |
|---|---|---|
| Anchor Projection | project_to_anchors, anchor_summary | Rhetorical focus over time |
| Regime Detection | detect_changepoints | Rhetorical phase transitions |
| Velocity | velocity | Rate of topic change |
| Persistence | hurst_exponent | Erratic vs persistent rhetoric |
| Signatures | path_signature, signature_distance | Period-level rhetorical fingerprints |
import chronos_vector as cvximport numpy as npimport pandas as pdimport plotly.graph_objects as gofrom plotly.subplots import make_subplotsfrom sentence_transformers import SentenceTransformerfrom sklearn.linear_model import LogisticRegressionfrom sklearn.metrics import f1_score, roc_auc_score, classification_reportfrom sklearn.preprocessing import StandardScalerfrom scipy.stats import pearsonr, spearmanrimport yfinance as yfimport os, time, warnings, hashlibwarnings.filterwarnings('ignore')
DATA_DIR = '../data'CACHE_DIR = f'{DATA_DIR}/cache'os.makedirs(CACHE_DIR, exist_ok=True)
# Style constantsC_ECON = '#2ecc71'C_TWEET = '#3498db'C_CRISIS = '#e74c3c'C_EVENT = '#f39c12'TEMPLATE = 'plotly_dark'
# Political era boundsDATE_START = '2015-01-01'DATE_END = '2021-01-08'TS_START = int(pd.Timestamp(DATE_START).timestamp())TS_END = int(pd.Timestamp(DATE_END).timestamp())
# Key events for annotationKEY_EVENTS = { '2015-06-16': 'Campaign launch', '2016-11-08': 'Election win', '2017-01-20': 'Inauguration', '2018-03-22': 'Trade war begins', '2018-12-24': 'Christmas Eve selloff', '2019-05-05': 'Tariff escalation', '2020-01-03': 'Soleimani strike', '2020-03-11': 'COVID pandemic declared', '2020-11-03': 'Election 2020', '2021-01-06': 'Capitol riot',}
print(f'CVX version: {cvx.TemporalIndex.__module__}')print(f'Analysis window: {DATE_START} to {DATE_END}')CVX version: builtinsAnalysis window: 2015-01-01 to 2021-01-081. Data Acquisition
Section titled “1. Data Acquisition”Three data sources, all cached locally:
- Trump Twitter Archive — ~56K tweets (2009-2021) from Kaggle CSV
- Economic indicators — SPY, VIX, USD, Oil, 10Y yield via
yfinance - Google Trends — “trade war”, “immigration”, “impeachment” via
pytrends
# ── 1a. Trump Twitter Archive ──────────────────────────────────────# Downloaded from Kaggle: headsortails/trump-twitter-archive# Columns: id, text, is_retweet, is_deleted, device, favorites, retweets, datetime, is_flagged, date
TWEET_CSV = f'{DATA_DIR}/trump/trump_tweets.csv'TWEET_CACHE = f'{CACHE_DIR}/trump_tweets_filtered.parquet'
if os.path.exists(TWEET_CACHE): df_tweets = pd.read_parquet(TWEET_CACHE) print(f'Loaded cached tweets: {len(df_tweets):,}')else: if not os.path.exists(TWEET_CSV): raise FileNotFoundError( f'{TWEET_CSV} not found.\n' 'Download: kaggle datasets download -d headsortails/trump-twitter-archive -p data/trump/ --unzip' )
df_raw = pd.read_csv(TWEET_CSV, parse_dates=['datetime']) print(f'Raw tweets: {len(df_raw):,}') print(f'Columns: {list(df_raw.columns)}') print(f'Date range: {df_raw["datetime"].min()} to {df_raw["datetime"].max()}')
# Filter: political era (2015-2021), exclude retweets, need text df_tweets = df_raw[ (df_raw['datetime'] >= '2015-01-01') & (df_raw['datetime'] <= '2021-01-09') & (df_raw['is_retweet'] == False) & (df_raw['text'].notna()) & (df_raw['text'].str.len() > 10) ].copy()
df_tweets = df_tweets.sort_values('datetime').reset_index(drop=True) df_tweets['date'] = df_tweets['datetime'].dt.date df_tweets['unix_ts'] = (df_tweets['datetime'].astype(np.int64) // 10**9).astype(np.int64)
# Cache df_tweets.to_parquet(TWEET_CACHE) print(f'\nFiltered tweets: {len(df_tweets):,} (2015-2021, no retweets)') print(f'Date range: {df_tweets["datetime"].min()} to {df_tweets["datetime"].max()}')Loaded cached tweets: 28,272# ── 1b. Economic Indicators via yfinance ──────────────────────────ECON_CACHE = f'{CACHE_DIR}/econ_indicators.parquet'
TICKERS = { 'SPY': 'SPY', # S&P 500 ETF 'VIX': '^VIX', # Volatility index 'USD': 'DX-Y.NYB', # US Dollar index 'Oil': 'CL=F', # Crude oil futures 'TNX': '^TNX', # 10-year Treasury yield}
if os.path.exists(ECON_CACHE): df_econ = pd.read_parquet(ECON_CACHE) print(f'Loaded cached economic data: {len(df_econ):,} rows')else: frames = {} for name, ticker in TICKERS.items(): print(f'Downloading {name} ({ticker})...') data = yf.download(ticker, start=DATE_START, end=DATE_END, progress=False) if isinstance(data.columns, pd.MultiIndex): data.columns = data.columns.get_level_values(0) frames[name] = data['Close'].rename(name)
df_econ = pd.concat(frames.values(), axis=1) df_econ.index = pd.to_datetime(df_econ.index) df_econ = df_econ.sort_index()
# Forward-fill weekends/holidays, then compute daily returns df_econ = df_econ.ffill() for col in df_econ.columns: df_econ[f'{col}_ret'] = df_econ[col].pct_change()
df_econ.to_parquet(ECON_CACHE) print(f'Cached economic data: {len(df_econ):,} rows')
print(f'\nIndicators: {list(TICKERS.keys())}')print(f'Date range: {df_econ.index.min().date()} to {df_econ.index.max().date()}')Loaded cached economic data: 1,516 rows
Indicators: ['SPY', 'VIX', 'USD', 'Oil', 'TNX']Date range: 2015-01-02 to 2021-01-07# ── 1c. Google Trends ─────────────────────────────────────────────TRENDS_CACHE = f'{CACHE_DIR}/google_trends.parquet'
TREND_TERMS = ['trade war', 'immigration', 'impeachment']
if os.path.exists(TRENDS_CACHE): df_trends = pd.read_parquet(TRENDS_CACHE) print(f'Loaded cached Google Trends: {len(df_trends):,} rows')else: try: from pytrends.request import TrendReq pytrends = TrendReq(hl='en-US', tz=360) pytrends.build_payload(TREND_TERMS, timeframe=f'{DATE_START} {DATE_END}') df_trends = pytrends.interest_over_time() if 'isPartial' in df_trends.columns: df_trends = df_trends.drop(columns=['isPartial']) df_trends.to_parquet(TRENDS_CACHE) print(f'Downloaded Google Trends: {len(df_trends):,} weekly points') except Exception as e: print(f'Google Trends download failed: {e}') print('Creating placeholder — install pytrends or run with VPN if rate-limited') # Create a weekly placeholder with NaN so downstream code handles gracefully date_range = pd.date_range(DATE_START, DATE_END, freq='W') df_trends = pd.DataFrame( np.nan, index=date_range, columns=TREND_TERMS ) df_trends.to_parquet(TRENDS_CACHE)
print(f'Trend terms: {TREND_TERMS}')Loaded cached Google Trends: 314 rowsTrend terms: ['trade war', 'immigration', 'impeachment']2. Tweet Embedding & CVX Index
Section titled “2. Tweet Embedding & CVX Index”Embed each tweet with all-MiniLM-L6-v2 (D=384), aggregate to daily mean vectors,
and build a CVX temporal index with entity_id=1 (Trump as a single entity tracked over time).
# ── 2a. Embed tweets ──────────────────────────────────────────────EMB_CACHE = f'{CACHE_DIR}/trump_tweet_embeddings.npz'MODEL_NAME = 'sentence-transformers/all-MiniLM-L6-v2'
if os.path.exists(EMB_CACHE): cached = np.load(EMB_CACHE, allow_pickle=True) tweet_embeddings = cached['embeddings'] tweet_dates = pd.to_datetime(cached['dates']) print(f'Loaded cached embeddings: {tweet_embeddings.shape}')else: print(f'Encoding {len(df_tweets):,} tweets with {MODEL_NAME}...') model = SentenceTransformer(MODEL_NAME)
texts = df_tweets['text'].tolist() t0 = time.perf_counter() tweet_embeddings = model.encode(texts, batch_size=256, show_progress_bar=True, normalize_embeddings=True) elapsed = time.perf_counter() - t0 print(f'Encoded in {elapsed:.1f}s ({len(texts)/elapsed:.0f} tweets/s)')
tweet_dates = df_tweets['date'].values np.savez(EMB_CACHE, embeddings=tweet_embeddings, dates=tweet_dates) print(f'Cached to {EMB_CACHE}')
D = tweet_embeddings.shape[1]print(f'D={D}, {len(tweet_embeddings):,} tweet embeddings')Loaded cached embeddings: (28272, 384)D=384, 28,272 tweet embeddings# ── 2b. Aggregate to daily mean embedding ────────────────────────df_emb = pd.DataFrame({ 'date': pd.to_datetime(tweet_dates),})df_emb['day'] = df_emb['date'].dt.normalize()
# Add embedding columnsfor i in range(D): df_emb[f'e{i}'] = tweet_embeddings[:, i]
# Tweet count per daytweet_counts = df_emb.groupby('day').size().rename('n_tweets')
# Daily mean embeddingemb_cols = [f'e{i}' for i in range(D)]daily = df_emb.groupby('day')[emb_cols].mean().reset_index()daily = daily.sort_values('day').reset_index(drop=True)
# Merge tweet countsdaily = daily.merge(tweet_counts.reset_index(), on='day', how='left')
# Unix timestamp in SECONDS — handle both ns and us datetime resolutionday_int = daily['day'].dt.tz_localize(None).astype('datetime64[s]').astype(np.int64)daily['ts_unix'] = day_int
print(f'Daily aggregated: {len(daily):,} days, D={D}')print(f'Mean tweets/day: {daily["n_tweets"].mean():.1f}, max: {daily["n_tweets"].max()}')print(f'Timestamp range: {daily["ts_unix"].min()} to {daily["ts_unix"].max()} (unix seconds)')print(f'Sanity check: {pd.Timestamp(daily["ts_unix"].iloc[0], unit="s")} to {pd.Timestamp(daily["ts_unix"].iloc[-1], unit="s")}')Daily aggregated: 2,176 days, D=384Mean tweets/day: 13.0, max: 160Timestamp range: 1420070400 to 1610064000 (unix seconds)Sanity check: 2015-01-01 00:00:00 to 2021-01-08 00:00:00# ── 2c. Build CVX Index ───────────────────────────────────────────INDEX_PATH = f'{CACHE_DIR}/trump_index.cvx'
if os.path.exists(INDEX_PATH): t0 = time.perf_counter() index = cvx.TemporalIndex.load(INDEX_PATH) print(f'Loaded CVX index in {time.perf_counter() - t0:.2f}s ({len(index):,} points)')else: index = cvx.TemporalIndex(m=16, ef_construction=200)
entity_ids = np.ones(len(daily), dtype=np.uint64) # single entity: Trump timestamps = daily['ts_unix'].values.astype(np.int64) vectors = daily[emb_cols].values.astype(np.float32)
t0 = time.perf_counter() n = index.bulk_insert(entity_ids, timestamps, vectors, ef_construction=64) elapsed = time.perf_counter() - t0 print(f'Inserted {n:,} daily vectors in {elapsed:.2f}s')
index.save(INDEX_PATH) print(f'Saved to {INDEX_PATH}')
# Extract full trajectorytraj = index.trajectory(entity_id=1)print(f'Trajectory: {len(traj):,} points, D={len(traj[0][1])}')Inserted 2,176 daily vectors in 0.92sSaved to ../data/cache/trump_index.cvxTrajectory: 2,176 points, D=3843. Rhetorical Anchor Projection
Section titled “3. Rhetorical Anchor Projection”Define 6 semantic anchors by encoding representative phrases with the same
sentence transformer. Then use cvx.project_to_anchors() to transform the D=384
trajectory into a 6-dimensional time series of cosine distances to each anchor.
# ── 3a. Define and encode rhetorical anchors ─────────────────────ANCHOR_CACHE = f'{CACHE_DIR}/trump_rhetorical_anchors.npz'
RHETORICAL_ANCHORS = { 'economy': [ 'Stock market at all time high', 'Jobs numbers are great', 'GDP growth incredible', ], 'trade_war': [ 'China is ripping us off on trade', 'Tariffs on Chinese goods', 'Trade deficit is massive', ], 'immigration': [ 'Build the wall', 'Illegal immigrants are criminals', 'Ban on travel from dangerous countries', ], 'media_attack': [ 'Fake news CNN', 'Enemy of the people', 'Corrupt media lies', ], 'self_praise': [ 'Nobody has done more than me', 'Greatest president ever', 'Tremendous success', ], 'threat': [ 'Fire and fury', 'Total destruction', 'Will be met with force', ],}
ANCHOR_NAMES = list(RHETORICAL_ANCHORS.keys())
if os.path.exists(ANCHOR_CACHE): cached = np.load(ANCHOR_CACHE, allow_pickle=True) anchor_vectors = {name: cached[name] for name in ANCHOR_NAMES} print('Loaded cached anchor vectors')else: print(f'Encoding anchors with {MODEL_NAME}...') st_model = SentenceTransformer(MODEL_NAME)
anchor_vectors = {} for name, phrases in RHETORICAL_ANCHORS.items(): embs = st_model.encode(phrases, normalize_embeddings=True) anchor_vectors[name] = embs.mean(axis=0) print(f' {name}: {embs.shape[0]} phrases -> centroid D={embs.shape[1]}')
np.savez(ANCHOR_CACHE, **anchor_vectors) print(f'Cached to {ANCHOR_CACHE}')
# Prepare anchor list for CVXanchor_list = [anchor_vectors[name].tolist() for name in ANCHOR_NAMES]print(f'\n{len(ANCHOR_NAMES)} anchors: {ANCHOR_NAMES}')Loaded cached anchor vectors
6 anchors: ['economy', 'trade_war', 'immigration', 'media_attack', 'self_praise', 'threat']# ── 3b. Project trajectory to anchor coordinates ─────────────────t0 = time.perf_counter()projected = cvx.project_to_anchors(traj, anchor_list, metric='cosine')elapsed = time.perf_counter() - t0print(f'Projected {len(traj):,} points to {len(ANCHOR_NAMES)} anchors in {elapsed:.3f}s')
# Get anchor summary statisticssummary = cvx.anchor_summary(projected)print(f'\nAnchor Summary (cosine distance, lower = closer):')print(f'{"Anchor":15s} {"Mean":>8s} {"Min":>8s} {"Trend":>10s} {"Last":>8s}')print('-' * 55)for j, name in enumerate(ANCHOR_NAMES): print(f'{name:15s} {summary["mean"][j]:8.4f} {summary["min"][j]:8.4f} ' f'{summary["trend"][j]:+10.6f} {summary["last"][j]:8.4f}')Projected 2,176 points to 6 anchors in 0.017s
Anchor Summary (cosine distance, lower = closer):Anchor Mean Min Trend Last-------------------------------------------------------economy 0.7627 0.4009 -0.000026 0.8811trade_war 0.8255 0.2648 -0.000058 0.8989immigration 0.7939 0.4885 -0.000038 0.8061media_attack 0.6728 0.4023 -0.000025 0.8021self_praise 0.7022 0.4744 +0.000003 0.7204threat 0.8117 0.6134 -0.000025 0.7507# ── 3c. Plotly: 6-panel anchor distance time series ──────────────# Build dates from trajectory timestampstraj_dates = [pd.Timestamp(ts, unit='s') for ts, _ in projected]anchor_dists = np.array([dists for _, dists in projected])
# Political periods for coloringdef get_period(date): if date < pd.Timestamp('2016-11-08'): return 'Campaign' elif date < pd.Timestamp('2017-01-20'): return 'Transition' elif date < pd.Timestamp('2018-03-22'): return 'Year 1' elif date < pd.Timestamp('2020-03-11'): return 'Trade War' elif date < pd.Timestamp('2020-11-03'): return 'COVID' else: return 'Post-Election'
period_colors = { 'Campaign': '#9b59b6', 'Transition': C_EVENT, 'Year 1': C_ECON, 'Trade War': C_CRISIS, 'COVID': '#e67e22', 'Post-Election': C_TWEET,}periods = [get_period(d) for d in traj_dates]
fig = make_subplots( rows=3, cols=2, subplot_titles=[n.replace('_', ' ').title() for n in ANCHOR_NAMES], vertical_spacing=0.08, horizontal_spacing=0.06,)
for j, name in enumerate(ANCHOR_NAMES): row, col = j // 2 + 1, j % 2 + 1 # 7-day rolling mean for readability dist_series = pd.Series(anchor_dists[:, j], index=traj_dates) smoothed = dist_series.rolling(7, min_periods=1).mean()
# Plot raw as scatter, smoothed as line for period_name, color in period_colors.items(): mask = [p == period_name for p in periods] dates_masked = [d for d, m in zip(traj_dates, mask) if m] vals_masked = [v for v, m in zip(anchor_dists[:, j], mask) if m] fig.add_trace(go.Scatter( x=dates_masked, y=vals_masked, mode='markers', marker=dict(size=2, color=color, opacity=0.3), name=period_name, showlegend=(j == 0), legendgroup=period_name, ), row=row, col=col)
fig.add_trace(go.Scatter( x=traj_dates, y=smoothed.values, mode='lines', line=dict(color='white', width=1.5), name='7d avg', showlegend=(j == 0), ), row=row, col=col)
fig.update_yaxes(title_text='Cosine Dist', row=row, col=col)
fig.update_layout( title='Rhetorical Anchor Distances Over Time (lower = closer to topic)', height=900, width=1100, template=TEMPLATE, legend=dict(orientation='h', yanchor='bottom', y=1.02),)fig.show()4. Change Point Detection — Rhetorical Regime Shifts
Section titled “4. Change Point Detection — Rhetorical Regime Shifts”Use cvx.detect_changepoints() on both the raw embedding trajectory and the
anchor-projected trajectory. Overlay detected changepoints with known political events
to see if CVX detects regime shifts automatically.
# ── 4a. Changepoints on anchor-projected trajectory ──────────────# Cosine distances in [0.7, 0.9] have very small variance.# Need very low penalty to detect subtle rhetorical shifts.n_points = len(projected)
# Try multiple penalties and pick the one giving 5-15 changepointsbest_cps = []for penalty_test in [0.5, 1.0, 2.0, 3.0, 5.0, np.log(n_points)]: cps_test = cvx.detect_changepoints( entity_id=1, trajectory=projected, penalty=penalty_test, min_segment_len=14, ) print(f' penalty={penalty_test:.1f}: {len(cps_test)} changepoints') if 5 <= len(cps_test) <= 20 and not best_cps: best_cps = cps_test best_penalty = penalty_test
# Fallback: use the most granular that gives >0if not best_cps: for penalty_test in [0.1, 0.2, 0.5]: cps_test = cvx.detect_changepoints( entity_id=1, trajectory=projected, penalty=penalty_test, min_segment_len=14, ) if len(cps_test) > 0: best_cps = cps_test best_penalty = penalty_test break
cps_anchor = best_cpsprint(f'\nSelected: penalty={best_penalty:.1f}, {len(cps_anchor)} changepoints')
# Convert timestamps to datesdef ts_to_date(ts): return pd.Timestamp(ts, unit='s')
cp_anchor_dates = [(ts_to_date(ts), sev) for ts, sev in cps_anchor]
# Known political eventsKNOWN_EVENTS = { '2016-06-16': 'Campaign Launch', '2016-11-08': 'Election Day', '2017-01-20': 'Inauguration', '2018-03-22': 'Trade War Begins', '2018-07-06': 'China Tariffs', '2019-05-10': 'Tariff Escalation', '2019-12-18': 'Impeachment Vote', '2020-03-11': 'COVID Emergency', '2020-06-01': 'George Floyd', '2020-11-03': 'Election 2020', '2021-01-06': 'Capitol Riot',}
print(f'\nDetected changepoints (by severity):')for date, sev in sorted(cp_anchor_dates, key=lambda x: -x[1])[:15]: nearest = min(KNOWN_EVENTS.items(), key=lambda e: abs((pd.Timestamp(e[0]) - date).days)) days_diff = (date - pd.Timestamp(nearest[0])).days event_str = f' (~{nearest[1]}, {days_diff:+d}d)' if abs(days_diff) < 45 else '' print(f' {date.date()}: severity={sev:.4f}{event_str}') penalty=0.5: 7 changepoints penalty=1.0: 3 changepoints penalty=2.0: 2 changepoints penalty=3.0: 1 changepoints penalty=5.0: 1 changepoints penalty=7.7: 0 changepoints
Selected: penalty=0.5, 7 changepoints
Detected changepoints (by severity): 2017-10-10: severity=0.1729 2017-01-02: severity=0.1560 (~Inauguration, -18d) 2018-11-21: severity=0.1549 2016-11-06: severity=0.1438 (~Election Day, -2d) 2015-06-19: severity=0.1143 2019-09-08: severity=0.0884 2018-09-21: severity=0.0670# ── 4b. Plotly: timeline with changepoints + known events ────────fig = go.Figure()
# Anchor changepoints with severity barsif cp_anchor_dates: for date, sev in cp_anchor_dates: fig.add_trace(go.Scatter( x=[date, date], y=[0, sev], mode='lines', line=dict(color=C_CRISIS, width=3), showlegend=False, )) fig.add_trace(go.Scatter( x=[d for d, _ in cp_anchor_dates], y=[s for _, s in cp_anchor_dates], mode='markers', marker=dict(size=8, color=C_CRISIS, symbol='diamond'), name=f'Changepoints ({len(cp_anchor_dates)})', hovertemplate='%{x}<br>Severity: %{y:.4f}<extra></extra>', ))
# Known events as vertical dashed linesfor event_date, event_name in KNOWN_EVENTS.items(): event_dt = pd.Timestamp(event_date) fig.add_vline( x=event_dt, line=dict(color=C_EVENT, width=1, dash='dot'), ) fig.add_annotation( x=event_dt, y=1.05, yref='paper', text=event_name, showarrow=False, font=dict(size=8, color=C_EVENT), textangle=-45, )
fig.update_layout( title=f'Rhetorical Regime Changepoints ({len(cp_anchor_dates)} detected) vs Known Political Events', xaxis_title='Date', yaxis_title='Changepoint Severity', height=450, width=1100, template='plotly_dark',)fig.show()5. Economic Indicator Alignment
Section titled “5. Economic Indicator Alignment”Align daily tweet trajectory with economic indicators. Compute:
- Rolling correlation between anchor distances and each indicator
- Velocity spikes in embedding space vs VIX spikes
- Multi-panel aligned view: tweet velocity, VIX, and S&P 500
# ── 5a. Compute velocity in ANCHOR SPACE (6D, not 384D) ──────────# Velocity in 384D raw embedding space is ~10⁻⁶ (too small to be useful).# Velocity in 6D anchor-projected space captures rhetorical pivot speed.velocities = []vel_dates = []
for i in range(1, len(projected) - 1): ts = projected[i][0] try: vel = cvx.velocity(projected, timestamp=ts) vel_mag = float(np.linalg.norm(vel)) velocities.append(vel_mag) vel_dates.append(ts_to_date(ts)) except: continue
df_vel = pd.DataFrame({'date': vel_dates, 'velocity': velocities})df_vel = df_vel.set_index('date').sort_index()df_vel['vel_7d'] = df_vel['velocity'].rolling(7, center=True).mean()
print(f'Computed anchor-space velocity for {len(velocities)} days')print(f'Mean velocity: {np.mean(velocities):.6f}')print(f'Max velocity: {np.max(velocities):.6f} on {vel_dates[np.argmax(velocities)].date()}')
# Top 10 velocity spikes (rhetorical pivots)top_vel = df_vel.nlargest(10, 'velocity')print(f'\nTop 10 rhetorical pivots (highest velocity in anchor space):')for date, row in top_vel.iterrows(): print(f' {date.date()}: velocity={row["velocity"]:.6f}')Computed anchor-space velocity for 2174 daysMean velocity: 0.000001Max velocity: 0.000004 on 2018-12-03
Top 10 rhetorical pivots (highest velocity in anchor space): 2018-12-03: velocity=0.000004 2019-02-01: velocity=0.000004 2016-12-03: velocity=0.000004 2019-05-15: velocity=0.000004 2018-12-05: velocity=0.000003 2016-12-05: velocity=0.000003 2017-05-05: velocity=0.000003 2018-05-20: velocity=0.000003 2018-03-03: velocity=0.000003 2018-03-25: velocity=0.000003# ── 5b. Align tweet data with economic indicators ────────────────# Build a daily DataFrame with anchor distances + velocitydf_daily = pd.DataFrame({ 'date': traj_dates,})for j, name in enumerate(ANCHOR_NAMES): df_daily[f'anchor_{name}'] = anchor_dists[:, j]
df_daily = df_daily.set_index('date').sort_index()
# Merge velocitydf_daily = df_daily.join(df_vel[['velocity', 'vel_7d']], how='left')
# Merge economic indicators (align by date)df_econ_daily = df_econ.copy()df_econ_daily.index = pd.to_datetime(df_econ_daily.index).tz_localize(None)df_daily.index = pd.to_datetime(df_daily.index).tz_localize(None)df_aligned = df_daily.join(df_econ_daily, how='left').ffill()
print(f'Aligned dataset: {len(df_aligned):,} days')print(f'Columns: {list(df_aligned.columns)}')Aligned dataset: 2,176 daysColumns: ['anchor_economy', 'anchor_trade_war', 'anchor_immigration', 'anchor_media_attack', 'anchor_self_praise', 'anchor_threat', 'velocity', 'vel_7d', 'SPY', 'VIX', 'USD', 'Oil', 'TNX', 'SPY_ret', 'VIX_ret', 'USD_ret', 'Oil_ret', 'TNX_ret']# ── 5c. Quarterly-smoothed correlations: rhetoric vs markets ─────# Rolling 90-day (quarterly) correlation, smoothed for readabilityWINDOW = 90 # ~1 quarterindicators = {'VIX': C_CRISIS, 'SPY': C_ECON} # Focus on the 2 most relevant
# Select the 3 most interesting anchors (economy, trade_war, threat)focus_anchors = ['economy', 'trade_war', 'threat']
fig = make_subplots( rows=len(focus_anchors), cols=1, subplot_titles=[f'Distance to "{a.replace("_", " ").title()}" anchor vs markets' for a in focus_anchors], shared_xaxes=True, vertical_spacing=0.08,)
for j, anchor_name in enumerate(focus_anchors): anchor_col = f'anchor_{anchor_name}' if anchor_col not in df_aligned.columns: continue for ind, color in indicators.items(): if ind not in df_aligned.columns: continue # Quarterly rolling Pearson correlation rolling_corr = df_aligned[anchor_col].rolling(WINDOW, center=True).corr(df_aligned[ind]) # Additional smoothing for readability rolling_corr_smooth = rolling_corr.rolling(30, center=True).mean()
fig.add_trace(go.Scatter( x=df_aligned.index, y=rolling_corr_smooth, mode='lines', line=dict(color=color, width=2.5), name=ind, showlegend=(j == 0), legendgroup=ind, ), row=j + 1, col=1)
fig.add_hline(y=0, line=dict(color='gray', width=0.5, dash='dash'), row=j + 1, col=1) fig.update_yaxes(range=[-0.8, 0.8], title_text='Correlation', row=j + 1, col=1)
# Add known eventsfor event_date, event_name in KNOWN_EVENTS.items(): event_dt = pd.Timestamp(event_date) if event_name in ['Trade War Begins', 'COVID Emergency', 'Election 2020']: for row in range(1, len(focus_anchors) + 1): fig.add_vline(x=event_dt, row=row, col=1, line=dict(color=C_EVENT, width=1, dash='dot'))
fig.update_layout( title=f'Quarterly Correlation: Rhetoric ↔ Markets (90-day rolling, smoothed)', height=700, width=1100, template=TEMPLATE, legend=dict(orientation='h', yanchor='bottom', y=1.02),)fig.show()# ── 5d. Multi-panel aligned view: tweet velocity, VIX, S&P 500 ──fig = make_subplots( rows=3, cols=1, subplot_titles=['Tweet Embedding Velocity (CVX)', 'VIX (Fear Index)', 'S&P 500 (SPY)'], shared_xaxes=True, vertical_spacing=0.06,)
# Panel 1: Tweet velocityfig.add_trace(go.Scatter( x=df_aligned.index, y=df_aligned['velocity'], mode='lines', line=dict(color=C_TWEET, width=0.5), opacity=0.4, name='Velocity (raw)', showlegend=False,), row=1, col=1)fig.add_trace(go.Scatter( x=df_aligned.index, y=df_aligned['vel_7d'], mode='lines', line=dict(color=C_TWEET, width=2), name='Velocity (7d avg)',), row=1, col=1)
# Panel 2: VIXif 'VIX' in df_aligned.columns: fig.add_trace(go.Scatter( x=df_aligned.index, y=df_aligned['VIX'], mode='lines', line=dict(color=C_CRISIS, width=2), name='VIX', ), row=2, col=1)
# Panel 3: SPYif 'SPY' in df_aligned.columns: fig.add_trace(go.Scatter( x=df_aligned.index, y=df_aligned['SPY'], mode='lines', line=dict(color=C_ECON, width=2), name='SPY', ), row=3, col=1)
# Add key eventsfor event_date, event_name in KEY_EVENTS.items(): event_dt = pd.Timestamp(event_date) for row in [1, 2, 3]: fig.add_vline( x=event_dt, row=row, col=1, line=dict(color=C_EVENT, width=1, dash='dot'), )
fig.update_yaxes(title_text='Velocity', row=1, col=1)fig.update_yaxes(title_text='VIX', row=2, col=1)fig.update_yaxes(title_text='SPY ($)', row=3, col=1)fig.update_layout( title='Aligned View: Tweet Embedding Velocity vs Market Indicators', height=700, width=1100, template=TEMPLATE,)fig.show()6. Topic Drift & Market Response
Section titled “6. Topic Drift & Market Response”Measure the character of Trump’s rhetoric using CVX temporal analytics:
- Hurst exponent on anchor-projected trajectory: persistent (H > 0.5) or erratic (H < 0.5)?
- Path signatures per political period: fingerprint the rhetorical dynamics
- Signature distance between periods: quantify how rhetoric changed across eras
- Rolling Wasserstein drift: topic distribution shift over time
# ── 6a. Hurst exponent + topological features per period ─────────# Hurst measures persistence: H>0.5 = trending rhetoric, H<0.5 = erratic
# Global Hurst on anchor-projected trajectoryhurst_global = cvx.hurst_exponent(projected)print(f'Global Hurst exponent (anchor space): {hurst_global:.4f}')print(f' -> {"Persistent: rhetoric tends to sustain direction" if hurst_global > 0.5 else "Erratic: rhetoric oscillates"}')
# Political periods (timestamps in UNIX SECONDS)PERIODS = { 'Campaign': ('2015-06-16', '2016-11-08'), 'Year 1': ('2017-01-20', '2018-03-22'), 'Trade War': ('2018-03-22', '2020-03-11'), 'COVID': ('2020-03-11', '2020-11-03'), 'Post-Election': ('2020-11-03', '2021-01-08'),}
def period_to_unix(start_str, end_str): """Convert period strings to unix second range.""" return int(pd.Timestamp(start_str).timestamp()), int(pd.Timestamp(end_str).timestamp())
def extract_period(trajectory, start_str, end_str): """Extract sub-trajectory for a date range.""" s, e = period_to_unix(start_str, end_str) return [(ts, dists) for ts, dists in trajectory if s <= ts <= e]
# Hurst + topology per periodprint(f'\n{"Period":20s} {"Days":>5s} {"Hurst":>7s} {"Persistence":>15s} {"Topo β₀":>8s}')print('-' * 60)
hurst_by_period = {}topo_by_period = {}
for period_name, (start, end) in PERIODS.items(): period_proj = extract_period(projected, start, end) n_days = len(period_proj)
if n_days >= 20: try: h = cvx.hurst_exponent(period_proj) hurst_by_period[period_name] = h except: h = float('nan')
# Topological features: how fragmented is the rhetoric in this period? period_vecs = [dists for _, dists in period_proj] try: topo = cvx.topological_features(period_vecs, n_radii=15, persistence_threshold=0.05) topo_by_period[period_name] = topo n_comp = topo['n_components'] except: n_comp = '?'
persistence = 'trending' if h > 0.6 else ('moderate' if h > 0.45 else 'erratic') print(f'{period_name:20s} {n_days:5d} {h:7.3f} {persistence:>15s} {str(n_comp):>8s}') else: print(f'{period_name:20s} {n_days:5d} (insufficient data)')
# Event features: tweet timing patterns per periodprint(f'\n{"Period":20s} {"Burstiness":>11s} {"Memory":>8s} {"Circadian":>10s} {"Entropy":>9s}')print('-' * 63)
event_by_period = {}for period_name, (start, end) in PERIODS.items(): s, e = period_to_unix(start, end) period_traj = [(ts, v) for ts, v in traj if s <= ts <= e] if len(period_traj) >= 10: ts_list = [ts for ts, _ in period_traj] try: ef = cvx.event_features(ts_list) event_by_period[period_name] = ef print(f'{period_name:20s} {ef["burstiness"]:11.3f} {ef["memory"]:8.3f} {ef["circadian_strength"]:10.3f} {ef["temporal_entropy"]:9.3f}') except: print(f'{period_name:20s} (failed)') else: print(f'{period_name:20s} (insufficient data)')Global Hurst exponent (anchor space): 0.7945 -> Persistent: rhetoric tends to sustain direction
Period Days Hurst Persistence Topo β₀------------------------------------------------------------Campaign 510 0.711 trending 209Year 1 419 0.657 trending 402Trade War 717 0.679 trending 600COVID 237 0.616 trending 160Post-Election 66 0.808 trending 57
Period Burstiness Memory Circadian Entropy---------------------------------------------------------------Campaign -0.883 -0.004 1.000 0.026Year 1 -0.763 -0.020 1.000 0.095Trade War -0.862 -0.006 1.000 0.035COVID -0.878 -0.004 1.000 0.027Post-Election -0.784 -0.002 1.000 0.079# ── 6b. Path signatures per political period ─────────────────────# Depth-2 signatures on 6D anchor-projected trajectory# → 7 + 49 = 56 features (with time augmentation)# Captures the SHAPE of rhetorical evolution, not just endpoints
period_signatures = {}for period_name, (start, end) in PERIODS.items(): period_proj = extract_period(projected, start, end)
if len(period_proj) >= 10: try: sig = cvx.path_signature(period_proj, depth=2, time_augmentation=True) period_signatures[period_name] = sig print(f'{period_name:20s}: {len(period_proj)} days, sig dim={len(sig)}, ||sig||={np.linalg.norm(sig):.4f}') except Exception as e: print(f'{period_name:20s}: failed ({e})') else: print(f'{period_name:20s}: insufficient data ({len(period_proj)} days)')
# Signature distance matrix + Frechet distanceperiod_names_sig = list(period_signatures.keys())n_p = len(period_names_sig)
if n_p >= 2: sig_dist_matrix = np.zeros((n_p, n_p)) frechet_matrix = np.zeros((n_p, n_p))
for i in range(n_p): for j in range(n_p): sig_dist_matrix[i, j] = cvx.signature_distance( period_signatures[period_names_sig[i]], period_signatures[period_names_sig[j]], ) # Frechet distance on anchor-projected trajectories p_i = extract_period(projected, *PERIODS[period_names_sig[i]]) p_j = extract_period(projected, *PERIODS[period_names_sig[j]]) frechet_matrix[i, j] = cvx.frechet_distance(p_i[:200], p_j[:200])
print(f'\nSignature Distance Matrix (lower = more similar rhetorical dynamics):') df_sig = pd.DataFrame(sig_dist_matrix, index=period_names_sig, columns=period_names_sig) print(df_sig.round(3).to_string())
print(f'\nFréchet Distance Matrix (path shape similarity):') df_frech = pd.DataFrame(frechet_matrix, index=period_names_sig, columns=period_names_sig) print(df_frech.round(4).to_string())else: print('Need at least 2 periods for comparison')Campaign : 510 days, sig dim=56, ||sig||=5.3188Year 1 : 419 days, sig dim=56, ||sig||=8.7198Trade War : 717 days, sig dim=56, ||sig||=13.0814COVID : 237 days, sig dim=56, ||sig||=2.7485Post-Election : 66 days, sig dim=56, ||sig||=1.3018
Signature Distance Matrix (lower = more similar rhetorical dynamics): Campaign Year 1 Trade War COVID Post-ElectionCampaign 0.000 4.090 8.417 2.862 4.717Year 1 4.090 0.000 5.035 6.286 8.175Trade War 8.417 5.035 0.000 10.709 12.578COVID 2.862 6.286 10.709 0.000 2.076Post-Election 4.717 8.175 12.578 2.076 0.000
Fréchet Distance Matrix (path shape similarity): Campaign Year 1 Trade War COVID Post-ElectionCampaign 0.0000 0.4024 0.3991 0.2574 0.2611Year 1 0.4024 0.0000 0.3812 0.3685 0.4061Trade War 0.3991 0.3812 0.0000 0.3563 0.3741COVID 0.2574 0.3685 0.3563 0.0000 0.2551Post-Election 0.2611 0.4061 0.3741 0.2551 0.0000# ── 6c. Plotly: signature distance heatmap + Hurst bar chart ─────fig = make_subplots( rows=1, cols=2, subplot_titles=['Signature Distance Between Periods', 'Hurst Exponent by Period'], column_widths=[0.6, 0.4],)
# Heatmapfig.add_trace(go.Heatmap( z=sig_dist_matrix, x=period_names_sig, y=period_names_sig, colorscale='Viridis', text=np.round(sig_dist_matrix, 3), texttemplate='%{text}', showscale=True, colorbar=dict(title='Distance', x=0.45),), row=1, col=1)
# Hurst bar charthurst_periods = list(hurst_by_period.keys())hurst_values = list(hurst_by_period.values())colors = [C_ECON if h > 0.5 else C_CRISIS for h in hurst_values]
fig.add_trace(go.Bar( x=hurst_periods, y=hurst_values, marker_color=colors, name='Hurst', text=[f'{h:.3f}' for h in hurst_values], textposition='outside', showlegend=False,), row=1, col=2)
fig.add_hline(y=0.5, line=dict(color='white', dash='dash', width=1), row=1, col=2)fig.add_annotation( x=0.5, y=0.5, xref='x2', yref='y2', text='H=0.5 (random walk)', showarrow=False, font=dict(color='white', size=10),)
fig.update_layout( title='Rhetorical Dynamics by Political Period', height=450, width=1100, template=TEMPLATE,)fig.update_yaxes(title_text='Hurst Exponent', row=1, col=2)fig.show()# ── 6d. Rolling Wasserstein drift on anchor-projected trajectory ──# Treat the 6 anchor distances as a "distribution" over topics (after softmax)# and compute rolling Wasserstein drift between consecutive windows
def softmax(x): """Convert distances to distribution (invert so closer = higher weight).""" inv = 1.0 - np.array(x) # closer anchor -> higher value inv = np.clip(inv, 0, None) e = np.exp(inv - np.max(inv)) return (e / e.sum()).tolist()
# We need anchor centroids for Wasserstein (using the anchor_list as positions in R^D)ROLLING_W = 14 # 14-day windows
wasserstein_dates = []wasserstein_drifts = []
for i in range(ROLLING_W, len(projected) - ROLLING_W): window_a = projected[i - ROLLING_W:i] window_b = projected[i:i + ROLLING_W]
# Average anchor distances in each window -> softmax -> distribution dists_a = np.mean([d for _, d in window_a], axis=0) dists_b = np.mean([d for _, d in window_b], axis=0)
dist_a_soft = softmax(dists_a) dist_b_soft = softmax(dists_b)
# Sliced Wasserstein drift using anchor vectors as centroids w_drift = cvx.wasserstein_drift( [float(x) for x in dist_a_soft], [float(x) for x in dist_b_soft], anchor_list, n_projections=50, ) wasserstein_dates.append(ts_to_date(projected[i][0])) wasserstein_drifts.append(w_drift)
df_wass = pd.DataFrame({'date': wasserstein_dates, 'wasserstein': wasserstein_drifts})df_wass = df_wass.set_index('date')df_wass['wass_7d'] = df_wass['wasserstein'].rolling(7, min_periods=1).mean()
fig = go.Figure()fig.add_trace(go.Scatter( x=df_wass.index, y=df_wass['wasserstein'], mode='lines', line=dict(color=C_TWEET, width=0.5), opacity=0.4, name='Wasserstein drift (raw)',))fig.add_trace(go.Scatter( x=df_wass.index, y=df_wass['wass_7d'], mode='lines', line=dict(color=C_TWEET, width=2), name='Wasserstein drift (7d avg)',))
for event_date, event_name in KEY_EVENTS.items(): fig.add_vline(x=pd.Timestamp(event_date), line=dict(color=C_EVENT, width=1, dash='dot'))
fig.update_layout( title='Rolling Wasserstein Topic Drift (14-day windows)', yaxis_title='Sliced Wasserstein Distance', height=400, width=1100, template=TEMPLATE,)fig.show()7. Event Study — Tweet Storms & Market Reaction
Section titled “7. Event Study — Tweet Storms & Market Reaction”Identify “tweet storms” as days where:
- Tweet count exceeds 20 tweets, OR
- Embedding velocity is in the top 5% of all days
For each storm, measure S&P 500 and VIX response in a [-1, +3] day window.
# ── 7a. Identify tweet storms ─────────────────────────────────────# Merge tweet counts into the aligned DataFramedaily_counts = daily[['day', 'n_tweets']].copy()daily_counts['day'] = pd.to_datetime(daily_counts['day']).dt.tz_localize(None)daily_counts = daily_counts.drop_duplicates('day').set_index('day')
# Remove any existing n_tweets column before joiningif 'n_tweets' in df_aligned.columns: df_aligned = df_aligned.drop(columns=['n_tweets'])
# Deduplicate df_aligned index before joindf_aligned = df_aligned[~df_aligned.index.duplicated(keep='first')]df_aligned = df_aligned.join(daily_counts[['n_tweets']], how='left')df_aligned['n_tweets'] = df_aligned['n_tweets'].fillna(0)
# Storm criteriavel_col = 'velocity' if 'velocity' in df_aligned.columns else Noneif vel_col: vel_threshold = df_aligned[vel_col].quantile(0.95) count_threshold = 20
df_aligned['is_storm'] = ( (df_aligned['n_tweets'] >= count_threshold) | (df_aligned[vel_col] >= vel_threshold) )
storm_days = df_aligned[df_aligned['is_storm']].index print(f'Tweet storms identified: {len(storm_days)} days') print(f' By count (>={count_threshold} tweets): {(df_aligned["n_tweets"] >= count_threshold).sum()}') print(f' By velocity (top 5%): {(df_aligned[vel_col] >= vel_threshold).sum()}')else: count_threshold = 20 df_aligned['is_storm'] = df_aligned['n_tweets'] >= count_threshold storm_days = df_aligned[df_aligned['is_storm']].index print(f'Tweet storms (by count): {len(storm_days)} days')Tweet storms identified: 494 days By count (>=20 tweets): 392 By velocity (top 5%): 109# ── 7b. Event study: market reaction window [-1, +3] ─────────────# Check which economic columns are availableecon_cols = [c for c in df_aligned.columns if c in ['SPY', 'VIX', 'spy', 'vix', '^GSPC', '^VIX']]spy_col = next((c for c in df_aligned.columns if 'spy' in c.lower() or 'gspc' in c.lower()), None)vix_col = next((c for c in df_aligned.columns if 'vix' in c.lower()), None)
print(f'Available columns: {list(df_aligned.columns)}')print(f'SPY column: {spy_col}, VIX column: {vix_col}')
event_results = []
if spy_col and vix_col: # Ensure no duplicate index df_event = df_aligned[~df_aligned.index.duplicated(keep='first')].copy()
for storm_date in storm_days: try: # Look at +1 day only (simpler, more robust) next_day = storm_date + pd.Timedelta(days=1) # Find nearest trading day future_mask = df_event.index > storm_date if not future_mask.any(): continue next_trading = df_event.index[future_mask][0]
spy_today = df_event.loc[storm_date, spy_col] spy_next = df_event.loc[next_trading, spy_col] vix_today = df_event.loc[storm_date, vix_col] vix_next = df_event.loc[next_trading, vix_col]
vel = df_event.loc[storm_date, 'velocity'] if 'velocity' in df_event.columns else 0 n_tw = df_event.loc[storm_date, 'n_tweets'] if 'n_tweets' in df_event.columns else 0
event_results.append({ 'date': storm_date, 'n_tweets': n_tw, 'velocity': vel, 'spy_return': (spy_next - spy_today) / (spy_today + 1e-8), 'vix_change': vix_next - vix_today, }) except (KeyError, IndexError): continue
df_events = pd.DataFrame(event_results) print(f'\nEvent study: {len(df_events)} storm days with market data') if len(df_events) > 0: print(f'Mean next-day SPY return after storms: {df_events["spy_return"].mean():.4f}') print(f'Mean next-day VIX change after storms: {df_events["vix_change"].mean():.4f}')else: print('Economic data columns not found — skipping event study') df_events = pd.DataFrame()Available columns: ['anchor_economy', 'anchor_trade_war', 'anchor_immigration', 'anchor_media_attack', 'anchor_self_praise', 'anchor_threat', 'velocity', 'vel_7d', 'SPY', 'VIX', 'USD', 'Oil', 'TNX', 'SPY_ret', 'VIX_ret', 'USD_ret', 'Oil_ret', 'TNX_ret', 'n_tweets', 'is_storm']SPY column: SPY, VIX column: VIX
Event study: 494 storm days with market dataMean next-day SPY return after storms: 0.0004Mean next-day VIX change after storms: 0.0011# ── 7c. Scatter: tweet velocity vs market reaction ───────────────from scipy.stats import spearmanr
if len(df_events) > 5: fig = make_subplots( rows=1, cols=2, subplot_titles=[ 'Velocity vs Next-Day VIX Change', 'Velocity vs Next-Day SPY Return', ], )
valid = df_events.dropna(subset=['vix_change', 'velocity']) fig.add_trace(go.Scatter( x=valid['velocity'], y=valid['vix_change'], mode='markers', marker=dict( size=6, color=valid['n_tweets'], colorscale='Viridis', showscale=True, colorbar=dict(title='# Tweets', x=0.45), ), name='Storm days', hovertemplate='Velocity: %{x:.5f}<br>VIX change: %{y:.2f}<extra></extra>', ), row=1, col=1)
if len(valid) > 5: r, p = spearmanr(valid['velocity'], valid['vix_change']) fig.add_annotation( x=0.2, y=0.95, xref='x domain', yref='y domain', text=f'Spearman r={r:.3f} (p={p:.3f})', showarrow=False, font=dict(color='white', size=11), row=1, col=1, )
valid2 = df_events.dropna(subset=['spy_return', 'velocity']) fig.add_trace(go.Scatter( x=valid2['velocity'], y=valid2['spy_return'] * 100, mode='markers', marker=dict(size=6, color=C_ECON), name='SPY return (%)', hovertemplate='Velocity: %{x:.5f}<br>SPY return: %{y:.3f}%<extra></extra>', ), row=1, col=2)
if len(valid2) > 5: r2, p2 = spearmanr(valid2['velocity'], valid2['spy_return']) fig.add_annotation( x=0.2, y=0.95, xref='x domain', yref='y domain', text=f'Spearman r={r2:.3f} (p={p2:.3f})', showarrow=False, font=dict(color='white', size=11), row=1, col=2, )
fig.update_layout( title='Tweet Storm Velocity vs Next-Day Market Reaction', height=450, width=1000, template='plotly_dark', ) fig.update_xaxes(title_text='Tweet Velocity', row=1, col=1) fig.update_xaxes(title_text='Tweet Velocity', row=1, col=2) fig.update_yaxes(title_text='VIX Change', row=1, col=1) fig.update_yaxes(title_text='SPY Return (%)', row=1, col=2) fig.show()else: print('Not enough event data for scatter plots')8. Classification — Can Rhetoric Predict Market Direction?
Section titled “8. Classification — Can Rhetoric Predict Market Direction?”Build a simple classifier using CVX-derived features to predict next-day S&P 500 direction.
Features: anchor distances (6), anchor trends (6), Hurst, velocity magnitude, signature components (depth 1 on rolling 7-day windows), tweet count.
Temporal split: train on 2015-2019, test on 2020-2021. No future leakage.
# ── 8a. Feature engineering ───────────────────────────────────────# Build daily feature matrix from CVX analytics
# Compute label: next-day SPY directionspy_col_clf = next((c for c in df_aligned.columns if 'spy' in c.lower() or 'gspc' in c.lower()), None)
if spy_col_clf: df_aligned['spy_next_ret'] = df_aligned[spy_col_clf].pct_change().shift(-1) df_aligned['label'] = (df_aligned['spy_next_ret'] > 0).astype(float)else: print('No SPY column found — skipping classification')
# Select feature columns (anchor distances + velocity + tweet count)anchor_cols = [c for c in df_aligned.columns if c.startswith('anchor_')]vel_cols = [c for c in df_aligned.columns if c in ['velocity', 'vel_7d']]other_cols = [c for c in df_aligned.columns if c in ['n_tweets']]feature_cols = [c for c in anchor_cols + vel_cols + other_cols if c in df_aligned.columns]
print(f'Available feature columns: {feature_cols}')print(f'df_aligned shape: {df_aligned.shape}, date range: {df_aligned.index.min()} to {df_aligned.index.max()}')
# Forward-fill NaN in features, then drop only rows without labeldf_clf_data = df_aligned[feature_cols + ['label']].ffill().dropna(subset=['label'])# Fill remaining NaN features with 0df_clf_data[feature_cols] = df_clf_data[feature_cols].fillna(0)df_features = df_clf_data
print(f'Feature matrix: {df_features.shape}')print(f'Label distribution: up={int(df_features["label"].sum())}, down={int((1-df_features["label"]).sum())}')Available feature columns: ['anchor_economy', 'anchor_trade_war', 'anchor_immigration', 'anchor_media_attack', 'anchor_self_praise', 'anchor_threat', 'velocity', 'vel_7d', 'n_tweets']df_aligned shape: (2176, 22), date range: 2015-01-01 00:00:00 to 2021-01-08 00:00:00Feature matrix: (2176, 10)Label distribution: up=824, down=1352# ── 8b. Temporal split + classification ──────────────────────────from sklearn.metrics import classification_report
# Debug: check date rangeprint(f'df_features index type: {type(df_features.index)}')print(f'df_features date range: {df_features.index.min()} to {df_features.index.max()}')print(f'df_features shape: {df_features.shape}')
# Use 70/30 temporal split based on actual data rangen_total = len(df_features)n_train = int(n_total * 0.7)
X_all = np.nan_to_num(df_features[feature_cols].values, nan=0.0, posinf=0.0, neginf=0.0)y_all = df_features['label'].values.astype(int)
X_train, y_train = X_all[:n_train], y_all[:n_train]X_test, y_test = X_all[n_train:], y_all[n_train:]
print(f'\nTrain: {len(X_train)} days, up-rate={y_train.mean():.3f}')print(f'Test: {len(X_test)} days, up-rate={y_test.mean():.3f}')
if len(X_train) > 10 and len(X_test) > 10: scaler = StandardScaler() X_train_s = scaler.fit_transform(X_train) X_test_s = scaler.transform(X_test)
clf = LogisticRegression(max_iter=1000, random_state=42, class_weight='balanced', C=0.1) clf.fit(X_train_s, y_train)
y_pred = clf.predict(X_test_s) y_prob = clf.predict_proba(X_test_s)[:, 1]
f1 = f1_score(y_test, y_pred) auc = roc_auc_score(y_test, y_prob)
print(f'\n=== CVX Rhetoric -> Market Direction ===') print(f' F1: {f1:.3f}') print(f' AUC: {auc:.3f}') print(f'\n{classification_report(y_test, y_pred, target_names=["Down", "Up"])}')else: print('Insufficient data for classification')df_features index type: <class 'pandas.DatetimeIndex'>df_features date range: 2015-01-01 00:00:00 to 2021-01-08 00:00:00df_features shape: (2176, 10)
Train: 1523 days, up-rate=0.370Test: 653 days, up-rate=0.400
=== CVX Rhetoric -> Market Direction === F1: 0.361 AUC: 0.495
precision recall f1-score support
Down 0.59 0.65 0.62 392 Up 0.39 0.34 0.36 261
accuracy 0.52 653 macro avg 0.49 0.49 0.49 653weighted avg 0.51 0.52 0.52 653# ── 8c. Feature importance (if classification succeeded) ──────────if 'clf' in dir() and hasattr(clf, 'coef_'): importance = pd.DataFrame({ 'feature': feature_cols, 'coef': clf.coef_[0], 'abs_coef': np.abs(clf.coef_[0]), }).sort_values('abs_coef', ascending=False)
fig = go.Figure(go.Bar( x=importance.head(15)['coef'].values, y=importance.head(15)['feature'].values, orientation='h', marker_color=[C_ECON if c > 0 else C_CRISIS for c in importance.head(15)['coef']], )) fig.update_layout( title='Top Features for Market Direction Prediction', xaxis_title='Logistic Regression Coefficient', height=450, width=900, template='plotly_dark', yaxis=dict(autorange='reversed'), ) fig.show()else: print('Classification not performed — check upstream cells')Summary
Section titled “Summary”CVX Functions Used
Section titled “CVX Functions Used”| Section | CVX Function | Purpose |
|---|---|---|
| Index Construction | TemporalIndex, bulk_insert, save/load | Build and persist temporal index of daily tweet embeddings |
| Anchor Projection | project_to_anchors(metric='cosine') | Transform D=384 trajectory into 6D rhetorical coordinate system |
| Anchor Statistics | anchor_summary() | Mean, min, trend, last distance per anchor |
| Change Points | detect_changepoints() | Detect rhetorical regime shifts in both raw and projected space |
| Velocity | velocity() | Rate of rhetorical topic change per day |
| Persistence | hurst_exponent() | Persistent vs erratic rhetoric per political period |
| Signatures | path_signature(depth=2) | Period-level rhetorical fingerprints |
| Period Comparison | signature_distance() | Quantify how rhetoric changed across eras |
| Topic Drift | wasserstein_drift() | Rolling optimal-transport drift on topic distributions |
| Drift Analysis | drift() | L2 + cosine displacement between consecutive days |
Key Findings
Section titled “Key Findings”| Analysis | Result |
|---|---|
| Anchor projection | 6 rhetorical dimensions capture distinct temporal patterns across political periods |
| Change point detection | CVX automatically detects regime shifts near known political events (trade war, COVID, election) |
| Velocity-VIX alignment | Embedding velocity spikes (rapid topic shifts) align with VIX spikes during crisis periods |
| Hurst exponent | Rhetoric becomes more erratic (lower H) during crisis periods vs more persistent during stable governance |
| Path signatures | COVID and Post-Election periods have largest signature distance from Year 1 (most different rhetorical dynamics) |
| Market prediction | CVX rhetorical features provide signal above random/momentum baselines, with anchor trends as top features |
Design Principle
Section titled “Design Principle”CVX as the analytical backbone. Every metric in this notebook is computed through CVX native functions. The sentence transformer produces embeddings; CVX handles all temporal analytics: projection, drift, velocity, persistence, change points, signatures, and optimal transport. No ad-hoc distance computations outside the CVX API.