#!/usr/bin/env python3 """ fix05 -- Potential Function Bootstrap with 1000 Resamples + Post-1972 Stationarity Test Reproduces the core KDE + parametric potential logic from b2_potential_function.py with: - Bootstrap target: 1000 resamples (time-budgeted, falls back gracefully) - Convergence diagnostics: CI width comparison at half vs full resample count - Post-1972 subsample analysis to test stationarity assumption Uses only stdlib: csv, math, random, statistics, collections Performance notes: - The KDE step is the bottleneck: 200 grid pts * 1656 data pts = 331K kernel evals. - For bootstrap, we use a coarser grid (100 pts) and fewer NM iterations. - Time budget: 8 minutes for bootstrap section. """ import csv import math import random import statistics import time from collections import defaultdict random.seed(42) EXP = math.exp LOG = math.log SQRT_2PI = math.sqrt(2 * math.pi) # ============================================================ # 1. Load data # ============================================================ data = [] with open('/tmp/pt-data/political-topology-flat.csv', 'r') as f: reader = csv.DictReader(f) for row in reader: data.append(row) liberty_all = [float(row['liberty']) for row in data] N_all = len(liberty_all) liberty_post72 = [float(row['liberty']) for row in data if int(row['year']) >= 1972] N_post72 = len(liberty_post72) print(f"Full sample: N = {N_all}", flush=True) print(f"Post-1972 sample: N = {N_post72}", flush=True) # ============================================================ # 2. Optimized KDE # ============================================================ def kde_fast(x_grid, data, h): """Optimized KDE: inlines kernel, avoids function call overhead.""" n = len(data) inv_n = 1.0 / n inv_h = 1.0 / h coeff = inv_n / (h * SQRT_2PI) neg_half_inv_h2 = -0.5 * inv_h * inv_h density = [] for x in x_grid: s = 0.0 for xi in data: d = x - xi s += EXP(neg_half_inv_h2 * d * d) density.append(s * coeff) return density def compute_bandwidth(scores, N): std_x = statistics.stdev(scores) if len(scores) > 1 else 10.0 iqr_approx = std_x * 1.35 h_silverman = 0.9 * min(std_x, iqr_approx / 1.34) * N ** (-0.2) return max(h_silverman, 3.0) # ============================================================ # 3. Parametric potential models + Nelder-Mead (from b2) # ============================================================ def triple_gaussian_potential(x, params): w1, mu1, s1, w2, mu2, s2, w3, mu3, s3, C = params inv_s1 = 1.0 / max(s1, 0.01) inv_s2 = 1.0 / max(s2, 0.01) inv_s3 = 1.0 / max(s3, 0.01) d1 = (x - mu1) * inv_s1 d2 = (x - mu2) * inv_s2 d3 = (x - mu3) * inv_s3 p = (w1 * inv_s1 * EXP(-0.5*d1*d1) + w2 * inv_s2 * EXP(-0.5*d2*d2) + w3 * inv_s3 * EXP(-0.5*d3*d3)) / SQRT_2PI if p <= 1e-20: return 30.0 return -LOG(p) + C def double_gaussian_potential(x, params): w1, mu1, s1, w2, mu2, s2, C = params inv_s1 = 1.0 / max(s1, 0.01) inv_s2 = 1.0 / max(s2, 0.01) d1 = (x - mu1) * inv_s1 d2 = (x - mu2) * inv_s2 p = (w1 * inv_s1 * EXP(-0.5*d1*d1) + w2 * inv_s2 * EXP(-0.5*d2*d2)) / SQRT_2PI if p <= 1e-20: return 30.0 return -LOG(p) + C def single_gaussian_potential(x, params): mu, sigma, C = params s = max(sigma, 0.01) return 0.5 * ((x - mu) / s) ** 2 + C def rss_fast(x_grid, V_empirical, model_func, params): rss = 0.0 for i in range(len(x_grid)): d = V_empirical[i] - model_func(x_grid[i], params) rss += d * d return rss def nelder_mead(x_grid, V_emp, model_func, init_params, bounds, max_iter=2000, tol=1e-10): n = len(init_params) alpha, gamma_nm, rho, sigma_nm = 1.0, 2.0, 0.5, 0.5 def clip(p): return [max(bounds[i][0], min(bounds[i][1], p[i])) for i in range(n)] def f(p): return rss_fast(x_grid, V_emp, model_func, clip(p)) simplex = [list(init_params)] for i in range(n): pt = list(init_params) pt[i] += max(abs(pt[i]) * 0.1, 0.5) simplex.append(pt) values = [f(p) for p in simplex] for _ in range(max_iter): order = sorted(range(n+1), key=lambda i: values[i]) simplex = [simplex[i] for i in order] values = [values[i] for i in order] if max(values) - min(values) < tol: break centroid = [sum(simplex[i][j] for i in range(n)) / n for j in range(n)] worst = simplex[-1] refl = [centroid[j] + alpha * (centroid[j] - worst[j]) for j in range(n)] fr = f(refl) if values[0] <= fr < values[-2]: simplex[-1] = refl; values[-1] = fr elif fr < values[0]: exp = [centroid[j] + gamma_nm * (refl[j] - centroid[j]) for j in range(n)] fe = f(exp) if fe < fr: simplex[-1] = exp; values[-1] = fe else: simplex[-1] = refl; values[-1] = fr else: contr = [centroid[j] + rho * (worst[j] - centroid[j]) for j in range(n)] fc = f(contr) if fc < values[-1]: simplex[-1] = contr; values[-1] = fc else: for i in range(1, n+1): simplex[i] = [simplex[0][j] + sigma_nm*(simplex[i][j]-simplex[0][j]) for j in range(n)] values[i] = f(simplex[i]) best_idx = min(range(n+1), key=lambda i: values[i]) return clip(simplex[best_idx]), values[best_idx] # ============================================================ # 4. Full potential estimation pipeline # ============================================================ def estimate_potential(scores, label, n_grid_pts=200): N = len(scores) h = compute_bandwidth(scores, N) print(f"\n{'='*60}", flush=True) print(f"Potential estimation: {label} (N={N}, h={h:.2f})", flush=True) print(f"{'='*60}", flush=True) x_grid = [i * (100.0 / n_grid_pts) for i in range(1, n_grid_pts + 1)] density = kde_fast(x_grid, scores, h) dx = x_grid[1] - x_grid[0] total = sum(d * dx for d in density) density = [d / total for d in density] potential = [] for p in density: if p > 1e-10: potential.append(-LOG(p)) else: potential.append(30.0) min_V = min(potential) potential = [v - min_V for v in potential] local_mins = [] local_maxs = [] for i in range(2, len(potential) - 2): if potential[i] < potential[i-1] and potential[i] < potential[i+1]: if potential[i] < potential[i-2] and potential[i] < potential[i+2]: local_mins.append((x_grid[i], potential[i])) if potential[i] > potential[i-1] and potential[i] > potential[i+1]: if potential[i] > potential[i-2] and potential[i] > potential[i+2]: local_maxs.append((x_grid[i], potential[i])) print(f" Wells: {len(local_mins)}", flush=True) for loc, depth in local_mins: print(f" L={loc:.1f}, V={depth:.4f}", flush=True) print(f" Barriers: {len(local_maxs)}", flush=True) for loc, height in local_maxs: print(f" L={loc:.1f}, V={height:.4f}", flush=True) # Fit models bounds_s = [(0, 100), (1, 50), (-20, 20)] best_s_rss, best_s_p = float('inf'), None for _ in range(5): init = [random.uniform(30, 60), random.uniform(15, 35), random.uniform(-5, 5)] p, r = nelder_mead(x_grid, potential, single_gaussian_potential, init, bounds_s) if r < best_s_rss: best_s_rss, best_s_p = r, p bounds_d = [(0.01,1.0),(0,50),(1,40),(0.01,1.0),(50,100),(1,40),(-20,20)] best_d_rss, best_d_p = float('inf'), None for _ in range(20): init = [random.uniform(0.2,0.6), random.uniform(10,35), random.uniform(5,20), random.uniform(0.2,0.6), random.uniform(60,90), random.uniform(5,20), random.uniform(-5,5)] p, r = nelder_mead(x_grid, potential, double_gaussian_potential, init, bounds_d, max_iter=3000) if r < best_d_rss: best_d_rss, best_d_p = r, p bounds_t = [(0.01,1.0),(0,30),(1,30),(0.01,1.0),(25,65),(1,30), (0.01,1.0),(60,100),(1,30),(-20,20)] best_t_rss, best_t_p = float('inf'), None for _ in range(30): init = [random.uniform(0.15,0.5), random.uniform(5,25), random.uniform(3,15), random.uniform(0.15,0.5), random.uniform(30,55), random.uniform(5,20), random.uniform(0.15,0.5), random.uniform(65,92), random.uniform(3,15), random.uniform(-5,5)] p, r = nelder_mead(x_grid, potential, triple_gaussian_potential, init, bounds_t, max_iter=5000) if r < best_t_rss: best_t_rss, best_t_p = r, p ng = len(x_grid) def bic_rss(rss, n, k): if rss <= 0: rss = 1e-10 return n * LOG(rss / n) + k * LOG(n) models = { 'Single': {'rss': best_s_rss, 'n_params': 3, 'params': best_s_p}, 'Double': {'rss': best_d_rss, 'n_params': 7, 'params': best_d_p}, 'Triple': {'rss': best_t_rss, 'n_params': 10, 'params': best_t_p}, } for nm, m in models.items(): m['bic'] = bic_rss(m['rss'], ng, m['n_params']) best_model = min(models, key=lambda m: models[m]['bic']) print(f" Best model: {best_model}", flush=True) for nm in ['Single', 'Double', 'Triple']: m = models[nm] print(f" {nm}: RSS={m['rss']:.4f}, BIC={m['bic']:.2f}", flush=True) return { 'x_grid': x_grid, 'potential': potential, 'h': h, 'local_mins': local_mins, 'local_maxs': local_maxs, 'models': models, 'best_model': best_model, 'triple_params': best_t_p, 'bounds_triple': bounds_t } # ============================================================ # 5. Run on full sample and post-1972 # ============================================================ res_all = estimate_potential(liberty_all, "Full sample") res_post72 = estimate_potential(liberty_post72, "Post-1972") # ============================================================ # 6. Bootstrap with time budget # ============================================================ TARGET_BOOT = 1000 TIME_BUDGET = 480 # 8 minutes print(f"\n{'='*60}", flush=True) print(f"Bootstrap triple-well (target {TARGET_BOOT}, budget {TIME_BUDGET}s)", flush=True) print(f"{'='*60}", flush=True) w1, mu1, s1, w2, mu2, s2, w3, mu3, s3, C = res_all['triple_params'] bounds_t = res_all['bounds_triple'] # Use coarser grid for bootstrap speed (100 pts instead of 200) x_grid_boot = [i * 1.0 for i in range(1, 101)] # 1.0 to 100.0 h = res_all['h'] dx_boot = x_grid_boot[1] - x_grid_boot[0] boot_results = [] t0 = time.time() n_done = 0 for b in range(TARGET_BOOT): X_boot = random.choices(liberty_all, k=N_all) d_boot = kde_fast(x_grid_boot, X_boot, h) total_b = sum(d * dx_boot for d in d_boot) if total_b > 0: d_boot = [d / total_b for d in d_boot] else: n_done = b + 1 continue V_boot = [] for p in d_boot: if p > 1e-10: V_boot.append(-LOG(p)) else: V_boot.append(30.0) min_Vb = min(V_boot) V_boot = [v - min_Vb for v in V_boot] best_b_rss = float('inf') best_b_params = None for trial in range(2): # 2 trials for speed init = [w1 + random.gauss(0, 0.05), mu1 + random.gauss(0, 3), s1 + random.gauss(0, 1), w2 + random.gauss(0, 0.05), mu2 + random.gauss(0, 3), s2 + random.gauss(0, 1), w3 + random.gauss(0, 0.05), mu3 + random.gauss(0, 3), s3 + random.gauss(0, 1), C + random.gauss(0, 0.5)] for i in range(10): init[i] = max(bounds_t[i][0], min(bounds_t[i][1], init[i])) try: params_b, rss_b = nelder_mead(x_grid_boot, V_boot, triple_gaussian_potential, init, bounds_t, max_iter=1000) if rss_b < best_b_rss: best_b_rss = rss_b best_b_params = params_b except Exception: pass if best_b_params is not None: wells = [(best_b_params[0], best_b_params[1], best_b_params[2]), (best_b_params[3], best_b_params[4], best_b_params[5]), (best_b_params[6], best_b_params[7], best_b_params[8])] wells.sort(key=lambda x: x[1]) boot_results.append({ 'w': [wells[0][0], wells[1][0], wells[2][0]], 'mu': [wells[0][1], wells[1][1], wells[2][1]], 'sigma': [wells[0][2], wells[1][2], wells[2][2]], 'rss': best_b_rss }) n_done = b + 1 if n_done % 25 == 0: elapsed = time.time() - t0 rate = n_done / elapsed eta = (TARGET_BOOT - n_done) / rate if rate > 0 else 0 print(f" {n_done}/{TARGET_BOOT} ({len(boot_results)} valid, {elapsed:.0f}s, ETA {eta:.0f}s)", flush=True) if elapsed > TIME_BUDGET: print(f" TIME BUDGET REACHED at {n_done}", flush=True) break total_time = time.time() - t0 n_boot_actual = n_done n_valid = len(boot_results) print(f"\nCompleted {n_boot_actual} resamples in {total_time:.1f}s ({n_valid} valid)", flush=True) # ============================================================ # 7. Convergence diagnostics # ============================================================ print(f"\n{'='*60}", flush=True) print("Convergence Diagnostics", flush=True) print(f"{'='*60}", flush=True) well_names = ['Tyranny', 'Hybrid', 'Liberty'] convergence_table = [] def compute_ci(vals, n_use): subset = sorted(vals[:n_use]) lo_idx = int(0.025 * n_use) hi_idx = min(int(0.975 * n_use), n_use - 1) return subset[lo_idx], subset[hi_idx] half_n = max(n_valid // 2, 50) for k in range(3): mu_vals = [r['mu'][k] for r in boot_results] w_vals = [r['w'][k] for r in boot_results] if len(mu_vals) < 100: print(f" {well_names[k]}: only {len(mu_vals)} valid, skipping", flush=True) continue ci_half = compute_ci(mu_vals, half_n) ci_full = compute_ci(mu_vals, len(mu_vals)) w_h = ci_half[1] - ci_half[0] w_f = ci_full[1] - ci_full[0] ch = abs(w_f - w_h) / w_h * 100 if w_h > 0 else 0 wci_half = compute_ci(w_vals, half_n) wci_full = compute_ci(w_vals, len(w_vals)) ww_h = wci_half[1] - wci_half[0] ww_f = wci_full[1] - wci_full[0] wch = abs(ww_f - ww_h) / ww_h * 100 if ww_h > 0 else 0 convergence_table.append({ 'name': well_names[k], 'mu_ci_half': ci_half, 'mu_ci_full': ci_full, 'mu_change': ch, 'w_ci_half': wci_half, 'w_ci_full': wci_full, 'w_change': wch, 'n_half': half_n, 'n_full': len(mu_vals) }) print(f"\n {well_names[k]} (n={len(mu_vals)}):", flush=True) print(f" Location CI @{half_n}: [{ci_half[0]:.2f}, {ci_half[1]:.2f}] w={w_h:.2f}", flush=True) print(f" Location CI @{len(mu_vals)}: [{ci_full[0]:.2f}, {ci_full[1]:.2f}] w={w_f:.2f}", flush=True) print(f" Change: {ch:.1f}%", flush=True) # ============================================================ # 8. Final CIs # ============================================================ print(f"\n{'='*60}", flush=True) print(f"Final Bootstrap 95% CIs (B={n_valid})", flush=True) print(f"{'='*60}", flush=True) well_cis = {} for k in range(3): boot_w = sorted([r['w'][k] for r in boot_results]) boot_mu = sorted([r['mu'][k] for r in boot_results]) boot_s = sorted([r['sigma'][k] for r in boot_results]) nb = len(boot_w) lo, hi = int(0.025 * nb), min(int(0.975 * nb), nb - 1) well_cis[well_names[k]] = { 'w': (boot_w[lo], boot_w[hi]), 'mu': (boot_mu[lo], boot_mu[hi]), 'sigma': (boot_s[lo], boot_s[hi]) } print(f" {well_names[k]}: w=[{boot_w[lo]:.3f},{boot_w[hi]:.3f}] " f"mu=[{boot_mu[lo]:.2f},{boot_mu[hi]:.2f}] " f"sig=[{boot_s[lo]:.2f},{boot_s[hi]:.2f}]", flush=True) # ============================================================ # 9. Write results # ============================================================ out_path = '/tmp/pt-data/fix05-potential-1000-results.md' with open(out_path, 'w') as f: f.write("# fix05: Potential Function Bootstrap with Increased Resamples + Stationarity Test\n\n") if n_boot_actual < 1000: f.write(f"**Note**: Target was 1000 resamples but runtime budget limited to " f"{n_boot_actual} ({n_valid} valid). Pure-Python KDE + Nelder-Mead on N={N_all} " f"is computationally intensive without NumPy.\n\n") f.write("## Full Sample Potential Estimation\n\n") f.write(f"- N = {N_all}, bandwidth h = {res_all['h']:.2f}\n\n") f.write("### Empirical Wells\n\n") f.write("| Location | V(L) | Interpretation |\n") f.write("|----------|------|----------------|\n") for loc, depth in res_all['local_mins']: interp = "Tyranny" if loc < 30 else ("Hybrid" if loc < 65 else "Liberty") f.write(f"| {loc:.1f} | {depth:.4f} | {interp} |\n") f.write("\n### Empirical Barriers\n\n") f.write("| Location | V(L) |\n") f.write("|----------|------|\n") for loc, height in res_all['local_maxs']: f.write(f"| {loc:.1f} | {height:.4f} |\n") f.write("\n### Model Comparison (Full Sample)\n\n") f.write("| Model | Params | RSS | BIC | Best? |\n") f.write("|-------|--------|-----|-----|-------|\n") best_bic_val = min(m['bic'] for m in res_all['models'].values()) for nm in ['Single', 'Double', 'Triple']: m = res_all['models'][nm] is_best = "Yes" if m['bic'] == best_bic_val else "" f.write(f"| {nm} | {m['n_params']} | {m['rss']:.4f} | {m['bic']:.2f} | {is_best} |\n") f.write("\n## Post-1972 Potential Estimation (Stationarity Test)\n\n") f.write(f"- N = {N_post72}, bandwidth h = {res_post72['h']:.2f}\n\n") f.write("### Empirical Wells (Post-1972)\n\n") f.write("| Location | V(L) | Interpretation |\n") f.write("|----------|------|----------------|\n") for loc, depth in res_post72['local_mins']: interp = "Tyranny" if loc < 30 else ("Hybrid" if loc < 65 else "Liberty") f.write(f"| {loc:.1f} | {depth:.4f} | {interp} |\n") f.write("\n### Model Comparison (Post-1972)\n\n") f.write("| Model | Params | RSS | BIC | Best? |\n") f.write("|-------|--------|-----|-----|-------|\n") best_bic_72 = min(m['bic'] for m in res_post72['models'].values()) for nm in ['Single', 'Double', 'Triple']: m = res_post72['models'][nm] is_best = "Yes" if m['bic'] == best_bic_72 else "" f.write(f"| {nm} | {m['n_params']} | {m['rss']:.4f} | {m['bic']:.2f} | {is_best} |\n") f.write("\n### Stationarity Assessment\n\n") best_all = res_all['best_model'] best_72 = res_post72['best_model'] if best_all == best_72: f.write(f"Both full sample and post-1972 select **{best_all}** as the best model. " f"The potential landscape structure is temporally stable.\n\n") else: f.write(f"Full sample selects **{best_all}**, post-1972 selects **{best_72}**. " f"The potential structure may not be stationary across historical periods.\n\n") f.write("#### Well Location Comparison\n\n") f.write("| Feature | Full Sample | Post-1972 |\n") f.write("|---------|------------|----------|\n") f.write(f"| N wells | {len(res_all['local_mins'])} | {len(res_post72['local_mins'])} |\n") f.write(f"| N barriers | {len(res_all['local_maxs'])} | {len(res_post72['local_maxs'])} |\n") f.write(f"| Best model | {best_all} | {best_72} |\n") for i, (loc_a, _) in enumerate(res_all['local_mins'][:3]): loc_72 = res_post72['local_mins'][i][0] if i < len(res_post72['local_mins']) else "N/A" f.write(f"| Well {i+1} location | {loc_a:.1f} | {loc_72 if isinstance(loc_72, str) else f'{loc_72:.1f}'} |\n") f.write(f"\n## Convergence Diagnostics: {half_n} vs {n_valid} Resamples\n\n") f.write(f"| Well | Param | CI @{half_n} | CI @{n_valid} | Width @{half_n} | Width @{n_valid} | Change |\n") f.write("|------|-------|---------|----------|-----------|------------|--------|\n") for row in convergence_table: c5 = row['mu_ci_half'] cf = row['mu_ci_full'] wh = c5[1] - c5[0] wfl = cf[1] - cf[0] f.write(f"| {row['name']} | Location | [{c5[0]:.2f}, {c5[1]:.2f}] | [{cf[0]:.2f}, {cf[1]:.2f}] | " f"{wh:.2f} | {wfl:.2f} | {row['mu_change']:.1f}% |\n") c5w = row['w_ci_half'] cfw = row['w_ci_full'] wwh = c5w[1] - c5w[0] wwfl = cfw[1] - cfw[0] f.write(f"| {row['name']} | Weight | [{c5w[0]:.3f}, {c5w[1]:.3f}] | [{cfw[0]:.3f}, {cfw[1]:.3f}] | " f"{wwh:.3f} | {wwfl:.3f} | {row['w_change']:.1f}% |\n") f.write("\n") if convergence_table and all(r['mu_change'] < 10 for r in convergence_table): f.write("All location CI widths changed by <10%. Bootstrap has converged.\n\n") elif convergence_table: f.write("Some CI widths changed by >=10%. More resamples may refine CIs further.\n\n") f.write(f"## Bootstrap 95% CIs (Triple-Well, B={n_valid})\n\n") f.write("| Well | Weight [95% CI] | Location [95% CI] | Width [95% CI] |\n") f.write("|------|----------------|------------------|----------------|\n") for nm in ['Tyranny', 'Hybrid', 'Liberty']: wci = well_cis[nm] f.write(f"| {nm} | [{wci['w'][0]:.3f}, {wci['w'][1]:.3f}] | " f"[{wci['mu'][0]:.2f}, {wci['mu'][1]:.2f}] | " f"[{wci['sigma'][0]:.2f}, {wci['sigma'][1]:.2f}] |\n") f.write(f"\nBootstrap completed in {total_time:.1f}s ({n_boot_actual} attempted, {n_valid} valid)\n\n") f.write("---\n*Generated by fix05_potential_bootstrap_1000.py*\n") print(f"\nResults saved to {out_path}", flush=True)