from __future__ import annotations import argparse import csv import json import math import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] sys.path.append(str(ROOT)) import joblib import matplotlib.pyplot as plt import numpy as np import torch from src.common.config import Config from src.common.experiment_paths import config_for_stage, model_checkpoint_for_tag, normalize_tag, processed_path_for_tag from src.data.curve_processing import clean_curve_for_dataset, is_valid_curve, resample_curve_to_features from src.data.param_features import param_feature_transform_from_meta, transform_param_features from src.data.params import Params, Schedule from src.data.runner_client import CppRunner, read_result_bin from src.data.schedule_features import build_schedule_model_vector from src.evaluation.autofit_objective import dual_log_objective from src.models.forward_surrogate import ForwardSurrogate DEFAULT_CASE = { "config": "configs/data_gen_family_random.yaml", "tag": "family_random_mixed_50k", "params": { "k": 0.025, "skin": 0.0, "wellboreC": 0.01, "phi": 0.0245, "h": 9.144, "Cf": 0.0004315, }, } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Validate whether surrogate preserves local autofit objective ranking around a target case" ) parser.add_argument("--config", type=str, default=DEFAULT_CASE["config"]) parser.add_argument("--stage", choices=["fixed_case", "case_neighborhood", "family_random", "family_random_v2_q"], default=None) parser.add_argument("--processed", type=str, default=None) parser.add_argument("--model", type=str, default=None) parser.add_argument("--output-dir", type=str, default=None) parser.add_argument("--tag", type=str, default=DEFAULT_CASE["tag"]) parser.add_argument("--well-index", type=int, default=0) parser.add_argument("--solver-timeout", type=int, default=120) parser.add_argument("--n-candidates", type=int, default=64) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--max-attempts-factor", type=int, default=4) parser.add_argument( "--span-frac", type=float, default=0.08, help="Local box width as a fraction of each parameter range in transformed space", ) parser.add_argument( "--reuse-runner", action="store_true", help="Reuse one long-lived runner server across candidates; faster but less robust", ) parser.add_argument("--k", type=float, default=DEFAULT_CASE["params"]["k"]) parser.add_argument("--skin", type=float, default=DEFAULT_CASE["params"]["skin"]) parser.add_argument("--wellboreC", type=float, default=DEFAULT_CASE["params"]["wellboreC"]) parser.add_argument("--phi", type=float, default=DEFAULT_CASE["params"]["phi"]) parser.add_argument("--h", type=float, default=DEFAULT_CASE["params"]["h"]) parser.add_argument("--Cf", type=float, default=DEFAULT_CASE["params"]["Cf"]) return parser.parse_args() def infer_curve_layout(meta: dict, curve_dim: int) -> dict: curve_layout = meta.get("curve_layout") if curve_layout is not None: return curve_layout n_time_points = curve_dim // 3 return { "n_time_points": int(n_time_points), "parts": [ {"name": "log_pressure", "start": 0, "end": n_time_points}, {"name": "log_derivative", "start": n_time_points, "end": 2 * n_time_points}, {"name": "slope", "start": 2 * n_time_points, "end": 3 * n_time_points}, ], } def resolve_case_schedule(cfg: Config) -> Schedule: schedule_cfg = cfg.raw["schedule"]["case_schedule"] timeQ = list(map(float, schedule_cfg["timeQ"])) q = list(map(float, schedule_cfg["q"])) policy = cfg.raw["schedule"].get("section_policy", {}) or {} mode = str(policy.get("mode", "fixed_last")).lower() n_sections = len(timeQ) if mode == "fixed_last": section_index = n_sections elif mode == "fixed_value": section_index = int(np.clip(int(policy.get("fixed_value", n_sections)), 1, n_sections)) else: section_index = int(np.clip(int(schedule_cfg.get("default_section_index", n_sections)), 1, n_sections)) schedule = Schedule(sectionIndex=section_index, timeQ=timeQ, q=q) if not schedule.validate(): raise ValueError("Invalid case_schedule in config") return schedule def resolve_paths(args: argparse.Namespace) -> tuple[Config, Path, Path, Path]: tag = normalize_tag(args.tag) config_path = args.config if config_path is None: config_path = str(config_for_stage(args.stage) or Path(DEFAULT_CASE["config"])) processed_path = (Path(args.processed) if args.processed is not None else processed_path_for_tag(tag)).resolve() model_path = (Path(args.model) if args.model is not None else model_checkpoint_for_tag(tag, use_schedule=True)).resolve() if args.output_dir is not None: output_dir = Path(args.output_dir).resolve() else: output_dir = (Path("results") / f"autofit_local_validation_{tag}").resolve() return Config(config_path), processed_path, model_path, output_dir def build_params_from_args(args: argparse.Namespace, schedule: Schedule) -> Params: return Params( k=float(args.k), skin=float(args.skin), wellboreC=float(args.wellboreC), phi=float(args.phi), h=float(args.h), Cf=float(args.Cf), schedule=schedule, ) def build_schedule_vector(cfg: Config, schedule: Schedule) -> np.ndarray: return build_schedule_model_vector(cfg, schedule) def load_model(checkpoint_path: Path) -> tuple[ForwardSurrogate, bool, torch.device]: checkpoint = torch.load(checkpoint_path, map_location="cpu") use_schedule = bool(checkpoint.get("use_schedule", True)) model = ForwardSurrogate( param_dim=int(checkpoint["param_dim"]), schedule_dim=int(checkpoint["schedule_dim"]), curve_dim=int(checkpoint["curve_dim"]), hidden_dim=int(checkpoint["hidden_dim"]), dropout=float(checkpoint["dropout"]), use_schedule=use_schedule, ) model.load_state_dict(checkpoint["model_state_dict"]) model.eval() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) return model, use_schedule, device def predict_surrogate_curve( processed: dict, model: ForwardSurrogate, device: torch.device, use_schedule: bool, params: Params, schedule: Schedule, cfg: Config, ) -> np.ndarray: scaler_params = processed["scaler_params"] scaler_schedule = processed["scaler_schedule"] scaler_curve = processed["scaler_curve"] param_transform = param_feature_transform_from_meta(processed.get("meta", {})) params_vec = np.asarray( [params.k, params.skin, params.wellboreC, params.phi, params.h, params.Cf], dtype=np.float32, ).reshape(1, -1) schedule_vec = build_schedule_vector(cfg, schedule).reshape(1, -1) params_x = scaler_params.transform(transform_param_features(params_vec, param_transform)).astype(np.float32) schedule_x = scaler_schedule.transform(schedule_vec).astype(np.float32) with torch.no_grad(): params_t = torch.tensor(params_x, dtype=torch.float32, device=device) schedule_t = torch.tensor(schedule_x, dtype=torch.float32, device=device) if use_schedule else None pred_scaled = model(params_t, schedule_t).cpu().numpy() return scaler_curve.inverse_transform(pred_scaled)[0].astype(np.float32) def run_solver_and_extract_curve( runner: CppRunner, cfg: Config, params: Params, well_index: int, timeout: int, ) -> tuple[np.ndarray, dict]: ok = runner.run_simulation(params, timeout=timeout, override_schedule=params.schedule, include_schedule=True) result = read_result_bin(runner.result_bin) if runner.result_bin.exists() else None if not ok and result is None: raise RuntimeError("Solver failed and result.bin is unavailable") if result is None or not result["loglog"]: raise RuntimeError("Solver produced no loglog data") if well_index < 0 or well_index >= len(result["loglog"]): raise IndexError(f"well-index={well_index} out of range; n_wells={len(result['loglog'])}") loglog = result["loglog"][well_index] t = np.asarray(loglog["t"], dtype=np.float64) p = np.asarray(loglog["p"], dtype=np.float64) d = np.asarray(loglog["deriv"], dtype=np.float64) cleaned = clean_curve_for_dataset(cfg, t, p, d) if cleaned is None: raise RuntimeError("Curve invalid after cleaning") t_clean, p_clean, d_clean = cleaned valid, reason = is_valid_curve(cfg, t_clean, p_clean, d_clean) if not valid: raise RuntimeError(f"Curve failed validation: {reason}") curve_feat = resample_curve_to_features(cfg, t_clean, p_clean, d_clean) raw = { "t": t_clean.tolist(), "p": p_clean.tolist(), "d": d_clean.tolist(), "n_steps": int(result["nSteps"]), "n_wells": int(result["nWells"]), } return curve_feat, raw def _corr_pearson(x: np.ndarray, y: np.ndarray) -> float: if x.size < 2: return float("nan") if np.std(x) < 1e-12 or np.std(y) < 1e-12: return float("nan") return float(np.corrcoef(x, y)[0, 1]) def _corr_spearman(x: np.ndarray, y: np.ndarray) -> float: if x.size < 2: return float("nan") xr = np.argsort(np.argsort(x)) yr = np.argsort(np.argsort(y)) return _corr_pearson(xr.astype(np.float64), yr.astype(np.float64)) def _rank_positions(values: np.ndarray) -> np.ndarray: order = np.argsort(values) rank = np.empty_like(order) rank[order] = np.arange(order.size) return rank def _fixed_param_value(cfg: Config, name: str) -> float | None: fixed_cfg = ((cfg.raw["params"].get("fixed_params", {}) or {}).get(name, {}) or {}) if bool(fixed_cfg.get("enabled", False)): return float(fixed_cfg["value"]) return None def _search_param_names(cfg: Config) -> list[str]: active_names = list(cfg.raw["params"].get("active_param_names", cfg.raw["params"]["all_physical_param_names"])) names = [name for name in active_names if _fixed_param_value(cfg, name) is None] if not names: raise ValueError("No searchable parameters: active_param_names is empty after removing fixed_params") return names def _sample_local_candidates(cfg: Config, base: Params, n: int, seed: int, span_frac: float) -> list[Params]: rng = np.random.RandomState(seed) all_names = list(cfg.raw["params"]["all_physical_param_names"]) names = _search_param_names(cfg) log_params = set(cfg.raw["params"]["log_params"]) base_dict = { "k": float(base.k), "skin": float(base.skin), "wellboreC": float(base.wellboreC), "phi": float(base.phi), "h": float(base.h), "Cf": float(base.Cf), } samples: list[Params] = [] for _ in range(n): cand = dict(base_dict) for name in all_names: fixed_value = _fixed_param_value(cfg, name) if fixed_value is not None: cand[name] = fixed_value continue lo, hi = map(float, cfg.raw["params"]["ranges"][name]) base_val = float(base_dict[name]) if name not in names: cand[name] = base_val continue if name in log_params: lo_t = math.log10(max(lo, 1e-30)) hi_t = math.log10(max(hi, 1e-30)) base_t = math.log10(max(base_val, 1e-30)) span = span_frac * (hi_t - lo_t) v_t = np.clip(rng.uniform(base_t - span, base_t + span), lo_t, hi_t) cand[name] = float(10 ** v_t) else: span = span_frac * (hi - lo) v = np.clip(rng.uniform(base_val - span, base_val + span), lo, hi) cand[name] = float(v) samples.append( Params( k=cand["k"], skin=cand["skin"], wellboreC=cand["wellboreC"], phi=cand["phi"], h=cand["h"], Cf=cand["Cf"], schedule=base.schedule, ) ) return samples def _params_to_dict(params: Params) -> dict: return { "k": float(params.k), "skin": float(params.skin), "wellboreC": float(params.wellboreC), "phi": float(params.phi), "h": float(params.h), "Cf": float(params.Cf), } def _plot_scatter(csv_rows: list[dict], output_path: Path) -> None: solver_obj = np.asarray([float(r["solver_objective"]) for r in csv_rows], dtype=np.float64) surrogate_obj = np.asarray([float(r["surrogate_objective"]) for r in csv_rows], dtype=np.float64) plt.figure(figsize=(7, 6)) plt.scatter(solver_obj, surrogate_obj, alpha=0.75) if solver_obj.size >= 2: lo = min(float(np.min(solver_obj)), float(np.min(surrogate_obj))) hi = max(float(np.max(solver_obj)), float(np.max(surrogate_obj))) plt.plot([lo, hi], [lo, hi], linestyle="--", linewidth=1) plt.xlabel("Solver objective to target") plt.ylabel("Surrogate objective to target") plt.title("Local Candidate Ranking Validation") plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(output_path, dpi=150, bbox_inches="tight") plt.close() def main() -> None: args = parse_args() cfg, processed_path, model_path, output_dir = resolve_paths(args) output_dir.mkdir(parents=True, exist_ok=True) if not processed_path.exists(): raise FileNotFoundError(f"Processed dataset not found: {processed_path}") if not model_path.exists(): raise FileNotFoundError(f"Model checkpoint not found: {model_path}") processed = joblib.load(processed_path) curve_layout = infer_curve_layout(processed["meta"], int(processed["meta"]["curve_dim"])) schedule = resolve_case_schedule(cfg) target_params = build_params_from_args(args, schedule) search_names = _search_param_names(cfg) model, use_schedule, device = load_model(model_path) shared_runner = None def make_runner(tag: str) -> CppRunner: temp_dir = output_dir / "_runner_temp" / tag temp_dir.mkdir(parents=True, exist_ok=True) return CppRunner(cfg=cfg, auto_init=False, temp_dir=temp_dir) try: if args.reuse_runner: shared_runner = make_runner("shared") target_runner = shared_runner else: target_runner = make_runner("target") print("Running target solver case...") target_curve, target_raw = run_solver_and_extract_curve( runner=target_runner, cfg=cfg, params=target_params, well_index=args.well_index, timeout=args.solver_timeout, ) if not args.reuse_runner: target_runner.close() max_attempts = max(int(args.n_candidates), int(args.n_candidates) * int(args.max_attempts_factor)) candidates = _sample_local_candidates( cfg=cfg, base=target_params, n=max_attempts, seed=int(args.seed), span_frac=float(args.span_frac), ) rows: list[dict] = [] failed_rows: list[dict] = [] for attempt_idx, cand in enumerate(candidates): if len(rows) >= int(args.n_candidates): break runner = shared_runner if shared_runner is not None else make_runner(f"cand_{attempt_idx:04d}") try: solver_curve, _ = run_solver_and_extract_curve( runner=runner, cfg=cfg, params=cand, well_index=args.well_index, timeout=args.solver_timeout, ) pred_curve = predict_surrogate_curve( processed=processed, model=model, device=device, use_schedule=use_schedule, params=cand, schedule=schedule, cfg=cfg, ) solver_obj = dual_log_objective(target_curve, solver_curve, curve_layout) surrogate_obj = dual_log_objective(target_curve, pred_curve, curve_layout) rows.append( { "candidate_id": len(rows), "attempt_id": attempt_idx, "k": cand.k, "skin": cand.skin, "wellboreC": cand.wellboreC, "phi": cand.phi, "h": cand.h, "Cf": cand.Cf, "solver_objective": solver_obj["dual_log_objective"], "solver_p_obj": solver_obj["log_pressure_objective"], "solver_d_obj": solver_obj["log_derivative_objective"], "surrogate_objective": surrogate_obj["dual_log_objective"], "surrogate_p_obj": surrogate_obj["log_pressure_objective"], "surrogate_d_obj": surrogate_obj["log_derivative_objective"], "objective_gap": surrogate_obj["dual_log_objective"] - solver_obj["dual_log_objective"], } ) except Exception as exc: failed_row = {"attempt_id": attempt_idx, "reason": str(exc)} failed_row.update(_params_to_dict(cand)) failed_rows.append(failed_row) print( "[warn] candidate attempt " f"{attempt_idx} skipped: {exc} | " f"k={cand.k:.6g}, skin={cand.skin:.6g}, C={cand.wellboreC:.6g}, " f"phi={cand.phi:.6g}, h={cand.h:.6g}, Cf={cand.Cf:.6g}" ) finally: if shared_runner is None: runner.close() if not rows: raise RuntimeError("No valid candidate evaluations completed") solver_obj = np.asarray([float(r["solver_objective"]) for r in rows], dtype=np.float64) surrogate_obj = np.asarray([float(r["surrogate_objective"]) for r in rows], dtype=np.float64) solver_rank = _rank_positions(solver_obj) surrogate_rank = _rank_positions(surrogate_obj) for i, row in enumerate(rows): row["solver_rank"] = int(solver_rank[i]) row["surrogate_rank"] = int(surrogate_rank[i]) row["rank_gap"] = int(surrogate_rank[i] - solver_rank[i]) top5_solver = set(np.argsort(solver_obj)[: min(5, solver_obj.size)].tolist()) top5_sur = set(np.argsort(surrogate_obj)[: min(5, surrogate_obj.size)].tolist()) top10_solver = set(np.argsort(solver_obj)[: min(10, solver_obj.size)].tolist()) top10_sur = set(np.argsort(surrogate_obj)[: min(10, surrogate_obj.size)].tolist()) best_solver_idx = int(np.argmin(solver_obj)) best_surrogate_idx = int(np.argmin(surrogate_obj)) summary = { "config_path": str(cfg.path), "processed_path": str(processed_path), "model_path": str(model_path), "target_params": { "k": target_params.k, "skin": target_params.skin, "wellboreC": target_params.wellboreC, "phi": target_params.phi, "h": target_params.h, "Cf": target_params.Cf, }, "schedule": { "sectionIndex": schedule.sectionIndex, "timeQ": schedule.timeQ, "q": schedule.q, }, "n_requested": int(args.n_candidates), "n_valid": int(len(rows)), "span_frac": float(args.span_frac), "search_param_names": search_names, "pearson_objective": _corr_pearson(solver_obj, surrogate_obj), "spearman_objective": _corr_spearman(solver_obj, surrogate_obj), "mean_objective_gap": float(np.mean(surrogate_obj - solver_obj)), "mae_objective_gap": float(np.mean(np.abs(surrogate_obj - solver_obj))), "top5_overlap": int(len(top5_solver & top5_sur)), "top10_overlap": int(len(top10_solver & top10_sur)), "best_solver_candidate_id": best_solver_idx, "best_solver_candidate_surrogate_rank": int(surrogate_rank[best_solver_idx]), "best_surrogate_candidate_id": best_surrogate_idx, "best_surrogate_candidate_solver_rank": int(solver_rank[best_surrogate_idx]), "target_solver_raw_loglog": target_raw, } csv_path = output_dir / "candidate_objectives.csv" with open(csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter( f, fieldnames=[ "candidate_id", "attempt_id", "k", "skin", "wellboreC", "phi", "h", "Cf", "solver_objective", "solver_p_obj", "solver_d_obj", "surrogate_objective", "surrogate_p_obj", "surrogate_d_obj", "objective_gap", "solver_rank", "surrogate_rank", "rank_gap", ], ) writer.writeheader() writer.writerows(rows) failed_csv_path = output_dir / "failed_candidates.csv" with open(failed_csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter( f, fieldnames=["attempt_id", "reason", "k", "skin", "wellboreC", "phi", "h", "Cf"], ) writer.writeheader() writer.writerows(failed_rows) with open(output_dir / "summary.json", "w", encoding="utf-8") as f: json.dump(summary, f, ensure_ascii=False, indent=2) _plot_scatter(rows, output_dir / "objective_scatter.png") print("\nLocal autofit validation complete.") print(f"Output dir: {output_dir}") print(f"Valid candidates={len(rows)}/{args.n_candidates}") print( f"Objective Pearson={summary['pearson_objective']:.6f}, " f"Spearman={summary['spearman_objective']:.6f}, " f"Top5 overlap={summary['top5_overlap']}, Top10 overlap={summary['top10_overlap']}" ) print( f"Best solver candidate id={summary['best_solver_candidate_id']}, " f"its surrogate rank={summary['best_solver_candidate_surrogate_rank']}" ) print( f"Best surrogate candidate id={summary['best_surrogate_candidate_id']}, " f"its solver rank={summary['best_surrogate_candidate_solver_rank']}" ) print(f"CSV saved: {csv_path}") print(f"Failed candidates saved: {failed_csv_path}") print(f"Summary saved: {output_dir / 'summary.json'}") print(f"Plot saved: {output_dir / 'objective_scatter.png'}") finally: if shared_runner is not None: shared_runner.close() if __name__ == "__main__": main()