Spaces:
Sleeping
title: Data Cleaning Environment
emoji: π§Ή
colorFrom: blue
colorTo: purple
sdk: docker
app_port: 7860
base_path: /web
π§Ή Data Cleaning Environment
A Reinforcement Learning Benchmark for Autonomous Data Cleaning Agents
An OpenEnv-compatible reinforcement learning environment where an LLM agent receives a dirty CSV dataset and must autonomously fix type errors, outliers, missing values, and schema inconsistencies to match a hidden ground truth β step by step.
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Dirty CSV β Agent Observes β Issues CleanAction β Reward β
β β
β "N/A" β FILL_MISSING(median) β Score β β +0.12 reward β
β "2099" β SET_VALUE(row=3,"2024-01-15") β Score β β +0.08 β
β " bob" β STANDARDIZE_COL("name") β Score β β +0.05 β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Table of Contents
- Overview
- Architecture
- Project Structure
- Tasks
- Action Space
- Observation Space
- Reward Function
- Quick Start
- Running Inference
- Environment API
- Configuration
- Deployment
- Development & Testing
- Troubleshooting
π Overview
The Data Cleaning Environment is a structured RL benchmark where an LLM-powered agent must clean tabular datasets. The environment wraps a FastAPI WebSocket server following the OpenEnv protocol, making it compatible with any OpenEnv-based training or evaluation framework.
Why This Matters
Real-world data pipelines spend 60β80% of their time on data cleaning. This environment trains agents to:
- Detect type errors, outliers, missing values, and schema inconsistencies
- Reason about which fix is most impactful at each step
- Self-correct from informative error feedback
- Terminate efficiently without over-cleaning
Key Properties
| Property | Value |
|---|---|
| Protocol | OpenEnv (WebSocket + HTTP) |
| Action Space | Discrete (5 command types) |
| Observation | Full CSV state + grader feedback |
| Episode Structure | Reset β N Γ Step β Done |
| Concurrency | β Multiple simultaneous sessions |
| State Management | Server-side, fully isolated per session |
ποΈ Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Agent (LLM / RL Policy) β
β Qwen2.5-72B / Mistral / Custom Model β
ββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ¬βββββββββββββ
β CleanAction (JSON) β CleanObservation
βΌ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ΄βββββββββββββ
β DataCleaningEnv (client.py) β
β OpenEnv EnvClient[CleanAction, CleanObservation, dict] β
β WebSocket persistent connection β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββ
β WebSocket /ws
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FastAPI Server (server/app.py) β
β HTTP + WebSocket endpoints, sessions β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DataCleaningEnvironment (server/data_cleaning_env.py) β
β β
β βββββββββββββββ ββββββββββββββββ βββββββββββββ ββββββββββββββ β
β β dataset_ β β Action β β Grader β β Reward β β
β β factory.py β β Dispatcher β β Engine β β Computer β β
β β β β SET_VALUE β β grade() β β β β
β β easy/medium β β DROP_ROW β β score β β progress β β
β β /hard CSVs β β STANDARD. β β delta β β efficiencyβ β
β β β β FILL_MISS. β β β β penalties β β
β βββββββββββββββ ββββββββββββββββ βββββββββββββ ββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Project Structure
data_cleaning_env/
β
βββ π client.py # DataCleaningEnv β OpenEnv client
βββ π models.py # CleanAction, CleanObservation, CleanState (Pydantic)
βββ π inference.py # Official evaluation entry point
βββ π dataset_factory.py # Generates easy/medium/hard dirtyβclean CSV pairs
βββ π graders.py # Scoring engine β grade(agent_df vs clean_df)
βββ π openenv.yaml # OpenEnv manifest (HuggingFace Spaces config)
βββ π pyproject.toml # Project metadata and dependencies
β
βββ server/
βββ π app.py # FastAPI application (HTTP + WebSocket)
βββ π data_cleaning_env.py # Core environment logic (reset/step/state)
βββ π __init__.py
βββ π Dockerfile # Container image definition
π― Tasks
The environment ships three progressively harder tasks, each with fixed-seed deterministic datasets:
π’ Easy β Sales Orders
| Property | Value |
|---|---|
| Dataset | ~100-row sales orders CSV |
| Dirty Issues | Cell-level type errors, a few missing values |
| Step Budget | 40 steps |
| Success Threshold | Score β₯ 0.95 |
| Primary Skills | SET_VALUE, FILL_MISSING |
What the agent needs to fix: Individual cells with wrong types (e.g., "N/A" in a price column, "abc" in a numeric field). Straightforward injected errors with clear ground truth.
π‘ Medium β Financial Transactions
| Property | Value |
|---|---|
| Dataset | ~200-row transaction log |
| Dirty Issues | Outlier rows, mixed date formats, missing amounts |
| Step Budget | 80 steps |
| Success Threshold | Score β₯ 0.85 |
| Primary Skills | DROP_ROW, STANDARDIZE_COL, FILL_MISSING |
What the agent needs to fix: Statistical outliers disguised as data, inconsistent date formats, missing numeric values. Crucially, some extreme values are valid β dropping them costs a false-positive penalty.
π΄ Hard β Multi-Schema Dataset
| Property | Value |
|---|---|
| Dataset | ~400-row multi-domain CSV |
| Dirty Issues | Cross-column inconsistencies, future-year dates, bulk missing data |
| Step Budget | 150 steps |
| Success Threshold | Score β₯ 0.80 |
| Primary Skills | All commands |
What the agent needs to fix: Everything from easy + medium, plus cascading schema issues across columns. Requires strategic planning about fix order.
πΉοΈ Action Space
Every step the agent sends exactly one CleanAction:
from models import CleanAction
# Fix a specific cell
CleanAction(command="SET_VALUE", row_index=3, column="price", value="29.99")
# Remove an entire row (use carefully β false positives are penalised)
CleanAction(command="DROP_ROW", row_index=17)
# Normalise a column's format (dates β YYYY-MM-DD, numbers β float, strings β stripped)
CleanAction(command="STANDARDIZE_COL", column="order_date")
# Fill all NaN values in a column using a strategy
CleanAction(command="FILL_MISSING", column="quantity", fill_strategy="median")
# Signal episode completion (only accepted when score β₯ task threshold)
CleanAction(command="DONE")
Command Reference
| Command | row_index |
column |
value |
fill_strategy |
|---|---|---|---|---|
SET_VALUE |
β required | β required | β required | β |
DROP_ROW |
β required | β | β | β |
STANDARDIZE_COL |
β | β required | β | β |
FILL_MISSING |
β | β required | β | β required |
DONE |
β | β | β | β |
FILL_MISSING Strategies
| Strategy | Behaviour |
|---|---|
"mean" |
Replace NaN with column mean (numeric columns only) |
"median" |
Replace NaN with column median (numeric columns only) |
"mode" |
Replace NaN with most frequent value (any column) |
"drop" |
Remove rows where this column is NaN |
β οΈ Important:
DROP_ROWremoves by positional row index (therow_indexcolumn in the CSV), not by a row ID field. Row indices shift after each drop.
ποΈ Observation Space
After every reset() and step(), the agent receives a CleanObservation:
@dataclass
class CleanObservation:
# ββ Task context (constant per episode) ββββββββββββββββββββββ
task_id: str # "easy" | "medium" | "hard"
schema_hint: str # Plain-English description of clean schema
initial_dirty_cells: int # Total dirty cells at episode start
# ββ Per-step state βββββββββββββββββββββββββββββββββββββββββββ
dirty_csv: str # Full current CSV as string (all edits applied)
current_score: float # 0.0 β 1.0 (grader score vs ground truth)
issues_remaining: int # Approximate dirty cells still to fix
step_number: int # Steps taken so far
max_steps: int # Budget for this task
# ββ Last-action feedback βββββββββββββββββββββββββββββββββββββ
last_action_success: bool # Whether previous action applied cleanly
last_action_error: str # Error message if success=False (else None)
# ββ Inherited ββββββββββββββββββββββββββββββββββββββββββββββββ
done: bool # True = episode ended
reward: float | None # Per-step reward (None after reset)
Score Computation
The grader compares the agent's working DataFrame to the hidden ground-truth DataFrame:
score = (initial_dirty_cells - remaining_dirty_cells) / initial_dirty_cells
A score of 1.0 means perfect agreement with ground truth.
π° Reward Function
The reward is dense and shaped to guide efficient, precise cleaning:
reward = progress_term
+ efficiency_bonus
+ false_positive_penalty
+ early_done_penalty
+ step_cost
| Component | Value | When |
|---|---|---|
| Progress | current_score β previous_score |
Every step |
| Efficiency bonus | +0.10 Γ (1 β steps_used/max_steps) |
Only when task is solved this step |
| False-positive penalty | β0.15 |
DROP_ROW removes a valid-extreme row (medium task) |
| Early DONE penalty | β0.20 |
DONE called with score < 0.60 |
| Step cost | β0.005 |
Every step (discourages padding) |
| Premature DONE block | β1.00 |
DONE below task threshold β episode continues |
Reward range: [β0.5, +1.0] (clipped)
Termination Logic
The episode terminates when any of these is true:
- β
current_score >= task_threshold(auto-terminated, efficiency bonus awarded) - β
Agent sends
DONEandcurrent_score >= task_threshold(accepted) - β±οΈ
step_count >= max_steps(budget exhausted)
DONE is refused if the score is below threshold β the episode continues with a β1.0 reward signal.
π Quick Start
Prerequisites
- Python 3.12+
- Docker Desktop (for containerised server)
- A free HuggingFace token (for the inference LLM)
1. Clone & Install
git clone https://github.com/Code-Knight-Debjit/Data-Cleaning-Environment.git
cd Data-Cleaning-Environment
# Create virtual environment
python -m venv .venv
# Activate (Windows PowerShell)
.venv\Scripts\Activate.ps1
# Activate (macOS/Linux)
source .venv/bin/activate
# Install dependencies
pip install -e .
2. Build the Docker Image
docker build -t openenv-data_cleaning:latest -f server/Dockerfile .
3. Set Your HuggingFace Token
# Windows PowerShell
$env:HF_TOKEN = "hf_your_token_here"
# macOS / Linux
export HF_TOKEN="hf_your_token_here"
4. Run Inference
python inference.py
That's it! The script auto-starts the Docker container, runs the LLM agent through all three tasks (easy β medium β hard), and prints structured evaluation logs.
π€ Running Inference
Environment Variables
| Variable | Default | Description |
|---|---|---|
HF_TOKEN |
(required) | Your HuggingFace token for LLM API access |
API_BASE_URL |
https://router.huggingface.co/v1 |
LLM API endpoint |
MODEL_NAME |
Qwen/Qwen2.5-72B-Instruct |
Model to use for inference |
LOCAL_IMAGE_NAME |
openenv-data_cleaning:latest |
Docker image to launch |
ENV_BASE_URL |
http://localhost:8000 |
Direct server URL (if not using Docker) |
Switching Models
# Use Mistral (smaller, faster)
$env:MODEL_NAME = "mistralai/Mistral-7B-Instruct-v0.3"
# Use Llama
$env:MODEL_NAME = "meta-llama/Llama-3.1-8B-Instruct"
Connecting to a Running Server (skip Docker)
$env:LOCAL_IMAGE_NAME = "" # must be empty string
$env:ENV_BASE_URL = "http://localhost:8000"
python inference.py
Expected Output
API_BASE_URL : https://router.huggingface.co/v1
MODEL_NAME : Qwen/Qwen2.5-72B-Instruct
LOCAL_IMAGE_NAME : openenv-data_cleaning:latest
ENV_BASE_URL : http://localhost:8000
[START] task=easy env=data_cleaning_env model=Qwen/Qwen2.5-72B-Instruct
[STEP] step=1 action=FILL_MISSING reward=0.12 done=false error=null
[STEP] step=2 action=SET_VALUE reward=0.08 done=false error=null
[STEP] step=3 action=STANDARDIZE_COL reward=0.05 done=false error=null
...
[END] success=true steps=18 score=0.97 rewards=0.12,0.08,...
[START] task=medium env=data_cleaning_env ...
...
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Task Score Reward Steps Pass
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
easy 0.9712 1.3400 18 YES
medium 0.8823 2.1100 47 YES
hard 0.7640 1.8500 98 NO
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Environment API
Using the Python Client Directly
import asyncio
from client import DataCleaningEnv
from models import CleanAction
async def run():
# Option A: Auto-start Docker container
env = await DataCleaningEnv.from_docker_image("openenv-data_cleaning:latest")
# Option B: Connect to an already-running server
# env = DataCleaningEnv(base_url="http://localhost:8000")
# await env.connect()
try:
# Reset for a specific task
result = await env.reset(task_id="easy")
obs = result.observation
print(f"Score: {obs.current_score:.4f}")
print(f"Issues: {obs.issues_remaining}")
print(f"Schema: {obs.schema_hint}")
# Take a step
action = CleanAction(
command="FILL_MISSING",
column="price",
fill_strategy="median"
)
result = await env.step(action)
obs = result.observation
print(f"Reward: {result.reward:.4f}")
print(f"New score: {obs.current_score:.4f}")
print(f"Action OK: {obs.last_action_success}")
# Signal completion
result = await env.step(CleanAction(command="DONE"))
finally:
await env.close()
asyncio.run(run())
Using the Sync Wrapper
from client import DataCleaningEnv
from models import CleanAction
env = DataCleaningEnv(base_url="http://localhost:8000").sync()
with env:
result = env.reset(task_id="easy")
result = env.step(CleanAction(command="STANDARDIZE_COL", column="order_date"))
print(f"Score: {result.observation.current_score:.4f}")
HTTP Endpoints
When the server is running, the following HTTP endpoints are available:
| Endpoint | Method | Description |
|---|---|---|
/health |
GET | Server health check |
/docs |
GET | Swagger / OpenAPI documentation |
/web |
GET | Interactive web UI |
/ws |
WebSocket | Persistent session endpoint |
βοΈ Configuration
Step Budgets
MAX_STEPS = {
"easy": 40,
"medium": 80,
"hard": 150,
}
Success Thresholds
DONE_THRESHOLD = {
"easy": 0.95,
"medium": 0.85,
"hard": 0.80,
}
Reward Constants
| Constant | Value | Purpose |
|---|---|---|
STEP_COST |
-0.005 |
Per-step penalty to discourage padding |
EARLY_DONE_PENALTY |
-0.20 |
Penalty for DONE below score 0.60 |
EARLY_DONE_THRESHOLD |
0.60 |
Score floor for DONE without penalty |
FALSE_POSITIVE_PENALTY |
-0.15 |
Penalty for wrongly dropping a valid row |
EFFICIENCY_BONUS_WEIGHT |
0.10 |
Multiplier for early-completion bonus |
βοΈ Deployment
Deploy to HuggingFace Spaces
# Install the OpenEnv CLI
pip install openenv
# Authenticate with HuggingFace
huggingface-cli login
# Deploy (from the repo root where openenv.yaml lives)
openenv push
# Or deploy privately to a specific repo
openenv push --repo-id your-username/data-cleaning-env --private
After deployment, your environment will be live at:
https://huggingface.co/spaces/your-username/data-cleaning-env
With endpoints:
- Web UI:
/web - API Docs:
/docs - Health:
/health - WebSocket:
/ws
Connect to a HuggingFace Space
env = await DataCleaningEnv.from_env("your-username/data-cleaning-env")
# or run locally with UV (no Docker needed)
env = await DataCleaningEnv.from_env("your-username/data-cleaning-env", use_docker=False)
Run the Server Locally (Without Docker)
uvicorn server.app:app --reload --port 8000
π§ͺ Development & Testing
Test the Environment Logic (No Server Needed)
# Runs a smoke test across all three tasks
python server/data_cleaning_env.py
Expected output:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
TASK: EASY
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
reset() β score=0.0000 issues=29 done=False
CSV: 101 rows, 5 cols
Hint: Sales orders dataset. price must be float...
step (bad col) β success=False error='Column 'DOES_NOT_EXIST' not found...'
step (fix row=3 col='price') β success=True score=0.0345 reward=0.0295
step (DONE, blocked) β done=False reward=-1.0 score=0.0345
...
All smoke tests passed.
Test Pydantic Models
python models.py
Test the Client Parser
python test_parse.py
Run the Full Server Locally
uvicorn server.app:app --reload
# Open http://localhost:8000/docs for interactive API explorer
π§ Troubleshooting
TypeError: Too few arguments for EnvClient
Cause: Your client.py subclasses EnvClient with only 2 type parameters, but OpenEnv requires 3 (ActT, ObsT, StateT).
Fix:
# β Wrong
class DataCleaningEnv(EnvClient[CleanAction, CleanObservation]):
# β
Correct
class DataCleaningEnv(EnvClient[CleanAction, CleanObservation, dict]):
Also ensure _parse_state is implemented:
def _parse_state(self, payload: dict) -> dict:
return payload
ValidationError: Input should be 'SET_VALUE', 'DROP_ROW', ...
Cause: Passing an invalid command string to CleanAction.
Fix: Only these 5 commands are valid:
"SET_VALUE" | "DROP_ROW" | "STANDARDIZE_COL" | "FILL_MISSING" | "DONE"
There is no "drop_column" β columns cannot be dropped, only rows.
UnboundLocalError: cannot access local variable 'env'
Cause 1: Docker image doesn't exist yet.
docker build -t openenv-data_cleaning:latest -f server/Dockerfile .
Cause 2: Stray test lines in inference.py referencing env before it's assigned.
Fix: Remove any manually added lines like action = CleanAction(...) or result = await env.step(action) from inside main(). The main() function should only call run_episode() β all action logic belongs inside that function.
DONE rejected: score X < required Y
This is expected behaviour, not a bug. The environment refuses premature termination. The agent should continue cleaning until the score meets the task threshold.
HuggingFace Router returns 401
Ensure your token is set:
$env:HF_TOKEN = "hf_your_token_here"
Get a free token at huggingface.co/settings/tokens.
π Data Flow Diagram
ββββββββββββββββββββββββββββββββββββ
β inference.py / custom agent β
β β
β 1. await env.reset(task_id=β¦) β
β 2. obs = result.observation β
β 3. build_prompt(obs) β LLM β
β 4. parse_action(llm_output) β
β 5. await env.step(action) β
β 6. GOTO 2 until done β
ββββββββββββββββ¬ββββββββββββββββββββ
β
CleanAction (JSON over WebSocket)
β
βΌ
ββββββββββββββββββββββββββββββββββββ
β DataCleaningEnvironment β
β β
β _apply_action() β
β β mutates _dirty_df in-place β
β β
β grade(agent_df vs clean_df) β
β β score β [0.0, 1.0] β
β β
β _compute_reward() β
β β progress + bonuses β
β β
β _build_observation() β
β β CleanObservation β
ββββββββββββββββββββββββββββββββββββ
π€ Contributing
- Fork the repository
- Create a feature branch:
git checkout -b feature/my-improvement - Run the smoke tests:
python server/data_cleaning_env.py - Commit your changes:
git commit -m "feat: add my improvement" - Push and open a Pull Request
π License
This project is licensed under the MIT License. See LICENSE for details.
Built with β€οΈ using OpenEnv Β· FastAPI Β· Pydantic Β· HuggingFace