#!/usr/bin/env python3 """ A4 -- Statistical Inference: Country-clustered standard errors, confidence intervals. AR(1) model: L(t+1) = alpha + beta * L(t) + epsilon OLS with country-clustered standard errors. Zone velocity analysis with starting-zone assignment (Fix 3). Event horizon bootstrap CI. Stage retention rate CIs. """ import csv import math import random import statistics from collections import defaultdict random.seed(42) # ============================================================ # 1. Load data # ============================================================ data = [] with open('/tmp/pt-data/political-topology-flat.csv', 'r') as f: reader = csv.DictReader(f) for row in reader: data.append(row) # Group by country, sort by year countries = defaultdict(list) for row in data: countries[row['country']].append(row) for c in countries: countries[c].sort(key=lambda r: int(r['year'])) print(f"Loaded {len(data)} observations across {len(countries)} countries") # ============================================================ # 2. Build AR(1) regression pairs: (L_t, L_{t+1}) within country # ============================================================ pairs = [] # list of (country, L_t, L_t1) for c, rows in countries.items(): for i in range(len(rows) - 1): L_t = float(rows[i]['liberty']) L_t1 = float(rows[i + 1]['liberty']) pairs.append((c, L_t, L_t1)) print(f"AR(1) pairs: {len(pairs)}") # ============================================================ # 3. OLS estimation: L_{t+1} = alpha + beta * L_t # ============================================================ n = len(pairs) sum_x = sum(p[1] for p in pairs) sum_y = sum(p[2] for p in pairs) sum_xx = sum(p[1] ** 2 for p in pairs) sum_xy = sum(p[1] * p[2] for p in pairs) beta_hat = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x ** 2) alpha_hat = (sum_y - beta_hat * sum_x) / n print(f"\nOLS Estimates:") print(f" alpha = {alpha_hat:.4f}") print(f" beta = {beta_hat:.4f}") # Residuals residuals = [] for c, L_t, L_t1 in pairs: e = L_t1 - alpha_hat - beta_hat * L_t residuals.append(e) sse = sum(e ** 2 for e in residuals) rmse = math.sqrt(sse / n) print(f" RMSE = {rmse:.4f}") # ============================================================ # 4. Country-clustered standard errors # ============================================================ # Bread matrix: (X'X)^{-1} for 2x2 # X = [1, L_t] for each obs # X'X = [[n, sum_x], [sum_x, sum_xx]] det_xx = n * sum_xx - sum_x ** 2 inv_xx = [[sum_xx / det_xx, -sum_x / det_xx], [-sum_x / det_xx, n / det_xx]] # Meat matrix: sum over clusters of (X_g' e_g)(X_g' e_g)' cluster_groups = defaultdict(list) for i, (c, L_t, L_t1) in enumerate(pairs): cluster_groups[c].append(i) G = len(cluster_groups) # number of clusters meat = [[0.0, 0.0], [0.0, 0.0]] for c, indices in cluster_groups.items(): # X_g' e_g = sum_i_in_g x_i * e_i s0 = sum(residuals[i] for i in indices) # sum of 1 * e_i s1 = sum(pairs[i][1] * residuals[i] for i in indices) # sum of L_t * e_i meat[0][0] += s0 * s0 meat[0][1] += s0 * s1 meat[1][0] += s1 * s0 meat[1][1] += s1 * s1 # Small-sample correction: G/(G-1) * n/(n-k) where k=2 correction = (G / (G - 1)) * (n / (n - 2)) # V = (X'X)^{-1} M (X'X)^{-1} * correction # First: M_adj = meat * correction def mat_mul_2x2(A, B): return [[A[0][0]*B[0][0] + A[0][1]*B[1][0], A[0][0]*B[0][1] + A[0][1]*B[1][1]], [A[1][0]*B[0][0] + A[1][1]*B[1][0], A[1][0]*B[0][1] + A[1][1]*B[1][1]]] meat_adj = [[meat[i][j] * correction for j in range(2)] for i in range(2)] temp = mat_mul_2x2(inv_xx, meat_adj) V_cluster = mat_mul_2x2(temp, inv_xx) se_alpha_cluster = math.sqrt(V_cluster[0][0]) se_beta_cluster = math.sqrt(V_cluster[1][1]) ci_alpha = (alpha_hat - 1.96 * se_alpha_cluster, alpha_hat + 1.96 * se_alpha_cluster) ci_beta = (beta_hat - 1.96 * se_beta_cluster, beta_hat + 1.96 * se_beta_cluster) print(f"\nCountry-Clustered Standard Errors:") print(f" SE(alpha) = {se_alpha_cluster:.4f}") print(f" SE(beta) = {se_beta_cluster:.4f}") print(f" 95% CI(alpha): [{ci_alpha[0]:.4f}, {ci_alpha[1]:.4f}]") print(f" 95% CI(beta): [{ci_beta[0]:.4f}, {ci_beta[1]:.4f}]") # Also compute standard (non-clustered) SEs for comparison s2 = sse / (n - 2) se_alpha_ols = math.sqrt(s2 * sum_xx / det_xx) se_beta_ols = math.sqrt(s2 * n / det_xx) print(f"\n (Comparison) OLS SE(alpha) = {se_alpha_ols:.4f}") print(f" (Comparison) OLS SE(beta) = {se_beta_ols:.4f}") print(f" Clustering inflation factor for alpha: {se_alpha_cluster/se_alpha_ols:.2f}x") print(f" Clustering inflation factor for beta: {se_beta_cluster/se_beta_ols:.2f}x") # ============================================================ # 5. Zone velocity analysis with STARTING-zone assignment (Fix 3) # ============================================================ zones = { 'Liberty Basin (L>70)': lambda L: L > 70, 'Hybrid Zone (30-70)': lambda L: 30 <= L <= 70, 'Tyranny Basin (L<30)': lambda L: L < 30 } print("\n" + "=" * 60) print("Zone Velocity Analysis (Starting-zone assignment)") print("=" * 60) zone_results = {} for zname, zfunc in zones.items(): # Velocity = L_{t+1} - L_t, assigned by STARTING zone (L_t) zone_pairs = [(c, L_t, L_t1) for c, L_t, L_t1 in pairs if zfunc(L_t)] velocities = [L_t1 - L_t for c, L_t, L_t1 in zone_pairs] if not velocities: print(f"\n{zname}: No observations") continue mean_vel = statistics.mean(velocities) # Country-clustered SE for mean velocity zone_cluster = defaultdict(list) for c, L_t, L_t1 in zone_pairs: zone_cluster[c].append(L_t1 - L_t) G_z = len(zone_cluster) n_z = len(velocities) # Clustered variance of mean: (G/(G-1)) * (1/n^2) * sum_g (sum_i_in_g (v_i - vbar))^2 if G_z > 1: sum_sq_cluster = sum( (sum(v - mean_vel for v in vels)) ** 2 for vels in zone_cluster.values() ) var_clustered = (G_z / (G_z - 1)) * (1 / n_z ** 2) * sum_sq_cluster se_clustered = math.sqrt(var_clustered) else: se_clustered = float('nan') ci_low = mean_vel - 1.96 * se_clustered ci_high = mean_vel + 1.96 * se_clustered zone_results[zname] = { 'n': n_z, 'n_countries': G_z, 'mean': mean_vel, 'se': se_clustered, 'ci_low': ci_low, 'ci_high': ci_high } print(f"\n{zname}:") print(f" N = {n_z} transitions across {G_z} countries") print(f" Mean velocity = {mean_vel:+.3f}") print(f" Clustered SE = {se_clustered:.3f}") print(f" 95% CI: [{ci_low:+.3f}, {ci_high:+.3f}]") # Interpretation if ci_low > 0: print(f" --> Significantly positive (upward drift)") elif ci_high < 0: print(f" --> Significantly negative (downward drift)") else: print(f" --> NOT significantly different from zero (attractor behavior)") # ============================================================ # 6. Event Horizon Bootstrap CI # ============================================================ print("\n" + "=" * 60) print("Event Horizon Threshold Bootstrap CI") print("=" * 60) # Identify the breakpoint: below what liberty score is recovery extremely rare? # Method: for each threshold L*, compute the fraction of observations with L_t < L* # that have L_{t+1} > L_t (positive velocity). Find where this fraction drops sharply. # First, compute observed recovery rates at different thresholds thresholds = list(range(5, 55, 1)) def compute_recovery_rates(pair_list): """For each threshold, compute fraction with positive velocity below that threshold.""" rates = {} for thresh in thresholds: below = [(c, L_t, L_t1) for c, L_t, L_t1 in pair_list if L_t < thresh] if len(below) >= 5: recovery = sum(1 for c, L_t, L_t1 in below if L_t1 > L_t) rates[thresh] = recovery / len(below) else: rates[thresh] = None return rates def find_breakpoint(pair_list, target_rate=0.30): """Find the threshold where recovery rate drops below target_rate. Use the highest threshold where rate is below target as the event horizon.""" rates = compute_recovery_rates(pair_list) # Find breakpoint: scan from high to low, find where rate first drops below target breakpoint = None for thresh in sorted(rates.keys(), reverse=True): if rates[thresh] is not None and rates[thresh] < target_rate: breakpoint = thresh break return breakpoint # Observed breakpoint obs_rates = compute_recovery_rates(pairs) obs_breakpoint = find_breakpoint(pairs) print(f"\nRecovery rates by liberty score threshold:") for t in [10, 15, 20, 25, 30, 35, 40, 45, 50]: if obs_rates.get(t) is not None: print(f" L < {t}: recovery rate = {obs_rates[t]:.3f}") print(f"\nObserved event horizon breakpoint (recovery < 30%): L = {obs_breakpoint}") # Alternative method: find the largest drop in recovery rate # Compute first differences of recovery rates rate_vals = [(t, obs_rates[t]) for t in sorted(obs_rates.keys()) if obs_rates[t] is not None] max_drop = 0 drop_point = None for i in range(1, len(rate_vals)): drop = rate_vals[i-1][1] - rate_vals[i][1] # drop from lower threshold to higher if drop < 0: # rate decreased as threshold increased (unusual) pass # Actually let's look for where rate changes most rapidly # Better approach: find threshold where sustained improvement probability changes # Look at net positive velocity fraction def find_event_horizon_logistic(pair_list): """Find event horizon as the liberty score where P(improvement) = 0.5 using simple binned logistic-like approach.""" bins = defaultdict(lambda: [0, 0]) # [positive_velocity_count, total] for c, L_t, L_t1 in pair_list: b = int(L_t / 5) * 5 bins[b][1] += 1 if L_t1 > L_t: bins[b][0] += 1 results = {} for b in sorted(bins.keys()): if bins[b][1] >= 3: results[b] = bins[b][0] / bins[b][1] return results binned_rates = find_event_horizon_logistic(pairs) print(f"\nBinned improvement rates (P(L_t+1 > L_t) by starting L bin):") for b in sorted(binned_rates.keys()): bar = '#' * int(binned_rates[b] * 40) print(f" L in [{b:2d},{b+5:2d}): P(improve) = {binned_rates[b]:.3f} {bar}") # Find where improvement probability crosses 0.5 eh_estimate = None sorted_bins = sorted(binned_rates.keys()) for i in range(len(sorted_bins) - 1): b1, b2 = sorted_bins[i], sorted_bins[i+1] r1, r2 = binned_rates[b1], binned_rates[b2] if r1 < 0.5 <= r2: # Linear interpolation eh_estimate = b1 + 5 * (0.5 - r1) / (r2 - r1) break if eh_estimate is None: # Fallback: find boundary between low and high improvement for i in range(len(sorted_bins) - 1): b1, b2 = sorted_bins[i], sorted_bins[i+1] if binned_rates[b1] < 0.4 and binned_rates[b2] >= 0.4: eh_estimate = b1 + 5 * (0.4 - binned_rates[b1]) / (binned_rates[b2] - binned_rates[b1]) break if eh_estimate is None: eh_estimate = 20 # default print(f"\nEvent horizon estimate (P(improve)=0.5 crossing): L ~ {eh_estimate:.1f}") # Bootstrap CI n_boot = 1000 boot_estimates = [] country_list = list(cluster_groups.keys()) for b in range(n_boot): # Resample countries (cluster bootstrap) boot_countries = random.choices(country_list, k=len(country_list)) boot_pairs = [] for c in boot_countries: rows = countries[c] for i in range(len(rows) - 1): L_t = float(rows[i]['liberty']) L_t1 = float(rows[i + 1]['liberty']) boot_pairs.append((c, L_t, L_t1)) boot_binned = find_event_horizon_logistic(boot_pairs) sorted_b = sorted(boot_binned.keys()) est = None for i in range(len(sorted_b) - 1): b1, b2 = sorted_b[i], sorted_b[i+1] r1, r2 = boot_binned[b1], boot_binned[b2] if r1 < 0.5 <= r2: est = b1 + 5 * (0.5 - r1) / (r2 - r1) break if est is None: for i in range(len(sorted_b) - 1): b1, b2 = sorted_b[i], sorted_b[i+1] if boot_binned[b1] < 0.4 and boot_binned[b2] >= 0.4: est = b1 + 5 * (0.4 - boot_binned[b1]) / (boot_binned[b2] - boot_binned[b1]) break if est is not None: boot_estimates.append(est) boot_estimates.sort() ci_low_eh = boot_estimates[int(0.025 * len(boot_estimates))] ci_high_eh = boot_estimates[int(0.975 * len(boot_estimates))] boot_mean_eh = statistics.mean(boot_estimates) boot_se_eh = statistics.stdev(boot_estimates) print(f"\nBootstrap Event Horizon (N={len(boot_estimates)} valid resamples):") print(f" Point estimate: L = {eh_estimate:.1f}") print(f" Bootstrap mean: L = {boot_mean_eh:.1f}") print(f" Bootstrap SE: {boot_se_eh:.2f}") print(f" 95% CI: [{ci_low_eh:.1f}, {ci_high_eh:.1f}]") # ============================================================ # 7. Stage Retention Rates with 95% CIs # ============================================================ print("\n" + "=" * 60) print("Stage Retention Rates with 95% CIs") print("=" * 60) # Define stages (using the status column and liberty ranges) stages = { 'Free (L>70)': lambda L: L > 70, 'Partly Free (30-70)': lambda L: 30 <= L <= 70, 'Not Free (L<30)': lambda L: L < 30 } retention_results = {} for sname, sfunc in stages.items(): # Transitions where starting state is in this stage stage_pairs = [(c, L_t, L_t1) for c, L_t, L_t1 in pairs if sfunc(L_t)] retained = sum(1 for c, L_t, L_t1 in stage_pairs if sfunc(L_t1)) n_s = len(stage_pairs) if n_s == 0: continue rate = retained / n_s # Country-clustered CI via bootstrap boot_rates = [] for _ in range(n_boot): boot_c = random.choices(country_list, k=len(country_list)) b_pairs = [] for c in boot_c: rows = countries[c] for i in range(len(rows) - 1): L_t = float(rows[i]['liberty']) L_t1 = float(rows[i + 1]['liberty']) if sfunc(L_t): b_pairs.append((c, L_t, L_t1)) if len(b_pairs) > 0: b_retained = sum(1 for c, L_t, L_t1 in b_pairs if sfunc(L_t1)) boot_rates.append(b_retained / len(b_pairs)) boot_rates.sort() if len(boot_rates) > 20: ci_lo = boot_rates[int(0.025 * len(boot_rates))] ci_hi = boot_rates[int(0.975 * len(boot_rates))] se_boot = statistics.stdev(boot_rates) else: ci_lo = ci_hi = se_boot = float('nan') retention_results[sname] = { 'n': n_s, 'rate': rate, 'se': se_boot, 'ci_low': ci_lo, 'ci_high': ci_hi } print(f"\n{sname}:") print(f" N transitions = {n_s}") print(f" Retention rate = {rate:.3f} ({rate*100:.1f}%)") print(f" Bootstrap SE = {se_boot:.3f}") print(f" 95% CI: [{ci_lo:.3f}, {ci_hi:.3f}]") # ============================================================ # 8. Transition Matrix with CIs # ============================================================ print("\n" + "=" * 60) print("Transition Matrix") print("=" * 60) stage_names = ['Free (L>70)', 'Partly Free (30-70)', 'Not Free (L<30)'] stage_funcs = [lambda L: L > 70, lambda L: 30 <= L <= 70, lambda L: L < 30] trans_matrix = {} for i, (sn_from, sf_from) in enumerate(zip(stage_names, stage_funcs)): row_pairs = [(c, L_t, L_t1) for c, L_t, L_t1 in pairs if sf_from(L_t)] n_from = len(row_pairs) for j, (sn_to, sf_to) in enumerate(zip(stage_names, stage_funcs)): count = sum(1 for c, L_t, L_t1 in row_pairs if sf_to(L_t1)) rate = count / n_from if n_from > 0 else 0 trans_matrix[(sn_from, sn_to)] = {'count': count, 'rate': rate, 'n_from': n_from} # Print matrix from_to = 'From \\ To' header = f"{from_to:<25}" + "".join(f"{sn:>25}" for sn in stage_names) print(header) print("-" * len(header)) for sn_from in stage_names: row = f"{sn_from:<25}" for sn_to in stage_names: t = trans_matrix[(sn_from, sn_to)] row += f"{t['rate']:.3f} ({t['count']:>4d}/{t['n_from']:<4d}) " print(row) # ============================================================ # 9. Write results to markdown # ============================================================ with open('/tmp/pt-data/statistical-inference-results.md', 'w') as f: f.write("# A4: Statistical Inference Results\n\n") f.write("## AR(1) Model: L(t+1) = alpha + beta * L(t) + epsilon\n\n") f.write("### OLS Estimates with Country-Clustered Standard Errors\n\n") f.write("| Parameter | Estimate | Clustered SE | 95% CI | OLS SE | Clustering Factor |\n") f.write("|-----------|----------|-------------|--------|--------|------------------|\n") f.write(f"| alpha | {alpha_hat:.4f} | {se_alpha_cluster:.4f} | [{ci_alpha[0]:.4f}, {ci_alpha[1]:.4f}] | {se_alpha_ols:.4f} | {se_alpha_cluster/se_alpha_ols:.2f}x |\n") f.write(f"| beta | {beta_hat:.4f} | {se_beta_cluster:.4f} | [{ci_beta[0]:.4f}, {ci_beta[1]:.4f}] | {se_beta_ols:.4f} | {se_beta_cluster/se_beta_ols:.2f}x |\n") f.write(f"\n- N observations: {n}\n") f.write(f"- N clusters (countries): {G}\n") f.write(f"- RMSE: {rmse:.4f}\n") f.write(f"- R-squared: {1 - sse / sum((p[2] - sum_y/n)**2 for p in pairs):.4f}\n") f.write("\n### Interpretation\n\n") f.write(f"- beta = {beta_hat:.4f}: Strong persistence in liberty scores (close to unit root)\n") f.write(f"- alpha = {alpha_hat:.4f}: Small positive intercept (slight upward drift when L=0)\n") f.write(f"- Clustered SEs are {se_alpha_cluster/se_alpha_ols:.1f}-{se_beta_cluster/se_beta_ols:.1f}x larger than OLS SEs, ") f.write("indicating substantial within-country correlation of errors\n") f.write("\n## Zone Velocity Analysis (Starting-Zone Assignment)\n\n") f.write("| Zone | N | Countries | Mean Velocity | Clustered SE | 95% CI | Interpretation |\n") f.write("|------|---|-----------|--------------|-------------|--------|----------------|\n") for zname in ['Liberty Basin (L>70)', 'Hybrid Zone (30-70)', 'Tyranny Basin (L<30)']: if zname in zone_results: zr = zone_results[zname] interp = "Attractor" if zr['ci_low'] <= 0 <= zr['ci_high'] else ("Upward drift" if zr['ci_low'] > 0 else "Downward drift") f.write(f"| {zname} | {zr['n']} | {zr['n_countries']} | {zr['mean']:+.3f} | {zr['se']:.3f} | [{zr['ci_low']:+.3f}, {zr['ci_high']:+.3f}] | {interp} |\n") f.write("\n### Velocity Analysis Interpretation\n\n") f.write("Starting-zone assignment (Fix 3 compliance) avoids contaminating velocity estimates\n") f.write("by assigning each transition to the zone where it *begins*, not where it ends.\n") f.write("This prevents selection bias from counting only transitions that stayed in-zone.\n\n") for zname in ['Liberty Basin (L>70)', 'Hybrid Zone (30-70)', 'Tyranny Basin (L<30)']: if zname in zone_results: zr = zone_results[zname] if zr['ci_low'] <= 0 <= zr['ci_high']: f.write(f"- **{zname}**: Mean velocity {zr['mean']:+.3f} is NOT significantly different from zero.\n") f.write(f" This is consistent with attractor behavior (states tend to stay).\n") elif zr['ci_low'] > 0: f.write(f"- **{zname}**: Mean velocity {zr['mean']:+.3f} is significantly POSITIVE.\n") f.write(f" States in this zone tend to improve.\n") else: f.write(f"- **{zname}**: Mean velocity {zr['mean']:+.3f} is significantly NEGATIVE.\n") f.write(f" States in this zone tend to decline.\n") f.write("\n## Event Horizon Analysis\n\n") f.write(f"The event horizon is estimated as the liberty score where the probability of\n") f.write(f"improvement (L_t+1 > L_t) crosses 50%.\n\n") f.write(f"- **Point estimate**: L = {eh_estimate:.1f}\n") f.write(f"- **Bootstrap mean**: L = {boot_mean_eh:.1f}\n") f.write(f"- **Bootstrap SE**: {boot_se_eh:.2f}\n") f.write(f"- **95% CI**: [{ci_low_eh:.1f}, {ci_high_eh:.1f}]\n\n") f.write("### Binned Improvement Rates\n\n") f.write("| Liberty Score Bin | P(Improvement) | Interpretation |\n") f.write("|-------------------|---------------|----------------|\n") for b in sorted(binned_rates.keys()): interp = "Below event horizon" if binned_rates[b] < 0.5 else "Above event horizon" f.write(f"| [{b}, {b+5}) | {binned_rates[b]:.3f} | {interp} |\n") f.write("\n## Stage Retention Rates\n\n") f.write("| Stage | N Transitions | Retention Rate | Bootstrap SE | 95% CI |\n") f.write("|-------|--------------|---------------|-------------|--------|\n") for sname in stage_names: if sname in retention_results: rr = retention_results[sname] f.write(f"| {sname} | {rr['n']} | {rr['rate']:.3f} ({rr['rate']*100:.1f}%) | {rr['se']:.3f} | [{rr['ci_low']:.3f}, {rr['ci_high']:.3f}] |\n") f.write("\n### Retention Rate Interpretation\n\n") f.write("High retention rates in both Free and Not Free categories confirm\n") f.write("the 'sticky basins' hypothesis: once a country enters either the\n") f.write("liberty or tyranny basin, it tends to remain there. The Partly Free\n") f.write("zone shows lower retention, consistent with its role as an unstable\n") f.write("transition zone between the two basins of attraction.\n") f.write("\n## Transition Matrix\n\n") f.write("| From \\ To | Free (L>70) | Partly Free (30-70) | Not Free (L<30) |\n") f.write("|-----------|-------------|--------------------|-----------------|\n") for sn_from in stage_names: row = f"| {sn_from} |" for sn_to in stage_names: t = trans_matrix[(sn_from, sn_to)] row += f" {t['rate']:.3f} ({t['count']}/{t['n_from']}) |" f.write(row + "\n") f.write("\n---\n*Generated by A4 statistical inference script. All standard errors are country-clustered.*\n") print("\n\nResults saved to /tmp/pt-data/statistical-inference-results.md")