#!/usr/bin/env python3 """ B8 — Derive Stage Boundaries from Data Governance Topology thesis Implements: 1. BIC-penalized segmented regression (change-point detection) 2. Hidden Markov Model with EM algorithm + Viterbi decoding 3. Comparison of data-derived boundaries with imposed 8-stage system Uses Python stdlib only (csv, math, random, statistics, collections). """ import csv import math import random from collections import defaultdict, Counter random.seed(42) # ── 1. Load data ──────────────────────────────────────────────────────────── liberty_scores = [] rows_full = [] with open("/tmp/pt-data/political-topology-flat.csv", newline="") as f: reader = csv.DictReader(f) for r in reader: lib = float(r["liberty"]) liberty_scores.append(lib) rows_full.append({ "country": r["country"], "year": int(r["year"]), "liberty": lib, }) N = len(liberty_scores) liberty_sorted = sorted(liberty_scores) print(f"Loaded {N} observations") print(f"Liberty range: [{min(liberty_scores)}, {max(liberty_scores)}]") print(f"Mean: {sum(liberty_scores)/N:.2f}, Median: {liberty_sorted[N//2]}") # Distribution summary print("\nLiberty score distribution (histogram):") bins = list(range(0, 101, 5)) for i in range(len(bins)-1): lo, hi = bins[i], bins[i+1] count = sum(1 for x in liberty_scores if lo <= x < hi) bar = "#" * (count // 2) print(f" [{lo:>3}-{hi:>3}): {count:>4} {bar}") # ── 2. BIC-penalized segmented regression (change-point detection) ────────── def within_segment_variance(data, breakpoints): """ Compute total within-segment sum of squared residuals (RSS). breakpoints are the boundary values that divide data into segments. Data should be sorted. """ # breakpoints define boundaries: segments are [min, bp1), [bp1, bp2), ..., [bpK, max] all_bounds = [min(data) - 1] + list(breakpoints) + [max(data) + 1] rss = 0.0 segment_info = [] for i in range(len(all_bounds) - 1): lo, hi = all_bounds[i], all_bounds[i+1] seg = [x for x in data if lo < x <= hi] if i > 0 else [x for x in data if lo <= x <= hi] # Simpler: just use bounds seg = [x for x in data if (x > lo if i > 0 else x >= lo) and x <= hi] if not seg: continue mean_seg = sum(seg) / len(seg) ss = sum((x - mean_seg) ** 2 for x in seg) rss += ss segment_info.append({ "range": (lo if i == 0 else lo, hi), "n": len(seg), "mean": mean_seg, "ss": ss, }) return rss, segment_info def compute_bic(n, rss, k): """ BIC = n * log(RSS/n) + k * log(n) k = number of parameters = K segments * 2 (mean + variance) + (K-1) breakpoints = 3K - 1 """ if rss <= 0: rss = 1e-10 num_params = 3 * k - 1 bic = n * math.log(rss / n) + num_params * math.log(n) return bic def find_optimal_breakpoints_dp(data_sorted, K, candidate_points=None): """ Find K-1 optimal breakpoints for K segments using dynamic programming. This minimizes within-segment sum of squares. For efficiency with large datasets, we use candidate breakpoints at integer or half-integer liberty values. """ if candidate_points is None: # Use integer breakpoints from 1 to 99 candidate_points = list(range(1, 100)) n = len(data_sorted) # Precompute cumulative sums for fast segment SS computation # For a segment [a, b] in sorted data: # SS = sum(x^2) - n_seg * mean^2 # Build index: for each candidate point, find the index in sorted data # where values exceed that point def segment_ss(start_idx, end_idx): """Sum of squared deviations from mean for data[start_idx:end_idx+1]""" if start_idx > end_idx: return float('inf'), 0 seg = data_sorted[start_idx:end_idx+1] n_s = len(seg) if n_s == 0: return 0.0, 0 mean_s = sum(seg) / n_s ss = sum((x - mean_s)**2 for x in seg) return ss, n_s # Map candidate breakpoints to indices in sorted data bp_indices = [] for bp in candidate_points: # Find first index where data >= bp idx = 0 for i, v in enumerate(data_sorted): if v >= bp: idx = i break else: idx = n bp_indices.append(idx) # For K=2: try all single breakpoints # For K>2: use iterative refinement (DP is expensive for large K) if K == 1: ss, _ = segment_ss(0, n-1) return [], ss # For small K, exhaustive search over candidate points best_bp = None best_rss = float('inf') if K == 2: for bp_val in candidate_points: rss, _ = within_segment_variance(data_sorted, [bp_val]) if rss < best_rss: best_rss = rss best_bp = [bp_val] elif K <= 5: # For K=3,4,5: use iterative refinement approach # Start with evenly spaced breakpoints, then optimize each one # Initialize with quantile-based breakpoints breakpoints = [] for i in range(1, K): idx = int(n * i / K) breakpoints.append(data_sorted[idx]) # Iterative refinement for iteration in range(50): improved = False for bp_idx in range(len(breakpoints)): current_best_rss = float('inf') current_best_val = breakpoints[bp_idx] for candidate in candidate_points: # Check ordering constraint if bp_idx > 0 and candidate <= breakpoints[bp_idx - 1]: continue if bp_idx < len(breakpoints) - 1 and candidate >= breakpoints[bp_idx + 1]: continue trial_bp = breakpoints.copy() trial_bp[bp_idx] = candidate rss, _ = within_segment_variance(data_sorted, trial_bp) if rss < current_best_rss: current_best_rss = rss current_best_val = candidate if current_best_val != breakpoints[bp_idx]: breakpoints[bp_idx] = current_best_val improved = True if not improved: break best_bp = breakpoints best_rss, _ = within_segment_variance(data_sorted, best_bp) else: # K = 6, 7, 8 # For larger K, use multi-start iterative refinement overall_best_rss = float('inf') overall_best_bp = None for start in range(10): # 10 random starts # Random initial breakpoints (sorted) if start == 0: # Quantile initialization breakpoints = [] for i in range(1, K): idx = int(n * i / K) breakpoints.append(data_sorted[idx]) else: breakpoints = sorted(random.sample(candidate_points, K-1)) # Iterative refinement for iteration in range(30): improved = False for bp_idx in range(len(breakpoints)): current_best_rss = float('inf') current_best_val = breakpoints[bp_idx] for candidate in candidate_points: if bp_idx > 0 and candidate <= breakpoints[bp_idx - 1]: continue if bp_idx < len(breakpoints) - 1 and candidate >= breakpoints[bp_idx + 1]: continue trial_bp = breakpoints.copy() trial_bp[bp_idx] = candidate rss, _ = within_segment_variance(data_sorted, trial_bp) if rss < current_best_rss: current_best_rss = rss current_best_val = candidate if current_best_val != breakpoints[bp_idx]: breakpoints[bp_idx] = current_best_val improved = True if not improved: break rss, _ = within_segment_variance(data_sorted, breakpoints) if rss < overall_best_rss: overall_best_rss = rss overall_best_bp = breakpoints.copy() best_bp = overall_best_bp best_rss = overall_best_rss return best_bp, best_rss print("\n" + "="*80) print("CHANGE-POINT DETECTION: BIC-PENALIZED SEGMENTED REGRESSION") print("="*80) # Use integer candidate breakpoints candidates = list(range(1, 100)) results_seg = {} for K in range(2, 9): print(f"\nFitting K={K} segments...") bp, rss = find_optimal_breakpoints_dp(liberty_sorted, K, candidates) bic = compute_bic(N, rss, K) _, seg_info = within_segment_variance(liberty_sorted, bp) results_seg[K] = { "breakpoints": bp, "rss": rss, "bic": bic, "segments": seg_info, } print(f" Breakpoints: {bp}") print(f" RSS = {rss:.2f}, BIC = {bic:.2f}") for s in seg_info: print(f" Segment: n={s['n']}, mean={s['mean']:.1f}, range={s['range']}") # Find optimal K print("\n\nBIC comparison:") print(f" {'K':>3} {'BIC':>12} {'RSS':>12} {'Breakpoints'}") best_K = min(results_seg, key=lambda k: results_seg[k]['bic']) for K in range(2, 9): r = results_seg[K] marker = " <-- BEST" if K == best_K else "" print(f" {K:>3} {r['bic']:>12.2f} {r['rss']:>12.2f} {r['breakpoints']}{marker}") print(f"\nOptimal number of segments: K = {best_K}") print(f"Optimal breakpoints: {results_seg[best_K]['breakpoints']}") # ── 3. Hidden Markov Model ────────────────────────────────────────────────── print("\n" + "="*80) print("HIDDEN MARKOV MODEL: EM ESTIMATION + VITERBI DECODING") print("="*80) # Prepare time-series data: for each country, sorted by year # HMM observations are liberty scores in temporal order country_series = defaultdict(list) for r in rows_full: country_series[r["country"]].append((r["year"], r["liberty"])) for c in country_series: country_series[c].sort() # All time series concatenated (with country boundaries tracked) all_series = [] series_boundaries = [] # (start_idx, end_idx) for each country idx = 0 for country in sorted(country_series.keys()): series = [lib for _, lib in country_series[country]] start = idx all_series.extend(series) idx += len(series) series_boundaries.append((start, idx - 1, country)) T_total = len(all_series) print(f"Total observations in time series: {T_total}") print(f"Number of country series: {len(series_boundaries)}") def gaussian_pdf(x, mu, sigma): """Normal distribution PDF.""" if sigma < 1e-6: sigma = 1e-6 return (1.0 / (sigma * math.sqrt(2 * math.pi))) * math.exp(-0.5 * ((x - mu) / sigma) ** 2) def log_gaussian_pdf(x, mu, sigma): """Log of normal PDF.""" if sigma < 1e-6: sigma = 1e-6 return -0.5 * math.log(2 * math.pi) - math.log(sigma) - 0.5 * ((x - mu) / sigma) ** 2 def hmm_em(observations_list, K, max_iter=100, tol=1e-6): """ EM algorithm for Gaussian HMM. observations_list: list of observation sequences (one per country) K: number of hidden states Returns: pi, A, means, stds, log_likelihood """ # Initialize parameters # Sort initial means to span the range obs_flat = [] for seq in observations_list: obs_flat.extend(seq) obs_min = min(obs_flat) obs_max = max(obs_flat) # Initialize means evenly spaced + small random perturbation means = [obs_min + (obs_max - obs_min) * (i + 0.5) / K + random.gauss(0, 2) for i in range(K)] means.sort() stds = [(obs_max - obs_min) / (2 * K)] * K # Transition matrix: slightly favor staying in same state A = [[0.0] * K for _ in range(K)] for i in range(K): for j in range(K): if i == j: A[i][j] = 0.7 else: A[i][j] = 0.3 / (K - 1) if K > 1 else 0.0 # Initial state distribution pi = [1.0 / K] * K prev_ll = -float('inf') for iteration in range(max_iter): # ── E-step: Forward-Backward for each sequence ── total_log_likelihood = 0.0 # Accumulators for M-step gamma_sum = [0.0] * K # sum of gamma over all t gamma_obs_sum = [0.0] * K # sum of gamma * obs gamma_obs2_sum = [0.0] * K # sum of gamma * obs^2 xi_sum = [[0.0] * K for _ in range(K)] # sum of xi(i,j) gamma_init_sum = [0.0] * K # sum of gamma at t=0 for seq in observations_list: T = len(seq) if T == 0: continue # Forward pass (log-space for numerical stability) # alpha[t][k] = log P(o1..ot, qt=k) log_alpha = [[0.0] * K for _ in range(T)] for k in range(K): log_alpha[0][k] = math.log(max(pi[k], 1e-300)) + log_gaussian_pdf(seq[0], means[k], stds[k]) for t in range(1, T): for j in range(K): # log_alpha[t][j] = log(sum_i(alpha[t-1][i] * A[i][j])) + log(b_j(o_t)) log_terms = [log_alpha[t-1][i] + math.log(max(A[i][j], 1e-300)) for i in range(K)] max_log = max(log_terms) log_sum = max_log + math.log(sum(math.exp(lt - max_log) for lt in log_terms)) log_alpha[t][j] = log_sum + log_gaussian_pdf(seq[t], means[j], stds[j]) # Log-likelihood for this sequence max_final = max(log_alpha[T-1]) seq_ll = max_final + math.log(sum(math.exp(la - max_final) for la in log_alpha[T-1])) total_log_likelihood += seq_ll # Backward pass log_beta = [[0.0] * K for _ in range(T)] for k in range(K): log_beta[T-1][k] = 0.0 # log(1) = 0 for t in range(T-2, -1, -1): for i in range(K): log_terms = [ math.log(max(A[i][j], 1e-300)) + log_gaussian_pdf(seq[t+1], means[j], stds[j]) + log_beta[t+1][j] for j in range(K) ] max_log = max(log_terms) log_beta[t][i] = max_log + math.log(sum(math.exp(lt - max_log) for lt in log_terms)) # Compute gamma and xi for t in range(T): # gamma[t][k] = P(qt=k | O, lambda) log_gamma = [log_alpha[t][k] + log_beta[t][k] for k in range(K)] max_lg = max(log_gamma) log_denom = max_lg + math.log(sum(math.exp(lg - max_lg) for lg in log_gamma)) gamma_t = [math.exp(lg - log_denom) for lg in log_gamma] for k in range(K): gamma_sum[k] += gamma_t[k] gamma_obs_sum[k] += gamma_t[k] * seq[t] gamma_obs2_sum[k] += gamma_t[k] * seq[t] ** 2 if t == 0: for k in range(K): gamma_init_sum[k] += gamma_t[k] # xi: transition counts if t < T - 1: log_xi_vals = [[0.0] * K for _ in range(K)] log_xi_all = [] for i in range(K): for j in range(K): val = (log_alpha[t][i] + math.log(max(A[i][j], 1e-300)) + log_gaussian_pdf(seq[t+1], means[j], stds[j]) + log_beta[t+1][j]) log_xi_vals[i][j] = val log_xi_all.append(val) max_xi = max(log_xi_all) log_xi_denom = max_xi + math.log(sum(math.exp(v - max_xi) for v in log_xi_all)) for i in range(K): for j in range(K): xi_sum[i][j] += math.exp(log_xi_vals[i][j] - log_xi_denom) # ── M-step ── n_seq = len(observations_list) # Update initial distribution total_init = sum(gamma_init_sum) if total_init > 0: pi = [g / total_init for g in gamma_init_sum] # Update transition matrix for i in range(K): row_sum = sum(xi_sum[i]) if row_sum > 0: for j in range(K): A[i][j] = max(xi_sum[i][j] / row_sum, 1e-6) # Renormalize rs = sum(A[i]) A[i] = [a / rs for a in A[i]] # Update emission means and stds for k in range(K): if gamma_sum[k] > 1e-10: means[k] = gamma_obs_sum[k] / gamma_sum[k] var = gamma_obs2_sum[k] / gamma_sum[k] - means[k] ** 2 stds[k] = math.sqrt(max(var, 1.0)) # floor variance # Check convergence if abs(total_log_likelihood - prev_ll) < tol: print(f" Converged at iteration {iteration+1}, LL={total_log_likelihood:.2f}") break prev_ll = total_log_likelihood else: print(f" Max iterations reached, LL={total_log_likelihood:.2f}") return pi, A, means, stds, total_log_likelihood def viterbi(seq, pi, A, means, stds, K): """Viterbi algorithm for most likely state sequence.""" T = len(seq) if T == 0: return [] # Log probabilities delta = [[0.0] * K for _ in range(T)] psi = [[0] * K for _ in range(T)] for k in range(K): delta[0][k] = math.log(max(pi[k], 1e-300)) + log_gaussian_pdf(seq[0], means[k], stds[k]) for t in range(1, T): for j in range(K): candidates = [delta[t-1][i] + math.log(max(A[i][j], 1e-300)) for i in range(K)] best_i = max(range(K), key=lambda i: candidates[i]) delta[t][j] = candidates[best_i] + log_gaussian_pdf(seq[t], means[j], stds[j]) psi[t][j] = best_i # Backtrack states = [0] * T states[T-1] = max(range(K), key=lambda k: delta[T-1][k]) for t in range(T-2, -1, -1): states[t] = psi[t+1][states[t+1]] return states def hmm_bic(log_likelihood, n, K): """BIC for HMM: K states, Gaussian emissions. Parameters: K-1 initial probs + K*(K-1) transition + K means + K variances = K^2 + 2K - 1 """ n_params = K * K + 2 * K - 1 return -2 * log_likelihood + n_params * math.log(n) # Prepare observation sequences (one per country) obs_sequences = [] for country in sorted(country_series.keys()): seq = [lib for _, lib in country_series[country]] if len(seq) >= 2: obs_sequences.append(seq) print(f"\nUsing {len(obs_sequences)} country sequences for HMM") hmm_results = {} for K in [2, 3, 4, 5]: print(f"\nFitting HMM with K={K} states...") # Multiple random restarts best_ll = -float('inf') best_params = None for restart in range(5): random.seed(42 + restart * 17) try: pi, A, means, stds, ll = hmm_em(obs_sequences, K, max_iter=100) if ll > best_ll: best_ll = ll best_params = (pi, A, means[:], stds[:], ll) except Exception as e: print(f" Restart {restart} failed: {e}") continue if best_params is None: print(f" K={K}: All restarts failed") continue pi, A, means, stds, ll = best_params bic = hmm_bic(ll, T_total, K) # Sort states by mean (low to high liberty) state_order = sorted(range(K), key=lambda k: means[k]) sorted_means = [means[k] for k in state_order] sorted_stds = [stds[k] for k in state_order] hmm_results[K] = { "pi": pi, "A": A, "means": sorted_means, "stds": sorted_stds, "ll": ll, "bic": bic, "state_order": state_order, } print(f" Log-likelihood: {ll:.2f}") print(f" BIC: {bic:.2f}") print(f" States (sorted by mean):") for i, k in enumerate(state_order): print(f" State {i+1}: mean={means[k]:.1f}, std={stds[k]:.1f}") # Transition matrix (reordered) print(f" Transition matrix (rows=from, cols=to):") header = " " + " ".join(f"S{i+1:>3}" for i in range(K)) print(f" {header}") for i, ki in enumerate(state_order): row = f" S{i+1}: " + " ".join(f"{A[ki][kj]:.3f}" for kj in state_order) print(row) # Find optimal K by BIC print("\n\nHMM BIC comparison:") print(f" {'K':>3} {'BIC':>14} {'Log-lik':>14} State means") best_hmm_K = min(hmm_results, key=lambda k: hmm_results[k]['bic']) for K in sorted(hmm_results.keys()): r = hmm_results[K] marker = " <-- BEST" if K == best_hmm_K else "" means_str = ", ".join(f"{m:.1f}" for m in r['means']) print(f" {K:>3} {r['bic']:>14.2f} {r['ll']:>14.2f} [{means_str}]{marker}") # ── 4. Derive natural boundaries ─────────────────────────────────────────── print("\n" + "="*80) print("NATURAL BOUNDARY DERIVATION") print("="*80) # From segmented regression print(f"\nSegmented regression (optimal K={best_K}):") best_seg = results_seg[best_K] print(f" Breakpoints: {best_seg['breakpoints']}") for seg in best_seg['segments']: print(f" Segment: {seg['range']}, n={seg['n']}, mean={seg['mean']:.1f}") # From HMM: boundaries are midpoints between adjacent state means print(f"\nHMM-derived boundaries (optimal K={best_hmm_K}):") hmm_best = hmm_results[best_hmm_K] hmm_boundaries = [] for i in range(len(hmm_best['means']) - 1): # Boundary = point where posterior probability of adjacent states is equal # Approximate as midpoint weighted by standard deviations m1, s1 = hmm_best['means'][i], hmm_best['stds'][i] m2, s2 = hmm_best['means'][i+1], hmm_best['stds'][i+1] # Solve: (x-m1)^2/s1^2 = (x-m2)^2/s2^2 + 2*log(s1/s2) # Simplified: use weighted midpoint boundary = (m1 * s2 + m2 * s1) / (s1 + s2) hmm_boundaries.append(round(boundary, 1)) print(f" Boundaries: {hmm_boundaries}") for i in range(len(hmm_best['means'])): lo = hmm_boundaries[i-1] if i > 0 else 0 hi = hmm_boundaries[i] if i < len(hmm_boundaries) else 100 print(f" Regime {i+1}: [{lo:.0f}, {hi:.0f}], mean={hmm_best['means'][i]:.1f}, std={hmm_best['stds'][i]:.1f}") # ── 5. Comparison with imposed 8-stage system ────────────────────────────── print("\n" + "="*80) print("COMPARISON: DATA-DERIVED vs IMPOSED BOUNDARIES") print("="*80) imposed_boundaries = [25, 35, 50, 60, 70, 80, 85] print(f"\nImposed 8-stage boundaries: {imposed_boundaries}") print(f"Segmented regression (K={best_K}) boundaries: {best_seg['breakpoints']}") print(f"HMM (K={best_hmm_K}) boundaries: {hmm_boundaries}") # Compare using AIC/BIC # For the imposed system rss_imposed, seg_imposed = within_segment_variance(liberty_sorted, imposed_boundaries) bic_imposed = compute_bic(N, rss_imposed, 8) print(f"\nImposed 8-stage BIC: {bic_imposed:.2f} (RSS={rss_imposed:.2f})") print(f"Optimal segmented BIC: {results_seg[best_K]['bic']:.2f} (K={best_K})") # Also compute segmented regression for K=8 specifically if 8 in results_seg: print(f"Data-optimal 8-segment BIC: {results_seg[8]['bic']:.2f}") print(f"Data-optimal 8-segment breakpoints: {results_seg[8]['breakpoints']}") # Viterbi decoding with best HMM to show state assignments print("\n\nViterbi state assignments (sample countries):") sample_countries = ["United States", "Russia", "China", "United Kingdom", "Brazil", "India", "South Africa", "Germany", "Japan", "Nigeria"] best_hmm = hmm_results[best_hmm_K] pi_best = best_hmm['pi'] A_best = best_hmm['A'] # Use unsorted means/stds for Viterbi (need original indexing) # Re-run with original ordering # Actually, just re-map via state_order means_orig = [0.0] * best_hmm_K stds_orig = [0.0] * best_hmm_K for new_idx, old_idx in enumerate(best_hmm['state_order']): means_orig[old_idx] = best_hmm['means'][new_idx] stds_orig[old_idx] = best_hmm['stds'][new_idx] for country in sample_countries: if country not in country_series: continue seq = [lib for _, lib in country_series[country]] years = [yr for yr, _ in country_series[country]] states = viterbi(seq, pi_best, A_best, means_orig, stds_orig, best_hmm_K) # Map to sorted state labels inv_order = {old: new for new, old in enumerate(best_hmm['state_order'])} mapped_states = [inv_order[s] + 1 for s in states] print(f"\n {country}:") for yr, lib, st in zip(years, seq, mapped_states): print(f" {yr}: liberty={lib:.0f}, HMM state=R{st}") # ── 6. Write results ─────────────────────────────────────────────────────── with open("/tmp/pt-data/stage-derivation-results.md", "w") as f: f.write("# B8: Deriving Stage Boundaries from Data\n\n") f.write("## Methodology\n\n") f.write("Two independent approaches to identify natural regime boundaries in the liberty score distribution:\n\n") f.write("1. **BIC-penalized segmented regression**: Change-point detection minimizing within-segment variance\n") f.write("2. **Hidden Markov Model**: EM-estimated Gaussian HMM with Viterbi decoding\n\n") f.write(f"Dataset: {N} observations across {len(country_series)} countries\n\n") # Distribution summary f.write("## Liberty Score Distribution\n\n") f.write("| Range | Count | Pct |\n") f.write("|-------|-------|-----|\n") for i in range(len(bins)-1): lo, hi = bins[i], bins[i+1] count = sum(1 for x in liberty_scores if lo <= x < hi) pct = 100 * count / N f.write(f"| {lo}-{hi-1} | {count} | {pct:.1f}% |\n") f.write("\n") # Segmented regression results f.write("## Segmented Regression Results\n\n") f.write("| K (segments) | BIC | RSS | Breakpoints |\n") f.write("|-------------|------|-----|-------------|\n") for K in range(2, 9): r = results_seg[K] marker = " **BEST**" if K == best_K else "" bp_str = ", ".join(str(b) for b in r['breakpoints']) f.write(f"| {K} | {r['bic']:.1f} | {r['rss']:.1f} | [{bp_str}]{marker} |\n") f.write("\n") f.write(f"**Optimal model**: K = {best_K} segments\n\n") f.write("Segment details:\n\n") f.write("| Segment | Range | N | Mean Liberty | Within-SS |\n") f.write("|---------|-------|---|-------------|----------|\n") for i, seg in enumerate(best_seg['segments']): lo = seg['range'][0] hi = seg['range'][1] f.write(f"| {i+1} | {lo:.0f}-{hi:.0f} | {seg['n']} | {seg['mean']:.1f} | {seg['ss']:.1f} |\n") f.write("\n") # HMM results f.write("## Hidden Markov Model Results\n\n") f.write("| K (states) | BIC | Log-Likelihood | State Means |\n") f.write("|-----------|------|----------------|-------------|\n") for K in sorted(hmm_results.keys()): r = hmm_results[K] marker = " **BEST**" if K == best_hmm_K else "" means_str = ", ".join(f"{m:.1f}" for m in r['means']) f.write(f"| {K} | {r['bic']:.1f} | {r['ll']:.1f} | [{means_str}]{marker} |\n") f.write("\n") f.write(f"**Optimal model**: K = {best_hmm_K} states\n\n") f.write("State details:\n\n") f.write("| State | Mean | Std Dev | Boundary |\n") f.write("|-------|------|---------|----------|\n") for i in range(len(hmm_best['means'])): boundary = f"{hmm_boundaries[i]:.1f}" if i < len(hmm_boundaries) else "—" f.write(f"| R{i+1} | {hmm_best['means'][i]:.1f} | {hmm_best['stds'][i]:.1f} | {boundary} |\n") f.write("\n") f.write("Transition matrix:\n\n") header = "| |" for i in range(best_hmm_K): header += f" R{i+1} |" f.write(header + "\n") f.write("|" + "---|" * (best_hmm_K + 1) + "\n") for i, ki in enumerate(hmm_best['state_order']): row = f"| R{i+1} |" for kj in hmm_best['state_order']: row += f" {A_best[ki][kj]:.3f} |" f.write(row + "\n") f.write("\n") # Comparison f.write("## Comparison: Data-Derived vs Imposed Boundaries\n\n") f.write("| System | K | Boundaries | BIC |\n") f.write("|--------|---|-----------|-----|\n") f.write(f"| Imposed 8-stage | 8 | [25, 35, 50, 60, 70, 80, 85] | {bic_imposed:.1f} |\n") if 8 in results_seg: bp8 = ", ".join(str(b) for b in results_seg[8]['breakpoints']) f.write(f"| Data-optimal 8-segment | 8 | [{bp8}] | {results_seg[8]['bic']:.1f} |\n") bp_best = ", ".join(str(b) for b in results_seg[best_K]['breakpoints']) f.write(f"| BIC-optimal segmented | {best_K} | [{bp_best}] | {results_seg[best_K]['bic']:.1f} |\n") hmm_bp = ", ".join(f"{b:.1f}" for b in hmm_boundaries) f.write(f"| BIC-optimal HMM | {best_hmm_K} | [{hmm_bp}] | {hmm_results[best_hmm_K]['bic']:.1f} |\n") f.write("\n") # Key findings f.write("## Key Findings\n\n") f.write(f"1. **Optimal number of regimes**: Segmented regression favors **{best_K}** segments; ") f.write(f"HMM favors **{best_hmm_K}** states. ") if best_K <= 5 and best_hmm_K <= 5: f.write("Both methods converge on fewer regimes than the imposed 8-stage system.\n\n") else: f.write("\n\n") f.write("2. **Natural breakpoints** (segmented regression):\n") for bp in best_seg['breakpoints']: f.write(f" - Liberty = {bp}\n") f.write("\n") f.write("3. **HMM-derived boundaries**:\n") for bp in hmm_boundaries: f.write(f" - Liberty = {bp:.1f}\n") f.write("\n") # Compare imposed vs data f.write("4. **Imposed vs data-optimal 8 segments**: ") if 8 in results_seg: bic_diff = bic_imposed - results_seg[8]['bic'] f.write(f"The imposed boundaries have a BIC {bic_diff:.1f} higher than the data-optimal ") f.write(f"8-segment solution, suggesting the imposed boundaries are ") if bic_diff > 10: f.write("substantially suboptimal.\n\n") elif bic_diff > 2: f.write("moderately suboptimal.\n\n") else: f.write("reasonably close to optimal.\n\n") else: f.write("N/A\n\n") f.write("5. **Interpretation**: The data suggests the liberty score space is best described by ") f.write(f"approximately **{min(best_K, best_hmm_K)}-{max(best_K, best_hmm_K)} distinct regimes**, ") f.write("rather than the 8 stages imposed by the classification system. ") f.write("The natural boundaries cluster around key transition thresholds in political development.\n\n") # Viterbi examples f.write("## Viterbi State Assignments (Sample Countries)\n\n") for country in sample_countries: if country not in country_series: continue seq = [lib for _, lib in country_series[country]] years = [yr for yr, _ in country_series[country]] states = viterbi(seq, pi_best, A_best, means_orig, stds_orig, best_hmm_K) inv_order = {old: new for new, old in enumerate(best_hmm['state_order'])} mapped_states = [inv_order[s] + 1 for s in states] f.write(f"### {country}\n\n") f.write("| Year | Liberty | HMM State |\n") f.write("|------|---------|----------|\n") for yr, lib, st in zip(years, seq, mapped_states): f.write(f"| {yr} | {lib:.0f} | R{st} |\n") f.write("\n") print("\n\nResults saved to /tmp/pt-data/stage-derivation-results.md")