| import os |
| import pandas as pd |
| from pandasai import Agent, SmartDataframe |
| from typing import Tuple |
| from PIL import Image |
| from pandasai.llm import HuggingFaceTextGen |
| from dotenv import load_dotenv |
| from langchain_groq import ChatGroq |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| import matplotlib.pyplot as plt |
| import json |
| from datetime import datetime |
| from huggingface_hub import HfApi |
| import uuid |
|
|
| |
| load_dotenv(override=True) |
|
|
| |
| Groq_Token = os.getenv("GROQ_API_KEY") |
| hf_token = os.getenv("HF_TOKEN") |
| gemini_token = os.getenv("GEMINI_TOKEN") |
|
|
| |
| print(f"Debug - Groq Token: {'Present' if Groq_Token else 'Missing'}") |
| print(f"Debug - Groq Token Value: {Groq_Token[:10] + '...' if Groq_Token else 'None'}") |
| print(f"Debug - Gemini Token: {'Present' if gemini_token else 'Missing'}") |
|
|
| models = { |
| "gpt-oss-20b": "openai/gpt-oss-20b", |
| "gpt-oss-120b": "openai/gpt-oss-120b", |
| "llama3.1": "llama-3.1-8b-instant", |
| "llama3.3": "llama-3.3-70b-versatile", |
| "deepseek-R1": "deepseek-r1-distill-llama-70b", |
| "llama4 maverik":"meta-llama/llama-4-maverick-17b-128e-instruct", |
| "llama4 scout":"meta-llama/llama-4-scout-17b-16e-instruct", |
| "gemini-pro": "gemini-1.5-pro" |
| } |
|
|
| def log_interaction(user_query, model_name, response_content, generated_code, execution_time, error_message=None, is_image=False): |
| """Log user interactions to Hugging Face dataset""" |
| try: |
| if not hf_token or hf_token.strip() == "": |
| print("Warning: HF_TOKEN not available, skipping logging") |
| return |
| |
| |
| log_entry = { |
| "timestamp": datetime.now().isoformat(), |
| "session_id": str(uuid.uuid4()), |
| "user_query": user_query, |
| "model_name": model_name, |
| "response_content": str(response_content), |
| "generated_code": generated_code or "", |
| "execution_time_seconds": execution_time, |
| "error_message": error_message or "", |
| "is_image_output": is_image, |
| "success": error_message is None |
| } |
| |
| |
| df = pd.DataFrame([log_entry]) |
| |
| |
| timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") |
| random_id = str(uuid.uuid4())[:8] |
| filename = f"interaction_log_{timestamp_str}_{random_id}.parquet" |
| |
| |
| local_path = f"/tmp/{filename}" |
| df.to_parquet(local_path, index=False) |
| |
| |
| api = HfApi(token=hf_token) |
| api.upload_file( |
| path_or_fileobj=local_path, |
| path_in_repo=f"data/{filename}", |
| repo_id="SustainabilityLabIITGN/VayuChat_logs", |
| repo_type="dataset", |
| ) |
| |
| |
| if os.path.exists(local_path): |
| os.remove(local_path) |
| |
| print(f"Successfully logged interaction to HuggingFace: {filename}") |
| |
| except Exception as e: |
| print(f"Error logging interaction: {e}") |
|
|
| def preprocess_and_load_df(path: str) -> pd.DataFrame: |
| """Load and preprocess the dataframe""" |
| try: |
| df = pd.read_csv(path) |
| df["Timestamp"] = pd.to_datetime(df["Timestamp"]) |
| return df |
| except Exception as e: |
| raise Exception(f"Error loading dataframe: {e}") |
|
|
| def load_agent(df: pd.DataFrame, context: str, inference_server: str, name="mistral") -> Agent: |
| """Load pandas AI agent with error handling""" |
| try: |
| if name == "gemini-pro": |
| if not gemini_token or gemini_token.strip() == "": |
| raise ValueError("Gemini API token not available or empty") |
| llm = ChatGoogleGenerativeAI( |
| model=models[name], |
| google_api_key=gemini_token, |
| temperature=0.1 |
| ) |
| else: |
| if not Groq_Token or Groq_Token.strip() == "": |
| raise ValueError("Groq API token not available or empty") |
| llm = ChatGroq( |
| model=models[name], |
| api_key=Groq_Token, |
| temperature=0.1 |
| ) |
| |
| agent = Agent(df, config={"llm": llm, "enable_cache": False, "options": {"wait_for_model": True}}) |
| if context: |
| agent.add_message(context) |
| return agent |
| except Exception as e: |
| raise Exception(f"Error loading agent: {e}") |
|
|
| def load_smart_df(df: pd.DataFrame, inference_server: str, name="mistral") -> SmartDataframe: |
| """Load smart dataframe with error handling""" |
| try: |
| if name == "gemini-pro": |
| if not gemini_token or gemini_token.strip() == "": |
| raise ValueError("Gemini API token not available or empty") |
| llm = ChatGoogleGenerativeAI( |
| model=models[name], |
| google_api_key=gemini_token, |
| temperature=0.1 |
| ) |
| else: |
| if not Groq_Token or Groq_Token.strip() == "": |
| raise ValueError("Groq API token not available or empty") |
| llm = ChatGroq( |
| model=models[name], |
| api_key=Groq_Token, |
| temperature=0.1 |
| ) |
| |
| df = SmartDataframe(df, config={"llm": llm, "max_retries": 5, "enable_cache": False}) |
| return df |
| except Exception as e: |
| raise Exception(f"Error loading smart dataframe: {e}") |
|
|
| def get_from_user(prompt): |
| """Format user prompt""" |
| return {"role": "user", "content": prompt} |
|
|
| def ask_agent(agent: Agent, prompt: str) -> dict: |
| """Ask agent with comprehensive error handling""" |
| start_time = datetime.now() |
| try: |
| response = agent.chat(prompt) |
| execution_time = (datetime.now() - start_time).total_seconds() |
| |
| gen_code = getattr(agent, 'last_code_generated', '') |
| ex_code = getattr(agent, 'last_code_executed', '') |
| last_prompt = getattr(agent, 'last_prompt', prompt) |
| |
| |
| log_interaction( |
| user_query=prompt, |
| model_name="pandas_ai_agent", |
| response_content=response, |
| generated_code=gen_code, |
| execution_time=execution_time, |
| error_message=None, |
| is_image=isinstance(response, str) and any(response.endswith(ext) for ext in ['.png', '.jpg', '.jpeg']) |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": response, |
| "gen_code": gen_code, |
| "ex_code": ex_code, |
| "last_prompt": last_prompt, |
| "error": None |
| } |
| except Exception as e: |
| execution_time = (datetime.now() - start_time).total_seconds() |
| error_msg = str(e) |
| |
| |
| log_interaction( |
| user_query=prompt, |
| model_name="pandas_ai_agent", |
| response_content=f"Error: {error_msg}", |
| generated_code="", |
| execution_time=execution_time, |
| error_message=error_msg, |
| is_image=False |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": f"Error: {error_msg}", |
| "gen_code": "", |
| "ex_code": "", |
| "last_prompt": prompt, |
| "error": error_msg |
| } |
|
|
| def decorate_with_code(response: dict) -> str: |
| """Decorate response with code details""" |
| gen_code = response.get("gen_code", "No code generated") |
| last_prompt = response.get("last_prompt", "No prompt") |
| |
| return f"""<details> |
| <summary>Generated Code</summary> |
| |
| ```python |
| {gen_code} |
| ``` |
| </details> |
| |
| <details> |
| <summary>Prompt</summary> |
| |
| {last_prompt} |
| """ |
|
|
| def show_response(st, response): |
| """Display response with error handling""" |
| try: |
| with st.chat_message(response["role"]): |
| content = response.get("content", "No content") |
| |
| try: |
| |
| image = Image.open(content) |
| if response.get("gen_code"): |
| st.markdown(decorate_with_code(response), unsafe_allow_html=True) |
| st.image(image) |
| return {"is_image": True} |
| except: |
| |
| if response.get("gen_code"): |
| display_content = decorate_with_code(response) + f"""</details> |
| |
| {content}""" |
| else: |
| display_content = content |
| st.markdown(display_content, unsafe_allow_html=True) |
| return {"is_image": False} |
| except Exception as e: |
| st.error(f"Error displaying response: {e}") |
| return {"is_image": False} |
|
|
| def ask_question(model_name, question): |
| """Ask question with comprehensive error handling and logging""" |
| start_time = datetime.now() |
| try: |
| |
| load_dotenv(override=True) |
| fresh_groq_token = os.getenv("GROQ_API_KEY") |
| fresh_gemini_token = os.getenv("GEMINI_TOKEN") |
| |
| print(f"ask_question - Fresh Groq Token: {'Present' if fresh_groq_token else 'Missing'}") |
| |
| |
| if model_name == "gemini-pro": |
| if not fresh_gemini_token or fresh_gemini_token.strip() == "": |
| execution_time = (datetime.now() - start_time).total_seconds() |
| error_msg = "Missing or empty API token" |
| |
| |
| log_interaction( |
| user_query=question, |
| model_name=model_name, |
| response_content="β Gemini API token not available or empty", |
| generated_code="", |
| execution_time=execution_time, |
| error_message=error_msg, |
| is_image=False |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": "β Gemini API token not available or empty. Please set GEMINI_TOKEN in your environment variables.", |
| "gen_code": "", |
| "ex_code": "", |
| "last_prompt": question, |
| "error": error_msg |
| } |
| llm = ChatGoogleGenerativeAI( |
| model=models[model_name], |
| google_api_key=fresh_gemini_token, |
| temperature=0 |
| ) |
| else: |
| if not fresh_groq_token or fresh_groq_token.strip() == "": |
| execution_time = (datetime.now() - start_time).total_seconds() |
| error_msg = "Missing or empty API token" |
| |
| |
| log_interaction( |
| user_query=question, |
| model_name=model_name, |
| response_content="β Groq API token not available or empty", |
| generated_code="", |
| execution_time=execution_time, |
| error_message=error_msg, |
| is_image=False |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": "β Groq API token not available or empty. Please set GROQ_API_KEY in your environment variables and restart the application.", |
| "gen_code": "", |
| "ex_code": "", |
| "last_prompt": question, |
| "error": error_msg |
| } |
| |
| |
| try: |
| llm = ChatGroq( |
| model=models[model_name], |
| api_key=fresh_groq_token, |
| temperature=0.1 |
| ) |
| |
| test_response = llm.invoke("Test") |
| print("API key test successful") |
| except Exception as api_error: |
| execution_time = (datetime.now() - start_time).total_seconds() |
| error_msg = str(api_error) |
| |
| if "organization_restricted" in error_msg.lower() or "unauthorized" in error_msg.lower(): |
| response_content = "β API Key Error: Your Groq API key appears to be invalid, expired, or restricted. Please check your API key in the .env file." |
| log_error_msg = f"API key validation failed: {error_msg}" |
| else: |
| response_content = f"β API Connection Error: {error_msg}" |
| log_error_msg = error_msg |
| |
| |
| log_interaction( |
| user_query=question, |
| model_name=model_name, |
| response_content=response_content, |
| generated_code="", |
| execution_time=execution_time, |
| error_message=log_error_msg, |
| is_image=False |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": response_content, |
| "gen_code": "", |
| "ex_code": "", |
| "last_prompt": question, |
| "error": log_error_msg |
| } |
|
|
| |
| if not os.path.exists("Data.csv"): |
| execution_time = (datetime.now() - start_time).total_seconds() |
| error_msg = "Data file not found" |
| |
| |
| log_interaction( |
| user_query=question, |
| model_name=model_name, |
| response_content="β Data.csv file not found", |
| generated_code="", |
| execution_time=execution_time, |
| error_message=error_msg, |
| is_image=False |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": "β Data.csv file not found. Please ensure the data file is in the correct location.", |
| "gen_code": "", |
| "ex_code": "", |
| "last_prompt": question, |
| "error": error_msg |
| } |
|
|
| df_check = pd.read_csv("Data.csv") |
| df_check["Timestamp"] = pd.to_datetime(df_check["Timestamp"]) |
| df_check = df_check.head(5) |
|
|
| new_line = "\n" |
| parameters = {"font.size": 12, "figure.dpi": 600} |
|
|
| template = f"""```python |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| import uuid |
| |
| plt.rcParams.update({parameters}) |
| |
| df = pd.read_csv("Data.csv") |
| df["Timestamp"] = pd.to_datetime(df["Timestamp"]) |
| |
| # Available columns and data types: |
| {new_line.join(map(lambda x: '# '+x, str(df_check.dtypes).split(new_line)))} |
| |
| # Question: {question.strip()} |
| # Generate code to answer the question and save result in 'answer' variable |
| # If creating a plot, save it with a unique filename and store the filename in 'answer' |
| # If returning text/numbers, store the result directly in 'answer' |
| ```""" |
|
|
| system_prompt = """You are a helpful assistant that generates Python code for data analysis. |
| |
| Rules: |
| 1. Always save your final result in a variable called 'answer' |
| 2. If creating a plot, save it with plt.savefig() and store the filename in 'answer' |
| 3. If returning text/numbers, store the result directly in 'answer' |
| 4. Use descriptive variable names and add comments |
| 5. Handle potential errors gracefully |
| 6. For plots, use unique filenames to avoid conflicts |
| """ |
|
|
| query = f"""{system_prompt} |
| |
| Complete the following code to answer the user's question: |
| |
| {template} |
| """ |
| |
| |
| if model_name == "gemini-pro": |
| response = llm.invoke(query) |
| answer = response.content |
| else: |
| response = llm.invoke(query) |
| answer = response.content |
| |
| |
| try: |
| if "```python" in answer: |
| code_part = answer.split("```python")[1].split("```")[0] |
| else: |
| code_part = answer |
| |
| full_code = f""" |
| {template.split("```python")[1].split("```")[0]} |
| {code_part} |
| """ |
| |
| |
| local_vars = {} |
| global_vars = { |
| 'pd': pd, |
| 'plt': plt, |
| 'os': os, |
| 'uuid': __import__('uuid') |
| } |
| |
| exec(full_code, global_vars, local_vars) |
| |
| |
| if 'answer' in local_vars: |
| answer_result = local_vars['answer'] |
| else: |
| answer_result = "No answer variable found in generated code" |
| |
| execution_time = (datetime.now() - start_time).total_seconds() |
| |
| |
| is_image = isinstance(answer_result, str) and any(answer_result.endswith(ext) for ext in ['.png', '.jpg', '.jpeg']) |
| |
| |
| log_interaction( |
| user_query=question, |
| model_name=model_name, |
| response_content=str(answer_result), |
| generated_code=full_code, |
| execution_time=execution_time, |
| error_message=None, |
| is_image=is_image |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": answer_result, |
| "gen_code": full_code, |
| "ex_code": full_code, |
| "last_prompt": question, |
| "error": None |
| } |
| |
| except Exception as code_error: |
| execution_time = (datetime.now() - start_time).total_seconds() |
| error_msg = str(code_error) |
| |
| |
| log_interaction( |
| user_query=question, |
| model_name=model_name, |
| response_content=f"β Error executing generated code: {error_msg}", |
| generated_code=full_code if 'full_code' in locals() else "", |
| execution_time=execution_time, |
| error_message=error_msg, |
| is_image=False |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": f"β Error executing generated code: {error_msg}", |
| "gen_code": full_code if 'full_code' in locals() else "", |
| "ex_code": full_code if 'full_code' in locals() else "", |
| "last_prompt": question, |
| "error": error_msg |
| } |
| |
| except Exception as e: |
| execution_time = (datetime.now() - start_time).total_seconds() |
| error_msg = str(e) |
| |
| |
| if "organization_restricted" in error_msg: |
| response_content = "β API Organization Restricted: Your API key access has been restricted. Please check your Groq API key or try generating a new one." |
| log_error_msg = "API access restricted" |
| elif "rate_limit" in error_msg.lower(): |
| response_content = "β Rate limit exceeded. Please wait a moment and try again." |
| log_error_msg = "Rate limit exceeded" |
| else: |
| response_content = f"β Error: {error_msg}" |
| log_error_msg = error_msg |
| |
| |
| log_interaction( |
| user_query=question, |
| model_name=model_name, |
| response_content=response_content, |
| generated_code="", |
| execution_time=execution_time, |
| error_message=log_error_msg, |
| is_image=False |
| ) |
| |
| return { |
| "role": "assistant", |
| "content": response_content, |
| "gen_code": "", |
| "ex_code": "", |
| "last_prompt": question, |
| "error": log_error_msg |
| } |