LLM-Powered Data Analysis: Ask Questions About Your CSV Using AI

Listen to this article

Table of Contents

  1. Introduction
  2. Prerequisites
  3. Project Setup
  4. Method 1: OpenAI + Pandas Approach
  5. Method 2: LangChain Agents Approach
  6. Advanced Features
  7. Building a Web Interface
  8. Best Practices & Security
  9. Troubleshooting
  10. Conclusion

Introduction

Imagine being able to ask your data questions in plain English and getting intelligent answers backed by actual analysis. Instead of writing complex SQL queries or pandas code, you could simply ask “What are the top 5 products by sales?” or “Show me the correlation between price and customer satisfaction.”

This tutorial will show you how to build an AI-powered data analysis system that:

  • Accepts natural language questions about your CSV data
  • Automatically generates and executes pandas code
  • Returns both code explanations and visualizations
  • Handles complex analytical queries intelligently

We’ll explore two main approaches:

  1. OpenAI + Pandas: Direct integration with OpenAI’s API for code generation
  2. LangChain Agents: Using LangChain’s agent framework for more sophisticated workflows

Prerequisites

Required Knowledge:

  • Basic Python programming
  • Familiarity with pandas for data manipulation
  • Understanding of APIs and environment variables
  • Basic knowledge of data analysis concepts

Required Tools:

  • Python 3.8+
  • OpenAI API key (get one at https://platform.openai.com)
  • Code editor (VS Code recommended)

Project Setup

1. Create Project Directory

mkdir llm-csv-analyzer
cd llm-csv-analyzer

2. Set Up Virtual Environment

# Create virtual environment
python -m venv venv

# Activate virtual environment
# On Windows:
venv\Scripts\activate
# On macOS/Linux:
source venv/bin/activate

3. Install Dependencies

pip install openai pandas matplotlib seaborn plotly langchain langchain-openai python-dotenv streamlit jupyter

4. Create Environment File

Create a .env file in your project root:

OPENAI_API_KEY=your_openai_api_key_here

5. Project Structure

llm-csv-analyzer/
├── .env
├── requirements.txt
├── data/
│   └── sample_data.csv
├── src/
│   ├── __init__.py
│   ├── openai_analyzer.py
│   ├── langchain_analyzer.py
│   └── utils.py
├── notebooks/
│   └── exploration.ipynb
├── web_app/
│   └── streamlit_app.py
└── README.md

Method 1: OpenAI + Pandas Approach

This approach directly uses OpenAI’s API to generate pandas code based on natural language queries.

Core Implementation

Create src/openai_analyzer.py:

import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from openai import OpenAI
from dotenv import load_dotenv
import ast
import sys
from io import StringIO
import warnings
warnings.filterwarnings('ignore')

load_dotenv()

class OpenAICSVAnalyzer:
    def __init__(self, csv_path):
        self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
        self.df = pd.read_csv(csv_path)
        self.csv_path = csv_path
        self.analysis_history = []
        
        # Get basic info about the dataset
        self.dataset_info = self._get_dataset_info()
        
    def _get_dataset_info(self):
        """Get comprehensive information about the dataset"""
        info = {
            'shape': self.df.shape,
            'columns': list(self.df.columns),
            'dtypes': self.df.dtypes.to_dict(),
            'sample_data': self.df.head(3).to_dict(),
            'null_counts': self.df.isnull().sum().to_dict(),
            'numeric_columns': self.df.select_dtypes(include=['number']).columns.tolist(),
            'categorical_columns': self.df.select_dtypes(include=['object']).columns.tolist()
        }
        return info
    
    def _create_system_prompt(self):
        """Create a comprehensive system prompt with dataset context"""
        prompt = f"""
You are an expert data analyst. You have access to a pandas DataFrame called 'df' with the following characteristics:

DATASET INFORMATION:
- Shape: {self.dataset_info['shape']} (rows, columns)
- Columns: {', '.join(self.dataset_info['columns'])}
- Data types: {self.dataset_info['dtypes']}
- Numeric columns: {', '.join(self.dataset_info['numeric_columns'])}
- Categorical columns: {', '.join(self.dataset_info['categorical_columns'])}
- Missing values: {self.dataset_info['null_counts']}

SAMPLE DATA:
{pd.DataFrame(self.dataset_info['sample_data']).to_string()}

INSTRUCTIONS:
1. Generate ONLY executable pandas/matplotlib/seaborn/plotly code
2. Always use the variable name 'df' for the DataFrame
3. Handle missing values appropriately
4. For visualizations, use matplotlib, seaborn, or plotly
5. Print results using print() statements
6. Include comments explaining your approach
7. Make sure code is production-ready and handles edge cases

AVAILABLE LIBRARIES:
- pandas as pd
- matplotlib.pyplot as plt
- seaborn as sns
- plotly.express as px
- plotly.graph_objects as go
- numpy as np (if needed)

RESPONSE FORMAT:
Return ONLY the Python code, no explanations outside the code comments.
"""
        return prompt
    
    def ask_question(self, question, include_visualization=True):
        """Process a natural language question about the data"""
        
        # Create the prompt
        system_prompt = self._create_system_prompt()
        
        user_prompt = f"""
Question: {question}

Please generate pandas code to answer this question.
{'Include a relevant visualization if appropriate.' if include_visualization else 'Do not include visualizations.'}
"""
        
        try:
            # Get response from OpenAI
            response = self.client.chat.completions.create(
                model="gpt-4",
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                max_tokens=1500,
                temperature=0.1
            )
            
            generated_code = response.choices[0].message.content.strip()
            
            # Clean the code (remove markdown formatting if present)
            if generated_code.startswith('```python'):
                generated_code = generated_code[9:-3]
            elif generated_code.startswith('```'):
                generated_code = generated_code[3:-3]
            
            # Execute the code
            result = self._execute_code(generated_code)
            
            # Store in history
            self.analysis_history.append({
                'question': question,
                'code': generated_code,
                'result': result,
                'success': result['success']
            })
            
            return {
                'question': question,
                'generated_code': generated_code,
                'execution_result': result,
                'success': result['success']
            }
            
        except Exception as e:
            error_result = {
                'question': question,
                'generated_code': '',
                'execution_result': {'success': False, 'error': str(e), 'output': ''},
                'success': False
            }
            self.analysis_history.append(error_result)
            return error_result
    
    def _execute_code(self, code):
        """Safely execute the generated pandas code"""
        
        # Create a safe execution environment
        safe_globals = {
            'df': self.df.copy(),
            'pd': pd,
            'plt': plt,
            'sns': sns,
            'px': px,
            'go': go,
            'print': print
        }
        
        # Capture output
        old_stdout = sys.stdout
        sys.stdout = captured_output = StringIO()
        
        try:
            # Execute the code
            exec(code, safe_globals)
            
            # Get the output
            output = captured_output.getvalue()
            
            return {
                'success': True,
                'output': output,
                'error': None
            }
            
        except Exception as e:
            return {
                'success': False,
                'output': captured_output.getvalue(),
                'error': str(e)
            }
        finally:
            sys.stdout = old_stdout
    
    def get_summary(self):
        """Get a summary of the dataset"""
        summary_question = "Provide a comprehensive summary of this dataset including key statistics, data types, and any interesting patterns you notice."
        return self.ask_question(summary_question, include_visualization=True)
    
    def suggest_questions(self):
        """Generate suggested questions based on the data"""
        prompt = f"""
Based on this dataset with columns {self.dataset_info['columns']} and shape {self.dataset_info['shape']}, 
suggest 5 interesting analytical questions that would provide valuable insights.

Format as a simple list:
1. Question 1
2. Question 2
...
"""
        
        try:
            response = self.client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "user", "content": prompt}],
                max_tokens=300,
                temperature=0.7
            )
            
            return response.choices[0].message.content.strip()
            
        except Exception as e:
            return f"Error generating suggestions: {str(e)}"

Usage Examples

Create a sample script examples/basic_usage.py:

from src.openai_analyzer import OpenAICSVAnalyzer
import pandas as pd

# Create sample data
sample_data = {
    'product': ['Laptop', 'Phone', 'Tablet', 'Watch', 'Headphones'] * 100,
    'price': [800, 600, 400, 300, 150] * 100,
    'sales': [50, 80, 60, 40, 90] * 100,
    'rating': [4.5, 4.2, 4.0, 3.8, 4.6] * 100,
    'category': ['Electronics', 'Electronics', 'Electronics', 'Wearables', 'Audio'] * 100
}
df = pd.DataFrame(sample_data)
df.to_csv('data/sample_sales.csv', index=False)

# Initialize analyzer
analyzer = OpenAICSVAnalyzer('data/sample_sales.csv')

# Ask questions
questions = [
    "What are the top 5 products by total sales revenue?",
    "Show me the correlation between price and rating",
    "What's the average price by category?",
    "Create a visualization showing sales distribution by product",
    "Which product has the highest profit margin if cost is 60% of price?"
]

for question in questions:
    print(f"\n{'='*60}")
    print(f"QUESTION: {question}")
    print('='*60)
    
    result = analyzer.ask_question(question)
    
    if result['success']:
        print("GENERATED CODE:")
        print(result['generated_code'])
        print("\nOUTPUT:")
        print(result['execution_result']['output'])
    else:
        print("ERROR:")
        print(result['execution_result']['error'])

Method 2: LangChain Agents Approach

This approach uses LangChain’s agent framework for more sophisticated analysis workflows.

Core Implementation

Create src/langchain_analyzer.py:

import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from langchain.agents import create_pandas_dataframe_agent
from langchain_openai import ChatOpenAI
from langchain.agents.agent_types import AgentType
from dotenv import load_dotenv
import warnings
warnings.filterwarnings('ignore')

load_dotenv()

class LangChainCSVAnalyzer:
    def __init__(self, csv_path, model_name="gpt-4"):
        self.csv_path = csv_path
        self.df = pd.read_csv(csv_path)
        
        # Initialize the LLM
        self.llm = ChatOpenAI(
            model_name=model_name,
            temperature=0,
            openai_api_key=os.getenv('OPENAI_API_KEY')
        )
        
        # Create the pandas dataframe agent
        self.agent = create_pandas_dataframe_agent(
            llm=self.llm,
            df=self.df,
            verbose=True,
            agent_type=AgentType.OPENAI_FUNCTIONS,
            allow_dangerous_code=True  # Required for code execution
        )
        
        self.conversation_history = []
    
    def ask_question(self, question):
        """Ask a question about the data using the LangChain agent"""
        try:
            # Add context about available plotting libraries
            enhanced_question = f"""
            {question}
            
            You have access to matplotlib.pyplot as plt and seaborn as sns for visualizations.
            If creating plots, make sure to use plt.show() to display them.
            Provide clear, actionable insights based on the data.
            """
            
            # Get response from agent
            response = self.agent.run(enhanced_question)
            
            # Store in conversation history
            self.conversation_history.append({
                'question': question,
                'response': response,
                'success': True
            })
            
            return {
                'question': question,
                'response': response,
                'success': True
            }
            
        except Exception as e:
            error_response = {
                'question': question,
                'response': f"Error: {str(e)}",
                'success': False
            }
            self.conversation_history.append(error_response)
            return error_response
    
    def get_data_summary(self):
        """Get a comprehensive summary of the dataset"""
        summary_prompt = """
        Provide a comprehensive analysis of this dataset including:
        1. Basic statistics (shape, data types, missing values)
        2. Summary statistics for numerical columns
        3. Unique values for categorical columns
        4. Any data quality issues
        5. Suggestions for potential analyses
        """
        return self.ask_question(summary_prompt)
    
    def perform_multi_step_analysis(self, analysis_goal):
        """Perform a complex, multi-step analysis"""
        planning_prompt = f"""
        I want to perform the following analysis: {analysis_goal}
        
        Please break this down into steps and execute each step:
        1. First, examine the relevant columns and data quality
        2. Perform necessary data cleaning or preprocessing
        3. Execute the main analysis
        4. Create appropriate visualizations
        5. Provide insights and conclusions
        
        Execute each step and provide detailed explanations.
        """
        return self.ask_question(planning_prompt)
    
    def compare_segments(self, segment_column, metric_column, comparison_type="mean"):
        """Compare different segments of data"""
        comparison_prompt = f"""
        Compare different segments in the '{segment_column}' column based on '{metric_column}'.
        
        Please:
        1. Calculate {comparison_type} values for each segment
        2. Create a visualization comparing segments
        3. Identify the top and bottom performing segments
        4. Provide insights about the differences
        """
        return self.ask_question(comparison_prompt)
    
    def find_correlations(self, target_column=None):
        """Find correlations in the data"""
        if target_column:
            correlation_prompt = f"""
            Analyze correlations between '{target_column}' and other numerical columns.
            
            Please:
            1. Calculate correlation coefficients
            2. Create a correlation heatmap
            3. Identify the strongest positive and negative correlations
            4. Explain what these correlations might mean
            """
        else:
            correlation_prompt = """
            Analyze correlations between all numerical columns in the dataset.
            
            Please:
            1. Create a correlation matrix
            2. Generate a heatmap visualization
            3. Identify the strongest correlations
            4. Provide insights about relationships in the data
            """
        
        return self.ask_question(correlation_prompt)
    
    def detect_outliers(self, columns=None):
        """Detect outliers in specified columns or all numerical columns"""
        if columns:
            outlier_prompt = f"""
            Detect outliers in the following columns: {', '.join(columns)}
            
            Please:
            1. Use multiple methods (IQR, Z-score, isolation forest if appropriate)
            2. Create box plots to visualize outliers
            3. Show the actual outlier values
            4. Suggest whether these outliers should be investigated or removed
            """
        else:
            outlier_prompt = """
            Detect outliers in all numerical columns of the dataset.
            
            Please:
            1. Use appropriate statistical methods
            2. Create visualizations showing outliers
            3. Summarize findings for each column
            4. Provide recommendations for handling outliers
            """
        
        return self.ask_question(outlier_prompt)
    
    def get_conversation_summary(self):
        """Get a summary of the conversation history"""
        if not self.conversation_history:
            return "No questions have been asked yet."
        
        summary = "Conversation Summary:\n" + "="*50 + "\n"
        for i, item in enumerate(self.conversation_history, 1):
            summary += f"\nQ{i}: {item['question']}\n"
            if item['success']:
                # Truncate long responses
                response = item['response']
                if len(response) > 200:
                    response = response[:200] + "... [truncated]"
                summary += f"A{i}: {response}\n"
            else:
                summary += f"A{i}: [Error occurred]\n"
        
        return summary

Advanced LangChain Features

Create src/advanced_langchain.py:

from langchain.tools import BaseTool
from langchain.agents import Tool
from langchain.memory import ConversationBufferMemory
from typing import Type
from pydantic import BaseModel, Field
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go

class PlotlyVisualizationTool(BaseTool):
    name = "plotly_visualization"
    description = "Create interactive visualizations using Plotly"
    
    def _run(self, query: str) -> str:
        """Execute the visualization request"""
        try:
            # This would contain logic to parse the query and create appropriate Plotly charts
            return f"Created visualization for: {query}"
        except Exception as e:
            return f"Error creating visualization: {str(e)}"
    
    def _arun(self, query: str):
        raise NotImplementedError("Async not implemented")

class AdvancedLangChainAnalyzer:
    def __init__(self, csv_path):
        self.df = pd.read_csv(csv_path)
        
        # Initialize memory for conversation context
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
        
        # Create custom tools
        self.custom_tools = [
            Tool(
                name="Data Statistics",
                func=self._get_statistics,
                description="Get comprehensive statistics about the dataset"
            ),
            Tool(
                name="Create Visualization",
                func=self._create_visualization,
                description="Create various types of visualizations"
            ),
            PlotlyVisualizationTool(),
        ]
    
    def _get_statistics(self, column_name: str) -> str:
        """Get detailed statistics for a specific column"""
        if column_name not in self.df.columns:
            return f"Column '{column_name}' not found in dataset"
        
        col_data = self.df[column_name]
        
        if col_data.dtype in ['int64', 'float64']:
            stats = {
                'count': col_data.count(),
                'mean': col_data.mean(),
                'median': col_data.median(),
                'std': col_data.std(),
                'min': col_data.min(),
                'max': col_data.max(),
                'missing': col_data.isnull().sum()
            }
            return f"Statistics for {column_name}: {stats}"
        else:
            stats = {
                'count': col_data.count(),
                'unique': col_data.nunique(),
                'top_value': col_data.mode().iloc[0] if len(col_data.mode()) > 0 else 'N/A',
                'missing': col_data.isnull().sum()
            }
            return f"Statistics for {column_name}: {stats}"
    
    def _create_visualization(self, chart_request: str) -> str:
        """Create visualizations based on natural language requests"""
        # This would contain logic to parse the request and create appropriate charts
        return f"Created visualization: {chart_request}"

Advanced Features

1. Data Quality Assessment

Create src/data_quality.py:

import pandas as pd
import numpy as np
from typing import Dict, List, Any

class DataQualityAnalyzer:
    def __init__(self, df: pd.DataFrame):
        self.df = df
    
    def assess_data_quality(self) -> Dict[str, Any]:
        """Comprehensive data quality assessment"""
        assessment = {
            'basic_info': self._get_basic_info(),
            'missing_data': self._analyze_missing_data(),
            'duplicates': self._analyze_duplicates(),
            'data_types': self._analyze_data_types(),
            'outliers': self._detect_outliers(),
            'consistency': self._check_consistency(),
            'completeness_score': self._calculate_completeness_score()
        }
        return assessment
    
    def _get_basic_info(self) -> Dict[str, Any]:
        return {
            'shape': self.df.shape,
            'memory_usage': self.df.memory_usage(deep=True).sum(),
            'columns': list(self.df.columns)
        }
    
    def _analyze_missing_data(self) -> Dict[str, Any]:
        missing_data = self.df.isnull().sum()
        missing_percentage = (missing_data / len(self.df)) * 100
        
        return {
            'missing_counts': missing_data.to_dict(),
            'missing_percentages': missing_percentage.to_dict(),
            'columns_with_missing': missing_data[missing_data > 0].index.tolist()
        }
    
    def _analyze_duplicates(self) -> Dict[str, Any]:
        duplicate_rows = self.df.duplicated().sum()
        duplicate_percentage = (duplicate_rows / len(self.df)) * 100
        
        return {
            'duplicate_rows': int(duplicate_rows),
            'duplicate_percentage': float(duplicate_percentage),
            'unique_rows': len(self.df) - duplicate_rows
        }
    
    def _analyze_data_types(self) -> Dict[str, Any]:
        type_counts = self.df.dtypes.value_counts().to_dict()
        
        return {
            'type_distribution': {str(k): int(v) for k, v in type_counts.items()},
            'column_types': self.df.dtypes.to_dict()
        }
    
    def _detect_outliers(self) -> Dict[str, Any]:
        numeric_columns = self.df.select_dtypes(include=[np.number]).columns
        outliers = {}
        
        for col in numeric_columns:
            Q1 = self.df[col].quantile(0.25)
            Q3 = self.df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            column_outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]
            outliers[col] = {
                'count': len(column_outliers),
                'percentage': (len(column_outliers) / len(self.df)) * 100,
                'bounds': {'lower': lower_bound, 'upper': upper_bound}
            }
        
        return outliers
    
    def _check_consistency(self) -> Dict[str, Any]:
        consistency_issues = []
        
        # Check for mixed case in string columns
        string_columns = self.df.select_dtypes(include=['object']).columns
        for col in string_columns:
            if self.df[col].dtype == 'object':
                unique_values = self.df[col].dropna().unique()
                if len(unique_values) != len([str(val).upper() for val in unique_values]):
                    consistency_issues.append(f"Mixed case in column: {col}")
        
        return {'issues': consistency_issues}
    
    def _calculate_completeness_score(self) -> float:
        """Calculate overall data completeness score"""
        total_cells = self.df.size
        missing_cells = self.df.isnull().sum().sum()
        completeness = ((total_cells - missing_cells) / total_cells) * 100
        return round(completeness, 2)

2. Smart Query Suggestions

Create src/query_suggestions.py:

import pandas as pd
from typing import List, Dict
import re

class SmartQuerySuggester:
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.numeric_columns = df.select_dtypes(include=['number']).columns.tolist()
        self.categorical_columns = df.select_dtypes(include=['object']).columns.tolist()
        self.datetime_columns = df.select_dtypes(include=['datetime64']).columns.tolist()
    
    def generate_suggestions(self) -> Dict[str, List[str]]:
        """Generate smart query suggestions based on data characteristics"""
        suggestions = {
            'descriptive': self._generate_descriptive_queries(),
            'comparative': self._generate_comparative_queries(),
            'correlation': self._generate_correlation_queries(),
            'trend': self._generate_trend_queries(),
            'distribution': self._generate_distribution_queries()
        }
        return suggestions
    
    def _generate_descriptive_queries(self) -> List[str]:
        """Generate descriptive analysis queries"""
        queries = [
            "What are the basic statistics of this dataset?",
            "Show me a summary of all numerical columns",
            "What are the data types and missing values in each column?"
        ]
        
        if self.numeric_columns:
            queries.extend([
                f"What are the top 10 highest values in {self.numeric_columns[0]}?",
                f"What is the distribution of {self.numeric_columns[0]}?"
            ])
        
        if self.categorical_columns:
            queries.extend([
                f"What are the unique values in {self.categorical_columns[0]}?",
                f"Show me the frequency distribution of {self.categorical_columns[0]}"
            ])
        
        return queries
    
    def _generate_comparative_queries(self) -> List[str]:
        """Generate comparative analysis queries"""
        queries = []
        
        if len(self.categorical_columns) >= 1 and len(self.numeric_columns) >= 1:
            cat_col = self.categorical_columns[0]
            num_col = self.numeric_columns[0]
            queries.extend([
                f"Compare the average {num_col} across different {cat_col}",
                f"Which {cat_col} has the highest {num_col}?",
                f"Show me a box plot of {num_col} by {cat_col}"
            ])
        
        if len(self.numeric_columns) >= 2:
            queries.extend([
                f"Compare {self.numeric_columns[0]} vs {self.numeric_columns[1]}",
                "Which numerical columns have the most similar distributions?"
            ])
        
        return queries
    
    def _generate_correlation_queries(self) -> List[str]:
        """Generate correlation analysis queries"""
        queries = []
        
        if len(self.numeric_columns) >= 2:
            queries.extend([
                "Show me the correlation matrix for all numerical columns",
                f"What is the correlation between {self.numeric_columns[0]} and {self.numeric_columns[1]}?",
                "Which variables are most strongly correlated?",
                "Create a heatmap of correlations"
            ])
        
        return queries
    
    def _generate_trend_queries(self) -> List[str]:
        """Generate trend analysis queries"""
        queries = []
        
        if self.datetime_columns and self.numeric_columns:
            date_col = self.datetime_columns[0]
            num_col = self.numeric_columns[0]
            queries.extend([
                f"Show me the trend of {num_col} over {date_col}",
                f"What is the monthly/yearly pattern in {num_col}?",
                "Are there any seasonal patterns in the data?"
            ])
        
        return queries
    
    def _generate_distribution_queries(self) -> List[str]:
        """Generate distribution analysis queries"""
        queries = []
        
        if self.numeric_columns:
            queries.extend([
                f"Show me the distribution of {self.numeric_columns[0]}",
                "Which columns have normal distributions?",
                "Are there any outliers in the numerical data?",
                "Create histograms for all numerical columns"
            ])
        
        return queries

Building a Web Interface

Create web_app/streamlit_app.py:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from io import StringIO
import sys
import os

# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.openai_analyzer import OpenAICSVAnalyzer
from src.langchain_analyzer import LangChainCSVAnalyzer
from src.data_quality import DataQualityAnalyzer
from src.query_suggestions import SmartQuerySuggester

# Page configuration
st.set_page_config(
    page_title="LLM-Powered CSV Analyzer",
    page_icon="📊",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Custom CSS
st.markdown("""
<style>
    .main > div {
        padding-top: 2rem;
    }
    .stAlert {
        margin-top: 1rem;
    }
    .metric-card {
        background-color: #f0f2f6;
        padding: 1rem;
        border-radius: 0.5rem;
        margin: 0.5rem 0;
    }
</style>
""", unsafe_allow_html=True)

def main():
    st.title("🤖 LLM-Powered CSV Analyzer")
    st.markdown("Ask questions about your CSV data in plain English!")

    # Sidebar for configuration
    with st.sidebar:
        st.header("Configuration")
        
        # API Key input
        api_key = st.text_input("OpenAI API Key", type="password", 
                               help="Enter your OpenAI API key")
        
        if api_key:
            os.environ['OPENAI_API_KEY'] = api_key
        
        # Method selection
        analysis_method = st.selectbox(
            "Choose Analysis Method",
            ["OpenAI + Pandas", "LangChain Agents"],
            help="Select the AI method for analyzing your data"
        )
        
        # Model selection
        if analysis_method == "OpenAI + Pandas":
            model_choice = st.selectbox("Model", ["gpt-4", "gpt-3.5-turbo"])
        else:
            model_choice = st.selectbox("Model", ["gpt-4", "gpt-3.5-turbo"])

    # File upload
    uploaded_file = st.file_uploader(
        "Upload your CSV file",
        type=['csv'],
        help="Upload a CSV file to start analyzing"
    )

    if uploaded_file is not None:
        try:
            # Load the data
            df = pd.read_csv(uploaded_file)
            
            # Save uploaded file temporarily
            temp_path = f"temp_{uploaded_file.name}"
            df.to_csv(temp_path, index=False)
            
            # Display basic info
            st.success(f"✅ File uploaded successfully! Shape: {df.shape}")
            
            # Create tabs for different functionalities
            tab1, tab2, tab3, tab4, tab5 = st.tabs([
                "📊 Data Overview", 
                "🤖 AI Analysis", 
                "📈 Data Quality", 
                "💡 Smart Suggestions",
                "📋 Analysis History"
            ])
            
            # Initialize analyzers
            if api_key:
                if analysis_method == "OpenAI + Pandas":
                    analyzer = OpenAICSVAnalyzer(temp_path)
                else:
                    analyzer = LangChainCSVAnalyzer(temp_path, model_choice)
                
                quality_analyzer = DataQualityAnalyzer(df)
                suggester = SmartQuerySuggester(df)
            
            with tab1:
                display_data_overview(df)
            
            with tab2:
                if api_key:
                    display_ai_analysis(analyzer, analysis_method)
                else:
                    st.warning("Please enter your OpenAI API key in the sidebar to use AI analysis.")
            
            with tab3:
                if api_key:
                    display_data_quality(quality_analyzer)
                else:
                    st.warning("Please enter your OpenAI API key in the sidebar.")
            
            with tab4:
                if api_key:
                    display_smart_suggestions(suggester, analyzer if api_key else None)
                else:
                    st.warning("Please enter your OpenAI API key in the sidebar.")
            
            with tab5:
                if api_key and 'analysis_history' in st.session_state:
                    display_analysis_history()
                else:
                    st.info("Analysis history will appear here after you ask questions.")
            
            # Clean up temp file
            if os.path.exists(temp_path):
                os.remove(temp_path)
                
        except Exception as e:
            st.error(f"Error processing file: {str(e)}")
    else:
        st.info("👆 Upload a CSV file to get started!")
        
        # Show example data
        st.subheader("Example Data Format")
        example_data = pd.DataFrame({
            'product': ['Laptop', 'Phone', 'Tablet'],
            'price': [1000, 800, 600],
            'sales': [50, 120, 80],
            'rating': [4.5, 4.2, 4.0]
        })
        st.dataframe(example_data)

def display_data_overview(df):
    """Display basic data overview"""
    st.subheader("📊 Data Overview")
    
    # Basic metrics
    col1, col2, col3, col4 = st.columns(4)
    
    with col1:
        st.metric("Rows", f"{df.shape[0]:,}")
    with col2:
        st.metric("Columns", df.shape[1])
    with col3:
        st.metric("Missing Values", f"{df.isnull().sum().sum():,}")
    with col4:
        st.metric("Memory Usage", f"{df.memory_usage(deep=True).sum() / 1024:.1f} KB")
    
    # Data preview
    st.subheader("Data Preview")
    st.dataframe(df.head(10), use_container_width=True)
    
    # Column information
    st.subheader("Column Information")
    col_info = pd.DataFrame({
        'Column': df.columns,
        'Data Type': df.dtypes.astype(str),
        'Non-Null Count': df.count(),
        'Null Count': df.isnull().sum(),
        'Unique Values': df.nunique()
    })
    st.dataframe(col_info, use_container_width=True)

def display_ai_analysis(analyzer, method):
    """Display AI analysis interface"""
    st.subheader("🤖 AI-Powered Analysis")
    
    # Initialize session state for history
    if 'analysis_history' not in st.session_state:
        st.session_state.analysis_history = []
    
    # Question input
    question = st.text_area(
        "Ask a question about your data:",
        placeholder="e.g., What are the top 5 products by sales? Show me the correlation between price and rating.",
        height=100
    )
    
    # Analysis options
    col1, col2 = st.columns([3, 1])
    with col1:
        include_viz = st.checkbox("Include visualizations", value=True)
    with col2:
        analyze_button = st.button("🔍 Analyze", type="primary")
    
    if analyze_button and question:
        with st.spinner("🤖 AI is analyzing your data..."):
            try:
                if method == "OpenAI + Pandas":
                    result = analyzer.ask_question(question, include_visualization=include_viz)
                else:
                    result = analyzer.ask_question(question)
                
                # Store in session state
                st.session_state.analysis_history.append(result)
                
                # Display results
                if result['success']:
                    st.success("✅ Analysis completed!")
                    
                    # Show generated code (for OpenAI method)
                    if method == "OpenAI + Pandas":
                        with st.expander("🔧 Generated Code"):
                            st.code(result['generated_code'], language='python')
                    
                    # Show results
                    st.subheader("📊 Results")
                    if method == "OpenAI + Pandas":
                        output = result['execution_result']['output']
                        if output.strip():
                            st.text(output)
                        else:
                            st.info("Analysis completed - check for any generated visualizations above.")
                    else:
                        st.write(result['response'])
                        
                else:
                    st.error("❌ Analysis failed")
                    if method == "OpenAI + Pandas":
                        st.error(result['execution_result']['error'])
                    else:
                        st.error(result['response'])
                        
            except Exception as e:
                st.error(f"Error during analysis: {str(e)}")
    
    # Quick analysis buttons
    st.subheader("🚀 Quick Analysis")
    quick_col1, quick_col2, quick_col3 = st.columns(3)
    
    with quick_col1:
        if st.button("📈 Get Data Summary"):
            with st.spinner("Generating summary..."):
                if method == "OpenAI + Pandas":
                    result = analyzer.get_summary()
                else:
                    result = analyzer.get_data_summary()
                st.session_state.analysis_history.append(result)
                st.rerun()
    
    with quick_col2:
        if st.button("🔍 Find Correlations"):
            question = "Show me correlations between numerical columns with a heatmap"
            with st.spinner("Finding correlations..."):
                if method == "OpenAI + Pandas":
                    result = analyzer.ask_question(question)
                else:
                    result = analyzer.find_correlations()
                st.session_state.analysis_history.append(result)
                st.rerun()
    
    with quick_col3:
        if st.button("⚠️ Detect Outliers"):
            question = "Detect and visualize outliers in numerical columns"
            with st.spinner("Detecting outliers..."):
                if method == "OpenAI + Pandas":
                    result = analyzer.ask_question(question)
                else:
                    result = analyzer.detect_outliers()
                st.session_state.analysis_history.append(result)
                st.rerun()

def display_data_quality(quality_analyzer):
    """Display data quality assessment"""
    st.subheader("📈 Data Quality Assessment")
    
    with st.spinner("Analyzing data quality..."):
        quality_report = quality_analyzer.assess_data_quality()
    
    # Overall completeness score
    completeness = quality_report['completeness_score']
    st.metric("Overall Data Completeness", f"{completeness}%")
    
    # Create quality indicator
    if completeness >= 95:
        st.success("🟢 Excellent data quality!")
    elif completeness >= 80:
        st.warning("🟡 Good data quality with some issues")
    else:
        st.error("🔴 Data quality needs attention")
    
    # Missing data analysis
    st.subheader("Missing Data Analysis")
    missing_data = quality_report['missing_data']
    
    if missing_data['columns_with_missing']:
        missing_df = pd.DataFrame({
            'Column': missing_data['columns_with_missing'],
            'Missing Count': [missing_data['missing_counts'][col] for col in missing_data['columns_with_missing']],
            'Missing %': [f"{missing_data['missing_percentages'][col]:.1f}%" for col in missing_data['columns_with_missing']]
        })
        st.dataframe(missing_df, use_container_width=True)
        
        # Visualization of missing data
        fig = px.bar(missing_df, x='Column', y='Missing Count', 
                     title='Missing Values by Column')
        st.plotly_chart(fig, use_container_width=True)
    else:
        st.success("✅ No missing values detected!")
    
    # Duplicates analysis
    st.subheader("Duplicate Records")
    duplicates = quality_report['duplicates']
    
    col1, col2, col3 = st.columns(3)
    with col1:
        st.metric("Duplicate Rows", duplicates['duplicate_rows'])
    with col2:
        st.metric("Duplicate %", f"{duplicates['duplicate_percentage']:.1f}%")
    with col3:
        st.metric("Unique Rows", duplicates['unique_rows'])
    
    # Outliers analysis
    st.subheader("Outliers Detection")
    outliers = quality_report['outliers']
    
    if outliers:
        outlier_data = []
        for col, info in outliers.items():
            outlier_data.append({
                'Column': col,
                'Outlier Count': info['count'],
                'Outlier %': f"{info['percentage']:.1f}%",
                'Lower Bound': f"{info['bounds']['lower']:.2f}",
                'Upper Bound': f"{info['bounds']['upper']:.2f}"
            })
        
        outlier_df = pd.DataFrame(outlier_data)
        st.dataframe(outlier_df, use_container_width=True)
    else:
        st.info("No numerical columns available for outlier detection.")

def display_smart_suggestions(suggester, analyzer=None):
    """Display smart query suggestions"""
    st.subheader("💡 Smart Query Suggestions")
    
    with st.spinner("Generating smart suggestions..."):
        suggestions = suggester.generate_suggestions()
    
    # Display suggestions by category
    for category, queries in suggestions.items():
        if queries:  # Only show categories with suggestions
            st.subheader(f"{category.title()} Analysis")
            
            for i, query in enumerate(queries):
                col1, col2 = st.columns([4, 1])
                with col1:
                    st.write(f"• {query}")
                with col2:
                    if analyzer and st.button(f"Run", key=f"{category}_{i}"):
                        with st.spinner("Running analysis..."):
                            if hasattr(analyzer, 'ask_question'):
                                result = analyzer.ask_question(query)
                                if 'analysis_history' not in st.session_state:
                                    st.session_state.analysis_history = []
                                st.session_state.analysis_history.append(result)
                                st.success("Analysis added to history!")
                                st.rerun()

def display_analysis_history():
    """Display analysis history"""
    st.subheader("📋 Analysis History")
    
    if 'analysis_history' not in st.session_state or not st.session_state.analysis_history:
        st.info("No analysis history available.")
        return
    
    # Clear history button
    if st.button("🗑️ Clear History"):
        st.session_state.analysis_history = []
        st.rerun()
    
    # Display each analysis
    for i, analysis in enumerate(reversed(st.session_state.analysis_history)):
        with st.expander(f"Analysis {len(st.session_state.analysis_history) - i}: {analysis['question'][:50]}..."):
            st.write(f"**Question:** {analysis['question']}")
            
            if analysis['success']:
                st.success("✅ Successful")
                
                # Show code if available (OpenAI method)
                if 'generated_code' in analysis:
                    st.subheader("Generated Code:")
                    st.code(analysis['generated_code'], language='python')
                
                # Show output
                st.subheader("Output:")
                if 'execution_result' in analysis:
                    output = analysis['execution_result']['output']
                    if output.strip():
                        st.text(output)
                    else:
                        st.info("No text output (visualization may have been generated)")
                elif 'response' in analysis:
                    st.write(analysis['response'])
            else:
                st.error("❌ Failed")
                if 'execution_result' in analysis:
                    st.error(analysis['execution_result']['error'])
                elif 'response' in analysis:
                    st.error(analysis['response'])

if __name__ == "__main__":
    main()

Best Practices & Security

Security Considerations

# src/security.py
import ast
import re
from typing import List, Set

class CodeSecurityAnalyzer:
    """Analyze generated code for security risks"""
    
    DANGEROUS_IMPORTS = {
        'os', 'subprocess', 'sys', 'eval', 'exec', 'open', 
        'file', '__import__', 'compile', 'globals', 'locals'
    }
    
    DANGEROUS_FUNCTIONS = {
        'eval', 'exec', 'compile', '__import__', 'getattr', 
        'setattr', 'delattr', 'globals', 'locals', 'vars'
    }
    
    DANGEROUS_METHODS = {
        'system', 'popen', 'spawn', 'fork', 'kill'
    }
    
    def __init__(self):
        self.risks = []
    
    def analyze_code(self, code: str) -> dict:
        """Analyze code for security risks"""
        self.risks = []
        
        try:
            # Parse the code
            tree = ast.parse(code)
            
            # Check for dangerous imports
            self._check_imports(tree)
            
            # Check for dangerous function calls
            self._check_function_calls(tree)
            
            # Check for file operations
            self._check_file_operations(code)
            
            # Check for network operations
            self._check_network_operations(code)
            
            return {
                'is_safe': len(self.risks) == 0,
                'risks': self.risks,
                'risk_level': self._calculate_risk_level()
            }
            
        except SyntaxError as e:
            return {
                'is_safe': False,
                'risks': [f"Syntax error in code: {str(e)}"],
                'risk_level': 'HIGH'
            }
    
    def _check_imports(self, tree):
        """Check for dangerous imports"""
        for node in ast.walk(tree):
            if isinstance(node, ast.Import):
                for alias in node.names:
                    if alias.name in self.DANGEROUS_IMPORTS:
                        self.risks.append(f"Dangerous import: {alias.name}")
            
            elif isinstance(node, ast.ImportFrom):
                if node.module in self.DANGEROUS_IMPORTS:
                    self.risks.append(f"Dangerous import: {node.module}")
    
    def _check_function_calls(self, tree):
        """Check for dangerous function calls"""
        for node in ast.walk(tree):
            if isinstance(node, ast.Call):
                if isinstance(node.func, ast.Name):
                    if node.func.id in self.DANGEROUS_FUNCTIONS:
                        self.risks.append(f"Dangerous function call: {node.func.id}")
    
    def _check_file_operations(self, code: str):
        """Check for file operations"""
        file_patterns = [
            r'open\s*\(',
            r'with\s+open\s*\(',
            r'\.write\s*\(',
            r'\.read\s*\(',
            r'os\.remove',
            r'os\.unlink'
        ]
        
        for pattern in file_patterns:
            if re.search(pattern, code):
                self.risks.append(f"File operation detected: {pattern}")
    
    def _check_network_operations(self, code: str):
        """Check for network operations"""
        network_patterns = [
            r'requests\.',
            r'urllib\.',
            r'http\.',
            r'socket\.',
            r'smtplib\.'
        ]
        
        for pattern in network_patterns:
            if re.search(pattern, code):
                self.risks.append(f"Network operation detected: {pattern}")
    
    def _calculate_risk_level(self) -> str:
        """Calculate overall risk level"""
        if len(self.risks) == 0:
            return 'SAFE'
        elif len(self.risks) <= 2:
            return 'LOW'
        elif len(self.risks) <= 5:
            return 'MEDIUM'
        else:
            return 'HIGH'

Performance Optimization

# src/performance.py
import time
import functools
import pandas as pd
from typing import Callable, Any

def performance_monitor(func: Callable) -> Callable:
    """Decorator to monitor function performance"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        execution_time = end_time - start_time
        print(f"{func.__name__} executed in {execution_time:.2f} seconds")
        
        return result
    return wrapper

class DataFrameOptimizer:
    """Optimize DataFrame operations for better performance"""
    
    @staticmethod
    def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
        """Optimize DataFrame data types for memory efficiency"""
        optimized_df = df.copy()
        
        # Optimize integer columns
        for col in optimized_df.select_dtypes(include=['int64']).columns:
            col_min = optimized_df[col].min()
            col_max = optimized_df[col].max()
            
            if col_min >= 0:
                if col_max < 255:
                    optimized_df[col] = optimized_df[col].astype('uint8')
                elif col_max < 65535:
                    optimized_df[col] = optimized_df[col].astype('uint16')
                elif col_max < 4294967295:
                    optimized_df[col] = optimized_df[col].astype('uint32')
            else:
                if col_min > -128 and col_max < 127:
                    optimized_df[col] = optimized_df[col].astype('int8')
                elif col_min > -32768 and col_max < 32767:
                    optimized_df[col] = optimized_df[col].astype('int16')
                elif col_min > -2147483648 and col_max < 2147483647:
                    optimized_df[col] = optimized_df[col].astype('int32')
        
        # Optimize float columns
        for col in optimized_df.select_dtypes(include=['float64']).columns:
            optimized_df[col] = pd.to_numeric(optimized_df[col], downcast='float')
        
        # Optimize object columns to category where appropriate
        for col in optimized_df.select_dtypes(include=['object']).columns:
            if optimized_df[col].nunique() / len(optimized_df) < 0.5:
                optimized_df[col] = optimized_df[col].astype('category')
        
        return optimized_df
    
    @staticmethod
    def chunk_large_dataframes(df: pd.DataFrame, chunk_size: int = 10000):
        """Process large DataFrames in chunks"""
        for start in range(0, len(df), chunk_size):
            yield df[start:start + chunk_size]

Troubleshooting

Common Issues and Solutions

# src/troubleshooting.py

class TroubleshootingGuide:
    """Common issues and their solutions"""
    
    COMMON_ERRORS = {
        "KeyError": {
            "description": "Column name not found in DataFrame",
            "solutions": [
                "Check column names with df.columns",
                "Use df.columns.tolist() to see all available columns",
                "Check for extra spaces or different capitalization",
                "Use df.rename() to standardize column names"
            ],
            "example": """
# Check available columns
print("Available columns:", df.columns.tolist())

# Rename columns to remove spaces
df.columns = df.columns.str.strip().str.replace(' ', '_')
"""
        },
        
        "ValueError": {
            "description": "Invalid value for operation",
            "solutions": [
                "Check data types with df.dtypes",
                "Convert data types if necessary",
                "Handle missing values before operations",
                "Check for non-numeric data in numeric columns"
            ],
            "example": """
# Convert to numeric, errors='coerce' turns invalid values to NaN
df['numeric_column'] = pd.to_numeric(df['numeric_column'], errors='coerce')

# Fill or drop missing values
df = df.dropna()  # or df.fillna(0)
"""
        },
        
        "MemoryError": {
            "description": "Not enough memory to process the data",
            "solutions": [
                "Process data in chunks",
                "Optimize data types",
                "Use generators instead of loading all data",
                "Consider using Dask for large datasets"
            ],
            "example": """
# Read CSV in chunks
chunk_list = []
for chunk in pd.read_csv('large_file.csv', chunksize=10000):
    # Process each chunk
    processed_chunk = chunk.groupby('column').sum()
    chunk_list.append(processed_chunk)

# Combine results
result = pd.concat(chunk_list, ignore_index=True)
"""
        },
        
        "OpenAI API Error": {
            "description": "Issues with OpenAI API calls",
            "solutions": [
                "Check API key is valid and set correctly",
                "Verify you have sufficient API credits",
                "Check rate limits and add delays if necessary",
                "Handle API timeouts with retry logic"
            ],
            "example": """
import time
from openai import OpenAI

def make_api_call_with_retry(client, messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model="gpt-4",
                messages=messages
            )
            return response
        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
                continue
            raise e
"""
        }
    }
    
    @classmethod
    def get_solution(cls, error_type: str) -> dict:
        """Get solutions for a specific error type"""
        return cls.COMMON_ERRORS.get(error_type, {
            "description": "Unknown error",
            "solutions": ["Check the error message for specific details"],
            "example": "# No specific example available"
        })
    
    @classmethod
    def diagnose_dataframe_issues(cls, df: pd.DataFrame) -> dict:
        """Diagnose common DataFrame issues"""
        issues = []
        
        # Check for missing values
        missing_count = df.isnull().sum().sum()
        if missing_count > 0:
            issues.append({
                "issue": "Missing Values",
                "count": missing_count,
                "solution": "Use df.dropna() or df.fillna() to handle missing values"
            })
        
        # Check for duplicate rows
        duplicate_count = df.duplicated().sum()
        if duplicate_count > 0:
            issues.append({
                "issue": "Duplicate Rows",
                "count": duplicate_count,
                "solution": "Use df.drop_duplicates() to remove duplicates"
            })
        
        # Check for mixed data types in object columns
        for col in df.select_dtypes(include=['object']).columns:
            sample_values = df[col].dropna().head(100)
            if len(sample_values) > 0:
                types = set(type(val).__name__ for val in sample_values)
                if len(types) > 1:
                    issues.append({
                        "issue": f"Mixed Data Types in {col}",
                        "types": list(types),
                        "solution": f"Clean and standardize data in column {col}"
                    })
        
        # Check memory usage
        memory_mb = df.memory_usage(deep=True).sum() / 1024 / 1024
        if memory_mb > 100:
            issues.append({
                "issue": "High Memory Usage",
                "memory_mb": round(memory_mb, 2),
                "solution": "Consider optimizing data types or processing in chunks"
            })
        
        return {
            "total_issues": len(issues),
            "issues": issues,
            "overall_health": "Good" if len(issues) == 0 else "Needs Attention"
        }

Conclusion

This comprehensive tutorial has shown you how to build a powerful LLM-powered CSV analysis system that can:

Key Features Implemented:

  • Natural language querying of CSV data
  • Automatic code generation and execution
  • Data quality assessment
  • Smart query suggestions
  • Interactive web interface
  • Security and performance optimizations

Two Main Approaches Covered:

  1. OpenAI + Pandas: Direct integration with fine-grained control
  2. LangChain Agents: Framework-based approach with advanced workflows

Production Considerations:

  • Always validate and sanitize generated code
  • Implement proper error handling and logging
  • Monitor API usage and costs
  • Consider caching frequently asked questions
  • Add user authentication for web applications
  • Implement rate limiting to prevent abuse

Next Steps:

  • Add support for multiple file formats (Excel, JSON, etc.)
  • Implement natural language report generation
  • Add collaborative features for team analysis
  • Integrate with databases and data warehouses
  • Build custom visualizations based on query context
  • Add support for time series analysis and forecasting

Security Reminders:

  • Never execute untrusted code in production
  • Validate all user inputs
  • Use environment variables for API keys
  • Implement proper access controls
  • Monitor and log all analysis activities

This system provides a solid foundation for building AI-powered data analysis tools. The modular design allows you to extend functionality, add new data sources, and customize the analysis capabilities based on your specific needs.

Remember to test thoroughly with your specific datasets and use cases, and always prioritize security when deploying to production environments.

Leave a Reply

Your email address will not be published. Required fields are marked *