| import os |
| import json |
| import re |
| import sys |
| import time |
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| try: |
| from avh_math.input_normalize import normalize_input as normalize_input_shared |
| except Exception: |
| normalize_input_shared = None |
|
|
| |
| from decomposer import decompose_problem |
| from rewrite_engine import apply_rewrites |
| from energy import score_diffs |
| from proof_trace import Trace |
| from proof_sketch import generate_proof_sketch |
| from counterexample_synth import synth_cex_templates_from_kb, match_cex_template |
| from explainer import explain_formula |
| from repair import suggest_repairs |
| from counterexample_delta import explain_counterexample_delta |
| from patch_search import find_minimal_patches |
| from assumption_minimizer import minimize_assumptions_bfs |
| from patch_proof import ( |
| apply_patch_to_counterexample, |
| check_assumption_satisfied, |
| recheck_formula_with_model_search, |
| model_search_wrapper |
| ) |
| from search_orchestrator import run_beam_parallel |
| from verifier import check_model, find_counterexample, VerifyConfig |
|
|
| |
| try: |
| from solvers.registry import SolverRegistry |
| from solvers.logic import PropositionalSolver |
| from solvers.linear_algebra import LinearAlgebraSolver |
| from solvers.backoff import BackoffSolver |
| from solvers.modal_solver import ModalSolver |
| except ImportError: |
| from avh_math.solvers.registry import SolverRegistry |
| from avh_math.solvers.logic import PropositionalSolver |
| from avh_math.solvers.linear_algebra import LinearAlgebraSolver |
| from avh_math.solvers.backoff import BackoffSolver |
| from avh_math.solvers.modal_solver import ModalSolver |
|
|
| |
| _STRUCT_DOMAIN_RE = re.compile(r"(?im)^\s*Domain\s*:\s*([a-zA-Z0-9_]+)\s*$") |
| _STRUCT_ASSUME_RE = re.compile(r"(?im)^\s*Assumption(?:s)?\s*:\s*(.+?)\s*$") |
| _STRUCT_FORMULA_RE = re.compile(r"(?im)^\s*Formula\s*:\s*(.+?)\s*$") |
|
|
| def _normalize_modal_words(s: str) -> str: |
| s = re.sub(r"\bbox\b", "□", s, flags=re.IGNORECASE) |
| s = re.sub(r"\bdiamond\b", "◇", s, flags=re.IGNORECASE) |
| return s |
|
|
| def parse_structured_header(q: str) -> Dict[str, Any]: |
| q = q or "" |
| dom = None |
| m = _STRUCT_DOMAIN_RE.search(q) |
| if m: |
| dom = m.group(1).strip().lower() |
| assumptions: List[str] = [] |
| for m in _STRUCT_ASSUME_RE.finditer(q): |
| parts = re.split(r"[,\\s]+", m.group(1).strip().lower()) |
| assumptions.extend([p for p in parts if p]) |
| formula = None |
| m = _STRUCT_FORMULA_RE.search(q) |
| if m: |
| formula = m.group(1).strip() |
| return {"domain": dom, "assumptions": assumptions, "formula": formula} |
|
|
| _FORMULA_CHARS = re.compile(r"(?:<->|->|\[\]|<>|[()~&|]|□|◇|[A-Za-z][A-Za-z0-9_]*|[⊤⊥TF])") |
|
|
| def rebuild_formula_only(s: str) -> str: |
| toks = _FORMULA_CHARS.findall(s or "") |
| if not toks: return "" |
| cand = " ".join(toks) |
| cand = re.sub(r"\s+", " ", cand).strip() |
| |
| cand = re.sub(r"\[\]\s+(?=[A-Za-z(~\[])", "[]", cand) |
| cand = re.sub(r"<>\\s+(?=[A-Za-z(~\\[])", "<>", cand) |
| cand = re.sub(r"□\\s+(?=[A-Za-z(~\\[])", "□", cand) |
| cand = re.sub(r"◇\\s+(?=[A-Za-z(~\\[])", "◇", cand) |
| cand = re.sub(r"\[\]\s+\[\]\s*", "[][]", cand) |
| cand = re.sub(r"<>\\s+<>\\s*", "<><>", cand) |
| cand = re.sub(r"\b(a|an|the)\b\s*$", "", cand, flags=re.IGNORECASE).strip() |
| cand = re.sub(r"\b(tautology|valid|satisfiable|unsatisfiable)\b\s*$", "", cand, flags=re.IGNORECASE).strip() |
| return cand |
|
|
| def extract_logic_formula(text: str) -> Optional[str]: |
| s = (text or "").strip() |
| s = _normalize_modal_words(s) |
| m = _STRUCT_FORMULA_RE.search(s) |
| if m: |
| cand = rebuild_formula_only(m.group(1).strip()) |
| if any(op in cand for op in ("->", "<->", "&", "|", "~", "□", "◇", "[]", "<>")): return cand |
| m = re.search(r"\bformula\b\s*[:\-]?\s*(.+)", s, flags=re.IGNORECASE) |
| if m: |
| tail = re.sub(r"[?.!。!]+", "", m.group(1).strip()).strip() |
| cand = rebuild_formula_only(tail) |
| if any(op in cand for op in ("->", "<->", "&", "|", "~", "□", "◇", "[]", "<>")): return cand |
| m = re.search(r"(?:において|にて)\s*(.+?)\s*(?:は|が)\s*(?:恒真|妥当|成立|成り立つ|反例|偽).*$", s) |
| if m: |
| cand = rebuild_formula_only(m.group(1).strip()) |
| if any(op in cand for op in ("->", "<->", "&", "|", "~", "□", "◇", "[]", "<>")): return cand |
| cand = rebuild_formula_only(s) |
| if any(op in cand for op in ("->", "<->", "&", "|", "~", "□", "◇", "[]", "<>")): return cand |
| return None |
|
|
| _QUOTED_FORMULA_RE = re.compile(r'["“”]([^"“”]+)["“”]|「([^」]+)」|『([^』]+)』') |
|
|
| def extract_quoted_formula(text: str) -> Optional[str]: |
| for m in _QUOTED_FORMULA_RE.finditer(text or ""): |
| frag = next((g for g in m.groups() if g), "") |
| cand = rebuild_formula_only(frag) |
| if any(op in cand for op in ("->", "<->", "&", "|", "~", "□", "◇", "[]", "<>")): |
| return cand |
| return None |
|
|
| def coarse_domain_guess(text: str) -> str: |
| s = (text or "").lower() |
| if "kripke" in s or "□" in s or "◇" in s or "modal" in s: return "modal_logic" |
| if any(k in s for k in ("∀","∃","forall","exists","predicate")): return "first_order_logic" |
| if any(k in s for k in ("matrix","行列"," 対称行列","rank","det")): return "linear_algebra" |
| if any(k in s for k in ("->","<->","&","|","~","⊤","⊥")): return "propositional_logic" |
| return "unknown" |
|
|
| def normalize_input(text: str) -> Dict[str, Any]: |
| raw = (text or "").strip() |
| if normalize_input_shared: |
| raw = normalize_input_shared(raw) |
| hdr = parse_structured_header(raw) |
| raw2 = _normalize_modal_words(raw) |
| dom = hdr["domain"] or coarse_domain_guess(raw2) |
| assumptions = hdr["assumptions"][:] if hdr["assumptions"] else [] |
| formula = None |
| if hdr["formula"]: formula = rebuild_formula_only(hdr["formula"]) |
| if not formula: |
| formula = extract_quoted_formula(raw2) |
| if not formula: |
| formula = extract_logic_formula(raw2) |
| injected = [] |
| if dom and dom != "unknown": injected.append(f"Domain: {dom}") |
| if assumptions: injected.append("Assumptions: " + ", ".join(assumptions)) |
| if formula: |
| norm = ("\n".join(injected) + "\n" if injected else "") + f"Formula: {formula}" |
| else: |
| norm = ("\n".join(injected) + "\n" if injected else "") + raw2 |
| return {"raw": raw, "normalized": norm, "domain": dom, "assumptions": assumptions, "formula": formula} |
|
|
| @dataclass |
| class CandidateEval: |
| formula: str |
| status: str |
| energy: int |
| energy_breakdown: Dict[str, int] |
| diffs: List[str] |
| counterexample: Optional[Dict[str, Any]] |
| audit: List[str] |
| proof_sketch: Optional[Dict[str, Any]] = None |
| cex_explain: Optional[Dict[str, Any]] = None |
| explanation: Optional[Dict[str, Any]] = None |
| repair_suggestions: Optional[List[Dict[str, Any]]] = None |
| counterexample_delta: Optional[List[Dict[str, Any]]] = None |
| counterexample_patch_proof: Optional[Dict[str, Any]] = None |
| minimal_patches: Optional[List[Dict[str, Any]]] = None |
| minimal_assumption_sets: Optional[List[List[str]]] = None |
|
|
| @dataclass |
| class SolveResult: |
| ok: bool |
| assumptions: List[str] |
| ranked: List[CandidateEval] |
| best_valid: List[str] |
| trace: List[str] |
| evidence_map: Optional[Dict[str, Any]] = None |
|
|
| class MathEngine: |
| def __init__(self, db_dir: str): |
| self.db_dir = db_dir |
| self.kb_path = os.path.join(db_dir, "foundation_kb.jsonl") |
| self.tactics_path = os.path.join(db_dir, "tactics.json") |
| self.knowledge_db_path = os.path.join(db_dir, "knowledge_db.json") |
| self.tactics = self._load_json(self.tactics_path, {}) |
| self.knowledge_db = self._load_json(self.knowledge_db_path, {}) |
| |
| |
| self.registry = SolverRegistry() |
| self.registry.register(PropositionalSolver()) |
| self.registry.register(LinearAlgebraSolver()) |
| self.registry.register(ModalSolver()) |
| self.registry.register(BackoffSolver()) |
|
|
| def _load_json(self, path: str, default: Any) -> Any: |
| if not os.path.exists(path): |
| return default |
| try: |
| with open(path, "r", encoding="utf-8") as f: |
| return json.load(f) |
| except: |
| return default |
|
|
| def solve(self, text: str) -> SolveResult: |
| trace = Trace() |
| trace.add("[PHASE0] Universal normalizer starting.") |
|
|
| |
| norm = normalize_input(text) |
| trace.add(f"[PHASE0] domain={norm['domain']} formula={'yes' if norm['formula'] else 'no'}") |
| trace.add(f"[PHASE0] normalized='{norm['normalized'][:160]}'") |
|
|
| |
| spec_obj = decompose_problem(norm["normalized"], self.knowledge_db) |
| |
| spec = { |
| "assumptions": list(dict.fromkeys((norm["assumptions"] or []) + (getattr(spec_obj, 'assumptions', []) or []))), |
| "candidates": getattr(spec_obj, 'candidates', []) or [], |
| "atoms": getattr(spec_obj, 'atoms', []) or [], |
| "domain": getattr(spec_obj, "domain", "unknown") or norm["domain"] |
| } |
|
|
| if spec.get("domain") == "modal_logic" and spec.get("candidates"): |
| if os.environ.get("AVH_MODAL_INTERNAL") == "1": |
| try: |
| from avh_math.solvers.modal_parse import to_internal_modal, ModalParseError |
| spec["candidates"] = [to_internal_modal(f) for f in spec["candidates"]] |
| trace.add("[PHASE1] modal surface -> internal conversion applied.") |
| except ModalParseError as e: |
| trace.add(f"[PHASE1] modal parse failed: {e}") |
| else: |
| trace.add("[PHASE1] modal internal conversion skipped (surface syntax preserved).") |
|
|
| core_formula = getattr(spec_obj, "core_formula", "") or "" |
| if not spec["candidates"] and core_formula: |
| spec["candidates"] = [core_formula] |
| trace.add("[PHASE1] candidates were empty; injected from core_formula.") |
|
|
| |
| if not spec["candidates"] and norm["formula"]: |
| spec["candidates"] = [norm["formula"]] |
| trace.add("[PHASE1] candidates were empty; injected from extracted formula.") |
|
|
| |
| if not spec["candidates"] and norm["formula"]: |
| spec["candidates"] = [norm["formula"]] |
| trace.add("[PHASE1] candidates were empty; injected from extracted formula.") |
|
|
| |
| if not spec["candidates"] and norm["formula"]: |
| spec["candidates"] = [norm["formula"]] |
| trace.add("[PHASE1] candidates were empty; injected from extracted formula.") |
|
|
| trace.add(f"[PHASE1] Problem decomposed. Domain hint: {spec['domain']}") |
|
|
| |
| limits = {"time_ms": 10000} |
| solver_res = self.registry.solve(norm["normalized"], spec, limits) |
| |
| if solver_res.status in ("proved", "disproved", "likely_true"): |
| trace.add(f"[PHASE S] Solved by {solver_res.status} logic. Answer: {solver_res.answer}") |
| |
| ranked = [] |
| |
| for f in spec["candidates"]: |
| status = "unknown" |
| ce = None |
| audit = ["Phase S: Registry Process"] |
| |
| if "results" in solver_res.evidence: |
| for r in solver_res.evidence["results"]: |
| if r["formula"] == f: |
| status = "valid" if r["status"] == "proved" else "invalid" |
| ce = r["evidence"].get("counterexample") |
| break |
| elif solver_res.status == "proved" and f in solver_res.answer: |
| status = "valid" |
| |
| ranked.append(CandidateEval( |
| formula=f, |
| status=status, |
| energy=0 if status == "valid" else 100, |
| energy_breakdown={}, |
| diffs=["diff:counterexample_found"] if status == "invalid" else [], |
| counterexample=ce, |
| audit=audit |
| )) |
| |
| best_valid = [c.formula for c in ranked if c.status == "valid"] |
| |
| evidence_map_serializable = { |
| tag: [{"start": s.start, "end": s.end, "text": s.text} for s in spans] |
| for tag, spans in getattr(spec_obj, 'evidence_map', {}).items() |
| } |
|
|
| return SolveResult( |
| ok=True, |
| assumptions=spec["assumptions"], |
| ranked=ranked, |
| best_valid=best_valid, |
| trace=trace.lines, |
| evidence_map=evidence_map_serializable |
| ) |
|
|
| |
| trace.add("[PHASE1] No definitive answer from registry. Falling back to Beam Search.") |
| beam_results = run_beam_parallel( |
| formulas=spec["candidates"], |
| atoms=spec["atoms"], |
| assumptions=spec["assumptions"], |
| tactics_db=self.tactics |
| ) |
|
|
| ranked = [] |
| for br in beam_results: |
| raw_status = br.final_status |
| status = "invalid" if raw_status == "invalid" else "unknown" |
| ce = br.best_counterexample |
| ranked.append(CandidateEval( |
| formula=br.formula, |
| status=status, |
| energy=0 if status == "valid" else 100, |
| energy_breakdown={}, |
| diffs=["diff:counterexample_found"] if status == "invalid" else [], |
| counterexample=ce, |
| audit=[o.tactic_id for o in br.outcomes] if br.outcomes else [] |
| )) |
|
|
| ranked = sorted(ranked, key=lambda x: (x.energy, 0 if x.status == "valid" else 1)) |
| best_valid = [c.formula for c in ranked if c.status == "valid"] |
|
|
| evidence_map_serializable = { |
| tag: [{"start": s.start, "end": s.end, "text": s.text} for s in spans] |
| for tag, spans in getattr(spec_obj, 'evidence_map', {}).items() |
| } |
|
|
| return SolveResult( |
| ok=True, |
| assumptions=spec["assumptions"], |
| ranked=ranked, |
| best_valid=best_valid, |
| trace=trace.lines, |
| evidence_map=evidence_map_serializable |
| ) |
|
|
| |
| AVHEngine = MathEngine |
|
|