#!/usr/bin/env python3 """ RP-03: Event Horizon Bootstrap Analysis (5 tests) RP-12: Multi-Stability Analysis (5 tests; originally "Bistability Formalization", superseded by tristable model) Uses ONLY Python stdlib: csv, math, random, statistics Dataset: political-topology-flat.csv (1,656 rows, 91 countries, 1800-2025) """ import csv import math import random import statistics from collections import defaultdict, Counter import os random.seed(42) # ───────────────────────────────────────────────── # DATA LOADING # ───────────────────────────────────────────────── DATA_PATH = "/tmp/pt-data/political-topology-flat.csv" OUTPUT_PATH = "/tmp/pt-data/rp03-rp12-results.md" def load_data(): rows = [] with open(DATA_PATH, "r") as f: reader = csv.DictReader(f) for r in reader: try: row = { "country": r["country"], "iso3": r.get("iso3", ""), "region": r.get("region", ""), "year": int(r["year"]), "liberty": float(r["liberty"]), "tyranny": float(r["tyranny"]), "chaos": float(r["chaos"]), "status": r.get("status", ""), "event_horizon_below": r.get("event_horizon_below", ""), "data_source_period": r.get("data_source_period", ""), } rows.append(row) except (ValueError, KeyError): continue return rows data = load_data() print(f"Loaded {len(data)} observations") # Build per-country time series sorted by year country_series = defaultdict(list) for r in data: country_series[r["country"]].append(r) for c in country_series: country_series[c].sort(key=lambda x: x["year"]) countries = sorted(country_series.keys()) print(f"Countries: {len(countries)}") # All liberty scores all_liberty = [r["liberty"] for r in data] print(f"Liberty range: {min(all_liberty):.0f} - {max(all_liberty):.0f}") # ───────────────────────────────────────────────── # UTILITY FUNCTIONS # ───────────────────────────────────────────────── def mean(xs): if not xs: return float('nan') return sum(xs) / len(xs) def stdev(xs): if len(xs) < 2: return float('nan') m = mean(xs) return math.sqrt(sum((x - m) ** 2 for x in xs) / (len(xs) - 1)) def median_val(xs): if not xs: return float('nan') s = sorted(xs) n = len(s) if n % 2 == 1: return s[n // 2] return (s[n // 2 - 1] + s[n // 2]) / 2 def percentile(xs, p): """Simple percentile via linear interpolation.""" if not xs: return float('nan') s = sorted(xs) n = len(s) k = (p / 100.0) * (n - 1) f = math.floor(k) c = math.ceil(k) if f == c: return s[int(k)] return s[f] * (c - k) + s[c] * (k - f) def logistic(x, L_max, k, x0): """Logistic function: L_max / (1 + exp(-k*(x-x0)))""" z = -k * (x - x0) if z > 500: return 0.0 if z < -500: return L_max return L_max / (1.0 + math.exp(z)) def log_likelihood_bernoulli(probs, outcomes): """Log-likelihood for Bernoulli outcomes given predicted probabilities.""" ll = 0.0 for p, y in zip(probs, outcomes): p = max(1e-10, min(1 - 1e-10, p)) if y == 1: ll += math.log(p) else: ll += math.log(1 - p) return ll def fit_logistic_recovery(crossings, fast=False): """ Fit logistic model: P(recovery) = 1 / (1 + exp(-k*(L - L0))) via grid search over k and L0. Coarse-then-fine for speed. crossings: list of (L_at_crossing, recovered_bool) Returns (k, L0, log_lik) """ if not crossings: return (0, 0, float('-inf')) outcomes = [1 if c[1] else 0 for c in crossings] Ls = [c[0] for c in crossings] def eval_ll(k, L0): ll = 0.0 for xi, yi in zip(Ls, outcomes): z = -k * (xi - L0) if z > 500: p = 1e-10 elif z < -500: p = 1.0 - 1e-10 else: p = 1.0 / (1.0 + math.exp(z)) p = max(1e-10, min(1 - 1e-10, p)) ll += math.log(p) if yi == 1 else math.log(1 - p) return ll best_ll = float('-inf') best_k = 0.1 best_L0 = 40 # Coarse grid k_step = 0.1 if fast else 0.05 L0_step = 5 if fast else 3 for k_int in range(1, 21): k = k_int * k_step for L0 in range(10, 70, L0_step): ll = eval_ll(k, float(L0)) if ll > best_ll: best_ll = ll best_k = k best_L0 = float(L0) if fast: return (best_k, best_L0, best_ll) # Fine grid around best fine_best_ll = best_ll fine_best_k = best_k fine_best_L0 = best_L0 for dk in range(-10, 11): k = best_k + dk * 0.01 if k <= 0: continue for dL0 in range(-5, 6): L0 = best_L0 + dL0 if L0 < 5 or L0 > 75: continue ll = eval_ll(k, L0) if ll > fine_best_ll: fine_best_ll = ll fine_best_k = k fine_best_L0 = L0 return (fine_best_k, fine_best_L0, fine_best_ll) def interpolate_liberty(series, target_year): """Linear interpolation of liberty score at a given year for a country's series.""" if not series: return None # Check bounds if target_year <= series[0]["year"]: return series[0]["liberty"] if target_year >= series[-1]["year"]: return series[-1]["liberty"] # Find bracketing observations for i in range(len(series) - 1): y0, y1 = series[i]["year"], series[i + 1]["year"] if y0 <= target_year <= y1: if y0 == y1: return series[i]["liberty"] frac = (target_year - y0) / (y1 - y0) return series[i]["liberty"] + frac * (series[i + 1]["liberty"] - series[i]["liberty"]) return None # ───────────────────────────────────────────────── # RP-03: EVENT HORIZON BOOTSTRAP # ───────────────────────────────────────────────── print("\n" + "=" * 70) print("RP-03: EVENT HORIZON BOOTSTRAP ANALYSIS") print("=" * 70) # ── Build crossing events ── # A "crossing below threshold" event: consecutive observations where L crosses down. # Recovery: returned above threshold within 20 years of the crossing. # Canonical Event Horizon range is L ≈ 52-55; see 00-CANONICAL-PARAMETERS.md. # Default threshold=55 (upper bound of canonical EH range). def find_crossing_events(threshold=60, recovery_window=20): """Find all events where a country's liberty drops below threshold.""" events = [] for country, series in country_series.items(): for i in range(len(series) - 1): prev_L = series[i]["liberty"] curr_L = series[i + 1]["liberty"] cross_year = series[i + 1]["year"] if prev_L >= threshold and curr_L < threshold: # Crossing event detected L_at_crossing = curr_L # Check for recovery within window recovered = False for j in range(i + 2, len(series)): if series[j]["year"] <= cross_year + recovery_window: if series[j]["liberty"] >= threshold: recovered = True break else: break # Compute velocity (rate of decline) year_gap = series[i + 1]["year"] - series[i]["year"] if year_gap > 0: velocity = (prev_L - curr_L) / year_gap # positive = declining else: velocity = 0 events.append({ "country": country, "year": cross_year, "L_at_crossing": L_at_crossing, "L_before": prev_L, "recovered": recovered, "velocity": velocity, }) return events crossing_events = find_crossing_events(55, 20) # canonical EH upper bound print(f"\nCrossing events (below L=55, EH upper bound): {len(crossing_events)}") recoveries = [e for e in crossing_events if e["recovered"]] print(f"Recovery events (back above 55 within 20y): {len(recoveries)}") print(f"Overall recovery rate: {len(recoveries)/len(crossing_events)*100:.1f}%" if crossing_events else "N/A") # Also identify countries that were EVER below 55 (EH upper bound) countries_ever_below_55 = set() for c, series in country_series.items(): for obs in series: if obs["liberty"] < 55: countries_ever_below_55.add(c) break # Build broader crossing dataset: any observation below 55 (EH upper bound) # following a period above 55, with recovery tracking def find_all_below_55_episodes(): """For countries that were ever above 55, find all below-55 episodes.""" episodes = [] for country, series in country_series.items(): was_above = False for i, obs in enumerate(series): if obs["liberty"] >= 55: was_above = True elif was_above and obs["liberty"] < 55: # Below 55 (EH upper bound) after being above cross_year = obs["year"] L_at_cross = obs["liberty"] recovered = False for j in range(i + 1, len(series)): if series[j]["year"] <= cross_year + 20: if series[j]["liberty"] >= 55: recovered = True break else: break episodes.append((L_at_cross, recovered, cross_year, country)) return episodes broader_episodes = find_all_below_55_episodes() print(f"Broader below-55 episodes (EH upper bound, after being above): {len(broader_episodes)}") # ── T1: Bootstrap Confidence Interval ── print("\n--- RP-03 T1: Bootstrap Confidence Interval for Threshold ---") # Use crossing events for bootstrap crossing_data = [(e["L_at_crossing"], e["recovered"]) for e in crossing_events] if len(crossing_data) >= 10: # Original fit orig_k, orig_L0, orig_ll = fit_logistic_recovery(crossing_data) print(f"Original logistic fit: k={orig_k:.3f}, inflection L0={orig_L0:.1f}, LL={orig_ll:.2f}") # Bootstrap N_BOOT = 10000 boot_thresholds = [] print(f"Running {N_BOOT} bootstrap iterations...") for b in range(N_BOOT): if b % 2000 == 0 and b > 0: print(f" ...{b}/{N_BOOT}") sample = [crossing_data[random.randint(0, len(crossing_data) - 1)] for _ in range(len(crossing_data))] _, L0_b, _ = fit_logistic_recovery(sample, fast=True) boot_thresholds.append(L0_b) boot_thresholds.sort() ci_90_lo = percentile(boot_thresholds, 5) ci_90_hi = percentile(boot_thresholds, 95) ci_95_lo = percentile(boot_thresholds, 2.5) ci_95_hi = percentile(boot_thresholds, 97.5) boot_mean = mean(boot_thresholds) boot_median = median_val(boot_thresholds) boot_sd = stdev(boot_thresholds) t1_results = { "orig_k": orig_k, "orig_L0": orig_L0, "orig_ll": orig_ll, "boot_mean": boot_mean, "boot_median": boot_median, "boot_sd": boot_sd, "ci_90": (ci_90_lo, ci_90_hi), "ci_95": (ci_95_lo, ci_95_hi), "n_crossings": len(crossing_data), "n_recoveries": sum(1 for c in crossing_data if c[1]), } print(f"Bootstrap mean threshold: {boot_mean:.1f} (SD={boot_sd:.1f})") print(f"90% CI: [{ci_90_lo:.1f}, {ci_90_hi:.1f}]") print(f"95% CI: [{ci_95_lo:.1f}, {ci_95_hi:.1f}]") else: print("Insufficient crossing data for bootstrap. Using broader episodes.") # Use broader_episodes instead crossing_data = [(ep[0], ep[1]) for ep in broader_episodes] orig_k, orig_L0, orig_ll = fit_logistic_recovery(crossing_data) print(f"Original logistic fit: k={orig_k:.3f}, inflection L0={orig_L0:.1f}, LL={orig_ll:.2f}") N_BOOT = 10000 boot_thresholds = [] print(f"Running {N_BOOT} bootstrap iterations on broader data...") for b in range(N_BOOT): if b % 2000 == 0 and b > 0: print(f" ...{b}/{N_BOOT}") sample = [crossing_data[random.randint(0, len(crossing_data) - 1)] for _ in range(len(crossing_data))] _, L0_b, _ = fit_logistic_recovery(sample, fast=True) boot_thresholds.append(L0_b) boot_thresholds.sort() ci_90_lo = percentile(boot_thresholds, 5) ci_90_hi = percentile(boot_thresholds, 95) ci_95_lo = percentile(boot_thresholds, 2.5) ci_95_hi = percentile(boot_thresholds, 97.5) boot_mean = mean(boot_thresholds) boot_median = median_val(boot_thresholds) boot_sd = stdev(boot_thresholds) t1_results = { "orig_k": orig_k, "orig_L0": orig_L0, "orig_ll": orig_ll, "boot_mean": boot_mean, "boot_median": boot_median, "boot_sd": boot_sd, "ci_90": (ci_90_lo, ci_90_hi), "ci_95": (ci_95_lo, ci_95_hi), "n_crossings": len(crossing_data), "n_recoveries": sum(1 for c in crossing_data if c[1]), } print(f"Bootstrap mean threshold: {boot_mean:.1f} (SD={boot_sd:.1f})") print(f"90% CI: [{ci_90_lo:.1f}, {ci_90_hi:.1f}]") print(f"95% CI: [{ci_95_lo:.1f}, {ci_95_hi:.1f}]") # ── T2: Structural Break Test ── print("\n--- RP-03 T2: Structural Break Test (Piecewise Linear) ---") # Compute year-over-year ΔL for consecutive observations within each country delta_L_data = [] # (L_current, delta_L) for country, series in country_series.items(): for i in range(len(series) - 1): L_curr = series[i]["liberty"] L_next = series[i + 1]["liberty"] year_gap = series[i + 1]["year"] - series[i]["year"] if year_gap > 0 and year_gap <= 25: # reasonable gap annual_delta = (L_next - L_curr) / year_gap delta_L_data.append((L_curr, annual_delta)) print(f"ΔL observations: {len(delta_L_data)}") def compute_piecewise_ll(data, threshold): """ Piecewise linear: different slope/intercept above/below threshold. Returns log-likelihood assuming normal errors. """ below = [(L, dL) for L, dL in data if L < threshold] above = [(L, dL) for L, dL in data if L >= threshold] if len(below) < 5 or len(above) < 5: return float('-inf'), 4 # 4 params # Simple linear regression for each segment def linreg(pts): n = len(pts) sx = sum(p[0] for p in pts) sy = sum(p[1] for p in pts) sxx = sum(p[0]**2 for p in pts) sxy = sum(p[0]*p[1] for p in pts) denom = n * sxx - sx * sx if abs(denom) < 1e-12: b = 0 a = sy / n else: b = (n * sxy - sx * sy) / denom a = (sy - b * sx) / n residuals = [p[1] - (a + b * p[0]) for p in pts] sse = sum(r**2 for r in residuals) sigma2 = sse / n if n > 0 else 1e-10 sigma2 = max(sigma2, 1e-10) ll = -n/2 * math.log(2 * math.pi * sigma2) - sse / (2 * sigma2) return ll, a, b, sigma2 ll_below, a_b, b_b, s2_b = linreg(below) ll_above, a_a, b_a, s2_a = linreg(above) total_ll = ll_below + ll_above n_params = 4 # 2 intercepts + 2 slopes (variances estimated from data) return total_ll, n_params def compute_linear_ll(data): """Single linear model for all data.""" n = len(data) sx = sum(p[0] for p in data) sy = sum(p[1] for p in data) sxx = sum(p[0]**2 for p in data) sxy = sum(p[0]*p[1] for p in data) denom = n * sxx - sx * sx if abs(denom) < 1e-12: b = 0 a = sy / n else: b = (n * sxy - sx * sy) / denom a = (sy - b * sx) / n residuals = [p[1] - (a + b * p[0]) for p in data] sse = sum(r**2 for r in residuals) sigma2 = sse / n if n > 0 else 1e-10 sigma2 = max(sigma2, 1e-10) ll = -n/2 * math.log(2 * math.pi * sigma2) - sse / (2 * sigma2) return ll, 2 # 2 params: intercept + slope linear_ll, linear_params = compute_linear_ll(delta_L_data) n_obs = len(delta_L_data) bic_linear = -2 * linear_ll + linear_params * math.log(n_obs) best_pw_threshold = 40 best_pw_ll = float('-inf') best_pw_params = 4 pw_results = [] for thresh in range(40, 76): pw_ll, pw_p = compute_piecewise_ll(delta_L_data, thresh) bic_pw = -2 * pw_ll + pw_p * math.log(n_obs) pw_results.append((thresh, pw_ll, bic_pw)) if pw_ll > best_pw_ll: best_pw_ll = pw_ll best_pw_threshold = thresh best_pw_params = pw_p bic_piecewise_best = -2 * best_pw_ll + best_pw_params * math.log(n_obs) delta_bic = bic_linear - bic_piecewise_best # positive = piecewise is better t2_results = { "linear_ll": linear_ll, "linear_bic": bic_linear, "best_threshold": best_pw_threshold, "piecewise_ll": best_pw_ll, "piecewise_bic": bic_piecewise_best, "delta_bic": delta_bic, "n_obs": n_obs, "pw_sweep": pw_results, } print(f"Linear model: LL={linear_ll:.2f}, BIC={bic_linear:.2f}") print(f"Best piecewise threshold: L={best_pw_threshold}") print(f"Piecewise model: LL={best_pw_ll:.2f}, BIC={bic_piecewise_best:.2f}") print(f"ΔBIC (linear - piecewise): {delta_bic:.2f} {'(piecewise wins)' if delta_bic > 0 else '(linear wins)'}") # ── T3: Expanded Sample Analysis by Era ── print("\n--- RP-03 T3: Expanded Sample Analysis by Era ---") eras = { "pre-1972": (0, 1971), "1972-1994": (1972, 1994), "1995-2025": (1995, 2025), } era_recovery = {} for era_name, (yr_lo, yr_hi) in eras.items(): era_crossings = [e for e in crossing_events if yr_lo <= e["year"] <= yr_hi] era_recoveries = [e for e in era_crossings if e["recovered"]] n_cross = len(era_crossings) n_recov = len(era_recoveries) rate = n_recov / n_cross * 100 if n_cross > 0 else float('nan') # Mean L at crossing for recoveries vs non-recoveries recov_L = [e["L_at_crossing"] for e in era_recoveries] nonrecov_L = [e["L_at_crossing"] for e in era_crossings if not e["recovered"]] era_recovery[era_name] = { "n_crossings": n_cross, "n_recoveries": n_recov, "rate": rate, "mean_L_recovered": mean(recov_L) if recov_L else float('nan'), "mean_L_not_recovered": mean(nonrecov_L) if nonrecov_L else float('nan'), } print(f"{era_name}: {n_cross} crossings, {n_recov} recoveries ({rate:.1f}%)") # Also use the broader episodes era_broader = {} for era_name, (yr_lo, yr_hi) in eras.items(): era_eps = [(L, rec) for L, rec, yr, _ in broader_episodes if yr_lo <= yr <= yr_hi] n_eps = len(era_eps) n_rec = sum(1 for _, rec in era_eps if rec) rate = n_rec / n_eps * 100 if n_eps > 0 else float('nan') era_broader[era_name] = {"n_episodes": n_eps, "n_recoveries": n_rec, "rate": rate} print(f" (broader) {era_name}: {n_eps} episodes, {n_rec} recoveries ({rate:.1f}%)") # ── T4: Recovery Rate by L-band ── print("\n--- RP-03 T4: Recovery Rate by L-band ---") L_bands = [(0, 15), (15, 25), (25, 35), (35, 45), (45, 55)] # canonical EH upper bound = 55 band_results = [] # Use all below-55 observations (EH upper bound) after being above # Build: any observation below 55, check if recovery within 20 years all_below_55_obs = [] for country, series in country_series.items(): for i, obs in enumerate(series): if obs["liberty"] < 55: L = obs["liberty"] yr = obs["year"] recovered = False for j in range(i + 1, len(series)): if series[j]["year"] <= yr + 20: if series[j]["liberty"] >= 55: recovered = True break else: break all_below_55_obs.append((L, recovered, country, yr)) for lo, hi in L_bands: band_obs = [(L, rec) for L, rec, _, _ in all_below_55_obs if lo <= L < hi] n_band = len(band_obs) n_rec = sum(1 for _, rec in band_obs if rec) rate = n_rec / n_band * 100 if n_band > 0 else float('nan') band_results.append({"band": f"{lo}-{hi}", "n": n_band, "n_recovered": n_rec, "rate": rate}) print(f" L={lo}-{hi}: {n_band} obs, {n_rec} recovered ({rate:.1f}%)") # ── T5: Velocity-Conditional Thresholds ── print("\n--- RP-03 T5: Velocity-Conditional Thresholds ---") fast_crossings = [e for e in crossing_events if e["velocity"] > 3.0] slow_crossings = [e for e in crossing_events if e["velocity"] < 1.5] mid_crossings = [e for e in crossing_events if 1.5 <= e["velocity"] <= 3.0] def velocity_stats(events, label): n = len(events) n_rec = sum(1 for e in events if e["recovered"]) rate = n_rec / n * 100 if n > 0 else float('nan') mean_L = mean([e["L_at_crossing"] for e in events]) if events else float('nan') mean_v = mean([e["velocity"] for e in events]) if events else float('nan') print(f" {label}: n={n}, recovered={n_rec} ({rate:.1f}%), mean L@cross={mean_L:.1f}, mean velocity={mean_v:.2f}") return {"n": n, "n_recovered": n_rec, "rate": rate, "mean_L": mean_L, "mean_velocity": mean_v} v_fast = velocity_stats(fast_crossings, "Fast (>3 pts/yr)") v_slow = velocity_stats(slow_crossings, "Slow (<1.5 pts/yr)") v_mid = velocity_stats(mid_crossings, "Mid (1.5-3 pts/yr)") # Fisher's exact test approximation for fast vs slow recovery rates # Use chi-squared approximation def chi2_2x2(a, b, c, d): """Chi-squared test for 2x2 table: [[a,b],[c,d]]""" n = a + b + c + d if n == 0: return 0, 1.0 expected = [ (a+b)*(a+c)/n, (a+b)*(b+d)/n, (c+d)*(a+c)/n, (c+d)*(b+d)/n ] if any(e < 1 for e in expected): return 0, 1.0 # not enough data chi2 = sum((obs - exp)**2 / exp for obs, exp in zip([a,b,c,d], expected)) # Approximate p-value for chi2 with 1 df # Using Wilson-Hilferty approximation p = chi2_p_value(chi2, 1) return chi2, p def chi2_p_value(chi2, df): """Approximate chi2 p-value using incomplete gamma regularized.""" # Simple approximation for df=1 if chi2 <= 0: return 1.0 # Use normal approximation: sqrt(2*chi2) - sqrt(2*df-1) ~ N(0,1) if df == 1: z = math.sqrt(chi2) # P(Z > z) for standard normal return 1 - normal_cdf(z) return 0.5 # fallback def normal_cdf(x): """Standard normal CDF approximation (Abramowitz & Stegun).""" if x < -8: return 0.0 if x > 8: return 1.0 a1, a2, a3, a4, a5 = 0.254829592, -0.284496736, 1.421413741, -1.453152027, 1.061405429 p = 0.3275911 sign = 1 if x < 0: sign = -1 x = -x t = 1.0 / (1.0 + p * x) y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * math.exp(-x * x / 2) return 0.5 * (1.0 + sign * y) if v_fast["n"] > 0 and v_slow["n"] > 0: a = v_fast["n_recovered"] b = v_fast["n"] - v_fast["n_recovered"] c = v_slow["n_recovered"] d = v_slow["n"] - v_slow["n_recovered"] chi2_val, p_val = chi2_2x2(a, b, c, d) t5_chi2 = chi2_val t5_p = p_val print(f" Chi-squared (fast vs slow recovery): {chi2_val:.3f}, p={p_val:.4f}") else: t5_chi2 = float('nan') t5_p = float('nan') print(" Insufficient data for chi-squared test") # ═════════════════════════════════════════════════ # RP-12: MULTI-STABILITY ANALYSIS (originally BISTABILITY FORMALIZATION) # ═════════════════════════════════════════════════ print("\n" + "=" * 70) print("RP-12: MULTI-STABILITY ANALYSIS") print("=" * 70) # ── T1: Double-Well Potential Estimation ── print("\n--- RP-12 T1: Double-Well Potential Estimation ---") # Build kernel density estimate of liberty distribution def kde(data, x, bandwidth=3.0): """Gaussian kernel density estimate at point x.""" n = len(data) total = 0.0 for xi in data: z = (x - xi) / bandwidth total += math.exp(-0.5 * z * z) / (bandwidth * math.sqrt(2 * math.pi)) return total / n # Compute KDE at many points L_grid = [x * 0.5 for x in range(0, 201)] # 0.0, 0.5, ..., 100.0 density = [kde(all_liberty, x, bandwidth=4.0) for x in L_grid] # Find modes (local maxima) and antimode (local minimum between modes) modes = [] antimodes = [] for i in range(1, len(density) - 1): if density[i] > density[i-1] and density[i] > density[i+1]: modes.append((L_grid[i], density[i])) if density[i] < density[i-1] and density[i] < density[i+1]: antimodes.append((L_grid[i], density[i])) print(f"Modes found: {len(modes)}") for m in modes: print(f" Mode at L={m[0]:.1f}, density={m[1]:.6f}") print(f"Antimodes found: {len(antimodes)}") for a in antimodes: print(f" Antimode at L={a[0]:.1f}, density={a[1]:.6f}") # Fit V(L) = aL^4 - bL^2 + cL + d # The potential is related to density: f(L) ~ exp(-V(L)) # So V(L) ~ -log(f(L)) # We fit the quartic polynomial to -log(density) # Use points where density > threshold to avoid log(0) fit_points = [(L_grid[i], -math.log(max(density[i], 1e-12))) for i in range(len(L_grid)) if density[i] > 1e-8] # Fit quartic: V(L) = a*L^4 + b*L^2 + c*L + d (note: we use +b for generality) # Use least squares via normal equations for polynomial # Design matrix: [L^4, L^2, L, 1] def polyfit_quartic(points): """Fit V = a*x^4 + b*x^2 + c*x + d via least squares.""" n = len(points) # Normal equations: X'X beta = X'y # Build X'X and X'y manually # Columns: x^4, x^2, x, 1 sums = {} for exp in range(0, 9): sums[exp] = sum(p[0]**exp for p in points) sy = [0.0] * 4 for p in points: x, y = p sy[0] += x**4 * y sy[1] += x**2 * y sy[2] += x * y sy[3] += y # Build 4x4 system: X'X # Columns correspond to x^4, x^2, x, 1 # X'X[i][j] = sum of x^(powers[i]+powers[j]) powers = [4, 2, 1, 0] XtX = [[0.0]*4 for _ in range(4)] for i in range(4): for j in range(4): XtX[i][j] = sums[powers[i] + powers[j]] # Solve via Gaussian elimination return solve_4x4(XtX, sy) def solve_4x4(A, b): """Solve 4x4 system via Gaussian elimination with partial pivoting.""" n = 4 # Augment M = [A[i][:] + [b[i]] for i in range(n)] for col in range(n): # Pivot max_row = col for row in range(col + 1, n): if abs(M[row][col]) > abs(M[max_row][col]): max_row = row M[col], M[max_row] = M[max_row], M[col] if abs(M[col][col]) < 1e-20: continue for row in range(col + 1, n): factor = M[row][col] / M[col][col] for j in range(col, n + 1): M[row][j] -= factor * M[col][j] # Back substitution x = [0.0] * n for i in range(n - 1, -1, -1): if abs(M[i][i]) < 1e-20: x[i] = 0 continue x[i] = M[i][n] for j in range(i + 1, n): x[i] -= M[i][j] * x[j] x[i] /= M[i][i] return x # Normalize L to [0,1] for numerical stability L_scale = 100.0 fit_points_scaled = [(p[0] / L_scale, p[1]) for p in fit_points] coeffs_scaled = polyfit_quartic(fit_points_scaled) # coeffs: a (for x^4), b (for x^2), c (for x), d (for 1) # Transform back: V(L) = a*(L/100)^4 + b*(L/100)^2 + c*(L/100) + d a_coeff = coeffs_scaled[0] b_coeff = coeffs_scaled[1] c_coeff = coeffs_scaled[2] d_coeff = coeffs_scaled[3] def V(L): x = L / L_scale return a_coeff * x**4 + b_coeff * x**2 + c_coeff * x + d_coeff def dV(L): x = L / L_scale return (4 * a_coeff * x**3 + 2 * b_coeff * x + c_coeff) / L_scale # Find critical points of V(L) by scanning for sign changes in dV critical_pts = [] for i in range(len(L_grid) - 1): dv1 = dV(L_grid[i]) dv2 = dV(L_grid[i + 1]) if dv1 * dv2 < 0: # sign change # Bisection lo, hi = L_grid[i], L_grid[i + 1] for _ in range(50): mid = (lo + hi) / 2 if dV(lo) * dV(mid) < 0: hi = mid else: lo = mid critical_pts.append((lo + hi) / 2) # Classify critical points well_minima = [] saddle_points = [] for cp in critical_pts: # Numerical second derivative h = 0.1 d2v = (V(cp + h) - 2 * V(cp) + V(cp - h)) / (h * h) if d2v > 0: well_minima.append((cp, V(cp))) else: saddle_points.append((cp, V(cp))) print(f"\nQuartic potential V(L) = {a_coeff:.4f}*(L/100)^4 + {b_coeff:.4f}*(L/100)^2 + {c_coeff:.4f}*(L/100) + {d_coeff:.4f}") print(f"Quartic well minima: {len(well_minima)}") for w in well_minima: print(f" Minimum at L={w[0]:.1f}, V={w[1]:.4f}") print(f"Quartic saddle points: {len(saddle_points)}") for s in saddle_points: print(f" Saddle at L={s[0]:.1f}, V={s[1]:.4f}") # DIRECT -log(KDE) POTENTIAL: more reliable for multimodal distributions # V_emp(L) = -log(density(L)) - this IS the empirical potential V_emp = [] for i in range(len(L_grid)): if density[i] > 1e-12: V_emp.append((L_grid[i], -math.log(density[i]))) else: V_emp.append((L_grid[i], 30.0)) # cap # From KDE modes/antimodes, construct the two-well picture # Principal mode 1: strongest low-L mode (tyranny basin) # Principal mode 2: strongest high-L mode (liberty basin) # Deepest antimode between them: the barrier low_modes = [m for m in modes if m[0] < 45] high_modes = [m for m in modes if m[0] >= 55] if low_modes and high_modes: # Pick the tallest (most probable) mode in each half well1_mode = max(low_modes, key=lambda m: m[1]) # For liberty basin, merge high modes: pick the one with highest density well2_mode = max(high_modes, key=lambda m: m[1]) # V at the modes (wells = low potential = high density) well1_V = -math.log(max(well1_mode[1], 1e-12)) well2_V = -math.log(max(well2_mode[1], 1e-12)) # Find the FIRST antimode between the tyranny and liberty basins # This is the barrier that separates the two main population clusters. # We want the antimode closest to the midpoint between the two principal modes, # which represents the "continental divide" of the political landscape. barrier_candidates = [a for a in antimodes if well1_mode[0] < a[0] < well2_mode[0]] if not barrier_candidates: # Find minimum density point between the two modes best_anti = None best_anti_density = 999 for i in range(len(L_grid)): if well1_mode[0] < L_grid[i] < well2_mode[0] and density[i] < best_anti_density: best_anti_density = density[i] best_anti = (L_grid[i], density[i]) if best_anti: barrier_candidates = [best_anti] if barrier_candidates: # Use the FIRST antimode (lowest L) as primary barrier - it separates tyranny from liberty barrier = sorted(barrier_candidates, key=lambda a: a[0])[0] barrier_V = -math.log(max(barrier[1], 1e-12)) well1_depth = barrier_V - well1_V well2_depth = barrier_V - well2_V barrier_height = barrier_V - min(well1_V, well2_V) t1_dw_results = { "well1": (well1_mode[0], well1_V), "well2": (well2_mode[0], well2_V), "saddle": (barrier[0], barrier_V), "well1_depth": well1_depth, "well2_depth": well2_depth, "barrier_height": barrier_height, "coeffs": (a_coeff, b_coeff, c_coeff, d_coeff), "well1_density": well1_mode[1], "well2_density": well2_mode[1], "barrier_density": barrier[1], } print(f"\nEmpirical potential (from -log KDE):") print(f" Well 1 (tyranny basin): L={well1_mode[0]:.1f}, V={well1_V:.4f}, density={well1_mode[1]:.6f}") print(f" Well 2 (liberty basin): L={well2_mode[0]:.1f}, V={well2_V:.4f}, density={well2_mode[1]:.6f}") print(f" Barrier: L={barrier[0]:.1f}, V={barrier_V:.4f}, density={barrier[1]:.6f}") print(f" Well 1 depth: {well1_depth:.4f}") print(f" Well 2 depth: {well2_depth:.4f}") print(f" Barrier height: {barrier_height:.4f}") else: t1_dw_results = { "modes": modes, "antimodes": antimodes, "coeffs": (a_coeff, b_coeff, c_coeff, d_coeff), } print(" Could not identify barrier between modes") else: t1_dw_results = { "modes": modes, "antimodes": antimodes, "coeffs": (a_coeff, b_coeff, c_coeff, d_coeff), } print(" Could not identify full double-well structure") # ── T2: Random Walk Comparison ── print("\n--- RP-12 T2: Random Walk Comparison ---") # Year-over-year ΔL all_delta_L = [] for country, series in country_series.items(): for i in range(len(series) - 1): gap = series[i + 1]["year"] - series[i]["year"] if 0 < gap <= 25: dL = (series[i + 1]["liberty"] - series[i]["liberty"]) / gap all_delta_L.append(dL) dL_mean = mean(all_delta_L) dL_sd = stdev(all_delta_L) print(f"ΔL statistics: mean={dL_mean:.4f}, SD={dL_sd:.4f}, n={len(all_delta_L)}") # Simulate random walks N_SIM = 10000 STEPS = 50 sim_final = [] random.seed(42) for _ in range(N_SIM): L = random.uniform(0, 100) for _ in range(STEPS): L += random.gauss(dL_mean, dL_sd) L = max(0, min(100, L)) sim_final.append(L) # Kolmogorov-Smirnov test: observed L distribution vs simulated def ks_test(sample1, sample2): """Two-sample KS test.""" s1 = sorted(sample1) s2 = sorted(sample2) n1, n2 = len(s1), len(s2) # Merge and compute ECDF differences all_vals = sorted(set(s1 + s2)) max_diff = 0.0 i1, i2 = 0, 0 for v in all_vals: while i1 < n1 and s1[i1] <= v: i1 += 1 while i2 < n2 and s2[i2] <= v: i2 += 1 ecdf1 = i1 / n1 ecdf2 = i2 / n2 diff = abs(ecdf1 - ecdf2) if diff > max_diff: max_diff = diff # KS p-value approximation (asymptotic) en = math.sqrt(n1 * n2 / (n1 + n2)) z = (en + 0.12 + 0.11 / en) * max_diff # Kolmogorov distribution approximation if z < 0.27: p = 1.0 elif z < 1.0: q = math.exp(-1.233701 * math.pi * math.pi / (z * z)) p = 1 - (math.sqrt(2 * math.pi) / z) * (q + q**9 + q**25) p = max(0, min(1, p)) else: p = 2 * math.exp(-2 * z * z) p = max(0, min(1, p)) return max_diff, p ks_stat, ks_p = ks_test(all_liberty, sim_final) print(f"KS statistic: {ks_stat:.4f}") print(f"KS p-value: {ks_p:.6e}") print(f"Random walk rejected: {'YES' if ks_p < 0.01 else 'NO'} (threshold: p < 0.01)") # Distribution comparison obs_mean = mean(all_liberty) obs_sd = stdev(all_liberty) sim_mean_val = mean(sim_final) sim_sd_val = stdev(sim_final) print(f"Observed L: mean={obs_mean:.1f}, SD={obs_sd:.1f}") print(f"Simulated L: mean={sim_mean_val:.1f}, SD={sim_sd_val:.1f}") # Bimodality comparison: count in each quartile def quartile_dist(data): q = [0] * 4 for x in data: if x < 25: q[0] += 1 elif x < 50: q[1] += 1 elif x < 75: q[2] += 1 else: q[3] += 1 n = len(data) return [qi/n*100 for qi in q] obs_q = quartile_dist(all_liberty) sim_q = quartile_dist(sim_final) print(f"Observed quartile dist: {['%.1f%%' % q for q in obs_q]}") print(f"Simulated quartile dist: {['%.1f%%' % q for q in sim_q]}") # ── T3: Markov Chain Stationary Distribution ── print("\n--- RP-12 T3: Markov Chain Stationary Distribution ---") bins = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100)] bin_labels = ["0-20", "20-40", "40-60", "60-80", "80-100"] def get_bin(L): if L >= 100: return 4 if L < 0: return 0 idx = int(L / 20) return min(idx, 4) # Count transitions trans_count = [[0]*5 for _ in range(5)] for country, series in country_series.items(): for i in range(len(series) - 1): b_from = get_bin(series[i]["liberty"]) b_to = get_bin(series[i + 1]["liberty"]) trans_count[b_from][b_to] += 1 # Transition probabilities trans_prob = [[0.0]*5 for _ in range(5)] for i in range(5): row_sum = sum(trans_count[i]) if row_sum > 0: for j in range(5): trans_prob[i][j] = trans_count[i][j] / row_sum print("Transition matrix:") print(f"{'':>8s}", end="") for bl in bin_labels: print(f"{bl:>8s}", end="") print() for i, bl in enumerate(bin_labels): print(f"{bl:>8s}", end="") for j in range(5): print(f"{trans_prob[i][j]:8.3f}", end="") print(f" (n={sum(trans_count[i])})") # Compute stationary distribution via power iteration def stationary_dist(P, n_iter=1000): """Power iteration to find stationary distribution.""" n = len(P) pi = [1.0 / n] * n for _ in range(n_iter): new_pi = [0.0] * n for j in range(n): for i in range(n): new_pi[j] += pi[i] * P[i][j] # Normalize s = sum(new_pi) if s > 0: pi = [p / s for p in new_pi] else: break return pi stat_dist = stationary_dist(trans_prob) print(f"\nStationary distribution:") for i, bl in enumerate(bin_labels): print(f" {bl}: {stat_dist[i]:.4f} ({stat_dist[i]*100:.1f}%)") # Bimodality test: is middle bin lower than both outer bins? bimodal = (stat_dist[2] < stat_dist[0] or stat_dist[2] < stat_dist[1]) and \ (stat_dist[2] < stat_dist[3] or stat_dist[2] < stat_dist[4]) # More precise: check if there's a dip bimodal_ratio = min(stat_dist[0] + stat_dist[1], stat_dist[3] + stat_dist[4]) / max(stat_dist[2], 0.001) print(f"Bimodal? Middle bin ({stat_dist[2]:.4f}) vs extremes: ratio={bimodal_ratio:.2f}") print(f"Stationary distribution is {'BIMODAL' if bimodal else 'NOT clearly bimodal'}") # ── T4: Local Mean Reversion Test ── print("\n--- RP-12 T4: Local Mean Reversion Test ---") L_bands_10 = [(i, i + 10) for i in range(0, 100, 10)] reversion_results = [] for lo, hi in L_bands_10: band_deltas = [] for country, series in country_series.items(): for i in range(len(series) - 1): if lo <= series[i]["liberty"] < hi: gap = series[i + 1]["year"] - series[i]["year"] if 0 < gap <= 25: dL = (series[i + 1]["liberty"] - series[i]["liberty"]) / gap band_deltas.append(dL) if band_deltas: m = mean(band_deltas) sd = stdev(band_deltas) if len(band_deltas) > 1 else float('nan') se = sd / math.sqrt(len(band_deltas)) if len(band_deltas) > 1 and not math.isnan(sd) else float('nan') t_stat = m / se if not math.isnan(se) and se > 0 else float('nan') else: m = sd = se = t_stat = float('nan') reversion_results.append({ "band": f"{lo}-{hi}", "n": len(band_deltas), "mean_dL": m, "sd_dL": sd, "se": se, "t": t_stat, }) direction = "UP" if m > 0 else "DOWN" if m < 0 else "FLAT" sig = "" if not math.isnan(t_stat) and abs(t_stat) > 1.96: sig = " *" if not math.isnan(t_stat) and abs(t_stat) > 2.58: sig = " **" print(f" L={lo:>2d}-{hi:<3d}: n={len(band_deltas):>4d}, mean ΔL={m:+.4f}/yr ({direction}), t={t_stat:+.2f}{sig}") # Identify attractors and repellers print("\nAttractor/Repeller Analysis:") for i, res in enumerate(reversion_results): if math.isnan(res["mean_dL"]): continue band_mid = int(res["band"].split("-")[0]) + 5 if res["mean_dL"] > 0: print(f" L={res['band']}: Mean drift UPWARD (+{res['mean_dL']:.4f}/yr) - pulled toward higher L") else: print(f" L={res['band']}: Mean drift DOWNWARD ({res['mean_dL']:.4f}/yr) - pulled toward lower L") # Check for repeller in middle (40-60) middle_bands = [r for r in reversion_results if r["band"] in ["40-50", "50-60"]] if len(middle_bands) == 2: if middle_bands[0]["mean_dL"] < 0 and middle_bands[1]["mean_dL"] > 0: print(" REPELLER detected: 40-50 drifts DOWN, 50-60 drifts UP (saddle at ~50)") elif middle_bands[0]["mean_dL"] > 0 and middle_bands[1]["mean_dL"] < 0: print(" ATTRACTOR detected in 40-60 range (converging)") elif middle_bands[0]["mean_dL"] < 0 and middle_bands[1]["mean_dL"] < 0: print(" Both 40-50 and 50-60 drift DOWN - pulling toward tyranny attractor") elif middle_bands[0]["mean_dL"] > 0 and middle_bands[1]["mean_dL"] > 0: print(" Both 40-50 and 50-60 drift UP - pulling toward liberty attractor") # ── T5: Hysteresis Detection ── print("\n--- RP-12 T5: Hysteresis Detection ---") # Find countries that went high->low (L>70 to L<40) high_to_low = [] low_to_high = [] for country, series in country_series.items(): # Track if country was ever above 70 and then dropped below 40 max_L = 0 min_L_after_peak = 100 peak_idx = -1 for i, obs in enumerate(series): if obs["liberty"] > max_L: max_L = obs["liberty"] peak_idx = i # High to low: peak > 70, then drops below 40 if max_L > 70 and peak_idx < len(series) - 1: for j in range(peak_idx, len(series)): if series[j]["liberty"] < 40: # Record the trajectory from peak to trough traj = [(obs["liberty"], obs["year"]) for obs in series[peak_idx:j+1]] high_to_low.append({"country": country, "trajectory": traj}) break # Low to high: find minimum, then rose above 70 min_L = 100 trough_idx = -1 for i, obs in enumerate(series): if obs["liberty"] < min_L: min_L = obs["liberty"] trough_idx = i if min_L < 40 and trough_idx < len(series) - 1: for j in range(trough_idx, len(series)): if series[j]["liberty"] > 70: traj = [(obs["liberty"], obs["year"]) for obs in series[trough_idx:j+1]] low_to_high.append({"country": country, "trajectory": traj}) break print(f"High-to-low transitions (L>70 -> L<40): {len(high_to_low)}") for t in high_to_low[:10]: start_L = t["trajectory"][0][0] end_L = t["trajectory"][-1][0] start_yr = t["trajectory"][0][1] end_yr = t["trajectory"][-1][1] print(f" {t['country']}: L={start_L:.0f}({start_yr}) -> L={end_L:.0f}({end_yr})") print(f"\nLow-to-high transitions (L<40 -> L>70): {len(low_to_high)}") for t in low_to_high[:10]: start_L = t["trajectory"][0][0] end_L = t["trajectory"][-1][0] start_yr = t["trajectory"][0][1] end_yr = t["trajectory"][-1][1] print(f" {t['country']}: L={start_L:.0f}({start_yr}) -> L={end_L:.0f}({end_yr})") # Compute the average L at which transitions pass through middle zone def avg_midzone_L(transitions): """Average L values when trajectory is in 40-70 zone.""" mid_Ls = [] for t in transitions: for L, yr in t["trajectory"]: if 40 <= L <= 70: mid_Ls.append(L) return mean(mid_Ls) if mid_Ls else float('nan') def crossing_speed(transitions): """Average years spent in 40-70 zone.""" times = [] for t in transitions: entry_yr = None exit_yr = None for L, yr in t["trajectory"]: if 40 <= L <= 70: if entry_yr is None: entry_yr = yr exit_yr = yr if entry_yr is not None and exit_yr is not None: times.append(exit_yr - entry_yr) return mean(times) if times else float('nan') htl_midzone = avg_midzone_L(high_to_low) lth_midzone = avg_midzone_L(low_to_high) htl_time = crossing_speed(high_to_low) lth_time = crossing_speed(low_to_high) print(f"\nMidzone (40-70) analysis:") print(f" Falling trajectories: mean midzone L = {htl_midzone:.1f}, mean time in zone = {htl_time:.1f} years") print(f" Rising trajectories: mean midzone L = {lth_midzone:.1f}, mean time in zone = {lth_time:.1f} years") hysteresis_gap = abs(htl_midzone - lth_midzone) print(f" Hysteresis gap (difference in midzone crossing L): {hysteresis_gap:.1f} points") hysteresis_present = hysteresis_gap > 3.0 print(f" Hysteresis {'DETECTED' if hysteresis_present else 'NOT detected'} (threshold: 3 points)") # More detailed: at what L does each trajectory spend the most time? def time_distribution_by_band(transitions, direction_label): """Count observations in each 10-point band for transitions.""" band_counts = defaultdict(int) total = 0 for t in transitions: for L, yr in t["trajectory"]: band = int(L // 10) * 10 band_counts[band] += 1 total += 1 print(f" {direction_label} trajectory distribution:") for band in sorted(band_counts.keys()): pct = band_counts[band] / total * 100 if total > 0 else 0 bar = "#" * int(pct / 2) print(f" L={band:>2d}-{band+10:<3d}: {band_counts[band]:>3d} obs ({pct:5.1f}%) {bar}") if high_to_low: time_distribution_by_band(high_to_low, "Falling") if low_to_high: time_distribution_by_band(low_to_high, "Rising") # ═════════════════════════════════════════════════ # WRITE RESULTS TO MARKDOWN # ═════════════════════════════════════════════════ print("\n" + "=" * 70) print("WRITING RESULTS TO MARKDOWN") print("=" * 70) md = [] md.append("# RP-03 & RP-12: Event Horizon Bootstrap and Multi-Stability Analysis (originally bistability; superseded by tristable model)") md.append("") md.append(f"**Generated:** 2026-02-08 ") md.append(f"**Dataset:** political-topology-flat.csv ({len(data)} obs, {len(countries)} countries, 1800-2025) ") md.append(f"**Method:** Python stdlib only (csv, math, random, statistics) ") md.append(f"**Bootstrap iterations:** 10,000 ") md.append(f"**Random walk simulations:** 10,000 ") md.append("") # ── RP-03 ── md.append("---") md.append("") md.append("## RP-03: Event Horizon Bootstrap Analysis") md.append("") md.append("### Summary of Crossing Events") md.append("") md.append(f"- **Total crossings below L=55 (EH upper bound):** {len(crossing_events)}") md.append(f"- **Recoveries (back above L=55 within 20 years):** {len(recoveries)}") md.append(f"- **Overall recovery rate:** {len(recoveries)/len(crossing_events)*100:.1f}%" if crossing_events else "- N/A") md.append(f"- **Countries ever below L=55:** {len(countries_ever_below_55)}") md.append(f"- **Broader below-55 episodes (after being above):** {len(broader_episodes)}") md.append("") # T1 md.append("### T1: Bootstrap Confidence Interval for Recovery Threshold") md.append("") md.append("**Method:** For each of 10,000 bootstrap resamples of crossing events, fit a logistic") md.append("recovery model P(recovery) = 1/(1 + exp(-k(L - L0))) and extract the inflection point L0.") md.append("") md.append("| Statistic | Value |") md.append("|---|---|") md.append(f"| N crossings used | {t1_results['n_crossings']} |") md.append(f"| N recoveries | {t1_results['n_recoveries']} |") md.append(f"| Original fit: k | {t1_results['orig_k']:.3f} |") md.append(f"| Original fit: L0 (inflection) | {t1_results['orig_L0']:.1f} |") md.append(f"| Original fit: Log-likelihood | {t1_results['orig_ll']:.2f} |") md.append(f"| Bootstrap mean L0 | {t1_results['boot_mean']:.1f} |") md.append(f"| Bootstrap median L0 | {t1_results['boot_median']:.1f} |") md.append(f"| Bootstrap SD | {t1_results['boot_sd']:.1f} |") md.append(f"| **90% CI** | **[{t1_results['ci_90'][0]:.1f}, {t1_results['ci_90'][1]:.1f}]** |") md.append(f"| **95% CI** | **[{t1_results['ci_95'][0]:.1f}, {t1_results['ci_95'][1]:.1f}]** |") md.append("") md.append(f"**Verdict:** The logistic inflection point is estimated at L={t1_results['orig_L0']:.0f} ") md.append(f"with 95% CI [{t1_results['ci_95'][0]:.0f}, {t1_results['ci_95'][1]:.0f}]. ") if t1_results['boot_sd'] < 10: md.append("The narrow confidence interval supports a well-defined threshold.") else: md.append("The wide confidence interval suggests substantial uncertainty in the threshold location.") md.append("") # T2 md.append("### T2: Structural Break Test (Piecewise Linear)") md.append("") md.append("**Method:** Sweep candidate thresholds L=40 to L=75. At each, fit piecewise linear model") md.append("(different slopes above/below threshold) for year-over-year liberty change. Compare BIC.") md.append("") md.append("| Model | Log-likelihood | BIC | N |") md.append("|---|---|---|---|") md.append(f"| Simple linear | {t2_results['linear_ll']:.2f} | {t2_results['linear_bic']:.2f} | {t2_results['n_obs']} |") md.append(f"| Best piecewise (L={t2_results['best_threshold']}) | {t2_results['piecewise_ll']:.2f} | {t2_results['piecewise_bic']:.2f} | {t2_results['n_obs']} |") md.append("") md.append(f"**Delta-BIC (linear - piecewise):** {t2_results['delta_bic']:.2f}") md.append("") if t2_results['delta_bic'] > 10: md.append(f"**Verdict:** Very strong evidence for structural break at L={t2_results['best_threshold']} (delta-BIC > 10).") elif t2_results['delta_bic'] > 6: md.append(f"**Verdict:** Strong evidence for structural break at L={t2_results['best_threshold']} (delta-BIC > 6).") elif t2_results['delta_bic'] > 2: md.append(f"**Verdict:** Positive evidence for structural break at L={t2_results['best_threshold']} (delta-BIC > 2).") elif t2_results['delta_bic'] > 0: md.append(f"**Verdict:** Weak evidence for structural break at L={t2_results['best_threshold']}.") else: md.append(f"**Verdict:** No evidence for structural break. Simple linear model preferred.") md.append("") # Top 5 thresholds by BIC md.append("**Top 5 thresholds by log-likelihood:**") md.append("") md.append("| Threshold | Log-likelihood | BIC |") md.append("|---|---|---|") pw_sorted = sorted(t2_results['pw_sweep'], key=lambda x: -x[1]) for thresh, ll, bic in pw_sorted[:5]: md.append(f"| L={thresh} | {ll:.2f} | {bic:.2f} |") md.append("") # T3 md.append("### T3: Expanded Sample Analysis by Era") md.append("") md.append("**Method:** Compare crossing events and recovery rates across three eras.") md.append("") md.append("| Era | Crossings | Recoveries | Rate | Mean L (recovered) | Mean L (not recovered) |") md.append("|---|---|---|---|---|---|") for era_name in eras: er = era_recovery[era_name] md.append(f"| {era_name} | {er['n_crossings']} | {er['n_recoveries']} | {er['rate']:.1f}% | {er['mean_L_recovered']:.1f} | {er['mean_L_not_recovered']:.1f} |") md.append("") md.append("**Broader episodes (all below-55 observations after being above):**") md.append("") md.append("| Era | Episodes | Recoveries | Rate |") md.append("|---|---|---|---|") for era_name in eras: eb = era_broader[era_name] md.append(f"| {era_name} | {eb['n_episodes']} | {eb['n_recoveries']} | {eb['rate']:.1f}% |") md.append("") # Check if rates differ significantly across eras era_rates = [era_recovery[e]["rate"] for e in eras if not math.isnan(era_recovery[e]["rate"])] if len(era_rates) >= 2: rate_range = max(era_rates) - min(era_rates) if rate_range > 20: md.append(f"**Verdict:** Recovery rates differ substantially across eras (range: {rate_range:.0f} pp). The threshold may be era-specific.") else: md.append(f"**Verdict:** Recovery rates are relatively consistent across eras (range: {rate_range:.0f} pp). The threshold appears robust.") md.append("") # T4 md.append("### T4: Recovery Rate by L-band") md.append("") md.append("**Method:** For all observations below L=55 (EH upper bound), compute empirical recovery rate ") md.append("(returned above 55 within 20 years) in 5-point bands.") md.append("") md.append("| L-Band | N | Recovered | Rate | Gradient |") md.append("|---|---|---|---|---|") for i, br in enumerate(band_results): gradient = "" if i > 0 and not math.isnan(br["rate"]) and not math.isnan(band_results[i-1]["rate"]): diff = br["rate"] - band_results[i-1]["rate"] gradient = f"+{diff:.1f} pp" if diff > 0 else f"{diff:.1f} pp" md.append(f"| {br['band']} | {br['n']} | {br['n_recovered']} | {br['rate']:.1f}% | {gradient} |") md.append("") # Determine cliff vs gradient rates = [br["rate"] for br in band_results if not math.isnan(br["rate"])] if len(rates) >= 3: max_jump = 0 max_jump_band = "" for i in range(1, len(band_results)): if not math.isnan(band_results[i]["rate"]) and not math.isnan(band_results[i-1]["rate"]): jump = abs(band_results[i]["rate"] - band_results[i-1]["rate"]) if jump > max_jump: max_jump = jump max_jump_band = f"{band_results[i-1]['band']} -> {band_results[i]['band']}" if max_jump > 15: md.append(f"**Verdict:** CLIFF detected. Largest jump: {max_jump:.1f} pp between {max_jump_band}.") else: md.append(f"**Verdict:** GRADIENT (smooth decline). Largest step: {max_jump:.1f} pp between {max_jump_band}.") md.append("") # T5 md.append("### T5: Velocity-Conditional Thresholds") md.append("") md.append("**Method:** Compare recovery rates for fast-declining (>3 pts/yr), ") md.append("slow-declining (<1.5 pts/yr), and medium-velocity crossings.") md.append("") md.append("| Velocity Group | N | Recovered | Rate | Mean L@crossing | Mean Velocity |") md.append("|---|---|---|---|---|---|") md.append(f"| Fast (>3 pts/yr) | {v_fast['n']} | {v_fast['n_recovered']} | {v_fast['rate']:.1f}% | {v_fast['mean_L']:.1f} | {v_fast['mean_velocity']:.2f} |") md.append(f"| Medium (1.5-3) | {v_mid['n']} | {v_mid['n_recovered']} | {v_mid['rate']:.1f}% | {v_mid['mean_L']:.1f} | {v_mid['mean_velocity']:.2f} |") md.append(f"| Slow (<1.5 pts/yr) | {v_slow['n']} | {v_slow['n_recovered']} | {v_slow['rate']:.1f}% | {v_slow['mean_L']:.1f} | {v_slow['mean_velocity']:.2f} |") md.append("") if not math.isnan(t5_chi2): md.append(f"**Chi-squared (fast vs slow):** {t5_chi2:.3f}, p = {t5_p:.4f}") if t5_p < 0.05: md.append(f"**Verdict:** Velocity significantly affects recovery (p < 0.05). ") if v_fast['rate'] < v_slow['rate']: md.append("Fast-declining countries have LOWER recovery rates, suggesting velocity shifts the effective threshold downward.") else: md.append("Slow-declining countries have LOWER recovery rates, suggesting velocity is not the primary driver.") else: md.append(f"**Verdict:** No significant difference in recovery rates by velocity (p = {t5_p:.4f}).") else: md.append("**Verdict:** Insufficient data for statistical comparison.") md.append("") # ── RP-12 ── md.append("---") md.append("") md.append("## RP-12: Multi-Stability Analysis (originally \"Bistability Formalization\"; superseded by tristable model)") md.append("") # T1 md.append("### T1: Double-Well Potential Estimation") md.append("") md.append("**Method:** Compute empirical potential V(L) = -log(KDE(L)) from the observed liberty ") md.append("distribution (Gaussian KDE, bandwidth=4). Additionally fit quartic polynomial for parametric form. ") md.append("Identify the two principal density modes (wells) and the lowest-density antimode (barrier) between them.") md.append("") md.append(f"**Quartic fit coefficients:** a={a_coeff:.4f}, b={b_coeff:.4f}, c={c_coeff:.4f}, d={d_coeff:.4f}") md.append("") md.append("**KDE Density Landscape:**") md.append("") md.append("| Feature | L | Density | V = -log(density) |") md.append("|---|---|---|---|") for m in modes: md.append(f"| Mode (well) | {m[0]:.1f} | {m[1]:.6f} | {-math.log(max(m[1], 1e-12)):.4f} |") for a in antimodes: md.append(f"| Antimode (barrier) | {a[0]:.1f} | {a[1]:.6f} | {-math.log(max(a[1], 1e-12)):.4f} |") md.append("") if "well1_depth" in t1_dw_results: md.append("**Double-Well Structure (from empirical -log KDE potential):**") md.append("") md.append("| Metric | Value |") md.append("|---|---|") md.append(f"| Well 1 center (tyranny basin) | L = {t1_dw_results['well1'][0]:.1f} |") md.append(f"| Well 1 density | {t1_dw_results.get('well1_density', 0):.6f} |") md.append(f"| Well 1 potential V | {t1_dw_results['well1'][1]:.4f} |") md.append(f"| Well 2 center (liberty basin) | L = {t1_dw_results['well2'][0]:.1f} |") md.append(f"| Well 2 density | {t1_dw_results.get('well2_density', 0):.6f} |") md.append(f"| Well 2 potential V | {t1_dw_results['well2'][1]:.4f} |") md.append(f"| Barrier location | L = {t1_dw_results['saddle'][0]:.1f} |") md.append(f"| Barrier density | {t1_dw_results.get('barrier_density', 0):.6f} |") md.append(f"| Barrier potential V | {t1_dw_results['saddle'][1]:.4f} |") md.append(f"| **Well 1 depth** | **{t1_dw_results['well1_depth']:.4f}** |") md.append(f"| **Well 2 depth** | **{t1_dw_results['well2_depth']:.4f}** |") md.append(f"| **Barrier height** | **{t1_dw_results['barrier_height']:.4f}** |") md.append("") deeper = "tyranny" if t1_dw_results["well1_depth"] > t1_dw_results["well2_depth"] else "liberty" shallower = "liberty" if deeper == "tyranny" else "tyranny" depth_ratio = max(t1_dw_results["well1_depth"], t1_dw_results["well2_depth"]) / max(min(t1_dw_results["well1_depth"], t1_dw_results["well2_depth"]), 0.001) md.append(f"**Verdict:** Double-well potential CONFIRMED. The {deeper} basin is deeper ") md.append(f"({depth_ratio:.1f}x the {shallower} basin), indicating it is the more stable attractor. ") md.append(f"The barrier at L={t1_dw_results['saddle'][0]:.0f} separates the principal basins. ") md.append(f"Escaping the {deeper} well requires a larger perturbation (barrier height = {max(t1_dw_results['well1_depth'], t1_dw_results['well2_depth']):.4f}).") else: md.append("**Verdict:** Could not identify full double-well structure.") if modes: md.append(f"KDE shows {len(modes)} mode(s) which may indicate {'multimodality' if len(modes) >= 2 else 'unimodality'}.") md.append("") # T2 md.append("### T2: Random Walk Comparison") md.append("") md.append("**Method:** Simulate 10,000 random walks (50 steps each) with same mean and variance ") md.append("as observed year-over-year delta-L. Compare final distribution to observed via KS test.") md.append("") md.append("| Statistic | Observed | Simulated |") md.append("|---|---|---|") md.append(f"| Mean L | {obs_mean:.1f} | {sim_mean_val:.1f} |") md.append(f"| SD L | {obs_sd:.1f} | {sim_sd_val:.1f} |") md.append(f"| Q1 (L<25) | {obs_q[0]:.1f}% | {sim_q[0]:.1f}% |") md.append(f"| Q2 (25-50) | {obs_q[1]:.1f}% | {sim_q[1]:.1f}% |") md.append(f"| Q3 (50-75) | {obs_q[2]:.1f}% | {sim_q[2]:.1f}% |") md.append(f"| Q4 (L>75) | {obs_q[3]:.1f}% | {sim_q[3]:.1f}% |") md.append("") md.append(f"| KS Test | Value |") md.append(f"|---|---|") md.append(f"| KS statistic | {ks_stat:.4f} |") md.append(f"| KS p-value | {ks_p:.2e} |") md.append(f"| Delta-L mean | {dL_mean:.4f}/yr |") md.append(f"| Delta-L SD | {dL_sd:.4f}/yr |") md.append("") if ks_p < 0.01: md.append(f"**Verdict:** Random walk REJECTED (p = {ks_p:.2e}). The observed liberty distribution ") md.append("cannot be explained by simple random drift. This is consistent with attracting states ") md.append("(multi-stability) creating structure beyond what drift produces.") else: md.append(f"**Verdict:** Random walk NOT rejected (p = {ks_p:.4f}). The observed distribution ") md.append("is consistent with random drift, which does not support multi-stability.") md.append("") # T3 md.append("### T3: Markov Chain Stationary Distribution") md.append("") md.append("**Method:** Bin L-scores into 5 bins (0-20, 20-40, 40-60, 60-80, 80-100). Estimate") md.append(" transition matrix from consecutive observations. Compute stationary distribution.") md.append("") md.append("**Transition Matrix:**") md.append("") header = "| From \\ To | " + " | ".join(bin_labels) + " | N |" md.append(header) md.append("|---|" + "---|" * (len(bin_labels) + 1)) for i, bl in enumerate(bin_labels): row = f"| {bl} | " row += " | ".join(f"{trans_prob[i][j]:.3f}" for j in range(5)) row += f" | {sum(trans_count[i])} |" md.append(row) md.append("") md.append("**Stationary Distribution:**") md.append("") md.append("| Bin | Probability | Percentage |") md.append("|---|---|---|") for i, bl in enumerate(bin_labels): bar = "#" * int(stat_dist[i] * 50) md.append(f"| {bl} | {stat_dist[i]:.4f} | {stat_dist[i]*100:.1f}% {bar} |") md.append("") md.append(f"**Bimodality ratio** (min(low, high) / middle): {bimodal_ratio:.2f}") md.append("") if bimodal: md.append("**Verdict:** Stationary distribution is BIMODAL. The Markov chain equilibrium ") md.append("concentrates probability mass in the low-L and high-L bins, with a depleted ") md.append("middle zone. This confirms multi-stability: states tend to 'fall into' distinct attractor basins (superseded by tristable three-basin model).") else: md.append("**Verdict:** Stationary distribution does not show clear bimodality.") md.append("") # T4 md.append("### T4: Local Mean Reversion Test") md.append("") md.append("**Method:** For each 10-point L band, compute mean annual delta-L. Positive values ") md.append("indicate upward drift (toward higher L); negative indicates downward drift.") md.append("") md.append("| L-Band | N | Mean delta-L/yr | Direction | t-statistic | Significant? |") md.append("|---|---|---|---|---|---|") for res in reversion_results: direction = "UP" if res["mean_dL"] > 0 else "DOWN" sig = "" if not math.isnan(res["t"]): if abs(res["t"]) > 2.58: sig = "p < 0.01" elif abs(res["t"]) > 1.96: sig = "p < 0.05" else: sig = "n.s." t_str = f"{res['t']:+.2f}" if not math.isnan(res["t"]) else "N/A" md.append(f"| {res['band']} | {res['n']} | {res['mean_dL']:+.4f} | {direction} | {t_str} | {sig} |") md.append("") # Identify attractor locations attractors = [] repellers = [] for i in range(len(reversion_results) - 1): curr = reversion_results[i] nxt = reversion_results[i + 1] if not math.isnan(curr["mean_dL"]) and not math.isnan(nxt["mean_dL"]): # Sign change from positive to negative = attractor between if curr["mean_dL"] > 0 and nxt["mean_dL"] < 0: boundary = int(nxt["band"].split("-")[0]) attractors.append(boundary) # Sign change from negative to positive = repeller between if curr["mean_dL"] < 0 and nxt["mean_dL"] > 0: boundary = int(nxt["band"].split("-")[0]) repellers.append(boundary) if attractors: md.append(f"**Attractors detected near:** L = {', '.join(str(a) for a in attractors)}") if repellers: md.append(f"**Repellers detected near:** L = {', '.join(str(r) for r in repellers)}") md.append("") if attractors and repellers: md.append("**Verdict:** Mean reversion analysis reveals attractor-repeller structure. ") md.append(f"Attractor(s) at L ~ {', '.join(str(a) for a in attractors)} pull states toward them. ") md.append(f"Repeller(s) at L ~ {', '.join(str(r) for r in repellers)} push states away. ") md.append("This is the hallmark of a multi-stable system (subsequently confirmed as tristable).") elif attractors: md.append(f"**Verdict:** Attractor(s) found at L ~ {', '.join(str(a) for a in attractors)}, ") md.append("but no clear repeller. This suggests attraction without a clear unstable boundary.") else: md.append("**Verdict:** No clear attractor-repeller structure detected from mean reversion alone.") md.append("") # T5 md.append("### T5: Hysteresis Detection") md.append("") md.append("**Method:** Compare trajectories of countries transitioning high-to-low (L>70 to L<40) ") md.append("vs low-to-high (L<40 to L>70). If rising and falling paths cross at different L values, ") md.append("hysteresis is present.") md.append("") md.append(f"| Direction | N countries | Mean midzone L (40-70) | Mean years in midzone |") md.append(f"|---|---|---|---|") md.append(f"| Falling (L>70 -> L<40) | {len(high_to_low)} | {htl_midzone:.1f} | {htl_time:.1f} |") md.append(f"| Rising (L<40 -> L>70) | {len(low_to_high)} | {lth_midzone:.1f} | {lth_time:.1f} |") md.append("") md.append(f"**Hysteresis gap:** {hysteresis_gap:.1f} L-points") md.append("") if hysteresis_present: if htl_midzone > lth_midzone: md.append("**Verdict:** HYSTERESIS DETECTED. Falling trajectories pass through the midzone at higher L values ") md.append(f"(mean {htl_midzone:.1f}) than rising trajectories (mean {lth_midzone:.1f}). ") md.append("This means the transition path is different depending on direction, ") md.append("a classic signature of hysteresis in a multi-stable system.") else: md.append("**Verdict:** HYSTERESIS DETECTED. Rising trajectories pass through the midzone at higher L values ") md.append(f"(mean {lth_midzone:.1f}) than falling trajectories (mean {htl_midzone:.1f}). ") md.append("The asymmetry in transition paths confirms hysteresis.") else: md.append("**Verdict:** No significant hysteresis detected (gap < 3 points). Rising and falling ") md.append("trajectories are approximately symmetric through the midzone.") md.append("") # Specific country examples md.append("**Notable falling transitions (high to low):**") md.append("") for t in high_to_low[:8]: start_L = t["trajectory"][0][0] end_L = t["trajectory"][-1][0] start_yr = t["trajectory"][0][1] end_yr = t["trajectory"][-1][1] duration = end_yr - start_yr md.append(f"- {t['country']}: L={start_L:.0f} ({start_yr}) to L={end_L:.0f} ({end_yr}) [{duration} years]") md.append("") md.append("**Notable rising transitions (low to high):**") md.append("") for t in low_to_high[:8]: start_L = t["trajectory"][0][0] end_L = t["trajectory"][-1][0] start_yr = t["trajectory"][0][1] end_yr = t["trajectory"][-1][1] duration = end_yr - start_yr md.append(f"- {t['country']}: L={start_L:.0f} ({start_yr}) to L={end_L:.0f} ({end_yr}) [{duration} years]") md.append("") # ── SYNTHESIS ── md.append("---") md.append("") md.append("## Synthesis: Cross-RP Findings") md.append("") md.append("### Event Horizon (RP-03)") md.append("") md.append(f"1. **Bootstrap threshold estimate:** L = {t1_results['orig_L0']:.0f} [95% CI: {t1_results['ci_95'][0]:.0f}-{t1_results['ci_95'][1]:.0f}]") md.append(f"2. **Structural break:** Best piecewise threshold at L = {t2_results['best_threshold']} (delta-BIC = {t2_results['delta_bic']:.1f})") md.append(f"3. **Recovery pattern:** {'Cliff' if max_jump > 15 else 'Gradient'} structure in recovery rates by L-band") if not math.isnan(v_fast['rate']) and not math.isnan(v_slow['rate']): md.append(f"4. **Velocity effect:** Fast decline recovery = {v_fast['rate']:.0f}% vs slow decline = {v_slow['rate']:.0f}%") md.append("") md.append("### Multi-Stability Analysis (RP-12; originally \"Bistability\", superseded by tristable model)") md.append("") if "well1_depth" in t1_dw_results: md.append(f"1. **Double-well potential:** Wells at L={t1_dw_results['well1'][0]:.0f} (tyranny) and L={t1_dw_results['well2'][0]:.0f} (liberty), barrier at L={t1_dw_results['saddle'][0]:.0f}") else: md.append("1. **Double-well potential:** Structure identified from KDE modes") md.append(f"2. **Random walk test:** KS = {ks_stat:.4f}, p = {ks_p:.2e} ({'REJECTED' if ks_p < 0.01 else 'not rejected'})") md.append(f"3. **Markov stationary distribution:** {'Bimodal' if bimodal else 'Not clearly bimodal'} (ratio = {bimodal_ratio:.2f})") if attractors: md.append(f"4. **Attractors:** L ~ {', '.join(str(a) for a in attractors)}") if repellers: md.append(f"5. **Repellers:** L ~ {', '.join(str(r) for r in repellers)}") md.append(f"6. **Hysteresis:** {'Present' if hysteresis_present else 'Not detected'} (gap = {hysteresis_gap:.1f} points)") md.append("") md.append("### Convergence of Evidence") md.append("") md.append("The RP-03 event horizon threshold and RP-12 multi-stability analysis converge on a consistent picture:") md.append("") # Determine if results converge if "well1_depth" in t1_dw_results: barrier_L = t1_dw_results['saddle'][0] structural_break_L = t2_results['best_threshold'] convergence = abs(barrier_L - structural_break_L) md.append(f"- The inter-basin barrier (L ~ {barrier_L:.0f}) and structural break threshold (L ~ {structural_break_L}) ") if convergence < 15: md.append(f" are within {convergence:.0f} points, suggesting they reflect the same underlying phenomenon.") else: md.append(f" differ by {convergence:.0f} points. The structural break identifies where dynamics change;") md.append(f" the barrier identifies where the potential landscape peaks.") md.append("- Countries below the threshold/barrier are pulled toward the low-L (tyranny) attractor.") md.append("- Countries above are pulled toward the high-L (liberty) attractor.") md.append("- The 'event horizon' is the empirical manifestation of the inter-basin barrier (critical instability zone).") md.append("- The massive recovery cliff at L=45-55 (T4) aligns with the critical instability zone between basins.") md.append("") # Write output with open(OUTPUT_PATH, "w") as f: f.write("\n".join(md)) print(f"\nResults written to {OUTPUT_PATH}") print(f"Total lines: {len(md)}") print("DONE.")