#!/usr/bin/env python3 """ fix06 -- Known-Answer Validation Tests 1. OLS validation: Y = 2 + 3*X + noise, verify recovered coefficients 2. GMM validation: known 3-component mixture, verify recovered means 3. Bootstrap CI validation: known normal, verify CI covers true mean 4. KDE validation: bimodal data, verify two modes found Uses only stdlib: csv, math, random, statistics """ import math import random import statistics import time random.seed(42) results = [] # (test_name, pass_fail, details) # ============================================================ # Test 1: OLS Validation # ============================================================ print("=" * 60) print("Test 1: OLS Validation (Y = 2 + 3*X + noise)") print("=" * 60) n_ols = 500 true_a, true_b = 2.0, 3.0 X_ols = [random.uniform(0, 100) for _ in range(n_ols)] Y_ols = [true_a + true_b * x + random.gauss(0, 5) for x in X_ols] # OLS: b = Cov(X,Y)/Var(X), a = mean(Y) - b*mean(X) mean_x = statistics.mean(X_ols) mean_y = statistics.mean(Y_ols) cov_xy = sum((X_ols[i] - mean_x) * (Y_ols[i] - mean_y) for i in range(n_ols)) / (n_ols - 1) var_x = sum((x - mean_x) ** 2 for x in X_ols) / (n_ols - 1) b_hat = cov_xy / var_x a_hat = mean_y - b_hat * mean_x # R-squared Y_pred = [a_hat + b_hat * x for x in X_ols] ss_res = sum((Y_ols[i] - Y_pred[i]) ** 2 for i in range(n_ols)) ss_tot = sum((y - mean_y) ** 2 for y in Y_ols) r_sq = 1 - ss_res / ss_tot tol_a = 1.0 # intercept tolerance tol_b = 0.1 # slope tolerance pass_a = abs(a_hat - true_a) < tol_a pass_b = abs(b_hat - true_b) < tol_b pass_ols = pass_a and pass_b detail = (f"True: a={true_a}, b={true_b} | " f"Est: a={a_hat:.4f} (err={abs(a_hat-true_a):.4f}, tol={tol_a}), " f"b={b_hat:.4f} (err={abs(b_hat-true_b):.4f}, tol={tol_b}) | " f"R^2={r_sq:.6f}") results.append(("OLS coefficient recovery", "PASS" if pass_ols else "FAIL", detail)) print(f" Intercept: true={true_a}, estimated={a_hat:.4f}, error={abs(a_hat-true_a):.4f} {'PASS' if pass_a else 'FAIL'}") print(f" Slope: true={true_b}, estimated={b_hat:.4f}, error={abs(b_hat-true_b):.4f} {'PASS' if pass_b else 'FAIL'}") print(f" R^2 = {r_sq:.6f}") print(f" Result: {'PASS' if pass_ols else 'FAIL'}") # ============================================================ # Test 2: GMM Validation (known 3-component mixture) # ============================================================ print("\n" + "=" * 60) print("Test 2: GMM Validation (0.3*N(10,3) + 0.3*N(50,8) + 0.4*N(85,5))") print("=" * 60) true_weights = [0.3, 0.3, 0.4] true_means = [10.0, 50.0, 85.0] true_sigmas = [3.0, 8.0, 5.0] # Generate 500 samples from the mixture n_gmm = 500 gmm_data = [] for _ in range(n_gmm): r = random.random() if r < 0.3: gmm_data.append(random.gauss(10, 3)) elif r < 0.6: gmm_data.append(random.gauss(50, 8)) else: gmm_data.append(random.gauss(85, 5)) # EM algorithm (same as in fix04) def gaussian_pdf(x, mu, sigma): if sigma <= 0: sigma = 0.01 coeff = 1.0 / (sigma * math.sqrt(2 * math.pi)) exponent = -0.5 * ((x - mu) / sigma) ** 2 if exponent < -500: return 1e-300 return coeff * math.exp(exponent) def em_gmm(X, K, max_iter=300, tol=1e-6, init_means=None): N = len(X) if init_means is not None: means = list(init_means) else: means = [X[random.randint(0, N-1)]] for _ in range(K - 1): dists = [min((x - m) ** 2 for m in means) for x in X] total = sum(dists) if total == 0: means.append(X[random.randint(0, N-1)]) continue probs = [d / total for d in dists] r = random.random() cumsum = 0 for i, p in enumerate(probs): cumsum += p if cumsum >= r: means.append(X[i]) break overall_std = statistics.stdev(X) if len(X) > 1 else 10.0 sigmas = [overall_std / K * 2] * K weights = [1.0 / K] * K prev_ll = -float('inf') for iteration in range(max_iter): gamma = [] for x in X: row = [] for k in range(K): row.append(weights[k] * gaussian_pdf(x, means[k], sigmas[k])) total = sum(row) if total < 1e-300: row = [1.0 / K] * K else: row = [r / total for r in row] gamma.append(row) for k in range(K): Nk = sum(gamma[i][k] for i in range(N)) if Nk < 1e-10: means[k] = X[random.randint(0, N-1)] sigmas[k] = overall_std weights[k] = 1.0 / K continue weights[k] = Nk / N means[k] = sum(gamma[i][k] * X[i] for i in range(N)) / Nk var_k = sum(gamma[i][k] * (X[i] - means[k]) ** 2 for i in range(N)) / Nk sigmas[k] = math.sqrt(max(var_k, 0.01)) ll = 0 for x in X: p = sum(weights[k] * gaussian_pdf(x, means[k], sigmas[k]) for k in range(K)) if p > 0: ll += math.log(p) else: ll += -700 if abs(ll - prev_ll) < tol: return weights, means, sigmas, ll, True prev_ll = ll return weights, means, sigmas, ll, False # Fit K=3 with multiple restarts best_ll = -float('inf') best_result = None for run in range(30): inits_list = [[10, 50, 85], [8, 45, 80], [12, 55, 90], [5, 40, 75], None] init_m = inits_list[run] if run < len(inits_list) else None try: w, m, s, ll, conv = em_gmm(gmm_data, 3, init_means=init_m) if ll > best_ll: best_ll = ll best_result = (w, m, s, ll, conv) except Exception: pass w, m, s, ll, conv = best_result order = sorted(range(3), key=lambda k: m[k]) w = [w[k] for k in order] m = [m[k] for k in order] s = [s[k] for k in order] mean_tol = 2.0 # within 2 units pass_means = all(abs(m[k] - true_means[k]) < mean_tol for k in range(3)) detail_parts = [] for k in range(3): err = abs(m[k] - true_means[k]) ok = "ok" if err < mean_tol else "FAIL" detail_parts.append(f"k{k+1}: true={true_means[k]:.1f}, est={m[k]:.2f}, err={err:.2f} ({ok})") print(f" Component {k+1}: true_mean={true_means[k]:.1f}, est_mean={m[k]:.2f}, " f"true_w={true_weights[k]:.1f}, est_w={w[k]:.3f}, " f"true_s={true_sigmas[k]:.1f}, est_s={s[k]:.2f}, " f"mean_err={err:.2f} {'PASS' if err < mean_tol else 'FAIL'}") results.append(("GMM mean recovery (K=3)", "PASS" if pass_means else "FAIL", " | ".join(detail_parts))) print(f" Result: {'PASS' if pass_means else 'FAIL'}") # ============================================================ # Test 3: Bootstrap CI validation # ============================================================ print("\n" + "=" * 60) print("Test 3: Bootstrap CI of mean covers true value") print("=" * 60) true_mu = 50.0 true_sigma = 10.0 n_ci = 200 ci_data = [random.gauss(true_mu, true_sigma) for _ in range(n_ci)] n_boot_ci = 2000 boot_means = [] for _ in range(n_boot_ci): sample = random.choices(ci_data, k=n_ci) boot_means.append(statistics.mean(sample)) boot_means.sort() lo_idx = int(0.025 * n_boot_ci) hi_idx = int(0.975 * n_boot_ci) ci_lo = boot_means[lo_idx] ci_hi = boot_means[hi_idx] covers = ci_lo <= true_mu <= ci_hi sample_mean = statistics.mean(ci_data) detail = (f"True mu={true_mu}, sample mean={sample_mean:.2f}, " f"95% CI=[{ci_lo:.2f}, {ci_hi:.2f}], covers={covers}") results.append(("Bootstrap CI covers true mean", "PASS" if covers else "FAIL", detail)) print(f" Sample mean: {sample_mean:.2f}") print(f" Bootstrap 95% CI: [{ci_lo:.2f}, {ci_hi:.2f}]") print(f" True mean {true_mu} in CI: {covers}") print(f" Result: {'PASS' if covers else 'FAIL'}") # Also test coverage rate over 200 simulations print("\n Coverage rate test (200 simulations)...") n_coverage_sims = 200 n_boot_quick = 500 covered_count = 0 for sim in range(n_coverage_sims): sim_data = [random.gauss(true_mu, true_sigma) for _ in range(n_ci)] bm = [] for _ in range(n_boot_quick): samp = random.choices(sim_data, k=n_ci) bm.append(statistics.mean(samp)) bm.sort() lo = bm[int(0.025 * n_boot_quick)] hi = bm[int(0.975 * n_boot_quick)] if lo <= true_mu <= hi: covered_count += 1 coverage_rate = covered_count / n_coverage_sims # 95% CI should cover approximately 95% of the time; accept 88-99% pass_coverage = 0.88 <= coverage_rate <= 0.99 detail_cov = f"Coverage rate: {coverage_rate:.1%} ({covered_count}/{n_coverage_sims}), expected ~95%" results.append(("Bootstrap coverage rate ~95%", "PASS" if pass_coverage else "FAIL", detail_cov)) print(f" Coverage rate: {coverage_rate:.1%} ({covered_count}/{n_coverage_sims})") print(f" Result: {'PASS' if pass_coverage else 'FAIL'}") # ============================================================ # Test 4: KDE mode detection # ============================================================ print("\n" + "=" * 60) print("Test 4: KDE detects both modes of bimodal data") print("=" * 60) # Generate bimodal data: N(25,5) + N(75,5) n_bimodal = 1000 bimodal_data = [] for _ in range(n_bimodal // 2): bimodal_data.append(random.gauss(25, 5)) bimodal_data.append(random.gauss(75, 5)) # KDE def gaussian_kernel(x, xi, h): z = (x - xi) / h return math.exp(-0.5 * z * z) / (h * math.sqrt(2 * math.pi)) def kde(x_grid, data, bandwidth): n = len(data) density = [] for x in x_grid: p = sum(gaussian_kernel(x, xi, bandwidth) for xi in data) / n density.append(p) return density # Use a bandwidth that should resolve two modes well separated by 50 units h_kde = 5.0 x_grid = [i * 0.5 for i in range(1, 201)] # 0.5 to 100 density = kde(x_grid, bimodal_data, h_kde) # Find local maxima (modes) modes = [] for i in range(2, len(density) - 2): if density[i] > density[i-1] and density[i] > density[i+1]: if density[i] > density[i-2] and density[i] > density[i+2]: modes.append((x_grid[i], density[i])) # We expect two modes near 25 and 75 n_modes = len(modes) print(f" Bandwidth: {h_kde}") print(f" Modes found: {n_modes}") for loc, d in modes: print(f" L={loc:.1f}, density={d:.6f}") # Check: at least 2 modes, and at least one near 25 and one near 75 mode_locs = [m[0] for m in modes] has_low = any(15 <= m <= 35 for m in mode_locs) has_high = any(65 <= m <= 85 for m in mode_locs) pass_kde = n_modes >= 2 and has_low and has_high detail_kde = (f"Modes found: {n_modes} at locations {[f'{m:.1f}' for m in mode_locs]}. " f"Low mode (~25): {'found' if has_low else 'MISSING'}. " f"High mode (~75): {'found' if has_high else 'MISSING'}.") results.append(("KDE detects bimodal modes", "PASS" if pass_kde else "FAIL", detail_kde)) print(f" Low mode (15-35): {'found' if has_low else 'MISSING'}") print(f" High mode (65-85): {'found' if has_high else 'MISSING'}") print(f" Result: {'PASS' if pass_kde else 'FAIL'}") # ============================================================ # Summary # ============================================================ print("\n" + "=" * 60) print("VALIDATION SUMMARY") print("=" * 60) n_pass = sum(1 for _, pf, _ in results if pf == "PASS") n_fail = sum(1 for _, pf, _ in results if pf == "FAIL") n_total = len(results) for name, pf, detail in results: print(f" [{pf}] {name}") print(f" {detail}") print(f"\n {n_pass}/{n_total} PASSED, {n_fail}/{n_total} FAILED") if n_fail == 0: print("\n ALL VALIDATION TESTS PASSED") else: print(f"\n WARNING: {n_fail} test(s) failed") # ============================================================ # Write results # ============================================================ out_path = '/tmp/pt-data/fix06-validation-results.md' with open(out_path, 'w') as f: f.write("# fix06: Known-Answer Validation Tests\n\n") f.write("## Summary\n\n") f.write(f"- **{n_pass}/{n_total} PASSED**, {n_fail}/{n_total} FAILED\n\n") f.write("## Results\n\n") f.write("| Test | Result | Details |\n") f.write("|------|--------|--------|\n") for name, pf, detail in results: f.write(f"| {name} | **{pf}** | {detail} |\n") f.write("\n## Test Descriptions\n\n") f.write("### 1. OLS Coefficient Recovery\n\n") f.write("Generated N=500 samples from Y = 2 + 3*X + N(0,5). Fit OLS via closed-form.\n") f.write(f"- Intercept tolerance: {tol_a}\n") f.write(f"- Slope tolerance: {tol_b}\n") f.write(f"- Estimated: a = {a_hat:.4f}, b = {b_hat:.4f}, R^2 = {r_sq:.6f}\n\n") f.write("### 2. GMM Mean Recovery\n\n") f.write("Generated N=500 samples from 0.3*N(10,3) + 0.3*N(50,8) + 0.4*N(85,5). Fit K=3 GMM.\n") f.write(f"- Mean tolerance: {mean_tol} units\n") f.write("- Recovered:\n\n") f.write("| Component | True Mean | Est Mean | True Weight | Est Weight | True Sigma | Est Sigma |\n") f.write("|-----------|-----------|----------|-------------|-----------|------------|----------|\n") for k in range(3): f.write(f"| {k+1} | {true_means[k]:.1f} | {m[k]:.2f} | {true_weights[k]:.1f} | {w[k]:.3f} | {true_sigmas[k]:.1f} | {s[k]:.2f} |\n") f.write("\n") f.write("### 3. Bootstrap CI Coverage\n\n") f.write(f"- Single test: N=200 from N(50,10), B=2000 bootstrap. CI=[{ci_lo:.2f}, {ci_hi:.2f}] covers {true_mu}\n") f.write(f"- Coverage rate: {n_coverage_sims} simulations, each N=200, B={n_boot_quick}. " f"Rate = {coverage_rate:.1%} (expected ~95%)\n\n") f.write("### 4. KDE Bimodal Detection\n\n") f.write(f"Generated N=1000 from equal mixture of N(25,5) and N(75,5). KDE with h={h_kde}.\n") f.write(f"- Modes found: {n_modes} at {[f'{m:.1f}' for m in mode_locs]}\n") f.write(f"- Expected: modes near 25 and 75\n\n") f.write("---\n*Generated by fix06_validation_tests.py*\n") print(f"\nResults saved to {out_path}")