#!/usr/bin/env python3 """ PHASE 2: MODEL HARDENING Governance Topology Thesis — Academic-Grade Quantitative Analysis Tasks: 2.1 Estimate shock probabilities from data (not stipulate) 2.2 Test Markov assumption (path-dependent transition matrices) 2.3 Reproduce Liberty→Yield regression with panel FE 2.4 Formal model comparison (AIC/BIC vs. alternatives) 2.5 Estimate mean reversion parameter k by stage Output: phase2-model-hardening-results.md """ import csv, math, random, statistics from collections import defaultdict, Counter DATA_PATH = "/Users/nickgogerty/Downloads/Political topology/political-topology-flat.csv" OUTPUT_PATH = "/Users/nickgogerty/Downloads/Political topology/phase2-model-hardening-results.md" # ── Load data ────────────────────────────────────────────────────────── def load_data(): with open(DATA_PATH) as f: reader = csv.DictReader(f) rows = [] for r in reader: rows.append({ 'country': r['country'], 'iso3': r['iso3'], 'region': r['region'], 'year': int(r['year']), 'liberty': int(r['liberty']), 'tyranny': int(r['tyranny']), 'chaos': int(r['chaos']), 'status': r['status'], 'event_horizon_below': r['event_horizon_below'] == 'YES', 'data_source_period': r['data_source_period'], }) return rows STAGES = { 1: (85, 100, "Consolidated Democracy"), 2: (80, 84, "Early Warning"), 3: (70, 79, "Democratic Erosion"), 4: (60, 69, "Competitive Authoritarian"), 5: (50, 59, "Electoral Autocracy"), 6: (40, 49, "Soft Dictatorship"), 7: (25, 39, "Consolidated Autocracy"), 8: (0, 24, "Totalitarianism"), } def get_stage(liberty): for s, (lo, hi, _) in STAGES.items(): if lo <= liberty <= hi: return s return 8 def build_trajectories(rows): trajectories = defaultdict(list) for r in rows: trajectories[r['country']].append((r['year'], r['liberty'])) for c in trajectories: trajectories[c].sort() return trajectories # ── TASK 2.1: Shock Probability Estimation ───────────────────────────── def task_2_1(rows, trajectories): output = [] output.append("## TASK 2.1: Shock Probability Estimation from Data\n") output.append("**Goal:** Estimate the distribution of velocity shocks (epsilon) by stage,") output.append("replacing the thesis's stipulated sigma=3-8 with data-driven estimates.\n") # Compute velocity residuals by stage # velocity = (L2 - L1) / (y2 - y1) for each consecutive observation pair # We compute annualized velocity for each pair, grouped by the starting stage stage_velocities = defaultdict(list) stage_annual_changes = defaultdict(list) for country, traj in trajectories.items(): for i in range(1, len(traj)): y1, l1 = traj[i-1] y2, l2 = traj[i] dt = y2 - y1 if dt <= 0: continue s = get_stage(l1) v = (l2 - l1) / dt # annualized velocity stage_velocities[s].append(v) # Also store raw change for the interval stage_annual_changes[s].append({'dl': l2 - l1, 'dt': dt, 'v': v, 'country': country, 'y1': y1, 'y2': y2}) output.append("### Velocity Distribution by Stage\n") output.append("| Stage | Name | N | Mean v | Median v | Std Dev | Skew | Min | Max | Thesis σ |") output.append("|-------|------|---|--------|----------|---------|------|-----|-----|----------|") thesis_sigma = {1: 3, 2: 5, 3: 5, 4: 6, 5: 7, 6: 7, 7: 6, 8: 4} for s in range(1, 9): vels = stage_velocities.get(s, []) if len(vels) < 2: continue n = len(vels) mean = statistics.mean(vels) med = statistics.median(vels) sd = statistics.stdev(vels) # Skewness skew = sum((v - mean)**3 for v in vels) / (n * sd**3) if sd > 0 else 0 mn = min(vels) mx = max(vels) t_sig = thesis_sigma.get(s, "?") name = STAGES[s][2] output.append(f"| {s} | {name} | {n} | {mean:+.2f} | {med:+.2f} | {sd:.2f} | {skew:+.2f} | {mn:+.1f} | {mx:+.1f} | {t_sig} |") # Standard errors on sigma estimates output.append("\n### Standard Errors on Volatility Estimates\n") output.append("SE(σ) = σ / sqrt(2(N-1)), assuming normality.\n") output.append("| Stage | σ (data) | SE(σ) | 95% CI | σ (thesis) | Consistent? |") output.append("|-------|----------|-------|--------|------------|-------------|") for s in range(1, 9): vels = stage_velocities.get(s, []) if len(vels) < 3: continue n = len(vels) sd = statistics.stdev(vels) se_sigma = sd / math.sqrt(2 * (n - 1)) ci_lo = sd - 1.96 * se_sigma ci_hi = sd + 1.96 * se_sigma t_sig = thesis_sigma.get(s, 0) consistent = "YES" if ci_lo <= t_sig <= ci_hi else "NO" output.append(f"| {s} | {sd:.2f} | {se_sigma:.2f} | [{ci_lo:.2f}, {ci_hi:.2f}] | {t_sig} | {consistent} |") # Normality test (Shapiro-Wilk approximation via skewness/kurtosis) output.append("\n### Distribution Shape Assessment\n") output.append("| Stage | Skewness | Kurtosis | Normal? | Notes |") output.append("|-------|----------|----------|---------|-------|") for s in range(1, 9): vels = stage_velocities.get(s, []) if len(vels) < 5: continue n = len(vels) mean = statistics.mean(vels) sd = statistics.stdev(vels) if sd == 0: continue skew = sum((v - mean)**3 for v in vels) / (n * sd**3) kurt = sum((v - mean)**4 for v in vels) / (n * sd**4) - 3 # excess kurtosis # JB test statistic: n/6 * (S^2 + K^2/4) jb = n / 6 * (skew**2 + kurt**2 / 4) # chi2(2) critical at 0.05 = 5.99 normal = "YES" if jb < 5.99 else "NO" notes = "" if kurt > 1: notes = "Heavy-tailed (leptokurtic)" elif kurt < -1: notes = "Light-tailed (platykurtic)" if abs(skew) > 1: notes = (notes + "; " if notes else "") + f"Skewed ({'left' if skew < 0 else 'right'})" output.append(f"| {s} | {skew:+.2f} | {kurt:+.2f} | {normal} | {notes} |") # Extreme events output.append("\n### Extreme Velocity Events (|v| > 10 pts/yr)\n") output.append("| Country | Period | Stage | From L | To L | Velocity | Context |") output.append("|---------|--------|-------|--------|------|----------|---------|") all_events = [] for s, changes in stage_annual_changes.items(): for c in changes: if abs(c['v']) > 10: all_events.append((c['country'], c['y1'], c['y2'], s, c['v'], c['dl'], c['dt'])) for country, y1, y2, s, v, dl, dt in sorted(all_events, key=lambda x: abs(x[4]), reverse=True)[:20]: # Infer context context = "" if abs(v) > 15 and dt <= 3: context = "Probable coup/invasion/revolution" elif abs(v) > 10 and dt <= 5: context = "Rapid regime change" output.append(f"| {country} | {y1}-{y2} | S{s} | — | — | {v:+.1f} | {context} |") output.append("") return "\n".join(output) # ── TASK 2.2: Markov Assumption Test ─────────────────────────────────── def task_2_2(rows, trajectories): output = [] output.append("## TASK 2.2: Test of Markov Assumption (Path Dependence)\n") output.append("**Goal:** Test whether transition probabilities depend on the direction") output.append("of arrival (from above vs. from below), which would violate the Markov property.\n") # For each observation at stage S, classify direction of arrival: # - "from_above": previous observation was at a higher stage number (lower Liberty) # - "from_below": previous observation was at a lower stage number (higher Liberty) # - "same": previous observation was same stage # Then compute separate transition matrices trans_from_above = defaultdict(lambda: Counter()) # arrived from higher L (declining) trans_from_below = defaultdict(lambda: Counter()) # arrived from lower L (improving) trans_from_same = defaultdict(lambda: Counter()) for country, traj in trajectories.items(): for i in range(2, len(traj)): # need 2 prior observations y0, l0 = traj[i-2] y1, l1 = traj[i-1] y2, l2 = traj[i] s0 = get_stage(l0) s1 = get_stage(l1) s2 = get_stage(l2) if s0 < s1: # came from higher Liberty (declining to current stage) trans_from_above[s1][s2] += 1 elif s0 > s1: # came from lower Liberty (improving to current stage) trans_from_below[s1][s2] += 1 else: trans_from_same[s1][s2] += 1 output.append("### Conditional Transition Probabilities by Direction of Arrival\n") for stage in range(1, 9): above_trans = trans_from_above[stage] below_trans = trans_from_below[stage] same_trans = trans_from_same[stage] above_total = sum(above_trans.values()) below_total = sum(below_trans.values()) same_total = sum(same_trans.values()) if above_total < 3 and below_total < 3: continue name = STAGES[stage][2] output.append(f"#### Stage {stage}: {name}\n") output.append(f"| Direction of Arrival | N | Move Up % | Stay % | Move Down % | Net Momentum |") output.append(f"|---------------------|---|-----------|--------|-------------|--------------|") for label, trans, total in [("Arrived declining ↓", above_trans, above_total), ("Arrived improving ↑", below_trans, below_total), ("Stayed in stage →", same_trans, same_total)]: if total < 2: continue up = sum(trans[s] for s in trans if s < stage) stay = trans.get(stage, 0) down = sum(trans[s] for s in trans if s > stage) up_p = up / total * 100 stay_p = stay / total * 100 down_p = down / total * 100 net = up_p - down_p output.append(f"| {label} | {total} | {up_p:.1f}% | {stay_p:.1f}% | {down_p:.1f}% | {net:+.1f}% |") output.append("") # Chi-squared test for independence output.append("### Chi-Squared Test for Path Independence\n") output.append("H0: Transition probabilities are independent of direction of arrival (Markov)") output.append("H1: Direction of arrival affects transition probabilities (path-dependent)\n") output.append("| Stage | Chi² | df | p-value (approx) | Markov Holds? |") output.append("|-------|------|----|------------------|---------------|") for stage in range(1, 9): above_trans = trans_from_above[stage] below_trans = trans_from_below[stage] above_total = sum(above_trans.values()) below_total = sum(below_trans.values()) if above_total < 5 or below_total < 5: output.append(f"| {stage} | — | — | — | Insufficient data (n_above={above_total}, n_below={below_total}) |") continue # Pool all destination stages that appear in either dest_stages = sorted(set(list(above_trans.keys()) + list(below_trans.keys()))) if len(dest_stages) < 2: continue # Build observed and expected grand_total = above_total + below_total chi2 = 0 df = len(dest_stages) - 1 # degrees of freedom for dest in dest_stages: obs_above = above_trans.get(dest, 0) obs_below = below_trans.get(dest, 0) col_total = obs_above + obs_below if col_total == 0: continue exp_above = above_total * col_total / grand_total exp_below = below_total * col_total / grand_total if exp_above > 0: chi2 += (obs_above - exp_above)**2 / exp_above if exp_below > 0: chi2 += (obs_below - exp_below)**2 / exp_below # Approximate p-value using chi2 distribution # Simple approximation: for df=k, P(chi2 > x) ≈ using Wilson-Hilferty if df > 0: z = (chi2 / df)**(1/3) - (1 - 2/(9*df)) z /= math.sqrt(2/(9*df)) if df > 0 else 1 # Normal CDF approximation p_approx = 0.5 * (1 + math.erf(-z / math.sqrt(2))) markov = "YES" if p_approx > 0.05 else "NO — path dependent" output.append(f"| {stage} | {chi2:.2f} | {df} | {p_approx:.3f} | {markov} |") else: output.append(f"| {stage} | {chi2:.2f} | {df} | — | Insufficient categories |") # Practical significance: momentum by direction output.append("\n### Practical Significance: Does Declining Momentum Persist?\n") output.append("Key question for the US case: if a country arrives at a stage while declining,") output.append("is it more likely to continue declining than the overall stage average?\n") output.append("| Stage | Overall Net | Arriving Declining | Arriving Improving | Decline Penalty |") output.append("|-------|-------------|--------------------|--------------------|-----------------|") for stage in range(2, 8): above_trans = trans_from_above[stage] below_trans = trans_from_below[stage] above_total = sum(above_trans.values()) below_total = sum(below_trans.values()) # Overall all_trans = Counter() for t in [trans_from_above[stage], trans_from_below[stage], trans_from_same[stage]]: all_trans.update(t) all_total = sum(all_trans.values()) if all_total == 0: continue all_up = sum(all_trans[s] for s in all_trans if s < stage) all_down = sum(all_trans[s] for s in all_trans if s > stage) all_net = (all_up - all_down) / all_total * 100 if above_total >= 3: above_up = sum(above_trans[s] for s in above_trans if s < stage) above_down = sum(above_trans[s] for s in above_trans if s > stage) above_net = (above_up - above_down) / above_total * 100 else: above_net = float('nan') if below_total >= 3: below_up = sum(below_trans[s] for s in below_trans if s < stage) below_down = sum(below_trans[s] for s in below_trans if s > stage) below_net = (below_up - below_down) / below_total * 100 else: below_net = float('nan') penalty = above_net - all_net if not math.isnan(above_net) else float('nan') ab_str = f"{above_net:+.1f}% (n={above_total})" if not math.isnan(above_net) else f"— (n={above_total})" be_str = f"{below_net:+.1f}% (n={below_total})" if not math.isnan(below_net) else f"— (n={below_total})" pen_str = f"{penalty:+.1f}pp" if not math.isnan(penalty) else "—" output.append(f"| {stage} | {all_net:+.1f}% | {ab_str} | {be_str} | {pen_str} |") output.append("") return "\n".join(output) # ── TASK 2.3: Liberty → Yield Regression ─────────────────────────────── def task_2_3(rows): output = [] output.append("## TASK 2.3: Liberty → Yield Regression Reproduction\n") output.append("**Goal:** Reproduce Y = 18.7 − 0.35×Liberty (R²=0.37) and test with controls.\n") # 32-country cross-sectional data from sovereign-spread-complete.html yield_data = [ ("Switzerland", 95, 0.7, 37), ("Japan", 89, 1.3, 260), ("China", 5, 1.7, 90), ("Sweden", 93, 2.5, 33), ("Germany", 91, 2.8, 63), ("Netherlands", 93, 3.0, 45), ("S. Korea", 83, 3.0, 55), ("Canada", 92, 3.3, 102), ("Greece", 79, 3.3, 155), ("France", 83, 3.4, 113), ("Spain", 85, 3.4, 105), ("Italy", 82, 3.8, 138), ("Australia", 92, 4.5, 36), ("UK", 87, 4.5, 100), ("US", 48, 4.5, 126), ("Chile", 82, 5.0, 40), ("Philippines", 42, 6.3, 62), ("India", 62, 6.8, 82), ("Indonesia", 50, 7.0, 40), ("Mexico", 48, 10.0, 45), ("Colombia", 53, 10.5, 55), ("S. Africa", 62, 11.5, 75), ("Argentina", 65, 12.0, 85), ("Russia", 10, 14.0, 22), ("Sri Lanka", 35, 14.0, 105), ("Brazil", 72, 15.0, 87), ("Nigeria", 38, 18.0, 45), ("Ukraine", 35, 22.0, 95), ("Egypt", 5, 25.0, 98), ("Turkey", 18, 30.0, 38), ("Venezuela", 8, 50.0, 170), ("Lebanon", 15, 90.0, 300), ] n = len(yield_data) output.append(f"### Cross-Sectional Data: {n} countries (2025)\n") # OLS: Y = a + b*Liberty X = [d[1] for d in yield_data] Y = [d[2] for d in yield_data] Debt = [d[3] for d in yield_data] def ols_simple(x, y): n = len(x) x_mean = sum(x) / n y_mean = sum(y) / n ss_xy = sum((xi - x_mean) * (yi - y_mean) for xi, yi in zip(x, y)) ss_xx = sum((xi - x_mean)**2 for xi in x) b = ss_xy / ss_xx if ss_xx != 0 else 0 a = y_mean - b * x_mean y_pred = [a + b * xi for xi in x] ss_res = sum((yi - yp)**2 for yi, yp in zip(y, y_pred)) ss_tot = sum((yi - y_mean)**2 for yi in y) r2 = 1 - ss_res / ss_tot if ss_tot != 0 else 0 # Standard errors (HC0) mse = ss_res / (n - 2) se_b = math.sqrt(mse / ss_xx) if ss_xx > 0 else 0 se_a = math.sqrt(mse * (1/n + x_mean**2 / ss_xx)) t_b = b / se_b if se_b > 0 else 0 return {'a': a, 'b': b, 'r2': r2, 'se_a': se_a, 'se_b': se_b, 't_b': t_b, 'mse': mse, 'residuals': [yi - yp for yi, yp in zip(y, y_pred)], 'y_pred': y_pred} # Model 1: Y = a + b*Liberty m1 = ols_simple(X, Y) output.append("### Model 1: Bivariate (Y = a + b×Liberty)\n") output.append(f"| Parameter | Estimate | SE | t-stat |") output.append(f"|-----------|----------|-----|--------|") output.append(f"| Intercept | {m1['a']:.2f} | {m1['se_a']:.2f} | {m1['a']/m1['se_a']:.2f} |") output.append(f"| Liberty | {m1['b']:.4f} | {m1['se_b']:.4f} | {m1['t_b']:.2f} |") output.append(f"\n**R² = {m1['r2']:.3f}**, n = {n}") output.append(f"**Thesis claims:** a=18.7, b=−0.35, R²=0.37\n") # Compare output.append("### Comparison to Thesis\n") output.append("| Parameter | Our Estimate | Thesis | Match? |") output.append("|-----------|-------------|--------|--------|") output.append(f"| Intercept | {m1['a']:.2f} | 18.7 | {'~YES' if abs(m1['a'] - 18.7) < 3 else 'NO'} |") output.append(f"| Slope | {m1['b']:.4f} | -0.35 | {'~YES' if abs(m1['b'] + 0.35) < 0.1 else 'NO'} |") output.append(f"| R² | {m1['r2']:.3f} | 0.37 | {'~YES' if abs(m1['r2'] - 0.37) < 0.05 else 'NO'} |") # Model 1b: Exclude outliers (China, Japan, Lebanon) output.append("\n### Model 1b: Excluding Outliers (China, Japan, Lebanon)\n") exclude = {"China", "Japan", "Lebanon"} X_ex = [d[1] for d in yield_data if d[0] not in exclude] Y_ex = [d[2] for d in yield_data if d[0] not in exclude] m1b = ols_simple(X_ex, Y_ex) output.append(f"R² = {m1b['r2']:.3f} (thesis claims R² rises to 0.52 excluding China+Japan)") output.append(f"Intercept = {m1b['a']:.2f}, Slope = {m1b['b']:.4f}\n") # Multi-factor: Y = a + b1*Liberty + b2*Debt output.append("### Model 2: Two-Factor (Y = a + b₁×Liberty + b₂×Debt/GDP)\n") # Manual 2-variable OLS def ols_multi(x1, x2, y): n = len(y) x1m = sum(x1) / n x2m = sum(x2) / n ym = sum(y) / n s11 = sum((a - x1m)**2 for a in x1) s22 = sum((a - x2m)**2 for a in x2) s12 = sum((a - x1m) * (b - x2m) for a, b in zip(x1, x2)) s1y = sum((a - x1m) * (b - ym) for a, b in zip(x1, y)) s2y = sum((a - x2m) * (b - ym) for a, b in zip(x2, y)) det = s11 * s22 - s12**2 if abs(det) < 1e-10: return None b1 = (s22 * s1y - s12 * s2y) / det b2 = (s11 * s2y - s12 * s1y) / det a = ym - b1 * x1m - b2 * x2m y_pred = [a + b1 * xi + b2 * di for xi, di in zip(x1, x2)] ss_res = sum((yi - yp)**2 for yi, yp in zip(y, y_pred)) ss_tot = sum((yi - ym)**2 for yi in y) r2 = 1 - ss_res / ss_tot mse = ss_res / (n - 3) # Approximate SEs x_mat_inv_diag = [s22 / det, s11 / det] # diagonal of (X'X)^-1 se_b1 = math.sqrt(mse * x_mat_inv_diag[0]) se_b2 = math.sqrt(mse * x_mat_inv_diag[1]) return {'a': a, 'b1': b1, 'b2': b2, 'r2': r2, 'se_b1': se_b1, 'se_b2': se_b2, 'mse': mse} m2 = ols_multi(X, Debt, Y) if m2: output.append(f"| Parameter | Estimate | SE | t-stat |") output.append(f"|-----------|----------|-----|--------|") output.append(f"| Intercept | {m2['a']:.2f} | — | — |") output.append(f"| Liberty | {m2['b1']:.4f} | {m2['se_b1']:.4f} | {m2['b1']/m2['se_b1']:.2f} |") output.append(f"| Debt/GDP | {m2['b2']:.4f} | {m2['se_b2']:.4f} | {m2['b2']/m2['se_b2']:.2f} |") output.append(f"\n**R² = {m2['r2']:.3f}** (thesis claims 0.52 with debt added)\n") # Influential observations output.append("### Influential Observations (Largest Residuals)\n") output.append("| Country | Liberty | Yield | Predicted | Residual | Leverage |") output.append("|---------|---------|-------|-----------|----------|----------|") resids = [(yield_data[i][0], yield_data[i][1], yield_data[i][2], m1['y_pred'][i], m1['residuals'][i]) for i in range(n)] resids.sort(key=lambda x: abs(x[4]), reverse=True) for country, lib, yld, pred, res in resids[:10]: leverage = "HIGH" if abs(res) > 2 * math.sqrt(m1['mse']) else "moderate" output.append(f"| {country} | {lib} | {yld:.1f}% | {pred:.1f}% | {res:+.1f} | {leverage} |") # US implied yield output.append("\n### US Implications\n") us_pred = m1['a'] + m1['b'] * 48 output.append(f"US at L=48: Model-predicted yield = {us_pred:.1f}%") output.append(f"Actual US 10Y yield: 4.5%") output.append(f"Gap: {us_pred - 4.5:.1f}% ({(us_pred - 4.5)*100:.0f}bp)") output.append(f"\n**Assessment:** The thesis's 650bp gap claim depends on the regression") output.append(f"specification and the author-estimated L=48. At L=65 (plausible official FH),") us_pred_65 = m1['a'] + m1['b'] * 65 output.append(f"the gap would be {us_pred_65 - 4.5:.1f}% ({(us_pred_65 - 4.5)*100:.0f}bp) — much smaller.\n") # Robustness: log-linear model output.append("### Robustness: Log-Linear Specification\n") X_nz = [d[1] for d in yield_data if d[2] > 0] Y_log = [math.log(d[2]) for d in yield_data if d[2] > 0] m_log = ols_simple(X_nz, Y_log) output.append(f"ln(Yield) = {m_log['a']:.2f} + {m_log['b']:.4f}×Liberty") output.append(f"R² = {m_log['r2']:.3f} (vs. {m1['r2']:.3f} for linear)") output.append(f"**{'Log-linear fits better' if m_log['r2'] > m1['r2'] else 'Linear fits better'}**\n") output.append("") return "\n".join(output) # ── TASK 2.4: Model Comparison (AIC/BIC) ────────────────────────────── def task_2_4(rows, trajectories): output = [] output.append("## TASK 2.4: Model Comparison (AIC/BIC)\n") output.append("**Goal:** Compare the thesis's stage-based model against simpler alternatives.\n") # For each country, predict L(t+1) given L(t) and history # Models: # 1. Persistence: L(t+1) = L(t) # 2. Random walk: L(t+1) = L(t) + epsilon # 3. AR(1): L(t+1) = a + b*L(t) # 4. Stage-based: L(t+1) = stage_mean(L(t)) # 5. Stage-based with momentum: L(t+1) = L(t) + stage_velocity(L(t)) * dt # Collect all (L_t, L_t+1, dt) pairs pairs = [] for country, traj in trajectories.items(): for i in range(1, len(traj)): y1, l1 = traj[i-1] y2, l2 = traj[i] dt = y2 - y1 if dt > 0: # Compute prior velocity if possible prior_v = None if i >= 2: y0, l0 = traj[i-2] dt0 = y1 - y0 if dt0 > 0: prior_v = (l1 - l0) / dt0 pairs.append({'l1': l1, 'l2': l2, 'dt': dt, 'country': country, 'y1': y1, 'y2': y2, 'prior_v': prior_v}) n = len(pairs) output.append(f"**Dataset:** {n} consecutive observation pairs across {len(trajectories)} countries\n") # Compute stage means and stage velocities from data stage_means = {} stage_avg_vel = {} for s in range(1, 9): lo, hi, _ = STAGES[s] vals = [r['liberty'] for r in rows if lo <= r['liberty'] <= hi] stage_means[s] = statistics.mean(vals) if vals else (lo + hi) / 2 # Stage velocity from all pairs stage_vels = defaultdict(list) for p in pairs: s = get_stage(p['l1']) stage_vels[s].append((p['l2'] - p['l1']) / p['dt']) for s in stage_vels: stage_avg_vel[s] = statistics.mean(stage_vels[s]) # Model predictions and log-likelihoods models = {} # 1. Persistence: L(t+1) = L(t) resids_persist = [(p['l2'] - p['l1']) for p in pairs] sigma_persist = statistics.stdev(resids_persist) if len(resids_persist) > 1 else 1 ll_persist = sum(-0.5 * (r/sigma_persist)**2 - math.log(sigma_persist) - 0.5*math.log(2*math.pi) for r in resids_persist) models['Persistence'] = {'k': 1, 'll': ll_persist, 'sigma': sigma_persist} # 2. AR(1): L(t+1) = a + b*L(t) ar1 = ols_simple_pair(pairs) resids_ar1 = [p['l2'] - (ar1['a'] + ar1['b'] * p['l1']) for p in pairs] sigma_ar1 = statistics.stdev(resids_ar1) if len(resids_ar1) > 1 else 1 ll_ar1 = sum(-0.5 * (r/sigma_ar1)**2 - math.log(sigma_ar1) - 0.5*math.log(2*math.pi) for r in resids_ar1) models['AR(1)'] = {'k': 3, 'll': ll_ar1, 'sigma': sigma_ar1, 'a': ar1['a'], 'b': ar1['b']} # 3. Stage-based mean: L(t+1) = stage_mean(stage(L(t))) resids_stage_mean = [p['l2'] - stage_means[get_stage(p['l1'])] for p in pairs] sigma_sm = statistics.stdev(resids_stage_mean) if len(resids_stage_mean) > 1 else 1 ll_sm = sum(-0.5 * (r/sigma_sm)**2 - math.log(sigma_sm) - 0.5*math.log(2*math.pi) for r in resids_stage_mean) models['Stage Mean'] = {'k': 9, 'll': ll_sm, 'sigma': sigma_sm} # 4. Stage velocity: L(t+1) = L(t) + stage_vel(L(t)) * dt resids_sv = [p['l2'] - (p['l1'] + stage_avg_vel.get(get_stage(p['l1']), 0) * p['dt']) for p in pairs] sigma_sv = statistics.stdev(resids_sv) if len(resids_sv) > 1 else 1 ll_sv = sum(-0.5 * (r/sigma_sv)**2 - math.log(sigma_sv) - 0.5*math.log(2*math.pi) for r in resids_sv) models['Stage Velocity'] = {'k': 9, 'll': ll_sv, 'sigma': sigma_sv} # 5. Linear trend: L(t+1) = L(t) + global_mean_v * dt global_mean_v = statistics.mean([(p['l2'] - p['l1']) / p['dt'] for p in pairs]) resids_lt = [p['l2'] - (p['l1'] + global_mean_v * p['dt']) for p in pairs] sigma_lt = statistics.stdev(resids_lt) if len(resids_lt) > 1 else 1 ll_lt = sum(-0.5 * (r/sigma_lt)**2 - math.log(sigma_lt) - 0.5*math.log(2*math.pi) for r in resids_lt) models['Linear Trend'] = {'k': 2, 'll': ll_lt, 'sigma': sigma_lt} # AIC and BIC output.append("### Model Comparison Table\n") output.append("| Model | Parameters (k) | Log-Lik | AIC | BIC | ΔAIC | ΔBIC | σ |") output.append("|-------|---------------|---------|-----|-----|------|------|---|") results = [] for name, m in models.items(): aic = 2 * m['k'] - 2 * m['ll'] bic = m['k'] * math.log(n) - 2 * m['ll'] results.append((name, m['k'], m['ll'], aic, bic, m['sigma'])) min_aic = min(r[3] for r in results) min_bic = min(r[4] for r in results) for name, k, ll, aic, bic, sigma in results: d_aic = aic - min_aic d_bic = bic - min_bic best_a = " **BEST**" if d_aic == 0 else "" best_b = " **BEST**" if d_bic == 0 else "" output.append(f"| {name} | {k} | {ll:.1f} | {aic:.1f} | {bic:.1f} | {d_aic:.1f}{best_a} | {d_bic:.1f}{best_b} | {sigma:.2f} |") # AR(1) details output.append(f"\n### AR(1) Model Details\n") output.append(f"L(t+1) = {ar1['a']:.2f} + {ar1['b']:.4f} × L(t)") output.append(f"R² = {ar1['r2']:.3f}") if ar1['b'] < 1: eq = ar1['a'] / (1 - ar1['b']) output.append(f"Equilibrium (L* = a/(1-b)): {eq:.1f}") half_life = -math.log(2) / math.log(ar1['b']) if 0 < ar1['b'] < 1 else float('inf') output.append(f"Half-life of deviations: {half_life:.1f} observation intervals") # Interpretation output.append("\n### Interpretation\n") output.append("- **Lower AIC/BIC = better model** (penalizing complexity)") output.append("- ΔAIC > 10 = strong evidence against the worse model") output.append("- ΔAIC 4-7 = moderate evidence") output.append("- ΔAIC < 2 = models essentially equivalent\n") # Forecast accuracy comparison output.append("### Predictive Accuracy (MAE, RMSE)\n") output.append("| Model | MAE | RMSE | Median Abs Error |") output.append("|-------|-----|------|------------------|") for name, resids_list in [("Persistence", resids_persist), ("AR(1)", resids_ar1), ("Stage Mean", resids_stage_mean), ("Stage Velocity", resids_sv), ("Linear Trend", resids_lt)]: mae = sum(abs(r) for r in resids_list) / len(resids_list) rmse = math.sqrt(sum(r**2 for r in resids_list) / len(resids_list)) med_ae = statistics.median(abs(r) for r in resids_list) output.append(f"| {name} | {mae:.2f} | {rmse:.2f} | {med_ae:.2f} |") output.append("") return "\n".join(output) def ols_simple_pair(pairs): """OLS for L(t+1) = a + b*L(t).""" x = [p['l1'] for p in pairs] y = [p['l2'] for p in pairs] n = len(x) xm = sum(x) / n ym = sum(y) / n ss_xy = sum((xi - xm) * (yi - ym) for xi, yi in zip(x, y)) ss_xx = sum((xi - xm)**2 for xi in x) b = ss_xy / ss_xx if ss_xx != 0 else 0 a = ym - b * xm y_pred = [a + b * xi for xi in x] ss_res = sum((yi - yp)**2 for yi, yp in zip(y, y_pred)) ss_tot = sum((yi - ym)**2 for yi in y) r2 = 1 - ss_res / ss_tot if ss_tot != 0 else 0 return {'a': a, 'b': b, 'r2': r2} # ── TASK 2.5: Mean Reversion Parameter k ────────────────────────────── def task_2_5(rows, trajectories): output = [] output.append("## TASK 2.5: Mean Reversion Parameter Estimation\n") output.append("**Goal:** Estimate k from dv/dt = −k(L − L*) for each stage.\n") output.append("The thesis states the attractor dynamics equation but does not publish k or L*.\n") # For each stage, estimate: # v(t) ≈ -k * (L(t) - L*) + noise # where v(t) = (L(t+1) - L(t)) / dt # This is a simple regression: v = a + b*L where b = -k and a = k*L* output.append("### Method\n") output.append("For each stage, we regress annualized velocity on Liberty score:") output.append("v(t) = α + β × L(t) + ε") output.append("Then: k = −β, L* = −α/β (if β < 0, mean-reverting to L* = α/k)\n") output.append("### Results by Stage\n") output.append("| Stage | Name | N | k (−β) | SE(k) | L* (equil.) | R² | Mean-Reverting? |") output.append("|-------|------|---|--------|-------|-------------|-----|-----------------|") for stage in range(1, 9): lo, hi, name = STAGES[stage] # Collect (L, v) pairs for this stage lv_pairs = [] for country, traj in trajectories.items(): for i in range(1, len(traj)): y1, l1 = traj[i-1] y2, l2 = traj[i] if lo <= l1 <= hi: dt = y2 - y1 if dt > 0: v = (l2 - l1) / dt lv_pairs.append((l1, v)) if len(lv_pairs) < 5: output.append(f"| {stage} | {name} | {len(lv_pairs)} | — | — | — | — | Insufficient data |") continue # OLS: v = a + b*L L_vals = [p[0] for p in lv_pairs] v_vals = [p[1] for p in lv_pairs] n = len(lv_pairs) lm = sum(L_vals) / n vm = sum(v_vals) / n ss_lv = sum((l - lm) * (v - vm) for l, v in zip(L_vals, v_vals)) ss_ll = sum((l - lm)**2 for l in L_vals) b = ss_lv / ss_ll if ss_ll > 0 else 0 a = vm - b * lm y_pred = [a + b * l for l in L_vals] ss_res = sum((v - yp)**2 for v, yp in zip(v_vals, y_pred)) ss_tot = sum((v - vm)**2 for v in v_vals) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 mse = ss_res / (n - 2) if n > 2 else 0 se_b = math.sqrt(mse / ss_ll) if ss_ll > 0 and mse > 0 else 0 k = -b se_k = se_b l_star = a / k if abs(k) > 0.001 else float('nan') mean_rev = "YES" if k > 0 and k > 2 * se_k else ("WEAK" if k > 0 else "NO (divergent)") l_star_str = f"{l_star:.1f}" if not math.isnan(l_star) and 0 <= l_star <= 100 else "outside [0,100]" output.append(f"| {stage} | {name} | {n} | {k:.4f} | {se_k:.4f} | {l_star_str} | {r2:.3f} | {mean_rev} |") # Global mean reversion output.append("\n### Global Mean Reversion (All Stages Pooled)\n") all_lv = [] for country, traj in trajectories.items(): for i in range(1, len(traj)): y1, l1 = traj[i-1] y2, l2 = traj[i] dt = y2 - y1 if dt > 0: all_lv.append((l1, (l2 - l1) / dt)) L_all = [p[0] for p in all_lv] v_all = [p[1] for p in all_lv] n_all = len(all_lv) lm_all = sum(L_all) / n_all vm_all = sum(v_all) / n_all ss_lv_all = sum((l - lm_all) * (v - vm_all) for l, v in zip(L_all, v_all)) ss_ll_all = sum((l - lm_all)**2 for l in L_all) b_all = ss_lv_all / ss_ll_all if ss_ll_all > 0 else 0 a_all = vm_all - b_all * lm_all y_pred_all = [a_all + b_all * l for l in L_all] ss_res_all = sum((v - yp)**2 for v, yp in zip(v_all, y_pred_all)) ss_tot_all = sum((v - vm_all)**2 for v in v_all) r2_all = 1 - ss_res_all / ss_tot_all if ss_tot_all > 0 else 0 mse_all = ss_res_all / (n_all - 2) se_b_all = math.sqrt(mse_all / ss_ll_all) if ss_ll_all > 0 else 0 k_all = -b_all l_star_all = a_all / k_all if abs(k_all) > 0.001 else float('nan') output.append(f"v(t) = {a_all:.4f} + {b_all:.6f} × L(t)") output.append(f"k = {k_all:.4f} (SE = {se_b_all:.4f})") if not math.isnan(l_star_all): output.append(f"L* = {l_star_all:.1f} (global equilibrium)") output.append(f"R² = {r2_all:.4f}") output.append(f"N = {n_all}\n") # Basin depth analysis output.append("### Basin Depth Implied by k Estimates\n") output.append("Higher k = deeper basin = stronger gravitational pull back to equilibrium.") output.append("The thesis claims two deep basins (Liberty at L≈85, Tyranny at L≈10).\n") output.append("If k varies by zone (not just stage), we expect:") output.append("- Liberty Basin (L>80): positive k (mean-reverting to ~85-90)") output.append("- Middle Ridge (L=40-79): k ≈ 0 or negative (no stable equilibrium)") output.append("- Tyranny Basin (L<25): positive k (mean-reverting to ~10-15)\n") # Zone-level estimation zones = [("Liberty Basin", 80, 100), ("Upper Ridge", 60, 79), ("Lower Ridge", 40, 59), ("Tyranny Slope", 25, 39), ("Tyranny Basin", 0, 24)] output.append("| Zone | L Range | N | k | SE(k) | L* | R² | Basin? |") output.append("|------|---------|---|---|-------|----|----|--------|") for zone_name, lo, hi in zones: zone_lv = [(l, v) for l, v in all_lv if lo <= l <= hi] if len(zone_lv) < 5: output.append(f"| {zone_name} | {lo}-{hi} | {len(zone_lv)} | — | — | — | — | Insufficient |") continue zl = [p[0] for p in zone_lv] zv = [p[1] for p in zone_lv] zn = len(zone_lv) zlm = sum(zl) / zn zvm = sum(zv) / zn z_ss_lv = sum((l - zlm) * (v - zvm) for l, v in zip(zl, zv)) z_ss_ll = sum((l - zlm)**2 for l in zl) z_b = z_ss_lv / z_ss_ll if z_ss_ll > 0 else 0 z_a = zvm - z_b * zlm z_pred = [z_a + z_b * l for l in zl] z_ss_res = sum((v - yp)**2 for v, yp in zip(zv, z_pred)) z_ss_tot = sum((v - zvm)**2 for v in zv) z_r2 = 1 - z_ss_res / z_ss_tot if z_ss_tot > 0 else 0 z_mse = z_ss_res / (zn - 2) if zn > 2 else 0 z_se_b = math.sqrt(z_mse / z_ss_ll) if z_ss_ll > 0 and z_mse > 0 else 0 z_k = -z_b z_l_star = z_a / z_k if abs(z_k) > 0.001 else float('nan') basin = "YES" if z_k > 0 and z_k > 2 * z_se_b else ("WEAK" if z_k > 0 else "NO") ls_str = f"{z_l_star:.1f}" if not math.isnan(z_l_star) and lo - 20 <= z_l_star <= hi + 20 else "—" output.append(f"| {zone_name} | {lo}-{hi} | {zn} | {z_k:.4f} | {z_se_b:.4f} | {ls_str} | {z_r2:.3f} | {basin} |") output.append("") return "\n".join(output) # ── MAIN ─────────────────────────────────────────────────────────────── def main(): print("Loading data...") rows = load_data() trajectories = build_trajectories(rows) print(f"Loaded {len(rows)} rows, {len(trajectories)} countries") report = [] report.append("# PHASE 2: MODEL HARDENING — Results\n") report.append("**Governance Topology Thesis**") report.append("**Goal:** Make the quantitative model defensible to an academic referee") report.append(f"**Dataset:** {len(rows)} observations, {len(trajectories)} countries") report.append(f"**Analysis date:** 2026-02-08\n") report.append("---\n") print("Running Task 2.1: Shock probability estimation...") report.append(task_2_1(rows, trajectories)) report.append("---\n") print("Running Task 2.2: Markov assumption test...") report.append(task_2_2(rows, trajectories)) report.append("---\n") print("Running Task 2.3: Liberty→Yield regression...") report.append(task_2_3(rows)) report.append("---\n") print("Running Task 2.4: Model comparison...") report.append(task_2_4(rows, trajectories)) report.append("---\n") print("Running Task 2.5: Mean reversion parameter k...") report.append(task_2_5(rows, trajectories)) report.append("---\n") full_report = "\n".join(report) with open(OUTPUT_PATH, 'w') as f: f.write(full_report) print(f"\nReport saved to: {OUTPUT_PATH}") print(f"Report length: {len(full_report)} chars, {full_report.count(chr(10))} lines") if __name__ == "__main__": main()