Table of Contents
- Introduction
- Prerequisites
- Project Setup
- Method 1: OpenAI + Pandas Approach
- Method 2: LangChain Agents Approach
- Advanced Features
- Building a Web Interface
- Best Practices & Security
- Troubleshooting
- Conclusion
Introduction
Imagine being able to ask your data questions in plain English and getting intelligent answers backed by actual analysis. Instead of writing complex SQL queries or pandas code, you could simply ask “What are the top 5 products by sales?” or “Show me the correlation between price and customer satisfaction.”
This tutorial will show you how to build an AI-powered data analysis system that:
- Accepts natural language questions about your CSV data
- Automatically generates and executes pandas code
- Returns both code explanations and visualizations
- Handles complex analytical queries intelligently
We’ll explore two main approaches:
- OpenAI + Pandas: Direct integration with OpenAI’s API for code generation
- LangChain Agents: Using LangChain’s agent framework for more sophisticated workflows
Prerequisites
Required Knowledge:
- Basic Python programming
- Familiarity with pandas for data manipulation
- Understanding of APIs and environment variables
- Basic knowledge of data analysis concepts
Required Tools:
- Python 3.8+
- OpenAI API key (get one at https://platform.openai.com)
- Code editor (VS Code recommended)
Project Setup
1. Create Project Directory
mkdir llm-csv-analyzer
cd llm-csv-analyzer
2. Set Up Virtual Environment
# Create virtual environment
python -m venv venv
# Activate virtual environment
# On Windows:
venv\Scripts\activate
# On macOS/Linux:
source venv/bin/activate
3. Install Dependencies
pip install openai pandas matplotlib seaborn plotly langchain langchain-openai python-dotenv streamlit jupyter
4. Create Environment File
Create a .env
file in your project root:
OPENAI_API_KEY=your_openai_api_key_here
5. Project Structure
llm-csv-analyzer/
├── .env
├── requirements.txt
├── data/
│ └── sample_data.csv
├── src/
│ ├── __init__.py
│ ├── openai_analyzer.py
│ ├── langchain_analyzer.py
│ └── utils.py
├── notebooks/
│ └── exploration.ipynb
├── web_app/
│ └── streamlit_app.py
└── README.md
Method 1: OpenAI + Pandas Approach
This approach directly uses OpenAI’s API to generate pandas code based on natural language queries.
Core Implementation
Create src/openai_analyzer.py
:
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from openai import OpenAI
from dotenv import load_dotenv
import ast
import sys
from io import StringIO
import warnings
warnings.filterwarnings('ignore')
load_dotenv()
class OpenAICSVAnalyzer:
def __init__(self, csv_path):
self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
self.df = pd.read_csv(csv_path)
self.csv_path = csv_path
self.analysis_history = []
# Get basic info about the dataset
self.dataset_info = self._get_dataset_info()
def _get_dataset_info(self):
"""Get comprehensive information about the dataset"""
info = {
'shape': self.df.shape,
'columns': list(self.df.columns),
'dtypes': self.df.dtypes.to_dict(),
'sample_data': self.df.head(3).to_dict(),
'null_counts': self.df.isnull().sum().to_dict(),
'numeric_columns': self.df.select_dtypes(include=['number']).columns.tolist(),
'categorical_columns': self.df.select_dtypes(include=['object']).columns.tolist()
}
return info
def _create_system_prompt(self):
"""Create a comprehensive system prompt with dataset context"""
prompt = f"""
You are an expert data analyst. You have access to a pandas DataFrame called 'df' with the following characteristics:
DATASET INFORMATION:
- Shape: {self.dataset_info['shape']} (rows, columns)
- Columns: {', '.join(self.dataset_info['columns'])}
- Data types: {self.dataset_info['dtypes']}
- Numeric columns: {', '.join(self.dataset_info['numeric_columns'])}
- Categorical columns: {', '.join(self.dataset_info['categorical_columns'])}
- Missing values: {self.dataset_info['null_counts']}
SAMPLE DATA:
{pd.DataFrame(self.dataset_info['sample_data']).to_string()}
INSTRUCTIONS:
1. Generate ONLY executable pandas/matplotlib/seaborn/plotly code
2. Always use the variable name 'df' for the DataFrame
3. Handle missing values appropriately
4. For visualizations, use matplotlib, seaborn, or plotly
5. Print results using print() statements
6. Include comments explaining your approach
7. Make sure code is production-ready and handles edge cases
AVAILABLE LIBRARIES:
- pandas as pd
- matplotlib.pyplot as plt
- seaborn as sns
- plotly.express as px
- plotly.graph_objects as go
- numpy as np (if needed)
RESPONSE FORMAT:
Return ONLY the Python code, no explanations outside the code comments.
"""
return prompt
def ask_question(self, question, include_visualization=True):
"""Process a natural language question about the data"""
# Create the prompt
system_prompt = self._create_system_prompt()
user_prompt = f"""
Question: {question}
Please generate pandas code to answer this question.
{'Include a relevant visualization if appropriate.' if include_visualization else 'Do not include visualizations.'}
"""
try:
# Get response from OpenAI
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
max_tokens=1500,
temperature=0.1
)
generated_code = response.choices[0].message.content.strip()
# Clean the code (remove markdown formatting if present)
if generated_code.startswith('```python'):
generated_code = generated_code[9:-3]
elif generated_code.startswith('```'):
generated_code = generated_code[3:-3]
# Execute the code
result = self._execute_code(generated_code)
# Store in history
self.analysis_history.append({
'question': question,
'code': generated_code,
'result': result,
'success': result['success']
})
return {
'question': question,
'generated_code': generated_code,
'execution_result': result,
'success': result['success']
}
except Exception as e:
error_result = {
'question': question,
'generated_code': '',
'execution_result': {'success': False, 'error': str(e), 'output': ''},
'success': False
}
self.analysis_history.append(error_result)
return error_result
def _execute_code(self, code):
"""Safely execute the generated pandas code"""
# Create a safe execution environment
safe_globals = {
'df': self.df.copy(),
'pd': pd,
'plt': plt,
'sns': sns,
'px': px,
'go': go,
'print': print
}
# Capture output
old_stdout = sys.stdout
sys.stdout = captured_output = StringIO()
try:
# Execute the code
exec(code, safe_globals)
# Get the output
output = captured_output.getvalue()
return {
'success': True,
'output': output,
'error': None
}
except Exception as e:
return {
'success': False,
'output': captured_output.getvalue(),
'error': str(e)
}
finally:
sys.stdout = old_stdout
def get_summary(self):
"""Get a summary of the dataset"""
summary_question = "Provide a comprehensive summary of this dataset including key statistics, data types, and any interesting patterns you notice."
return self.ask_question(summary_question, include_visualization=True)
def suggest_questions(self):
"""Generate suggested questions based on the data"""
prompt = f"""
Based on this dataset with columns {self.dataset_info['columns']} and shape {self.dataset_info['shape']},
suggest 5 interesting analytical questions that would provide valuable insights.
Format as a simple list:
1. Question 1
2. Question 2
...
"""
try:
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=300,
temperature=0.7
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"Error generating suggestions: {str(e)}"
Usage Examples
Create a sample script examples/basic_usage.py
:
from src.openai_analyzer import OpenAICSVAnalyzer
import pandas as pd
# Create sample data
sample_data = {
'product': ['Laptop', 'Phone', 'Tablet', 'Watch', 'Headphones'] * 100,
'price': [800, 600, 400, 300, 150] * 100,
'sales': [50, 80, 60, 40, 90] * 100,
'rating': [4.5, 4.2, 4.0, 3.8, 4.6] * 100,
'category': ['Electronics', 'Electronics', 'Electronics', 'Wearables', 'Audio'] * 100
}
df = pd.DataFrame(sample_data)
df.to_csv('data/sample_sales.csv', index=False)
# Initialize analyzer
analyzer = OpenAICSVAnalyzer('data/sample_sales.csv')
# Ask questions
questions = [
"What are the top 5 products by total sales revenue?",
"Show me the correlation between price and rating",
"What's the average price by category?",
"Create a visualization showing sales distribution by product",
"Which product has the highest profit margin if cost is 60% of price?"
]
for question in questions:
print(f"\n{'='*60}")
print(f"QUESTION: {question}")
print('='*60)
result = analyzer.ask_question(question)
if result['success']:
print("GENERATED CODE:")
print(result['generated_code'])
print("\nOUTPUT:")
print(result['execution_result']['output'])
else:
print("ERROR:")
print(result['execution_result']['error'])
Method 2: LangChain Agents Approach
This approach uses LangChain’s agent framework for more sophisticated analysis workflows.
Core Implementation
Create src/langchain_analyzer.py
:
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from langchain.agents import create_pandas_dataframe_agent
from langchain_openai import ChatOpenAI
from langchain.agents.agent_types import AgentType
from dotenv import load_dotenv
import warnings
warnings.filterwarnings('ignore')
load_dotenv()
class LangChainCSVAnalyzer:
def __init__(self, csv_path, model_name="gpt-4"):
self.csv_path = csv_path
self.df = pd.read_csv(csv_path)
# Initialize the LLM
self.llm = ChatOpenAI(
model_name=model_name,
temperature=0,
openai_api_key=os.getenv('OPENAI_API_KEY')
)
# Create the pandas dataframe agent
self.agent = create_pandas_dataframe_agent(
llm=self.llm,
df=self.df,
verbose=True,
agent_type=AgentType.OPENAI_FUNCTIONS,
allow_dangerous_code=True # Required for code execution
)
self.conversation_history = []
def ask_question(self, question):
"""Ask a question about the data using the LangChain agent"""
try:
# Add context about available plotting libraries
enhanced_question = f"""
{question}
You have access to matplotlib.pyplot as plt and seaborn as sns for visualizations.
If creating plots, make sure to use plt.show() to display them.
Provide clear, actionable insights based on the data.
"""
# Get response from agent
response = self.agent.run(enhanced_question)
# Store in conversation history
self.conversation_history.append({
'question': question,
'response': response,
'success': True
})
return {
'question': question,
'response': response,
'success': True
}
except Exception as e:
error_response = {
'question': question,
'response': f"Error: {str(e)}",
'success': False
}
self.conversation_history.append(error_response)
return error_response
def get_data_summary(self):
"""Get a comprehensive summary of the dataset"""
summary_prompt = """
Provide a comprehensive analysis of this dataset including:
1. Basic statistics (shape, data types, missing values)
2. Summary statistics for numerical columns
3. Unique values for categorical columns
4. Any data quality issues
5. Suggestions for potential analyses
"""
return self.ask_question(summary_prompt)
def perform_multi_step_analysis(self, analysis_goal):
"""Perform a complex, multi-step analysis"""
planning_prompt = f"""
I want to perform the following analysis: {analysis_goal}
Please break this down into steps and execute each step:
1. First, examine the relevant columns and data quality
2. Perform necessary data cleaning or preprocessing
3. Execute the main analysis
4. Create appropriate visualizations
5. Provide insights and conclusions
Execute each step and provide detailed explanations.
"""
return self.ask_question(planning_prompt)
def compare_segments(self, segment_column, metric_column, comparison_type="mean"):
"""Compare different segments of data"""
comparison_prompt = f"""
Compare different segments in the '{segment_column}' column based on '{metric_column}'.
Please:
1. Calculate {comparison_type} values for each segment
2. Create a visualization comparing segments
3. Identify the top and bottom performing segments
4. Provide insights about the differences
"""
return self.ask_question(comparison_prompt)
def find_correlations(self, target_column=None):
"""Find correlations in the data"""
if target_column:
correlation_prompt = f"""
Analyze correlations between '{target_column}' and other numerical columns.
Please:
1. Calculate correlation coefficients
2. Create a correlation heatmap
3. Identify the strongest positive and negative correlations
4. Explain what these correlations might mean
"""
else:
correlation_prompt = """
Analyze correlations between all numerical columns in the dataset.
Please:
1. Create a correlation matrix
2. Generate a heatmap visualization
3. Identify the strongest correlations
4. Provide insights about relationships in the data
"""
return self.ask_question(correlation_prompt)
def detect_outliers(self, columns=None):
"""Detect outliers in specified columns or all numerical columns"""
if columns:
outlier_prompt = f"""
Detect outliers in the following columns: {', '.join(columns)}
Please:
1. Use multiple methods (IQR, Z-score, isolation forest if appropriate)
2. Create box plots to visualize outliers
3. Show the actual outlier values
4. Suggest whether these outliers should be investigated or removed
"""
else:
outlier_prompt = """
Detect outliers in all numerical columns of the dataset.
Please:
1. Use appropriate statistical methods
2. Create visualizations showing outliers
3. Summarize findings for each column
4. Provide recommendations for handling outliers
"""
return self.ask_question(outlier_prompt)
def get_conversation_summary(self):
"""Get a summary of the conversation history"""
if not self.conversation_history:
return "No questions have been asked yet."
summary = "Conversation Summary:\n" + "="*50 + "\n"
for i, item in enumerate(self.conversation_history, 1):
summary += f"\nQ{i}: {item['question']}\n"
if item['success']:
# Truncate long responses
response = item['response']
if len(response) > 200:
response = response[:200] + "... [truncated]"
summary += f"A{i}: {response}\n"
else:
summary += f"A{i}: [Error occurred]\n"
return summary
Advanced LangChain Features
Create src/advanced_langchain.py
:
from langchain.tools import BaseTool
from langchain.agents import Tool
from langchain.memory import ConversationBufferMemory
from typing import Type
from pydantic import BaseModel, Field
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
class PlotlyVisualizationTool(BaseTool):
name = "plotly_visualization"
description = "Create interactive visualizations using Plotly"
def _run(self, query: str) -> str:
"""Execute the visualization request"""
try:
# This would contain logic to parse the query and create appropriate Plotly charts
return f"Created visualization for: {query}"
except Exception as e:
return f"Error creating visualization: {str(e)}"
def _arun(self, query: str):
raise NotImplementedError("Async not implemented")
class AdvancedLangChainAnalyzer:
def __init__(self, csv_path):
self.df = pd.read_csv(csv_path)
# Initialize memory for conversation context
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# Create custom tools
self.custom_tools = [
Tool(
name="Data Statistics",
func=self._get_statistics,
description="Get comprehensive statistics about the dataset"
),
Tool(
name="Create Visualization",
func=self._create_visualization,
description="Create various types of visualizations"
),
PlotlyVisualizationTool(),
]
def _get_statistics(self, column_name: str) -> str:
"""Get detailed statistics for a specific column"""
if column_name not in self.df.columns:
return f"Column '{column_name}' not found in dataset"
col_data = self.df[column_name]
if col_data.dtype in ['int64', 'float64']:
stats = {
'count': col_data.count(),
'mean': col_data.mean(),
'median': col_data.median(),
'std': col_data.std(),
'min': col_data.min(),
'max': col_data.max(),
'missing': col_data.isnull().sum()
}
return f"Statistics for {column_name}: {stats}"
else:
stats = {
'count': col_data.count(),
'unique': col_data.nunique(),
'top_value': col_data.mode().iloc[0] if len(col_data.mode()) > 0 else 'N/A',
'missing': col_data.isnull().sum()
}
return f"Statistics for {column_name}: {stats}"
def _create_visualization(self, chart_request: str) -> str:
"""Create visualizations based on natural language requests"""
# This would contain logic to parse the request and create appropriate charts
return f"Created visualization: {chart_request}"
Advanced Features
1. Data Quality Assessment
Create src/data_quality.py
:
import pandas as pd
import numpy as np
from typing import Dict, List, Any
class DataQualityAnalyzer:
def __init__(self, df: pd.DataFrame):
self.df = df
def assess_data_quality(self) -> Dict[str, Any]:
"""Comprehensive data quality assessment"""
assessment = {
'basic_info': self._get_basic_info(),
'missing_data': self._analyze_missing_data(),
'duplicates': self._analyze_duplicates(),
'data_types': self._analyze_data_types(),
'outliers': self._detect_outliers(),
'consistency': self._check_consistency(),
'completeness_score': self._calculate_completeness_score()
}
return assessment
def _get_basic_info(self) -> Dict[str, Any]:
return {
'shape': self.df.shape,
'memory_usage': self.df.memory_usage(deep=True).sum(),
'columns': list(self.df.columns)
}
def _analyze_missing_data(self) -> Dict[str, Any]:
missing_data = self.df.isnull().sum()
missing_percentage = (missing_data / len(self.df)) * 100
return {
'missing_counts': missing_data.to_dict(),
'missing_percentages': missing_percentage.to_dict(),
'columns_with_missing': missing_data[missing_data > 0].index.tolist()
}
def _analyze_duplicates(self) -> Dict[str, Any]:
duplicate_rows = self.df.duplicated().sum()
duplicate_percentage = (duplicate_rows / len(self.df)) * 100
return {
'duplicate_rows': int(duplicate_rows),
'duplicate_percentage': float(duplicate_percentage),
'unique_rows': len(self.df) - duplicate_rows
}
def _analyze_data_types(self) -> Dict[str, Any]:
type_counts = self.df.dtypes.value_counts().to_dict()
return {
'type_distribution': {str(k): int(v) for k, v in type_counts.items()},
'column_types': self.df.dtypes.to_dict()
}
def _detect_outliers(self) -> Dict[str, Any]:
numeric_columns = self.df.select_dtypes(include=[np.number]).columns
outliers = {}
for col in numeric_columns:
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
column_outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]
outliers[col] = {
'count': len(column_outliers),
'percentage': (len(column_outliers) / len(self.df)) * 100,
'bounds': {'lower': lower_bound, 'upper': upper_bound}
}
return outliers
def _check_consistency(self) -> Dict[str, Any]:
consistency_issues = []
# Check for mixed case in string columns
string_columns = self.df.select_dtypes(include=['object']).columns
for col in string_columns:
if self.df[col].dtype == 'object':
unique_values = self.df[col].dropna().unique()
if len(unique_values) != len([str(val).upper() for val in unique_values]):
consistency_issues.append(f"Mixed case in column: {col}")
return {'issues': consistency_issues}
def _calculate_completeness_score(self) -> float:
"""Calculate overall data completeness score"""
total_cells = self.df.size
missing_cells = self.df.isnull().sum().sum()
completeness = ((total_cells - missing_cells) / total_cells) * 100
return round(completeness, 2)
2. Smart Query Suggestions
Create src/query_suggestions.py
:
import pandas as pd
from typing import List, Dict
import re
class SmartQuerySuggester:
def __init__(self, df: pd.DataFrame):
self.df = df
self.numeric_columns = df.select_dtypes(include=['number']).columns.tolist()
self.categorical_columns = df.select_dtypes(include=['object']).columns.tolist()
self.datetime_columns = df.select_dtypes(include=['datetime64']).columns.tolist()
def generate_suggestions(self) -> Dict[str, List[str]]:
"""Generate smart query suggestions based on data characteristics"""
suggestions = {
'descriptive': self._generate_descriptive_queries(),
'comparative': self._generate_comparative_queries(),
'correlation': self._generate_correlation_queries(),
'trend': self._generate_trend_queries(),
'distribution': self._generate_distribution_queries()
}
return suggestions
def _generate_descriptive_queries(self) -> List[str]:
"""Generate descriptive analysis queries"""
queries = [
"What are the basic statistics of this dataset?",
"Show me a summary of all numerical columns",
"What are the data types and missing values in each column?"
]
if self.numeric_columns:
queries.extend([
f"What are the top 10 highest values in {self.numeric_columns[0]}?",
f"What is the distribution of {self.numeric_columns[0]}?"
])
if self.categorical_columns:
queries.extend([
f"What are the unique values in {self.categorical_columns[0]}?",
f"Show me the frequency distribution of {self.categorical_columns[0]}"
])
return queries
def _generate_comparative_queries(self) -> List[str]:
"""Generate comparative analysis queries"""
queries = []
if len(self.categorical_columns) >= 1 and len(self.numeric_columns) >= 1:
cat_col = self.categorical_columns[0]
num_col = self.numeric_columns[0]
queries.extend([
f"Compare the average {num_col} across different {cat_col}",
f"Which {cat_col} has the highest {num_col}?",
f"Show me a box plot of {num_col} by {cat_col}"
])
if len(self.numeric_columns) >= 2:
queries.extend([
f"Compare {self.numeric_columns[0]} vs {self.numeric_columns[1]}",
"Which numerical columns have the most similar distributions?"
])
return queries
def _generate_correlation_queries(self) -> List[str]:
"""Generate correlation analysis queries"""
queries = []
if len(self.numeric_columns) >= 2:
queries.extend([
"Show me the correlation matrix for all numerical columns",
f"What is the correlation between {self.numeric_columns[0]} and {self.numeric_columns[1]}?",
"Which variables are most strongly correlated?",
"Create a heatmap of correlations"
])
return queries
def _generate_trend_queries(self) -> List[str]:
"""Generate trend analysis queries"""
queries = []
if self.datetime_columns and self.numeric_columns:
date_col = self.datetime_columns[0]
num_col = self.numeric_columns[0]
queries.extend([
f"Show me the trend of {num_col} over {date_col}",
f"What is the monthly/yearly pattern in {num_col}?",
"Are there any seasonal patterns in the data?"
])
return queries
def _generate_distribution_queries(self) -> List[str]:
"""Generate distribution analysis queries"""
queries = []
if self.numeric_columns:
queries.extend([
f"Show me the distribution of {self.numeric_columns[0]}",
"Which columns have normal distributions?",
"Are there any outliers in the numerical data?",
"Create histograms for all numerical columns"
])
return queries
Building a Web Interface
Create web_app/streamlit_app.py
:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from io import StringIO
import sys
import os
# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.openai_analyzer import OpenAICSVAnalyzer
from src.langchain_analyzer import LangChainCSVAnalyzer
from src.data_quality import DataQualityAnalyzer
from src.query_suggestions import SmartQuerySuggester
# Page configuration
st.set_page_config(
page_title="LLM-Powered CSV Analyzer",
page_icon="📊",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS
st.markdown("""
<style>
.main > div {
padding-top: 2rem;
}
.stAlert {
margin-top: 1rem;
}
.metric-card {
background-color: #f0f2f6;
padding: 1rem;
border-radius: 0.5rem;
margin: 0.5rem 0;
}
</style>
""", unsafe_allow_html=True)
def main():
st.title("🤖 LLM-Powered CSV Analyzer")
st.markdown("Ask questions about your CSV data in plain English!")
# Sidebar for configuration
with st.sidebar:
st.header("Configuration")
# API Key input
api_key = st.text_input("OpenAI API Key", type="password",
help="Enter your OpenAI API key")
if api_key:
os.environ['OPENAI_API_KEY'] = api_key
# Method selection
analysis_method = st.selectbox(
"Choose Analysis Method",
["OpenAI + Pandas", "LangChain Agents"],
help="Select the AI method for analyzing your data"
)
# Model selection
if analysis_method == "OpenAI + Pandas":
model_choice = st.selectbox("Model", ["gpt-4", "gpt-3.5-turbo"])
else:
model_choice = st.selectbox("Model", ["gpt-4", "gpt-3.5-turbo"])
# File upload
uploaded_file = st.file_uploader(
"Upload your CSV file",
type=['csv'],
help="Upload a CSV file to start analyzing"
)
if uploaded_file is not None:
try:
# Load the data
df = pd.read_csv(uploaded_file)
# Save uploaded file temporarily
temp_path = f"temp_{uploaded_file.name}"
df.to_csv(temp_path, index=False)
# Display basic info
st.success(f"✅ File uploaded successfully! Shape: {df.shape}")
# Create tabs for different functionalities
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"📊 Data Overview",
"🤖 AI Analysis",
"📈 Data Quality",
"💡 Smart Suggestions",
"📋 Analysis History"
])
# Initialize analyzers
if api_key:
if analysis_method == "OpenAI + Pandas":
analyzer = OpenAICSVAnalyzer(temp_path)
else:
analyzer = LangChainCSVAnalyzer(temp_path, model_choice)
quality_analyzer = DataQualityAnalyzer(df)
suggester = SmartQuerySuggester(df)
with tab1:
display_data_overview(df)
with tab2:
if api_key:
display_ai_analysis(analyzer, analysis_method)
else:
st.warning("Please enter your OpenAI API key in the sidebar to use AI analysis.")
with tab3:
if api_key:
display_data_quality(quality_analyzer)
else:
st.warning("Please enter your OpenAI API key in the sidebar.")
with tab4:
if api_key:
display_smart_suggestions(suggester, analyzer if api_key else None)
else:
st.warning("Please enter your OpenAI API key in the sidebar.")
with tab5:
if api_key and 'analysis_history' in st.session_state:
display_analysis_history()
else:
st.info("Analysis history will appear here after you ask questions.")
# Clean up temp file
if os.path.exists(temp_path):
os.remove(temp_path)
except Exception as e:
st.error(f"Error processing file: {str(e)}")
else:
st.info("👆 Upload a CSV file to get started!")
# Show example data
st.subheader("Example Data Format")
example_data = pd.DataFrame({
'product': ['Laptop', 'Phone', 'Tablet'],
'price': [1000, 800, 600],
'sales': [50, 120, 80],
'rating': [4.5, 4.2, 4.0]
})
st.dataframe(example_data)
def display_data_overview(df):
"""Display basic data overview"""
st.subheader("📊 Data Overview")
# Basic metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Rows", f"{df.shape[0]:,}")
with col2:
st.metric("Columns", df.shape[1])
with col3:
st.metric("Missing Values", f"{df.isnull().sum().sum():,}")
with col4:
st.metric("Memory Usage", f"{df.memory_usage(deep=True).sum() / 1024:.1f} KB")
# Data preview
st.subheader("Data Preview")
st.dataframe(df.head(10), use_container_width=True)
# Column information
st.subheader("Column Information")
col_info = pd.DataFrame({
'Column': df.columns,
'Data Type': df.dtypes.astype(str),
'Non-Null Count': df.count(),
'Null Count': df.isnull().sum(),
'Unique Values': df.nunique()
})
st.dataframe(col_info, use_container_width=True)
def display_ai_analysis(analyzer, method):
"""Display AI analysis interface"""
st.subheader("🤖 AI-Powered Analysis")
# Initialize session state for history
if 'analysis_history' not in st.session_state:
st.session_state.analysis_history = []
# Question input
question = st.text_area(
"Ask a question about your data:",
placeholder="e.g., What are the top 5 products by sales? Show me the correlation between price and rating.",
height=100
)
# Analysis options
col1, col2 = st.columns([3, 1])
with col1:
include_viz = st.checkbox("Include visualizations", value=True)
with col2:
analyze_button = st.button("🔍 Analyze", type="primary")
if analyze_button and question:
with st.spinner("🤖 AI is analyzing your data..."):
try:
if method == "OpenAI + Pandas":
result = analyzer.ask_question(question, include_visualization=include_viz)
else:
result = analyzer.ask_question(question)
# Store in session state
st.session_state.analysis_history.append(result)
# Display results
if result['success']:
st.success("✅ Analysis completed!")
# Show generated code (for OpenAI method)
if method == "OpenAI + Pandas":
with st.expander("🔧 Generated Code"):
st.code(result['generated_code'], language='python')
# Show results
st.subheader("📊 Results")
if method == "OpenAI + Pandas":
output = result['execution_result']['output']
if output.strip():
st.text(output)
else:
st.info("Analysis completed - check for any generated visualizations above.")
else:
st.write(result['response'])
else:
st.error("❌ Analysis failed")
if method == "OpenAI + Pandas":
st.error(result['execution_result']['error'])
else:
st.error(result['response'])
except Exception as e:
st.error(f"Error during analysis: {str(e)}")
# Quick analysis buttons
st.subheader("🚀 Quick Analysis")
quick_col1, quick_col2, quick_col3 = st.columns(3)
with quick_col1:
if st.button("📈 Get Data Summary"):
with st.spinner("Generating summary..."):
if method == "OpenAI + Pandas":
result = analyzer.get_summary()
else:
result = analyzer.get_data_summary()
st.session_state.analysis_history.append(result)
st.rerun()
with quick_col2:
if st.button("🔍 Find Correlations"):
question = "Show me correlations between numerical columns with a heatmap"
with st.spinner("Finding correlations..."):
if method == "OpenAI + Pandas":
result = analyzer.ask_question(question)
else:
result = analyzer.find_correlations()
st.session_state.analysis_history.append(result)
st.rerun()
with quick_col3:
if st.button("⚠️ Detect Outliers"):
question = "Detect and visualize outliers in numerical columns"
with st.spinner("Detecting outliers..."):
if method == "OpenAI + Pandas":
result = analyzer.ask_question(question)
else:
result = analyzer.detect_outliers()
st.session_state.analysis_history.append(result)
st.rerun()
def display_data_quality(quality_analyzer):
"""Display data quality assessment"""
st.subheader("📈 Data Quality Assessment")
with st.spinner("Analyzing data quality..."):
quality_report = quality_analyzer.assess_data_quality()
# Overall completeness score
completeness = quality_report['completeness_score']
st.metric("Overall Data Completeness", f"{completeness}%")
# Create quality indicator
if completeness >= 95:
st.success("🟢 Excellent data quality!")
elif completeness >= 80:
st.warning("🟡 Good data quality with some issues")
else:
st.error("🔴 Data quality needs attention")
# Missing data analysis
st.subheader("Missing Data Analysis")
missing_data = quality_report['missing_data']
if missing_data['columns_with_missing']:
missing_df = pd.DataFrame({
'Column': missing_data['columns_with_missing'],
'Missing Count': [missing_data['missing_counts'][col] for col in missing_data['columns_with_missing']],
'Missing %': [f"{missing_data['missing_percentages'][col]:.1f}%" for col in missing_data['columns_with_missing']]
})
st.dataframe(missing_df, use_container_width=True)
# Visualization of missing data
fig = px.bar(missing_df, x='Column', y='Missing Count',
title='Missing Values by Column')
st.plotly_chart(fig, use_container_width=True)
else:
st.success("✅ No missing values detected!")
# Duplicates analysis
st.subheader("Duplicate Records")
duplicates = quality_report['duplicates']
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Duplicate Rows", duplicates['duplicate_rows'])
with col2:
st.metric("Duplicate %", f"{duplicates['duplicate_percentage']:.1f}%")
with col3:
st.metric("Unique Rows", duplicates['unique_rows'])
# Outliers analysis
st.subheader("Outliers Detection")
outliers = quality_report['outliers']
if outliers:
outlier_data = []
for col, info in outliers.items():
outlier_data.append({
'Column': col,
'Outlier Count': info['count'],
'Outlier %': f"{info['percentage']:.1f}%",
'Lower Bound': f"{info['bounds']['lower']:.2f}",
'Upper Bound': f"{info['bounds']['upper']:.2f}"
})
outlier_df = pd.DataFrame(outlier_data)
st.dataframe(outlier_df, use_container_width=True)
else:
st.info("No numerical columns available for outlier detection.")
def display_smart_suggestions(suggester, analyzer=None):
"""Display smart query suggestions"""
st.subheader("💡 Smart Query Suggestions")
with st.spinner("Generating smart suggestions..."):
suggestions = suggester.generate_suggestions()
# Display suggestions by category
for category, queries in suggestions.items():
if queries: # Only show categories with suggestions
st.subheader(f"{category.title()} Analysis")
for i, query in enumerate(queries):
col1, col2 = st.columns([4, 1])
with col1:
st.write(f"• {query}")
with col2:
if analyzer and st.button(f"Run", key=f"{category}_{i}"):
with st.spinner("Running analysis..."):
if hasattr(analyzer, 'ask_question'):
result = analyzer.ask_question(query)
if 'analysis_history' not in st.session_state:
st.session_state.analysis_history = []
st.session_state.analysis_history.append(result)
st.success("Analysis added to history!")
st.rerun()
def display_analysis_history():
"""Display analysis history"""
st.subheader("📋 Analysis History")
if 'analysis_history' not in st.session_state or not st.session_state.analysis_history:
st.info("No analysis history available.")
return
# Clear history button
if st.button("🗑️ Clear History"):
st.session_state.analysis_history = []
st.rerun()
# Display each analysis
for i, analysis in enumerate(reversed(st.session_state.analysis_history)):
with st.expander(f"Analysis {len(st.session_state.analysis_history) - i}: {analysis['question'][:50]}..."):
st.write(f"**Question:** {analysis['question']}")
if analysis['success']:
st.success("✅ Successful")
# Show code if available (OpenAI method)
if 'generated_code' in analysis:
st.subheader("Generated Code:")
st.code(analysis['generated_code'], language='python')
# Show output
st.subheader("Output:")
if 'execution_result' in analysis:
output = analysis['execution_result']['output']
if output.strip():
st.text(output)
else:
st.info("No text output (visualization may have been generated)")
elif 'response' in analysis:
st.write(analysis['response'])
else:
st.error("❌ Failed")
if 'execution_result' in analysis:
st.error(analysis['execution_result']['error'])
elif 'response' in analysis:
st.error(analysis['response'])
if __name__ == "__main__":
main()
Best Practices & Security
Security Considerations
# src/security.py
import ast
import re
from typing import List, Set
class CodeSecurityAnalyzer:
"""Analyze generated code for security risks"""
DANGEROUS_IMPORTS = {
'os', 'subprocess', 'sys', 'eval', 'exec', 'open',
'file', '__import__', 'compile', 'globals', 'locals'
}
DANGEROUS_FUNCTIONS = {
'eval', 'exec', 'compile', '__import__', 'getattr',
'setattr', 'delattr', 'globals', 'locals', 'vars'
}
DANGEROUS_METHODS = {
'system', 'popen', 'spawn', 'fork', 'kill'
}
def __init__(self):
self.risks = []
def analyze_code(self, code: str) -> dict:
"""Analyze code for security risks"""
self.risks = []
try:
# Parse the code
tree = ast.parse(code)
# Check for dangerous imports
self._check_imports(tree)
# Check for dangerous function calls
self._check_function_calls(tree)
# Check for file operations
self._check_file_operations(code)
# Check for network operations
self._check_network_operations(code)
return {
'is_safe': len(self.risks) == 0,
'risks': self.risks,
'risk_level': self._calculate_risk_level()
}
except SyntaxError as e:
return {
'is_safe': False,
'risks': [f"Syntax error in code: {str(e)}"],
'risk_level': 'HIGH'
}
def _check_imports(self, tree):
"""Check for dangerous imports"""
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
if alias.name in self.DANGEROUS_IMPORTS:
self.risks.append(f"Dangerous import: {alias.name}")
elif isinstance(node, ast.ImportFrom):
if node.module in self.DANGEROUS_IMPORTS:
self.risks.append(f"Dangerous import: {node.module}")
def _check_function_calls(self, tree):
"""Check for dangerous function calls"""
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in self.DANGEROUS_FUNCTIONS:
self.risks.append(f"Dangerous function call: {node.func.id}")
def _check_file_operations(self, code: str):
"""Check for file operations"""
file_patterns = [
r'open\s*\(',
r'with\s+open\s*\(',
r'\.write\s*\(',
r'\.read\s*\(',
r'os\.remove',
r'os\.unlink'
]
for pattern in file_patterns:
if re.search(pattern, code):
self.risks.append(f"File operation detected: {pattern}")
def _check_network_operations(self, code: str):
"""Check for network operations"""
network_patterns = [
r'requests\.',
r'urllib\.',
r'http\.',
r'socket\.',
r'smtplib\.'
]
for pattern in network_patterns:
if re.search(pattern, code):
self.risks.append(f"Network operation detected: {pattern}")
def _calculate_risk_level(self) -> str:
"""Calculate overall risk level"""
if len(self.risks) == 0:
return 'SAFE'
elif len(self.risks) <= 2:
return 'LOW'
elif len(self.risks) <= 5:
return 'MEDIUM'
else:
return 'HIGH'
Performance Optimization
# src/performance.py
import time
import functools
import pandas as pd
from typing import Callable, Any
def performance_monitor(func: Callable) -> Callable:
"""Decorator to monitor function performance"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"{func.__name__} executed in {execution_time:.2f} seconds")
return result
return wrapper
class DataFrameOptimizer:
"""Optimize DataFrame operations for better performance"""
@staticmethod
def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""Optimize DataFrame data types for memory efficiency"""
optimized_df = df.copy()
# Optimize integer columns
for col in optimized_df.select_dtypes(include=['int64']).columns:
col_min = optimized_df[col].min()
col_max = optimized_df[col].max()
if col_min >= 0:
if col_max < 255:
optimized_df[col] = optimized_df[col].astype('uint8')
elif col_max < 65535:
optimized_df[col] = optimized_df[col].astype('uint16')
elif col_max < 4294967295:
optimized_df[col] = optimized_df[col].astype('uint32')
else:
if col_min > -128 and col_max < 127:
optimized_df[col] = optimized_df[col].astype('int8')
elif col_min > -32768 and col_max < 32767:
optimized_df[col] = optimized_df[col].astype('int16')
elif col_min > -2147483648 and col_max < 2147483647:
optimized_df[col] = optimized_df[col].astype('int32')
# Optimize float columns
for col in optimized_df.select_dtypes(include=['float64']).columns:
optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='float')
# Optimize object columns to category where appropriate
for col in optimized_df.select_dtypes(include=['object']).columns:
if optimized_df[col].nunique() / len(optimized_df) < 0.5:
optimized_df[col] = optimized_df[col].astype('category')
return optimized_df
@staticmethod
def chunk_large_dataframes(df: pd.DataFrame, chunk_size: int = 10000):
"""Process large DataFrames in chunks"""
for start in range(0, len(df), chunk_size):
yield df[start:start + chunk_size]
Troubleshooting
Common Issues and Solutions
# src/troubleshooting.py
class TroubleshootingGuide:
"""Common issues and their solutions"""
COMMON_ERRORS = {
"KeyError": {
"description": "Column name not found in DataFrame",
"solutions": [
"Check column names with df.columns",
"Use df.columns.tolist() to see all available columns",
"Check for extra spaces or different capitalization",
"Use df.rename() to standardize column names"
],
"example": """
# Check available columns
print("Available columns:", df.columns.tolist())
# Rename columns to remove spaces
df.columns = df.columns.str.strip().str.replace(' ', '_')
"""
},
"ValueError": {
"description": "Invalid value for operation",
"solutions": [
"Check data types with df.dtypes",
"Convert data types if necessary",
"Handle missing values before operations",
"Check for non-numeric data in numeric columns"
],
"example": """
# Convert to numeric, errors='coerce' turns invalid values to NaN
df['numeric_column'] = pd.to_numeric(df['numeric_column'], errors='coerce')
# Fill or drop missing values
df = df.dropna() # or df.fillna(0)
"""
},
"MemoryError": {
"description": "Not enough memory to process the data",
"solutions": [
"Process data in chunks",
"Optimize data types",
"Use generators instead of loading all data",
"Consider using Dask for large datasets"
],
"example": """
# Read CSV in chunks
chunk_list = []
for chunk in pd.read_csv('large_file.csv', chunksize=10000):
# Process each chunk
processed_chunk = chunk.groupby('column').sum()
chunk_list.append(processed_chunk)
# Combine results
result = pd.concat(chunk_list, ignore_index=True)
"""
},
"OpenAI API Error": {
"description": "Issues with OpenAI API calls",
"solutions": [
"Check API key is valid and set correctly",
"Verify you have sufficient API credits",
"Check rate limits and add delays if necessary",
"Handle API timeouts with retry logic"
],
"example": """
import time
from openai import OpenAI
def make_api_call_with_retry(client, messages, max_retries=3):
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4",
messages=messages
)
return response
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise e
"""
}
}
@classmethod
def get_solution(cls, error_type: str) -> dict:
"""Get solutions for a specific error type"""
return cls.COMMON_ERRORS.get(error_type, {
"description": "Unknown error",
"solutions": ["Check the error message for specific details"],
"example": "# No specific example available"
})
@classmethod
def diagnose_dataframe_issues(cls, df: pd.DataFrame) -> dict:
"""Diagnose common DataFrame issues"""
issues = []
# Check for missing values
missing_count = df.isnull().sum().sum()
if missing_count > 0:
issues.append({
"issue": "Missing Values",
"count": missing_count,
"solution": "Use df.dropna() or df.fillna() to handle missing values"
})
# Check for duplicate rows
duplicate_count = df.duplicated().sum()
if duplicate_count > 0:
issues.append({
"issue": "Duplicate Rows",
"count": duplicate_count,
"solution": "Use df.drop_duplicates() to remove duplicates"
})
# Check for mixed data types in object columns
for col in df.select_dtypes(include=['object']).columns:
sample_values = df[col].dropna().head(100)
if len(sample_values) > 0:
types = set(type(val).__name__ for val in sample_values)
if len(types) > 1:
issues.append({
"issue": f"Mixed Data Types in {col}",
"types": list(types),
"solution": f"Clean and standardize data in column {col}"
})
# Check memory usage
memory_mb = df.memory_usage(deep=True).sum() / 1024 / 1024
if memory_mb > 100:
issues.append({
"issue": "High Memory Usage",
"memory_mb": round(memory_mb, 2),
"solution": "Consider optimizing data types or processing in chunks"
})
return {
"total_issues": len(issues),
"issues": issues,
"overall_health": "Good" if len(issues) == 0 else "Needs Attention"
}
Conclusion
This comprehensive tutorial has shown you how to build a powerful LLM-powered CSV analysis system that can:
Key Features Implemented:
- Natural language querying of CSV data
- Automatic code generation and execution
- Data quality assessment
- Smart query suggestions
- Interactive web interface
- Security and performance optimizations
Two Main Approaches Covered:
- OpenAI + Pandas: Direct integration with fine-grained control
- LangChain Agents: Framework-based approach with advanced workflows
Production Considerations:
- Always validate and sanitize generated code
- Implement proper error handling and logging
- Monitor API usage and costs
- Consider caching frequently asked questions
- Add user authentication for web applications
- Implement rate limiting to prevent abuse
Next Steps:
- Add support for multiple file formats (Excel, JSON, etc.)
- Implement natural language report generation
- Add collaborative features for team analysis
- Integrate with databases and data warehouses
- Build custom visualizations based on query context
- Add support for time series analysis and forecasting
Security Reminders:
- Never execute untrusted code in production
- Validate all user inputs
- Use environment variables for API keys
- Implement proper access controls
- Monitor and log all analysis activities
This system provides a solid foundation for building AI-powered data analysis tools. The modular design allows you to extend functionality, add new data sources, and customize the analysis capabilities based on your specific needs.
Remember to test thoroughly with your specific datasets and use cases, and always prioritize security when deploying to production environments.