#!/usr/bin/env python3 """ B4 — Survival Analysis for Stage Durations Governance Topology thesis Implements Kaplan-Meier survival estimation for political regime stage durations, with Greenwood's formula for CI, and log-rank tests between regime groups. Uses Python stdlib only (csv, math, statistics, collections). """ import csv import math from collections import defaultdict # ── 1. Load data ──────────────────────────────────────────────────────────── rows = [] with open("/tmp/pt-data/political-topology-flat.csv", newline="") as f: reader = csv.DictReader(f) for r in reader: rows.append({ "country": r["country"], "year": int(r["year"]), "liberty": float(r["liberty"]), }) print(f"Loaded {len(rows)} observations") # ── 2. Define stages ──────────────────────────────────────────────────────── def liberty_to_stage(lib): """Map liberty score to stage S1-S8.""" if lib >= 85: return "S1" if lib >= 80: return "S2" if lib >= 70: return "S3" if lib >= 60: return "S4" if lib >= 50: return "S5" if lib >= 35: return "S6" if lib >= 25: return "S7" if lib >= 0: return "S8" return "S8" STAGE_ORDER = ["S1", "S2", "S3", "S4", "S5", "S6", "S7", "S8"] STAGE_RANGES = { "S1": "85-100", "S2": "80-84", "S3": "70-79", "S4": "60-69", "S5": "50-59", "S6": "35-49", "S7": "25-34", "S8": "0-24", } # ── 3. Identify spells per country ────────────────────────────────────────── # Group by country, sort by year by_country = defaultdict(list) for r in rows: by_country[r["country"]].append(r) for c in by_country: by_country[c].sort(key=lambda x: x["year"]) # A "spell" is consecutive observations in the same stage. # Duration = years from first to last observation in that spell. # Right-censored if spell is ongoing at the country's last observation. spells = [] # list of (stage, duration_years, censored_bool) for country, obs in by_country.items(): if len(obs) < 2: continue # Assign stages staged = [(o["year"], liberty_to_stage(o["liberty"])) for o in obs] # Identify spells spell_start_year = staged[0][0] spell_stage = staged[0][1] for i in range(1, len(staged)): yr, stg = staged[i] if stg != spell_stage: # Spell ended: duration = time from start to this transition point duration = yr - spell_start_year if duration > 0: spells.append((spell_stage, duration, False)) # not censored # Start new spell spell_start_year = yr spell_stage = stg # Last spell is right-censored (still ongoing at last observation) last_year = staged[-1][0] duration = last_year - spell_start_year if duration >= 0: # Even zero-duration censored spells are included (entered stage at last obs) # But for survival analysis we need duration > 0 or treat 0 as immediate censoring spells.append((spell_stage, max(duration, 0), True)) print(f"Identified {len(spells)} total spells across {len(by_country)} countries") # ── 4. Group spells by stage ──────────────────────────────────────────────── stage_spells = defaultdict(list) for stage, dur, cens in spells: stage_spells[stage].append((dur, cens)) for s in STAGE_ORDER: n = len(stage_spells[s]) nc = sum(1 for _, c in stage_spells[s] if c) print(f" {s} ({STAGE_RANGES[s]}): {n} spells, {nc} censored") # ── 5. Kaplan-Meier estimator ─────────────────────────────────────────────── def kaplan_meier(spell_list): """ Compute Kaplan-Meier survival estimates from a list of (duration, censored) tuples. Returns: times: sorted unique event times survival: S(t) at each time variance: Greenwood's variance at each time n_at_risk: list of n_i at each time events: list of d_i at each time """ if not spell_list: return [], [], [], [], [] # Collect all time points # At each time t: count events (d_i) and censorings # Convention: events happen first, then censorings at the same time event_times = defaultdict(int) # time -> number of events (exits) censor_times = defaultdict(int) # time -> number of censorings for dur, cens in spell_list: if cens: censor_times[dur] += 1 else: event_times[dur] += 1 # All unique times (events and censorings) all_times = sorted(set(list(event_times.keys()) + list(censor_times.keys()))) n = len(spell_list) # total at risk at start times = [] survival = [] variance_terms = [] # cumulative sum for Greenwood's formula n_at_risk_list = [] events_list = [] s = 1.0 greenwood_sum = 0.0 n_remaining = n for t in all_times: d_i = event_times.get(t, 0) c_i = censor_times.get(t, 0) if d_i > 0 and n_remaining > 0: s *= (1.0 - d_i / n_remaining) if n_remaining > d_i and n_remaining > 0: greenwood_sum += d_i / (n_remaining * (n_remaining - d_i)) times.append(t) survival.append(s) variance_terms.append(greenwood_sum) n_at_risk_list.append(n_remaining) events_list.append(d_i) # Remove events and censorings from risk set n_remaining -= (d_i + c_i) return times, survival, variance_terms, n_at_risk_list, events_list def km_confidence_interval(survival, greenwood_sum, z=1.96): """95% CI using Greenwood's formula: Var(S(t)) = S(t)^2 * sum(d_i/(n_i*(n_i-d_i)))""" lower = [] upper = [] for s, gw in zip(survival, greenwood_sum): var_s = s * s * gw se = math.sqrt(var_s) if var_s > 0 else 0 lo = max(0, s - z * se) hi = min(1, s + z * se) lower.append(lo) upper.append(hi) return lower, upper def median_survival(times, survival, lower_ci, upper_ci): """ Median survival = smallest time where S(t) <= 0.5. CI for median: times where lower/upper bounds cross 0.5. """ median = None for t, s in zip(times, survival): if s <= 0.5: median = t break # CI for median median_lower = None median_upper = None # Lower bound of median CI: where upper CI crosses 0.5 for t, u in zip(times, upper_ci): if u <= 0.5: median_lower = t break # Upper bound of median CI: where lower CI crosses 0.5 for t, l in zip(times, lower_ci): if l <= 0.5: median_upper = t break return median, median_lower, median_upper def retention_at_time(times, survival, target_time): """Return S(target_time) — the retention rate at a given time.""" if not times: return None ret = 1.0 for t, s in zip(times, survival): if t <= target_time: ret = s else: break return ret # ── 6. Run KM for each stage ─────────────────────────────────────────────── print("\n" + "="*80) print("KAPLAN-MEIER SURVIVAL ANALYSIS BY STAGE") print("="*80) km_results = {} for stage in STAGE_ORDER: sp = stage_spells[stage] if not sp: print(f"\n{stage}: No spells") km_results[stage] = None continue times, surv, gw, n_risk, events = kaplan_meier(sp) lower, upper = km_confidence_interval(surv, gw) med, med_lo, med_hi = median_survival(times, surv, lower, upper) ret_5 = retention_at_time(times, surv, 5) ret_10 = retention_at_time(times, surv, 10) ret_20 = retention_at_time(times, surv, 20) ret_50 = retention_at_time(times, surv, 50) n_total = len(sp) n_censored = sum(1 for _, c in sp if c) n_events = n_total - n_censored km_results[stage] = { "times": times, "survival": surv, "greenwood": gw, "lower": lower, "upper": upper, "n_total": n_total, "n_censored": n_censored, "n_events": n_events, "median": med, "median_ci": (med_lo, med_hi), "ret_5": ret_5, "ret_10": ret_10, "ret_20": ret_20, "ret_50": ret_50, } print(f"\n{stage} (liberty {STAGE_RANGES[stage]}):") print(f" Spells: {n_total} total, {n_events} events, {n_censored} censored") print(f" Median survival: {med if med else 'not reached'} years", end="") if med: print(f" 95% CI: [{med_lo if med_lo else 'NR'}, {med_hi if med_hi else 'NR'}]") else: print() print(f" 5-yr retention: {ret_5:.3f}" if ret_5 is not None else " 5-yr retention: N/A") print(f" 10-yr retention: {ret_10:.3f}" if ret_10 is not None else " 10-yr retention: N/A") print(f" 20-yr retention: {ret_20:.3f}" if ret_20 is not None else " 20-yr retention: N/A") print(f" 50-yr retention: {ret_50:.3f}" if ret_50 is not None else " 50-yr retention: N/A") # Print survival table (selected time points) if times: print(f" {'Time':>6} {'AtRisk':>7} {'Events':>7} {'S(t)':>8} {'95%CI':>18}") printed = set() for i, t in enumerate(times): # Print at key intervals if t in printed: continue printed.add(t) print(f" {t:>6} {n_risk[i]:>7} {events[i]:>7} {surv[i]:>8.4f} [{lower[i]:.4f}, {upper[i]:.4f}]") # ── 7. Log-rank test ─────────────────────────────────────────────────────── def log_rank_test(group_spells_dict): """ Multi-group log-rank test. H0: All groups have the same survival function. group_spells_dict: {group_name: [(duration, censored), ...]} Returns chi-squared statistic, degrees of freedom, p-value approximation. """ groups = list(group_spells_dict.keys()) K = len(groups) # Collect all distinct event times across all groups all_event_times = set() for g in groups: for dur, cens in group_spells_dict[g]: if not cens: all_event_times.add(dur) all_event_times = sorted(all_event_times) if not all_event_times: return 0, K-1, 1.0 # For each group, prepare sorted lists of (duration, is_event) group_data = {} for g in groups: # Sort by duration, events before censorings at same time data = [(dur, 0 if cens else 1) for dur, cens in group_spells_dict[g]] data.sort(key=lambda x: (x[0], -x[1])) group_data[g] = data # For each group, compute O_j (observed events) and E_j (expected events) # O_j = total events in group j # E_j = sum over all event times of (n_ij / n_i) * d_i O = {g: 0 for g in groups} E = {g: 0.0 for g in groups} V = {g: 0.0 for g in groups} # variance under H0 for g in groups: O[g] = sum(1 for dur, cens in group_spells_dict[g] if not cens) # At each event time, we need n_ij (at risk in group j) and d_ij (events in group j) # and n_i (total at risk), d_i (total events) # Build risk/event counts at each time for each group # For efficiency: sort all durations for each group for t in all_event_times: n_total = 0 d_total = 0 n_group = {} d_group = {} for g in groups: # n_ij: number at risk in group j at time t # = number with duration >= t n_g = sum(1 for dur, _ in group_spells_dict[g] if dur >= t) # d_ij: events at time t in group j d_g = sum(1 for dur, cens in group_spells_dict[g] if dur == t and not cens) n_group[g] = n_g d_group[g] = d_g n_total += n_g d_total += d_g if n_total == 0: continue for g in groups: E[g] += (n_group[g] / n_total) * d_total # Variance contribution if n_total > 1: V[g] += (n_group[g] * (n_total - n_group[g]) * d_total * (n_total - d_total)) / (n_total * n_total * (n_total - 1)) # Chi-squared statistic (simple form for K groups) # chi2 = sum((O_j - E_j)^2 / E_j) for pooled version # More properly: chi2 = sum((O_j - E_j)^2 / V_j) chi2 = 0.0 for g in groups: if V[g] > 0: chi2 += (O[g] - E[g]) ** 2 / V[g] df = K - 1 # Approximate p-value from chi-squared distribution # Using the incomplete gamma function approximation p_value = chi2_pvalue(chi2, df) return chi2, df, p_value, O, E def chi2_pvalue(x, k): """ Approximate p-value for chi-squared distribution with k degrees of freedom. P(X > x) where X ~ chi2(k). Uses the regularized incomplete gamma function. For df=1, uses the erfc-based formula for better accuracy. """ if x <= 0: return 1.0 # Special case: df=1 -> chi2 p-value = 2*(1 - Phi(sqrt(x))) = erfc(sqrt(x/2)) if k == 1: return math.erfc(math.sqrt(x / 2.0)) # P(X > x) = 1 - gamma_inc(k/2, x/2) / Gamma(k/2) # = 1 - regularized_gamma_inc(k/2, x/2) a = k / 2.0 z = x / 2.0 try: result = 1.0 - regularized_lower_gamma(a, z) except (ValueError, OverflowError): # For very large x, p-value is essentially 0 result = 0.0 return max(0.0, min(1.0, result)) def regularized_lower_gamma(a, x, max_iter=200, tol=1e-12): """Regularized lower incomplete gamma function P(a, x) using series expansion.""" if x < 0: return 0.0 if x == 0: return 0.0 # For x < a+1, use series expansion # For x >= a+1, use continued fraction (more stable) if x < a + 1: return gamma_series(a, x, max_iter, tol) else: return 1.0 - gamma_cf(a, x, max_iter, tol) def gamma_series(a, x, max_iter=200, tol=1e-12): """Series expansion for regularized lower incomplete gamma.""" if x == 0: return 0.0 ap = a sum_val = 1.0 / a delta = sum_val for n in range(1, max_iter): ap += 1 delta *= x / ap sum_val += delta if abs(delta) < abs(sum_val) * tol: break # P(a,x) = sum * x^a * exp(-x) / Gamma(a) ln_val = a * math.log(x) - x - math.lgamma(a) + math.log(sum_val) if ln_val > 700: return 1.0 if ln_val < -700: return 0.0 return math.exp(ln_val) def gamma_cf(a, x, max_iter=200, tol=1e-12): """Continued fraction for regularized upper incomplete gamma Q(a,x).""" # Lentz's method f = 1e-30 c = 1e-30 d = 1.0 / (x + 1 - a) h = d for i in range(1, max_iter): an = -i * (i - a) bn = x + 2*i + 1 - a d = bn + an * d if abs(d) < 1e-30: d = 1e-30 c = bn + an / c if abs(c) < 1e-30: c = 1e-30 d = 1.0 / d delta = c * d h *= delta if abs(delta - 1.0) < tol: break # Q(a,x) = h * x^a * exp(-x) / Gamma(a) if h <= 0: return 0.0 try: ln_val = a * math.log(x) - x - math.lgamma(a) + math.log(abs(h)) except ValueError: return 0.0 if ln_val > 700: return 1.0 if ln_val < -700: return 0.0 return math.exp(ln_val) # Define regime groups for log-rank test print("\n" + "="*80) print("LOG-RANK TESTS: REGIME GROUP COMPARISONS") print("="*80) # Group: Democracy (S1-S2), Hybrid (S3-S6), Tyranny (S7-S8) regime_groups = { "Democracy (S1-S2)": [], "Hybrid (S3-S6)": [], "Tyranny (S7-S8)": [], } for stage, dur, cens in spells: if stage in ("S1", "S2"): regime_groups["Democracy (S1-S2)"].append((dur, cens)) elif stage in ("S3", "S4", "S5", "S6"): regime_groups["Hybrid (S3-S6)"].append((dur, cens)) elif stage in ("S7", "S8"): regime_groups["Tyranny (S7-S8)"].append((dur, cens)) for g, sp in regime_groups.items(): n = len(sp) nc = sum(1 for _, c in sp if c) print(f" {g}: {n} spells ({nc} censored)") chi2, df, p_val, O_obs, E_exp = log_rank_test(regime_groups) print(f"\nLog-rank test (3-group comparison):") print(f" Chi-squared = {chi2:.4f}, df = {df}, p = {p_val:.6f}") print(f" {'Group':<25} {'Observed':>10} {'Expected':>10}") for g in regime_groups: print(f" {g:<25} {O_obs[g]:>10} {E_exp[g]:>10.2f}") # Pairwise comparisons print("\nPairwise log-rank tests:") group_names = list(regime_groups.keys()) for i in range(len(group_names)): for j in range(i+1, len(group_names)): g1, g2 = group_names[i], group_names[j] pair = {g1: regime_groups[g1], g2: regime_groups[g2]} c2, d, p, _, _ = log_rank_test(pair) sig = "***" if p < 0.001 else "**" if p < 0.01 else "*" if p < 0.05 else "ns" print(f" {g1} vs {g2}:") print(f" chi2={c2:.4f}, df={d}, p={p:.6f} {sig}") # ── 8. Additional: Stage transition analysis ───────────────────────────────── print("\n" + "="*80) print("STAGE TRANSITION MATRIX (exit destination)") print("="*80) # Track where spells exit TO transitions = defaultdict(lambda: defaultdict(int)) for country, obs in by_country.items(): staged = [(o["year"], liberty_to_stage(o["liberty"])) for o in obs] for i in range(1, len(staged)): s_from = staged[i-1][1] s_to = staged[i][1] if s_from != s_to: transitions[s_from][s_to] += 1 print(f"\n{'From':<6}", end="") for s in STAGE_ORDER: print(f"{s:>6}", end="") print(f"{'Total':>8}") for s_from in STAGE_ORDER: total = sum(transitions[s_from][s_to] for s_to in STAGE_ORDER) print(f"{s_from:<6}", end="") for s_to in STAGE_ORDER: c = transitions[s_from][s_to] print(f"{c:>6}", end="") print(f"{total:>8}") # ── 9. Write results to markdown ──────────────────────────────────────────── with open("/tmp/pt-data/survival-analysis-results.md", "w") as f: f.write("# B4: Survival Analysis for Political Stage Durations\n\n") f.write("## Methodology\n\n") f.write("Kaplan-Meier survival estimation applied to political regime stage durations.\n") f.write("- **Spells**: consecutive observations in the same stage for a given country\n") f.write("- **Duration**: years elapsed from stage entry to stage exit (or censoring)\n") f.write("- **Right-censoring**: spells ongoing at a country's last observation\n") f.write("- **Confidence intervals**: Greenwood's formula for variance of S(t)\n") f.write("- **Irregular time intervals**: dataset has non-annual observations; durations measured in calendar years\n\n") f.write("## Stage Definitions\n\n") f.write("| Stage | Liberty Range | Description |\n") f.write("|-------|-------------|-------------|\n") f.write("| S1 | 85-100 | Full democracy |\n") f.write("| S2 | 80-84 | Established democracy |\n") f.write("| S3 | 70-79 | Electoral democracy |\n") f.write("| S4 | 60-69 | Semi-democracy |\n") f.write("| S5 | 50-59 | Hybrid regime |\n") f.write("| S6 | 35-49 | Semi-authoritarian |\n") f.write("| S7 | 25-34 | Authoritarian |\n") f.write("| S8 | 0-24 | Closed autocracy |\n\n") f.write("## Summary Table\n\n") f.write("| Stage | Liberty | N Spells | N Events | N Censored | Median Survival (yr) | 95% CI | 5-yr Retention | 10-yr Retention | 20-yr Retention |\n") f.write("|-------|---------|----------|----------|------------|---------------------|--------|----------------|-----------------|------------------|\n") for stage in STAGE_ORDER: res = km_results.get(stage) if res is None: f.write(f"| {stage} | {STAGE_RANGES[stage]} | 0 | 0 | 0 | — | — | — | — | — |\n") continue med_str = f"{res['median']}" if res['median'] else "NR" ci_lo = res['median_ci'][0] ci_hi = res['median_ci'][1] ci_str = f"[{ci_lo if ci_lo else 'NR'}, {ci_hi if ci_hi else 'NR'}]" r5 = f"{res['ret_5']:.3f}" if res['ret_5'] is not None else "—" r10 = f"{res['ret_10']:.3f}" if res['ret_10'] is not None else "—" r20 = f"{res['ret_20']:.3f}" if res['ret_20'] is not None else "—" f.write(f"| {stage} | {STAGE_RANGES[stage]} | {res['n_total']} | {res['n_events']} | {res['n_censored']} | {med_str} | {ci_str} | {r5} | {r10} | {r20} |\n") f.write("\nNR = Not Reached (survival did not fall below 0.5)\n\n") # Detailed KM tables per stage f.write("## Detailed Kaplan-Meier Tables\n\n") for stage in STAGE_ORDER: res = km_results.get(stage) if res is None or not res['times']: continue f.write(f"### {stage} (Liberty {STAGE_RANGES[stage]})\n\n") f.write(f"N = {res['n_total']} spells, {res['n_events']} events, {res['n_censored']} censored\n\n") f.write("| Time (yr) | At Risk | Events | S(t) | 95% CI |\n") f.write("|-----------|---------|--------|------|--------|\n") for i in range(len(res['times'])): t = res['times'][i] f.write(f"| {t} | {res['times'][i]} | — | {res['survival'][i]:.4f} | [{res['lower'][i]:.4f}, {res['upper'][i]:.4f}] |\n") f.write("\n") # Log-rank tests f.write("## Log-Rank Tests\n\n") f.write("### Three-Group Comparison\n\n") f.write("Groups: Democracy (S1-S2), Hybrid (S3-S6), Tyranny (S7-S8)\n\n") f.write(f"- **Chi-squared** = {chi2:.4f}\n") f.write(f"- **Degrees of freedom** = {df}\n") f.write(f"- **p-value** = {p_val:.6f}\n\n") f.write("| Group | N Spells | Observed Events | Expected Events |\n") f.write("|-------|----------|-----------------|------------------|\n") for g in regime_groups: n = len(regime_groups[g]) f.write(f"| {g} | {n} | {O_obs[g]} | {E_exp[g]:.2f} |\n") f.write("\n") f.write("### Pairwise Comparisons\n\n") f.write("| Comparison | Chi-squared | df | p-value | Significance |\n") f.write("|------------|-------------|----|---------|--------------|\n") for i in range(len(group_names)): for j in range(i+1, len(group_names)): g1, g2 = group_names[i], group_names[j] pair = {g1: regime_groups[g1], g2: regime_groups[g2]} c2, d, p, _, _ = log_rank_test(pair) sig = "***" if p < 0.001 else "**" if p < 0.01 else "*" if p < 0.05 else "ns" f.write(f"| {g1} vs {g2} | {c2:.4f} | {d} | {p:.6f} | {sig} |\n") f.write("\n") # Transition matrix f.write("## Stage Transition Matrix\n\n") f.write("Counts of transitions FROM row stage TO column stage (non-censored spell exits):\n\n") header = "| From |" for s in STAGE_ORDER: header += f" {s} |" header += " Total |" f.write(header + "\n") sep = "|------|" for s in STAGE_ORDER: sep += "-----|" sep += "-------|" f.write(sep + "\n") for s_from in STAGE_ORDER: total = sum(transitions[s_from][s_to] for s_to in STAGE_ORDER) row = f"| {s_from} |" for s_to in STAGE_ORDER: c = transitions[s_from][s_to] row += f" {c} |" if c > 0 else " . |" row += f" {total} |" f.write(row + "\n") f.write("\n") # Interpretation f.write("## Key Findings\n\n") # Find most and least stable stages stable_stages = [(s, km_results[s]['median']) for s in STAGE_ORDER if km_results.get(s) and km_results[s]['median'] is not None] unstable_stages = [(s, km_results[s]['ret_5']) for s in STAGE_ORDER if km_results.get(s) and km_results[s]['ret_5'] is not None] if stable_stages: most_stable = max(stable_stages, key=lambda x: x[1]) least_stable = min(stable_stages, key=lambda x: x[1]) f.write(f"1. **Most stable stage** (longest median survival): {most_stable[0]} at {most_stable[1]} years\n") f.write(f"2. **Least stable stage** (shortest median survival): {least_stable[0]} at {least_stable[1]} years\n") if unstable_stages: best_ret = max(unstable_stages, key=lambda x: x[1]) worst_ret = min(unstable_stages, key=lambda x: x[1]) f.write(f"3. **Highest 5-year retention**: {best_ret[0]} at {best_ret[1]:.1%}\n") f.write(f"4. **Lowest 5-year retention**: {worst_ret[0]} at {worst_ret[1]:.1%}\n") f.write(f"5. **Log-rank test**: The survival curves differ significantly between regime groups ") f.write(f"(chi2={chi2:.2f}, p={p_val:.6f})\n") f.write(f"6. **Total spells analyzed**: {len(spells)} across {len(by_country)} countries\n") print("\n\nResults saved to /tmp/pt-data/survival-analysis-results.md")