| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional, Set, Tuple, Callable |
|
|
| from patch_proof import ( |
| Patch, |
| apply_patch_to_counterexample, |
| check_assumption_satisfied, |
| recheck_formula_with_model_search, |
| ) |
|
|
| Edge = Tuple[int, int] |
|
|
|
|
| @dataclass |
| class PatchCandidate: |
| patch: Patch |
| patch_size: int |
| assumption_satisfied: bool |
| still_counterexample: Optional[bool] |
| note: str |
|
|
|
|
| def _edges_of_worlds(worlds: int) -> List[Edge]: |
| """世界数Nに対して取りうる全辺候補。MVPは全組み合わせ。""" |
| return [(i, j) for i in range(worlds) for j in range(worlds)] |
|
|
|
|
| def _patch_size(p: Patch) -> int: |
| return len(p.add_edges) + len(p.remove_edges) |
|
|
|
|
| def _merge_patch(a: Patch, b: Patch) -> Patch: |
| |
| add = set(a.add_edges) | set(b.add_edges) |
| rem = set(a.remove_edges) | set(b.remove_edges) |
| |
| add = add - rem |
| return Patch(add_edges=sorted(add), remove_edges=sorted(rem)) |
|
|
|
|
| |
| |
| |
|
|
| def candidate_patches_for_assumption(worlds: int, assumption: str, max_extra: int = 2) -> List[Patch]: |
| """ |
| 「assumption を満たす方向」のパッチ候補を生成する。 |
| max_extra: “最小っぽい候補”を少しだけ増やすための緩さ。 |
| """ |
| all_edges = _edges_of_worlds(worlds) |
|
|
| if assumption == "assume:reflexive": |
| |
| base = Patch(add_edges=[(i, i) for i in range(worlds)], remove_edges=[]) |
| |
| return [base] |
|
|
| if assumption == "assume:symmetric": |
| |
| |
| |
| |
| return [Patch(add_edges=[], remove_edges=[])] |
|
|
| if assumption == "assume:transitive": |
| |
| |
| return [Patch(add_edges=[], remove_edges=[])] |
|
|
| if assumption == "assume:euclidean": |
| |
| return [Patch(add_edges=[], remove_edges=[])] |
|
|
| return [] |
|
|
|
|
| def patch_to_satisfy_assumption_minimal( |
| cex: Dict[str, Any], |
| assumption: str, |
| limit_k: int = 3, |
| ) -> List[Patch]: |
| """ |
| 現反例モデル cex を、assumption を満たすように最小変更で修正する候補を返す。 |
| MVP: |
| - reflexive: 必要な self-loop を全部追加(定義上それが最小) |
| - symmetric/transitive/euclidean: “不足している辺だけ add” を greedily 作る |
| """ |
| worlds = int(cex.get("n_worlds") or cex.get("worlds") or 0) |
| |
| raw_edges = cex.get("edges") or [] |
| edges = set() |
| for e in raw_edges: |
| if len(e) >= 2: |
| edges.add((int(e[0]), int(e[1]))) |
|
|
| if assumption == "assume:reflexive": |
| add = [(i, i) for i in range(worlds) if (i, i) not in edges] |
| return [Patch(add_edges=add, remove_edges=[])] |
|
|
| if assumption == "assume:symmetric": |
| add: List[Edge] = [] |
| for (a, b) in list(edges): |
| if (b, a) not in edges: |
| add.append((b, a)) |
| |
| if len(add) > limit_k: |
| add = add[:limit_k] |
| return [Patch(add_edges=add, remove_edges=[])] |
|
|
| if assumption == "assume:transitive": |
| |
| missing: List[Edge] = [] |
| for (x, y) in edges: |
| for (y2, z) in edges: |
| if y2 == y and (x, z) not in edges: |
| missing.append((x, z)) |
| |
| missing = list(dict.fromkeys(missing)) |
| if not missing: |
| return [Patch(add_edges=[], remove_edges=[])] |
| |
| out = [Patch(add_edges=[e], remove_edges=[]) for e in missing[:limit_k]] |
| out.append(Patch(add_edges=missing[:limit_k], remove_edges=[])) |
| return out |
|
|
| if assumption == "assume:euclidean": |
| missing: List[Edge] = [] |
| for (a, b) in edges: |
| for (a2, c) in edges: |
| if a2 == a and (b, c) not in edges: |
| missing.append((b, c)) |
| missing = list(dict.fromkeys(missing)) |
| if not missing: |
| return [Patch(add_edges=[], remove_edges=[])] |
| out = [Patch(add_edges=[e], remove_edges=[]) for e in missing[:limit_k]] |
| out.append(Patch(add_edges=missing[:limit_k], remove_edges=[])) |
| return out |
|
|
| return [] |
|
|
|
|
| |
| |
| |
|
|
| def find_minimal_patches( |
| model_checker_fn: Callable[[str, Dict[str, Any]], bool], |
| formula: str, |
| base_assumptions: List[str], |
| cex: Dict[str, Any], |
| goal_assumption: str, |
| limit_k: int = 3, |
| max_results: int = 5, |
| ) -> List[Dict[str, Any]]: |
| """ |
| goal_assumption を満たしつつ、同じ反例モデルがもう反例でなくなるパッチを列挙。 |
| limit_k: transitive/euclidean の欠けedge候補の探索幅 |
| """ |
| |
| patches = patch_to_satisfy_assumption_minimal(cex, goal_assumption, limit_k=limit_k) |
|
|
| results: List[PatchCandidate] = [] |
|
|
| for p in patches: |
| patched = apply_patch_to_counterexample(cex, p) |
| satisfied = check_assumption_satisfied(patched, goal_assumption) |
|
|
| recheck = recheck_formula_with_model_search( |
| model_checker_fn=model_checker_fn, |
| formula=formula, |
| assumptions=(base_assumptions + [goal_assumption]), |
| patched_cex=patched, |
| ) |
|
|
| results.append(PatchCandidate( |
| patch=p, |
| patch_size=_patch_size(p), |
| assumption_satisfied=satisfied, |
| still_counterexample=recheck.get("still_counterexample"), |
| note=recheck.get("note", ""), |
| )) |
|
|
| |
| |
| def key(pc: PatchCandidate): |
| |
| |
| |
| |
| if not pc.assumption_satisfied: |
| rank = 2 |
| elif pc.still_counterexample is False: |
| rank = 0 |
| else: |
| rank = 1 |
| return (rank, pc.patch_size) |
|
|
| results.sort(key=key) |
|
|
| out: List[Dict[str, Any]] = [] |
| for pc in results[:max_results]: |
| out.append({ |
| "goal": f"make_assumption_true: {goal_assumption}", |
| "patch": { |
| "add_edges": [list(e) for e in pc.patch.add_edges], |
| "remove_edges": [list(e) for e in pc.patch.remove_edges], |
| }, |
| "patch_size": pc.patch_size, |
| "assumption_satisfied": pc.assumption_satisfied, |
| "still_counterexample": pc.still_counterexample, |
| "note": pc.note, |
| }) |
| return out |