#!/usr/bin/env python3 """ B1 -- Full GMM Model Comparison Fits Gaussian Mixture Models (K=1..5) to the cross-sectional liberty score distribution. Implements the EM algorithm from scratch. Computes BIC, AIC. Reports component parameters. Bootstrap CIs. """ import csv import math import random import statistics from collections import defaultdict random.seed(42) # ============================================================ # 1. Load data -- use most recent observation per country # ============================================================ data = [] with open('/tmp/pt-data/political-topology-flat.csv', 'r') as f: reader = csv.DictReader(f) for row in reader: data.append(row) # For cross-sectional: use ALL observations (pooled) -- the distribution we want to characterize liberty_scores = [float(row['liberty']) for row in data] N = len(liberty_scores) print(f"Loaded {N} liberty scores") print(f" Range: [{min(liberty_scores):.1f}, {max(liberty_scores):.1f}]") print(f" Mean: {statistics.mean(liberty_scores):.2f}") print(f" StDev: {statistics.stdev(liberty_scores):.2f}") # Also prepare cross-sectional (most recent per country) countries = defaultdict(list) for row in data: countries[row['country']].append(row) latest_scores = [] for c, rows in countries.items(): rows.sort(key=lambda r: int(r['year'])) latest_scores.append(float(rows[-1]['liberty'])) print(f"\nCross-sectional (latest per country): {len(latest_scores)} countries") print(f" Range: [{min(latest_scores):.1f}, {max(latest_scores):.1f}]") print(f" Mean: {statistics.mean(latest_scores):.2f}") # Use pooled distribution for GMM (more data, better estimation) X = liberty_scores # ============================================================ # 2. EM Algorithm for Gaussian Mixture Model # ============================================================ def gaussian_pdf(x, mu, sigma): """Univariate Gaussian PDF.""" if sigma <= 0: sigma = 0.01 coeff = 1.0 / (sigma * math.sqrt(2 * math.pi)) exponent = -0.5 * ((x - mu) / sigma) ** 2 # Guard against underflow if exponent < -500: return 1e-300 return coeff * math.exp(exponent) def em_gmm(X, K, max_iter=300, tol=1e-6, init_means=None): """ Fit a K-component Gaussian Mixture Model using EM algorithm. Returns: (weights, means, sigmas, log_likelihood, converged) """ N = len(X) # Initialize if init_means is not None: means = list(init_means) else: # K-means++ style initialization means = [X[random.randint(0, N-1)]] for _ in range(K - 1): # Pick next center proportional to distance from nearest existing center dists = [min((x - m) ** 2 for m in means) for x in X] total = sum(dists) if total == 0: means.append(X[random.randint(0, N-1)]) continue probs = [d / total for d in dists] r = random.random() cumsum = 0 for i, p in enumerate(probs): cumsum += p if cumsum >= r: means.append(X[i]) break overall_std = statistics.stdev(X) if len(X) > 1 else 10.0 sigmas = [overall_std / K * 2] * K weights = [1.0 / K] * K prev_ll = -float('inf') for iteration in range(max_iter): # E-step: compute responsibilities gamma = [] # N x K for x in X: row = [] for k in range(K): row.append(weights[k] * gaussian_pdf(x, means[k], sigmas[k])) total = sum(row) if total < 1e-300: row = [1.0 / K] * K else: row = [r / total for r in row] gamma.append(row) # M-step for k in range(K): Nk = sum(gamma[i][k] for i in range(N)) if Nk < 1e-10: # Dead component -- reinitialize means[k] = X[random.randint(0, N-1)] sigmas[k] = overall_std weights[k] = 1.0 / K continue weights[k] = Nk / N means[k] = sum(gamma[i][k] * X[i] for i in range(N)) / Nk var_k = sum(gamma[i][k] * (X[i] - means[k]) ** 2 for i in range(N)) / Nk sigmas[k] = math.sqrt(max(var_k, 0.01)) # Compute log-likelihood ll = 0 for x in X: p = sum(weights[k] * gaussian_pdf(x, means[k], sigmas[k]) for k in range(K)) if p > 0: ll += math.log(p) else: ll += -700 if abs(ll - prev_ll) < tol: return weights, means, sigmas, ll, True prev_ll = ll return weights, means, sigmas, ll, False # ============================================================ # 3. Fit models for K=1,2,3,4,5 # ============================================================ print("\n" + "=" * 60) print("GMM Model Fitting (K=1 to 5)") print("=" * 60) results = {} N = len(X) for K in range(1, 6): print(f"\nFitting K={K}...") # Run multiple initializations, keep best best_ll = -float('inf') best_result = None n_runs = 20 if K > 1 else 1 for run in range(n_runs): # Try different initializations if K == 1: init_means = [statistics.mean(X)] elif K == 2: inits = [[25, 75], [20, 80], [30, 70], [15, 60], None] init_means = inits[run] if run < len(inits) else None elif K == 3: inits = [[15, 45, 80], [10, 40, 75], [20, 50, 85], [12, 35, 78], None] init_means = inits[run] if run < len(inits) else None else: init_means = None try: w, m, s, ll, conv = em_gmm(X, K, init_means=init_means) if ll > best_ll: best_ll = ll best_result = (w, m, s, ll, conv) except Exception as e: pass if best_result is None: print(f" FAILED to fit K={K}") continue w, m, s, ll, conv = best_result # Sort components by mean order = sorted(range(K), key=lambda k: m[k]) w = [w[k] for k in order] m = [m[k] for k in order] s = [s[k] for k in order] # BIC and AIC n_params = 3 * K - 1 # K means + K variances + (K-1) weights bic = -2 * ll + n_params * math.log(N) aic = -2 * ll + 2 * n_params results[K] = { 'weights': w, 'means': m, 'sigmas': s, 'll': ll, 'bic': bic, 'aic': aic, 'n_params': n_params, 'converged': conv } print(f" Log-likelihood: {ll:.2f}") print(f" BIC: {bic:.2f}") print(f" AIC: {aic:.2f}") print(f" Converged: {conv}") print(f" Components:") for k in range(K): print(f" k={k+1}: weight={w[k]:.3f}, mean={m[k]:.2f}, sigma={s[k]:.2f}") # ============================================================ # 4. Model comparison # ============================================================ print("\n" + "=" * 60) print("Model Comparison") print("=" * 60) print(f"\n{'K':>2} | {'n_params':>8} | {'LogLik':>10} | {'AIC':>10} | {'BIC':>10} | {'dBIC':>8}") print("-" * 60) best_bic = min(results[K]['bic'] for K in results) for K in sorted(results.keys()): r = results[K] dbic = r['bic'] - best_bic marker = " <-- best" if dbic == 0 else "" print(f"{K:>2} | {r['n_params']:>8} | {r['ll']:>10.2f} | {r['aic']:>10.2f} | {r['bic']:>10.2f} | {dbic:>8.2f}{marker}") best_K = min(results, key=lambda K: results[K]['bic']) print(f"\nBest model by BIC: K = {best_K}") best_K_aic = min(results, key=lambda K: results[K]['aic']) print(f"Best model by AIC: K = {best_K_aic}") # ============================================================ # 5. Bootstrap CIs for best model # ============================================================ print(f"\n" + "=" * 60) print(f"Bootstrap CIs for K={best_K} (100 resamples)") print("=" * 60) n_boot = 100 boot_weights = [[] for _ in range(best_K)] boot_means = [[] for _ in range(best_K)] boot_sigmas = [[] for _ in range(best_K)] ref_means = results[best_K]['means'] for b in range(n_boot): # Resample X_boot = random.choices(X, k=N) try: # Initialize near the found solution init_m = [m + random.gauss(0, 2) for m in ref_means] w, m, s, ll, conv = em_gmm(X_boot, best_K, init_means=init_m) # Sort by mean to match components order = sorted(range(best_K), key=lambda k: m[k]) w = [w[k] for k in order] m = [m[k] for k in order] s = [s[k] for k in order] for k in range(best_K): boot_weights[k].append(w[k]) boot_means[k].append(m[k]) boot_sigmas[k].append(s[k]) except: pass if (b + 1) % 25 == 0: print(f" Completed {b+1}/{n_boot} resamples") print(f"\nK={best_K} Component Parameters with Bootstrap 95% CIs:") print(f"{'Component':>10} | {'Weight':>20} | {'Mean':>20} | {'Sigma':>20}") print("-" * 80) component_summary = [] for k in range(best_K): bw = sorted(boot_weights[k]) bm = sorted(boot_means[k]) bs = sorted(boot_sigmas[k]) n_b = len(bw) lo, hi = int(0.025 * n_b), int(0.975 * n_b) w_est = results[best_K]['weights'][k] m_est = results[best_K]['means'][k] s_est = results[best_K]['sigmas'][k] w_ci = f"{w_est:.3f} [{bw[lo]:.3f}, {bw[hi]:.3f}]" m_ci = f"{m_est:.2f} [{bm[lo]:.2f}, {bm[hi]:.2f}]" s_ci = f"{s_est:.2f} [{bs[lo]:.2f}, {bs[hi]:.2f}]" component_summary.append({ 'weight': w_est, 'weight_ci': (bw[lo], bw[hi]), 'mean': m_est, 'mean_ci': (bm[lo], bm[hi]), 'sigma': s_est, 'sigma_ci': (bs[lo], bs[hi]) }) print(f"{'k=' + str(k+1):>10} | {w_ci:>20} | {m_ci:>20} | {s_ci:>20}") # Also run bootstrap for K=3 if it's not the best, since theory predicts 3 if best_K != 3 and 3 in results: print(f"\n" + "=" * 60) print(f"Bootstrap CIs for K=3 (theory-predicted, 100 resamples)") print("=" * 60) boot_weights_3 = [[] for _ in range(3)] boot_means_3 = [[] for _ in range(3)] boot_sigmas_3 = [[] for _ in range(3)] ref_means_3 = results[3]['means'] for b in range(n_boot): X_boot = random.choices(X, k=N) try: init_m = [m + random.gauss(0, 2) for m in ref_means_3] w, m, s, ll, conv = em_gmm(X_boot, 3, init_means=init_m) order = sorted(range(3), key=lambda k: m[k]) w = [w[k] for k in order] m = [m[k] for k in order] s = [s[k] for k in order] for k in range(3): boot_weights_3[k].append(w[k]) boot_means_3[k].append(m[k]) boot_sigmas_3[k].append(s[k]) except: pass print(f"\nK=3 Component Parameters with Bootstrap 95% CIs:") print(f"{'Component':>10} | {'Weight':>20} | {'Mean':>20} | {'Sigma':>20}") print("-" * 80) for k in range(3): bw = sorted(boot_weights_3[k]) bm = sorted(boot_means_3[k]) bs = sorted(boot_sigmas_3[k]) n_b = len(bw) lo, hi = int(0.025 * n_b), int(0.975 * n_b) w_est = results[3]['weights'][k] m_est = results[3]['means'][k] s_est = results[3]['sigmas'][k] w_ci = f"{w_est:.3f} [{bw[lo]:.3f}, {bw[hi]:.3f}]" m_ci = f"{m_est:.2f} [{bm[lo]:.2f}, {bm[hi]:.2f}]" s_ci = f"{s_est:.2f} [{bs[lo]:.2f}, {bs[hi]:.2f}]" print(f"{'k=' + str(k+1):>10} | {w_ci:>20} | {m_ci:>20} | {s_ci:>20}") # ============================================================ # 6. Write results # ============================================================ with open('/tmp/pt-data/gmm-model-comparison-results.md', 'w') as f: f.write("# B1: Gaussian Mixture Model Comparison\n\n") f.write("## Data Summary\n\n") f.write(f"- Pooled liberty scores: N = {N}\n") f.write(f"- Range: [{min(X):.1f}, {max(X):.1f}]\n") f.write(f"- Mean: {statistics.mean(X):.2f}, StDev: {statistics.stdev(X):.2f}\n\n") f.write("## Model Comparison Table\n\n") f.write("| K | Parameters | Log-Likelihood | AIC | BIC | dBIC | Preferred? |\n") f.write("|---|-----------|---------------|-----|-----|------|------------|\n") for K in sorted(results.keys()): r = results[K] dbic = r['bic'] - best_bic pref = "BIC-best" if K == best_K else ("AIC-best" if K == best_K_aic and K != best_K else "") f.write(f"| {K} | {r['n_params']} | {r['ll']:.2f} | {r['aic']:.2f} | {r['bic']:.2f} | {dbic:.2f} | {pref} |\n") f.write(f"\n**Best model by BIC: K = {best_K}**\n") f.write(f"**Best model by AIC: K = {best_K_aic}**\n\n") f.write("### Interpretation\n\n") f.write("BIC penalizes model complexity more heavily than AIC and is preferred for\n") f.write("model selection. The BIC-optimal model identifies the number of distinct\n") f.write("clusters in the liberty score distribution that are statistically justified.\n\n") # Component details for all K for K in sorted(results.keys()): r = results[K] f.write(f"## K = {K} Component Details\n\n") f.write("| Component | Weight | Mean | Sigma | Interpretation |\n") f.write("|-----------|--------|------|-------|----------------|\n") for k in range(K): # Interpretation m = r['means'][k] if m < 25: interp = "Tyranny basin" elif m < 40: interp = "Low hybrid" elif m < 60: interp = "Mid hybrid" elif m < 75: interp = "High hybrid / Partly free" else: interp = "Liberty basin" f.write(f"| {k+1} | {r['weights'][k]:.3f} | {r['means'][k]:.2f} | {r['sigmas'][k]:.2f} | {interp} |\n") f.write("\n") f.write(f"## Bootstrap 95% CIs for K={best_K} (BIC-best)\n\n") f.write("| Component | Weight [95% CI] | Mean [95% CI] | Sigma [95% CI] |\n") f.write("|-----------|----------------|--------------|----------------|\n") for k in range(best_K): cs = component_summary[k] f.write(f"| {k+1} | {cs['weight']:.3f} [{cs['weight_ci'][0]:.3f}, {cs['weight_ci'][1]:.3f}] | " f"{cs['mean']:.2f} [{cs['mean_ci'][0]:.2f}, {cs['mean_ci'][1]:.2f}] | " f"{cs['sigma']:.2f} [{cs['sigma_ci'][0]:.2f}, {cs['sigma_ci'][1]:.2f}] |\n") f.write("\n## Theoretical Implications\n\n") f.write("The Governance Topology thesis predicts a **triple-well** structure with:\n") f.write("1. A **tyranny basin** (low liberty, high retention)\n") f.write("2. A **hybrid/transition zone** (medium liberty, unstable)\n") f.write("3. A **liberty basin** (high liberty, high retention)\n\n") if best_K >= 3: f.write(f"The BIC-optimal K={best_K} model is consistent with the triple-well prediction.\n") elif best_K == 2: f.write(f"The BIC-optimal K={best_K} model suggests a simpler bimodal structure.\n") f.write("However, this may reflect the pooled distribution combining historical eras.\n") f.write("The K=3 model (dBIC from best shown above) should be examined for theoretical fit.\n") f.write("\n---\n*Generated by B1 GMM model comparison script. EM algorithm implemented from scratch.*\n") print("\nResults saved to /tmp/pt-data/gmm-model-comparison-results.md")