Spaces:
Sleeping
Sleeping
| """ | |
| inference.py | |
| ------------ | |
| Official submission inference script for the Data Cleaning Pipeline environment. | |
| Environment variables: | |
| API_BASE_URL LLM endpoint. Default: HuggingFace free router. | |
| MODEL_NAME Model to use. Default: Qwen/Qwen2.5-72B-Instruct (free). | |
| HF_TOKEN Your HuggingFace token (hf_...). | |
| ENV_BASE_URL The running environment URL. | |
| Set this to your HuggingFace Space URL, e.g.: | |
| https://CodeKnightDebjit-data-cleaning-env.hf.space | |
| NOTE: Do NOT use LOCAL_IMAGE_NAME / from_docker_image() in submitted scripts. | |
| The evaluator machine does not have your local Docker image β it connects to | |
| your live HF Space via ENV_BASE_URL. | |
| STDOUT FORMAT (evaluator parses these exactly): | |
| [START] task=<n> env=<benchmark> model=<model> | |
| [STEP] step=<n> action=<str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<0.00> rewards=<r1,r2,...> | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from typing import Any, Dict, List, Optional | |
| from openai import OpenAI | |
| try: | |
| from client import DataCleaningEnv | |
| from models import CleanAction, MAX_STEPS, DONE_THRESHOLD | |
| except ImportError: | |
| sys.path.insert(0, os.path.dirname(__file__)) | |
| from client import DataCleaningEnv | |
| from models import CleanAction, MAX_STEPS, DONE_THRESHOLD | |
| # ββ Configuration ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ENV_BASE_URL must point to your live HuggingFace Space. | |
| # The evaluator sets this automatically when it runs your script. | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| ENV_BASE_URL = os.getenv("ENV_BASE_URL", "https://CodeKnightDebjit-data-cleaning-env.hf.space") | |
| BENCHMARK = "data_cleaning_env" | |
| TASK_IDS = ["easy", "medium", "hard"] | |
| STEP_LIMITS = {"easy": 25, "medium": 50, "hard": 80} | |
| # ββ System prompt ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SYSTEM_PROMPT = """You are a deterministic data cleaning agent. | |
| Your task is to clean a dataset step-by-step using valid actions. | |
| You are operating inside an environment with strict rules. | |
| -------------------------------------------------- | |
| ## INPUT PROVIDED EACH STEP | |
| You will receive: | |
| 1. Column schema (LIST OF VALID COLUMN NAMES β CASE SENSITIVE) | |
| 2. Column status: | |
| - missing values count | |
| - whether standardized (true/false) | |
| 3. Remaining issues (global state) | |
| 4. Previous actions taken | |
| -------------------------------------------------- | |
| ## OBJECTIVE | |
| Fully clean the dataset with MINIMUM steps. | |
| A dataset is CLEAN only if: | |
| - No missing values remain | |
| - All columns are standardized | |
| - No invalid formats exist | |
| -------------------------------------------------- | |
| ## STRICT RULES (MUST FOLLOW) | |
| ### 1. NEVER TERMINATE EARLY | |
| You MUST NOT output DONE unless: | |
| - ALL columns have missing = 0 | |
| - ALL columns have standardized = true | |
| - remaining_issues is empty | |
| If ANY issue remains β DO NOT output DONE. | |
| -------------------------------------------------- | |
| ### 2. USE ONLY VALID COLUMNS | |
| - You MUST use EXACT column names from schema | |
| - Column names are CASE SENSITIVE | |
| - NEVER invent new column names | |
| -------------------------------------------------- | |
| ### 3. PRIORITIZE COLUMN-LEVEL ACTIONS | |
| Preferred actions: | |
| - FILL_MISSING (fixes entire column) | |
| - STANDARDIZE_COL (fixes formatting) | |
| Avoid: | |
| - SET_VALUE (only for single isolated errors) | |
| NEVER fix a full column using repeated SET_VALUE. | |
| -------------------------------------------------- | |
| ### 4. DO NOT REPEAT ACTIONS | |
| - Do NOT apply the same action repeatedly on the same column | |
| - Do NOT standardize an already standardized column | |
| - Do NOT fill missing if missing = 0 | |
| -------------------------------------------------- | |
| ### 5. AVOID DESTRUCTIVE ACTIONS | |
| - DROP_ROW should be used ONLY when absolutely necessary | |
| -------------------------------------------------- | |
| ## OUTPUT FORMAT (STRICT JSON ONLY) | |
| Return ONLY one of these β no explanation, no markdown: | |
| {"action": "FILL_MISSING", "column": "<col>", "strategy": "<mean|median|mode>"} | |
| {"action": "STANDARDIZE_COL", "column": "<col>"} | |
| {"action": "SET_VALUE", "column": "<col>", "row": <int>, "value": "<str>"} | |
| {"action": "DROP_ROW", "row": <int>} | |
| {"action": "DONE"} | |
| -------------------------------------------------- | |
| ## FAILURE CONDITIONS (AVOID THESE) | |
| - DONE prematurely β penalty -1.0 | |
| - Invalid column names β action fails | |
| - Repeated same action β wasted step | |
| -------------------------------------------------- | |
| Every step must move the dataset closer to a fully clean state.""" | |
| # ββ Official log format ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, | |
| error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| print( | |
| f"[STEP] step={step} action={action[:80].replace(chr(10), ' ')} " | |
| f"reward={reward:.2f} done={str(done).lower()} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, | |
| rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} " | |
| f"score={score:.2f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # ββ Prompt builder βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _col_status_block(column_status: Dict[str, Any]) -> str: | |
| if not column_status: | |
| return " (not available)" | |
| lines = [] | |
| for col, s in column_status.items(): | |
| missing = s.get("missing", 0) | |
| standardized = s.get("standardized", True) | |
| issues = s.get("issues", []) | |
| flag = "OK" if (missing == 0 and standardized) else "NEEDS_FIX" | |
| issue_str = ", ".join(issues) if issues else "" | |
| lines.append( | |
| f" {col:<26} missing={missing:<3} standardized={str(standardized).lower():<5}" | |
| + (f" issues=[{issue_str}]" if issue_str else "") | |
| + f" β {flag}" | |
| ) | |
| return "\n".join(lines) | |
| def build_user_prompt(obs, history: List[str]) -> str: | |
| col_status: Dict[str, Any] = getattr(obs, "column_status", {}) | |
| valid_columns = list(col_status.keys()) | |
| broken = [c for c, s in col_status.items() | |
| if s.get("missing", 0) > 0 or not s.get("standardized", True)] | |
| rows = obs.dirty_csv.strip().split("\n") | |
| preview = "\n".join(rows[:21]) | |
| all_clean = len(broken) == 0 | |
| done_hint = ( | |
| "ALL columns clean β you MAY output DONE" | |
| if all_clean else | |
| f"{len(broken)} column(s) still broken β DO NOT output DONE" | |
| ) | |
| history_block = "\n".join(f" {h}" for h in history[-6:]) if history else " none" | |
| return f"""-------------------------------------------------- | |
| ## COLUMN SCHEMA (EXACT CASE-SENSITIVE NAMES β USE THESE EXACTLY) | |
| {chr(10).join(f' - {c}' for c in valid_columns)} | |
| -------------------------------------------------- | |
| ## COLUMN STATUS | |
| {_col_status_block(col_status)} | |
| -------------------------------------------------- | |
| ## GLOBAL STATE | |
| Task: {obs.task_id} | |
| Step: {obs.step_number} / {obs.max_steps} | |
| Score: {obs.current_score:.4f} (need >= {DONE_THRESHOLD[obs.task_id]:.2f}) | |
| Remaining issues: {obs.issues_remaining} | |
| Broken columns: {broken} | |
| DONE status: {done_hint} | |
| -------------------------------------------------- | |
| ## SCHEMA HINT | |
| {obs.schema_hint} | |
| -------------------------------------------------- | |
| ## CSV PREVIEW (first 20 rows) | |
| {preview} | |
| -------------------------------------------------- | |
| ## PREVIOUS ACTIONS | |
| {history_block} | |
| -------------------------------------------------- | |
| Return ONLY valid JSON β no explanation, no markdown.""" | |
| # ββ Action parsing βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| COMMAND_MAP = { | |
| "FILL_MISSING": "FILL_MISSING", | |
| "STANDARDIZE_COL": "STANDARDIZE_COL", | |
| "STANDARDIZE": "STANDARDIZE_COL", | |
| "SET_VALUE": "SET_VALUE", | |
| "DROP_ROW": "DROP_ROW", | |
| "DROP": "DROP_ROW", | |
| } | |
| VALID_STRATEGIES = {"mean", "median", "mode", "drop"} | |
| def parse_action(raw: str, valid_columns: List[str]) -> CleanAction: | |
| text = raw.strip() | |
| if text.startswith("```"): | |
| lines = text.split("\n") | |
| inner = lines[1:-1] if lines[-1].strip().startswith("```") else lines[1:] | |
| text = "\n".join(inner).strip() | |
| m = re.search(r"\{[^{}]*\}", text, re.DOTALL) | |
| if not m: | |
| return CleanAction(command="DONE") | |
| try: | |
| data: Dict[str, Any] = json.loads(m.group()) | |
| except json.JSONDecodeError: | |
| return CleanAction(command="DONE") | |
| action_raw = str(data.get("action", "DONE")).strip().upper().replace(" ", "_") | |
| if action_raw == "DONE": | |
| return CleanAction(command="DONE") | |
| command = COMMAND_MAP.get(action_raw) | |
| if command is None: | |
| return CleanAction(command="DONE") | |
| # Validate column name (case-sensitive, with case-insensitive fallback) | |
| column = data.get("column") | |
| if column is not None and valid_columns: | |
| if column not in valid_columns: | |
| col_lower = {c.lower(): c for c in valid_columns} | |
| column = col_lower.get(str(column).lower()) # None if no match | |
| # strategy β fill_strategy | |
| fill_strategy = data.get("strategy") or data.get("fill_strategy") | |
| if fill_strategy and str(fill_strategy).lower() not in VALID_STRATEGIES: | |
| fill_strategy = "median" | |
| # row β row_index | |
| row_raw = data.get("row") if data.get("row") is not None else data.get("row_index") | |
| row_index = None | |
| if row_raw is not None: | |
| try: | |
| row_index = int(row_raw) | |
| except (TypeError, ValueError): | |
| pass | |
| value = data.get("value") | |
| try: | |
| return CleanAction( | |
| command = command, | |
| column = column, | |
| fill_strategy = fill_strategy, | |
| row_index = row_index, | |
| value = str(value) if value is not None else None, | |
| ) | |
| except Exception: | |
| return CleanAction(command="DONE") | |
| def call_llm(client: OpenAI, messages: list) -> str: | |
| response = client.chat.completions.create( | |
| model = MODEL_NAME, | |
| messages = messages, | |
| max_tokens = 100, | |
| temperature = 0.0, | |
| ) | |
| return (response.choices[0].message.content or "").strip() | |
| # ββ Episode runner βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_episode(env, client: OpenAI, task_id: str) -> dict: | |
| max_steps = STEP_LIMITS[task_id] | |
| threshold = DONE_THRESHOLD[task_id] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| history: List[str] = [] | |
| log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| result = await env.reset(task_id=task_id) | |
| obs = result.observation | |
| valid_columns: List[str] = list(getattr(obs, "column_status", {}).keys()) | |
| messages = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| for step in range(1, max_steps + 1): | |
| if obs.done: | |
| break | |
| steps_taken = step | |
| messages.append({"role": "user", "content": build_user_prompt(obs, history)}) | |
| try: | |
| raw = call_llm(client, messages) | |
| action = parse_action(raw, valid_columns) | |
| messages.append({"role": "assistant", "content": raw}) | |
| except Exception as exc: | |
| log_step(step, "DONE", 0.00, True, str(exc)[:120]) | |
| rewards.append(0.0) | |
| break | |
| # Keep system + last 10 turns inside free-tier context limit | |
| if len(messages) > 21: | |
| messages = [messages[0]] + messages[-20:] | |
| result = await env.step(action) | |
| obs = result.observation | |
| if getattr(obs, "column_status", {}): | |
| valid_columns = list(obs.column_status.keys()) | |
| reward = result.reward or 0.0 | |
| rewards.append(reward) | |
| score = obs.current_score | |
| log_step( | |
| step = step, | |
| action = action.command, | |
| reward = reward, | |
| done = obs.done, | |
| error = obs.last_action_error, | |
| ) | |
| parts = [f"step {step}: {action.command}"] | |
| if action.column: parts.append(f"col={action.column}") | |
| if action.fill_strategy: parts.append(f"strategy={action.fill_strategy}") | |
| parts.append(f"score={score:.4f}") | |
| if obs.last_action_error: | |
| parts.append(f"[BLOCKED: {obs.last_action_error[:60]}]") | |
| history.append(" ".join(parts)) | |
| if obs.done or score >= threshold: | |
| break | |
| success = score >= threshold | |
| except Exception as episode_err: | |
| # Catch-all so [END] is always emitted even if the episode crashes | |
| print(f"[DEBUG] Episode error: {episode_err}", flush=True) | |
| log_end(success=False, steps=steps_taken, score=score, rewards=rewards) | |
| return {"task_id": task_id, "score": score, "reward": sum(rewards), | |
| "steps": steps_taken, "success": False} | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| return {"task_id": task_id, "score": score, "reward": sum(rewards), | |
| "steps": steps_taken, "success": success} | |
| # ββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def main() -> None: | |
| if not HF_TOKEN: | |
| print( | |
| "ERROR: HF_TOKEN is not set.\n" | |
| "1. Go to https://huggingface.co/settings/tokens\n" | |
| "2. Create a Read token and copy it\n" | |
| "3. Set it: $env:HF_TOKEN='hf_xxxxxxxxxxxx' (PowerShell)\n" | |
| " export HF_TOKEN='hf_xxxxxxxxxxxx' (bash)\n" | |
| "4. Run: python inference.py", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| print(f"API_BASE_URL : {API_BASE_URL}", flush=True) | |
| print(f"MODEL_NAME : {MODEL_NAME}", flush=True) | |
| print(f"ENV_BASE_URL : {ENV_BASE_URL}", flush=True) | |
| print("", flush=True) | |
| llm = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| # Always connect via URL β no Docker on the evaluator machine | |
| env = DataCleaningEnv(base_url=ENV_BASE_URL) | |
| await env.connect() | |
| results = [] | |
| try: | |
| for task_id in TASK_IDS: | |
| summary = await run_episode(env, llm, task_id) | |
| results.append(summary) | |
| print("", flush=True) | |
| finally: | |
| try: | |
| await env.close() | |
| except Exception: | |
| pass | |
| print("=" * 56, flush=True) | |
| print(f"{'Task':<10} {'Score':>7} {'Reward':>9} {'Steps':>6} {'Pass':>5}") | |
| print("-" * 56, flush=True) | |
| for r in results: | |
| print( | |
| f"{r['task_id']:<10} {r['score']:>7.4f} {r['reward']:>9.4f} " | |
| f"{r['steps']:>6} {'YES' if r['success'] else 'NO':>4}", | |
| flush=True, | |
| ) | |
| print("=" * 56, flush=True) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |