PRP Template for Pydantic AI Agents

This commit is contained in:
Cole Medin
2025-07-20 08:01:14 -05:00
parent 84d49cf30a
commit 1bcba59231
30 changed files with 6134 additions and 88 deletions

View File

@@ -0,0 +1,191 @@
"""
Basic Chat Agent with Memory and Context
A simple conversational agent that demonstrates core PydanticAI patterns:
- Environment-based model configuration
- System prompts for personality and behavior
- Basic conversation handling with memory
- String output (default, no result_type needed)
"""
import logging
from dataclasses import dataclass
from typing import Optional
from pydantic_settings import BaseSettings
from pydantic import Field
from pydantic_ai import Agent, RunContext
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.models.openai import OpenAIModel
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
class Settings(BaseSettings):
"""Configuration settings for the chat agent."""
# LLM Configuration
llm_provider: str = Field(default="openai")
llm_api_key: str = Field(...)
llm_model: str = Field(default="gpt-4")
llm_base_url: str = Field(default="https://api.openai.com/v1")
class Config:
env_file = ".env"
case_sensitive = False
def get_llm_model() -> OpenAIModel:
"""Get configured LLM model from environment settings."""
try:
settings = Settings()
provider = OpenAIProvider(
base_url=settings.llm_base_url,
api_key=settings.llm_api_key
)
return OpenAIModel(settings.llm_model, provider=provider)
except Exception:
# For testing without env vars
import os
os.environ.setdefault("LLM_API_KEY", "test-key")
settings = Settings()
provider = OpenAIProvider(
base_url=settings.llm_base_url,
api_key="test-key"
)
return OpenAIModel(settings.llm_model, provider=provider)
@dataclass
class ConversationContext:
"""Simple context for conversation state management."""
user_name: Optional[str] = None
conversation_count: int = 0
preferred_language: str = "English"
session_id: Optional[str] = None
SYSTEM_PROMPT = """
You are a friendly and helpful AI assistant.
Your personality:
- Warm and approachable
- Knowledgeable but humble
- Patient and understanding
- Encouraging and supportive
Guidelines:
- Keep responses conversational and natural
- Be helpful without being overwhelming
- Ask follow-up questions when appropriate
- Remember context from the conversation
- Adapt your tone to match the user's needs
"""
# Create the basic chat agent - note: no result_type, defaults to string
chat_agent = Agent(
get_llm_model(),
deps_type=ConversationContext,
system_prompt=SYSTEM_PROMPT
)
@chat_agent.system_prompt
def dynamic_context_prompt(ctx) -> str:
"""Dynamic system prompt that includes conversation context."""
prompt_parts = []
if ctx.deps.user_name:
prompt_parts.append(f"The user's name is {ctx.deps.user_name}.")
if ctx.deps.conversation_count > 0:
prompt_parts.append(f"This is message #{ctx.deps.conversation_count + 1} in your conversation.")
if ctx.deps.preferred_language != "English":
prompt_parts.append(f"The user prefers to communicate in {ctx.deps.preferred_language}.")
return " ".join(prompt_parts) if prompt_parts else ""
async def chat_with_agent(message: str, context: Optional[ConversationContext] = None) -> str:
"""
Main function to chat with the agent.
Args:
message: User's message to the agent
context: Optional conversation context for memory
Returns:
String response from the agent
"""
if context is None:
context = ConversationContext()
# Increment conversation count
context.conversation_count += 1
# Run the agent with the message and context
result = await chat_agent.run(message, deps=context)
return result.data
def chat_with_agent_sync(message: str, context: Optional[ConversationContext] = None) -> str:
"""
Synchronous version of chat_with_agent for simple use cases.
Args:
message: User's message to the agent
context: Optional conversation context for memory
Returns:
String response from the agent
"""
if context is None:
context = ConversationContext()
# Increment conversation count
context.conversation_count += 1
# Run the agent synchronously
result = chat_agent.run_sync(message, deps=context)
return result.data
# Example usage and demonstration
if __name__ == "__main__":
import asyncio
async def demo_conversation():
"""Demonstrate the basic chat agent with a simple conversation."""
print("=== Basic Chat Agent Demo ===\n")
# Create conversation context
context = ConversationContext(
user_name="Alex",
preferred_language="English"
)
# Sample conversation
messages = [
"Hello! My name is Alex, nice to meet you.",
"Can you help me understand what PydanticAI is?",
"That's interesting! What makes it different from other AI frameworks?",
"Thanks for the explanation. Can you recommend some good resources to learn more?"
]
for message in messages:
print(f"User: {message}")
response = await chat_with_agent(message, context)
print(f"Agent: {response}")
print("-" * 50)
# Run the demo
asyncio.run(demo_conversation())

View File

@@ -0,0 +1,214 @@
#!/usr/bin/env python3
"""Conversational CLI with real-time streaming and tool call visibility for Pydantic AI agents."""
import asyncio
import sys
import os
from typing import List
# Add parent directory to Python path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Prompt
from rich.live import Live
from rich.text import Text
from pydantic_ai import Agent
from agents.research_agent import research_agent
from agents.dependencies import ResearchAgentDependencies
from agents.settings import settings
console = Console()
async def stream_agent_interaction(user_input: str, conversation_history: List[str]) -> tuple[str, str]:
"""Stream agent interaction with real-time tool call display."""
try:
# Set up dependencies
research_deps = ResearchAgentDependencies(brave_api_key=settings.brave_api_key)
# Build context with conversation history
context = "\n".join(conversation_history[-6:]) if conversation_history else ""
prompt = f"""Previous conversation:
{context}
User: {user_input}
Respond naturally and helpfully."""
# Stream the agent execution
async with research_agent.iter(prompt, deps=research_deps) as run:
async for node in run:
# Handle user prompt node
if Agent.is_user_prompt_node(node):
pass # Clean start - no processing messages
# Handle model request node - stream the thinking process
elif Agent.is_model_request_node(node):
# Show assistant prefix at the start
console.print("[bold blue]Assistant:[/bold blue] ", end="")
# Stream model request events for real-time text
response_text = ""
async with node.stream(run.ctx) as request_stream:
async for event in request_stream:
# Handle different event types based on their type
event_type = type(event).__name__
if event_type == "PartDeltaEvent":
# Extract content from delta
if hasattr(event, 'delta') and hasattr(event.delta, 'content_delta'):
delta_text = event.delta.content_delta
if delta_text:
console.print(delta_text, end="")
response_text += delta_text
elif event_type == "FinalResultEvent":
console.print() # New line after streaming
# Handle tool calls - this is the key part
elif Agent.is_call_tools_node(node):
# Stream tool execution events
async with node.stream(run.ctx) as tool_stream:
async for event in tool_stream:
event_type = type(event).__name__
if event_type == "FunctionToolCallEvent":
# Extract tool name from the part attribute
tool_name = "Unknown Tool"
args = None
# Check if the part attribute contains the tool call
if hasattr(event, 'part'):
part = event.part
# Check if part has tool_name directly
if hasattr(part, 'tool_name'):
tool_name = part.tool_name
elif hasattr(part, 'function_name'):
tool_name = part.function_name
elif hasattr(part, 'name'):
tool_name = part.name
# Check for arguments in part
if hasattr(part, 'args'):
args = part.args
elif hasattr(part, 'arguments'):
args = part.arguments
# Debug: print part attributes to understand structure
if tool_name == "Unknown Tool" and hasattr(event, 'part'):
part_attrs = [attr for attr in dir(event.part) if not attr.startswith('_')]
console.print(f" [dim red]Debug - Part attributes: {part_attrs}[/dim red]")
# Try to get more details about the part
if hasattr(event.part, '__dict__'):
console.print(f" [dim red]Part dict: {event.part.__dict__}[/dim red]")
console.print(f" 🔹 [cyan]Calling tool:[/cyan] [bold]{tool_name}[/bold]")
# Show tool args if available
if args and isinstance(args, dict):
# Show first few characters of each arg
arg_preview = []
for key, value in list(args.items())[:3]:
val_str = str(value)
if len(val_str) > 50:
val_str = val_str[:47] + "..."
arg_preview.append(f"{key}={val_str}")
console.print(f" [dim]Args: {', '.join(arg_preview)}[/dim]")
elif args:
args_str = str(args)
if len(args_str) > 100:
args_str = args_str[:97] + "..."
console.print(f" [dim]Args: {args_str}[/dim]")
elif event_type == "FunctionToolResultEvent":
# Display tool result
result = str(event.tool_return) if hasattr(event, 'tool_return') else "No result"
if len(result) > 100:
result = result[:97] + "..."
console.print(f" ✅ [green]Tool result:[/green] [dim]{result}[/dim]")
# Handle end node
elif Agent.is_end_node(node):
# Don't show "Processing complete" - keep it clean
pass
# Get final result
final_result = run.result
final_output = final_result.output if hasattr(final_result, 'output') else str(final_result)
# Return both streamed and final content
return (response_text.strip(), final_output)
except Exception as e:
console.print(f"[red]❌ Error: {e}[/red]")
return ("", f"Error: {e}")
async def main():
"""Main conversation loop."""
# Show welcome
welcome = Panel(
"[bold blue]🤖 Pydantic AI Research Assistant[/bold blue]\n\n"
"[green]Real-time tool execution visibility[/green]\n"
"[dim]Type 'exit' to quit[/dim]",
style="blue",
padding=(1, 2)
)
console.print(welcome)
console.print()
conversation_history = []
while True:
try:
# Get user input
user_input = Prompt.ask("[bold green]You").strip()
# Handle exit
if user_input.lower() in ['exit', 'quit']:
console.print("\n[yellow]👋 Goodbye![/yellow]")
break
if not user_input:
continue
# Add to history
conversation_history.append(f"User: {user_input}")
# Stream the interaction and get response
streamed_text, final_response = await stream_agent_interaction(user_input, conversation_history)
# Handle the response display
if streamed_text:
# Response was streamed, just add spacing
console.print()
conversation_history.append(f"Assistant: {streamed_text}")
elif final_response and final_response.strip():
# Response wasn't streamed, display with proper formatting
console.print(f"[bold blue]Assistant:[/bold blue] {final_response}")
console.print()
conversation_history.append(f"Assistant: {final_response}")
else:
# No response
console.print()
except KeyboardInterrupt:
console.print("\n[yellow]Use 'exit' to quit[/yellow]")
continue
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
continue
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,9 @@
# ===== LLM Configuration =====
# Provider: openai, anthropic, gemini, ollama, etc.
LLM_PROVIDER=openai
# Your LLM API key
LLM_API_KEY=sk-your-openai-api-key-here
# LLM to use for the agents (e.g., gpt-4.1-mini, gpt-4.1, claude-4-sonnet)
LLM_CHOICE=gpt-4.1-mini
# Base URL for the LLM API (change for Ollama or other providers)
LLM_BASE_URL=https://api.openai.com/v1

View File

@@ -0,0 +1,103 @@
"""
Core data models for the multi-agent system.
"""
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
class ResearchQuery(BaseModel):
"""Model for research query requests."""
query: str = Field(..., description="Research topic to investigate")
max_results: int = Field(10, ge=1, le=50, description="Maximum number of results to return")
include_summary: bool = Field(True, description="Whether to include AI-generated summary")
class BraveSearchResult(BaseModel):
"""Model for individual Brave search results."""
title: str = Field(..., description="Title of the search result")
url: str = Field(..., description="URL of the search result")
description: str = Field(..., description="Description/snippet from the search result")
score: float = Field(0.0, ge=0.0, le=1.0, description="Relevance score")
class Config:
"""Pydantic configuration."""
json_schema_extra = {
"example": {
"title": "Understanding AI Safety",
"url": "https://example.com/ai-safety",
"description": "A comprehensive guide to AI safety principles...",
"score": 0.95
}
}
class EmailDraft(BaseModel):
"""Model for email draft creation."""
to: List[str] = Field(..., min_length=1, description="List of recipient email addresses")
subject: str = Field(..., min_length=1, description="Email subject line")
body: str = Field(..., min_length=1, description="Email body content")
cc: Optional[List[str]] = Field(None, description="List of CC recipients")
bcc: Optional[List[str]] = Field(None, description="List of BCC recipients")
class Config:
"""Pydantic configuration."""
json_schema_extra = {
"example": {
"to": ["john@example.com"],
"subject": "AI Research Summary",
"body": "Dear John,\n\nHere's the latest research on AI safety...",
"cc": ["team@example.com"]
}
}
class EmailDraftResponse(BaseModel):
"""Response model for email draft creation."""
draft_id: str = Field(..., description="Gmail draft ID")
message_id: str = Field(..., description="Message ID")
thread_id: Optional[str] = Field(None, description="Thread ID if part of a thread")
created_at: datetime = Field(default_factory=datetime.now, description="Draft creation timestamp")
class ResearchEmailRequest(BaseModel):
"""Model for research + email draft request."""
research_query: str = Field(..., description="Topic to research")
email_context: str = Field(..., description="Context for email generation")
recipient_email: str = Field(..., description="Email recipient")
email_subject: Optional[str] = Field(None, description="Optional email subject")
class ResearchResponse(BaseModel):
"""Response model for research queries."""
query: str = Field(..., description="Original research query")
results: List[BraveSearchResult] = Field(..., description="Search results")
summary: Optional[str] = Field(None, description="AI-generated summary of results")
total_results: int = Field(..., description="Total number of results found")
timestamp: datetime = Field(default_factory=datetime.now, description="Query timestamp")
class AgentResponse(BaseModel):
"""Generic agent response model."""
success: bool = Field(..., description="Whether the operation was successful")
data: Optional[Dict[str, Any]] = Field(None, description="Response data")
error: Optional[str] = Field(None, description="Error message if failed")
tools_used: List[str] = Field(default_factory=list, description="List of tools used")
class ChatMessage(BaseModel):
"""Model for chat messages in the CLI."""
role: str = Field(..., description="Message role (user/assistant)")
content: str = Field(..., description="Message content")
timestamp: datetime = Field(default_factory=datetime.now, description="Message timestamp")
tools_used: Optional[List[Dict[str, Any]]] = Field(None, description="Tools used in response")
class SessionState(BaseModel):
"""Model for maintaining session state."""
session_id: str = Field(..., description="Unique session identifier")
user_id: Optional[str] = Field(None, description="User identifier")
messages: List[ChatMessage] = Field(default_factory=list, description="Conversation history")
created_at: datetime = Field(default_factory=datetime.now, description="Session creation time")
last_activity: datetime = Field(default_factory=datetime.now, description="Last activity timestamp")

View File

@@ -0,0 +1,61 @@
"""
Flexible provider configuration for LLM models.
Based on examples/agent/providers.py pattern.
"""
from typing import Optional
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.models.openai import OpenAIModel
from .settings import settings
def get_llm_model(model_choice: Optional[str] = None) -> OpenAIModel:
"""
Get LLM model configuration based on environment variables.
Args:
model_choice: Optional override for model choice
Returns:
Configured OpenAI-compatible model
"""
llm_choice = model_choice or settings.llm_model
base_url = settings.llm_base_url
api_key = settings.llm_api_key
# Create provider based on configuration
provider = OpenAIProvider(base_url=base_url, api_key=api_key)
return OpenAIModel(llm_choice, provider=provider)
def get_model_info() -> dict:
"""
Get information about current model configuration.
Returns:
Dictionary with model configuration info
"""
return {
"llm_provider": settings.llm_provider,
"llm_model": settings.llm_model,
"llm_base_url": settings.llm_base_url,
"app_env": settings.app_env,
"debug": settings.debug,
}
def validate_llm_configuration() -> bool:
"""
Validate that LLM configuration is properly set.
Returns:
True if configuration is valid
"""
try:
# Check if we can create a model instance
get_llm_model()
return True
except Exception as e:
print(f"LLM configuration validation failed: {e}")
return False

View File

@@ -0,0 +1,263 @@
"""
Research Agent that uses Brave Search and can invoke Email Agent.
"""
import logging
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
from .providers import get_llm_model
from .email_agent import email_agent, EmailAgentDependencies
from .tools import search_web_tool
logger = logging.getLogger(__name__)
SYSTEM_PROMPT = """
You are an expert research assistant with the ability to search the web and create email drafts. Your primary goal is to help users find relevant information and communicate findings effectively.
Your capabilities:
1. **Web Search**: Use Brave Search to find current, relevant information on any topic
2. **Email Creation**: Create professional email drafts through Gmail when requested
When conducting research:
- Use specific, targeted search queries
- Analyze search results for relevance and credibility
- Synthesize information from multiple sources
- Provide clear, well-organized summaries
- Include source URLs for reference
When creating emails:
- Use research findings to create informed, professional content
- Adapt tone and detail level to the intended recipient
- Include relevant sources and citations when appropriate
- Ensure emails are clear, concise, and actionable
Always strive to provide accurate, helpful, and actionable information.
"""
@dataclass
class ResearchAgentDependencies:
"""Dependencies for the research agent - only configuration, no tool instances."""
brave_api_key: str
gmail_credentials_path: str
gmail_token_path: str
session_id: Optional[str] = None
# Initialize the research agent
research_agent = Agent(
get_llm_model(),
deps_type=ResearchAgentDependencies,
system_prompt=SYSTEM_PROMPT
)
@research_agent.tool
async def search_web(
ctx: RunContext[ResearchAgentDependencies],
query: str,
max_results: int = 10
) -> List[Dict[str, Any]]:
"""
Search the web using Brave Search API.
Args:
query: Search query
max_results: Maximum number of results to return (1-20)
Returns:
List of search results with title, URL, description, and score
"""
try:
# Ensure max_results is within valid range
max_results = min(max(max_results, 1), 20)
results = await search_web_tool(
api_key=ctx.deps.brave_api_key,
query=query,
count=max_results
)
logger.info(f"Found {len(results)} results for query: {query}")
return results
except Exception as e:
logger.error(f"Web search failed: {e}")
return [{"error": f"Search failed: {str(e)}"}]
@research_agent.tool
async def create_email_draft(
ctx: RunContext[ResearchAgentDependencies],
recipient_email: str,
subject: str,
context: str,
research_summary: Optional[str] = None
) -> Dict[str, Any]:
"""
Create an email draft based on research context using the Email Agent.
Args:
recipient_email: Email address of the recipient
subject: Email subject line
context: Context or purpose for the email
research_summary: Optional research findings to include
Returns:
Dictionary with draft creation results
"""
try:
# Prepare the email content prompt
if research_summary:
email_prompt = f"""
Create a professional email to {recipient_email} with the subject "{subject}".
Context: {context}
Research Summary:
{research_summary}
Please create a well-structured email that:
1. Has an appropriate greeting
2. Provides clear context
3. Summarizes the key research findings professionally
4. Includes actionable next steps if appropriate
5. Ends with a professional closing
The email should be informative but concise, and maintain a professional yet friendly tone.
"""
else:
email_prompt = f"""
Create a professional email to {recipient_email} with the subject "{subject}".
Context: {context}
Please create a well-structured email that addresses the context provided.
"""
# Create dependencies for email agent
email_deps = EmailAgentDependencies(
gmail_credentials_path=ctx.deps.gmail_credentials_path,
gmail_token_path=ctx.deps.gmail_token_path,
session_id=ctx.deps.session_id
)
# Run the email agent
result = await email_agent.run(
email_prompt,
deps=email_deps,
usage=ctx.usage # Pass usage for token tracking
)
logger.info(f"Email agent invoked for recipient: {recipient_email}")
return {
"success": True,
"agent_response": result.data,
"recipient": recipient_email,
"subject": subject,
"context": context
}
except Exception as e:
logger.error(f"Failed to create email draft via Email Agent: {e}")
return {
"success": False,
"error": str(e),
"recipient": recipient_email,
"subject": subject
}
@research_agent.tool
async def summarize_research(
ctx: RunContext[ResearchAgentDependencies],
search_results: List[Dict[str, Any]],
topic: str,
focus_areas: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a comprehensive summary of research findings.
Args:
search_results: List of search result dictionaries
topic: Main research topic
focus_areas: Optional specific areas to focus on
Returns:
Dictionary with research summary
"""
try:
if not search_results:
return {
"summary": "No search results provided for summarization.",
"key_points": [],
"sources": []
}
# Extract key information
sources = []
descriptions = []
for result in search_results:
if "title" in result and "url" in result:
sources.append(f"- {result['title']}: {result['url']}")
if "description" in result:
descriptions.append(result["description"])
# Create summary content
content_summary = "\n".join(descriptions[:5]) # Limit to top 5 descriptions
sources_list = "\n".join(sources[:10]) # Limit to top 10 sources
focus_text = f"\nSpecific focus areas: {focus_areas}" if focus_areas else ""
summary = f"""
Research Summary: {topic}{focus_text}
Key Findings:
{content_summary}
Sources:
{sources_list}
"""
return {
"summary": summary,
"topic": topic,
"sources_count": len(sources),
"key_points": descriptions[:5]
}
except Exception as e:
logger.error(f"Failed to summarize research: {e}")
return {
"summary": f"Failed to summarize research: {str(e)}",
"key_points": [],
"sources": []
}
# Convenience function to create research agent with dependencies
def create_research_agent(
brave_api_key: str,
gmail_credentials_path: str,
gmail_token_path: str,
session_id: Optional[str] = None
) -> Agent:
"""
Create a research agent with specified dependencies.
Args:
brave_api_key: Brave Search API key
gmail_credentials_path: Path to Gmail credentials.json
gmail_token_path: Path to Gmail token.json
session_id: Optional session identifier
Returns:
Configured research agent
"""
return research_agent

View File

@@ -0,0 +1,58 @@
"""
Configuration management using pydantic-settings.
"""
import os
from typing import Optional
from pydantic_settings import BaseSettings
from pydantic import Field, field_validator, ConfigDict
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
class Settings(BaseSettings):
"""Application settings with environment variable support."""
model_config = ConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False
)
# LLM Configuration
llm_provider: str = Field(default="openai")
llm_api_key: str = Field(...)
llm_model: str = Field(default="gpt-4")
llm_base_url: Optional[str] = Field(default="https://api.openai.com/v1")
# Brave Search Configuration
brave_api_key: str = Field(...)
brave_search_url: str = Field(
default="https://api.search.brave.com/res/v1/web/search"
)
# Application Configuration
app_env: str = Field(default="development")
log_level: str = Field(default="INFO")
debug: bool = Field(default=False)
@field_validator("llm_api_key", "brave_api_key")
@classmethod
def validate_api_keys(cls, v):
"""Ensure API keys are not empty."""
if not v or v.strip() == "":
raise ValueError("API key cannot be empty")
return v
# Global settings instance
try:
settings = Settings()
except Exception:
# For testing, create settings with dummy values
import os
os.environ.setdefault("LLM_API_KEY", "test_key")
os.environ.setdefault("BRAVE_API_KEY", "test_key")
settings = Settings()

View File

@@ -0,0 +1,120 @@
"""
Pure tool functions for multi-agent system.
These are standalone functions that can be imported and used by any agent.
"""
import os
import base64
import logging
import httpx
from typing import List, Dict, Any, Optional
from datetime import datetime
from agents.models import BraveSearchResult
logger = logging.getLogger(__name__)
# Brave Search Tool Function
async def search_web_tool(
api_key: str,
query: str,
count: int = 10,
offset: int = 0,
country: Optional[str] = None,
lang: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Pure function to search the web using Brave Search API.
Args:
api_key: Brave Search API key
query: Search query
count: Number of results to return (1-20)
offset: Offset for pagination
country: Country code for localized results
lang: Language code for results
Returns:
List of search results as dictionaries
Raises:
ValueError: If query is empty or API key missing
Exception: If API request fails
"""
if not api_key or not api_key.strip():
raise ValueError("Brave API key is required")
if not query or not query.strip():
raise ValueError("Query cannot be empty")
# Ensure count is within valid range
count = min(max(count, 1), 20)
headers = {
"X-Subscription-Token": api_key,
"Accept": "application/json"
}
params = {
"q": query,
"count": count,
"offset": offset
}
if country:
params["country"] = country
if lang:
params["lang"] = lang
logger.info(f"Searching Brave for: {query}")
async with httpx.AsyncClient() as client:
try:
response = await client.get(
"https://api.search.brave.com/res/v1/web/search",
headers=headers,
params=params,
timeout=30.0
)
# Handle rate limiting
if response.status_code == 429:
raise Exception("Rate limit exceeded. Check your Brave API quota.")
# Handle authentication errors
if response.status_code == 401:
raise Exception("Invalid Brave API key")
# Handle other errors
if response.status_code != 200:
raise Exception(f"Brave API returned {response.status_code}: {response.text}")
data = response.json()
# Extract web results
web_results = data.get("web", {}).get("results", [])
# Convert to our format
results = []
for idx, result in enumerate(web_results):
# Calculate a simple relevance score based on position
score = 1.0 - (idx * 0.05) # Decrease by 0.05 for each position
score = max(score, 0.1) # Minimum score of 0.1
results.append({
"title": result.get("title", ""),
"url": result.get("url", ""),
"description": result.get("description", ""),
"score": score
})
logger.info(f"Found {len(results)} results for query: {query}")
return results
except httpx.RequestError as e:
logger.error(f"Request error during Brave search: {e}")
raise Exception(f"Request failed: {str(e)}")
except Exception as e:
logger.error(f"Error during Brave search: {e}")
raise

View File

@@ -0,0 +1,303 @@
"""
Structured Output Agent for Data Validation
Demonstrates when to use structured outputs with PydanticAI:
- Environment-based model configuration (following main_agent_reference)
- Structured output validation with Pydantic models (result_type specified)
- Data extraction and validation use case
- Professional report generation with consistent formatting
"""
import logging
from dataclasses import dataclass
from typing import Optional, List
from pydantic_settings import BaseSettings
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.models.openai import OpenAIModel
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
class Settings(BaseSettings):
"""Configuration settings for the structured output agent."""
# LLM Configuration
llm_provider: str = Field(default="openai")
llm_api_key: str = Field(...)
llm_model: str = Field(default="gpt-4")
llm_base_url: str = Field(default="https://api.openai.com/v1")
class Config:
env_file = ".env"
case_sensitive = False
def get_llm_model() -> OpenAIModel:
"""Get configured LLM model from environment settings."""
try:
settings = Settings()
provider = OpenAIProvider(
base_url=settings.llm_base_url,
api_key=settings.llm_api_key
)
return OpenAIModel(settings.llm_model, provider=provider)
except Exception:
# For testing without env vars
import os
os.environ.setdefault("LLM_API_KEY", "test-key")
settings = Settings()
provider = OpenAIProvider(
base_url=settings.llm_base_url,
api_key="test-key"
)
return OpenAIModel(settings.llm_model, provider=provider)
@dataclass
class AnalysisDependencies:
"""Dependencies for the analysis agent."""
report_format: str = "business" # business, technical, academic
include_recommendations: bool = True
session_id: Optional[str] = None
class DataInsight(BaseModel):
"""Individual insight extracted from data."""
insight: str = Field(description="The key insight or finding")
confidence: float = Field(ge=0.0, le=1.0, description="Confidence level in this insight")
data_points: List[str] = Field(description="Supporting data points")
class DataAnalysisReport(BaseModel):
"""Structured output for data analysis with validation."""
# Required fields
summary: str = Field(description="Executive summary of the analysis")
key_insights: List[DataInsight] = Field(
min_items=1,
max_items=10,
description="Key insights discovered in the data"
)
# Validated fields
confidence_score: float = Field(
ge=0.0, le=1.0,
description="Overall confidence in the analysis"
)
data_quality: str = Field(
pattern="^(excellent|good|fair|poor)$",
description="Assessment of data quality"
)
# Optional structured fields
recommendations: Optional[List[str]] = Field(
default=None,
description="Actionable recommendations based on findings"
)
limitations: Optional[List[str]] = Field(
default=None,
description="Limitations or caveats in the analysis"
)
# Metadata
analysis_type: str = Field(description="Type of analysis performed")
data_sources: List[str] = Field(description="Sources of data analyzed")
SYSTEM_PROMPT = """
You are an expert data analyst specializing in extracting structured insights from various data sources.
Your role:
- Analyze provided data with statistical rigor
- Extract meaningful insights and patterns
- Assess data quality and reliability
- Provide actionable recommendations
- Structure findings in a consistent, professional format
Guidelines:
- Be objective and evidence-based in your analysis
- Clearly distinguish between facts and interpretations
- Provide confidence levels for your insights
- Highlight both strengths and limitations of the data
- Ensure all outputs follow the required structured format
"""
# Create structured output agent - NOTE: result_type specified for data validation
structured_agent = Agent(
get_llm_model(),
deps_type=AnalysisDependencies,
result_type=DataAnalysisReport, # This is when we DO want structured output
system_prompt=SYSTEM_PROMPT
)
@structured_agent.tool
def analyze_numerical_data(
ctx: RunContext[AnalysisDependencies],
data_description: str,
numbers: List[float]
) -> str:
"""
Analyze numerical data and provide statistical insights.
Args:
data_description: Description of what the numbers represent
numbers: List of numerical values to analyze
Returns:
Statistical analysis summary
"""
try:
if not numbers:
return "No numerical data provided for analysis."
# Basic statistical calculations
count = len(numbers)
total = sum(numbers)
average = total / count
minimum = min(numbers)
maximum = max(numbers)
# Calculate variance and standard deviation
variance = sum((x - average) ** 2 for x in numbers) / count
std_dev = variance ** 0.5
# Simple trend analysis
if count > 1:
trend = "increasing" if numbers[-1] > numbers[0] else "decreasing"
else:
trend = "insufficient data"
analysis = f"""
Statistical Analysis of {data_description}:
- Count: {count} data points
- Average: {average:.2f}
- Range: {minimum:.2f} to {maximum:.2f}
- Standard Deviation: {std_dev:.2f}
- Overall Trend: {trend}
- Data Quality: {'good' if std_dev < average * 0.5 else 'variable'}
"""
logger.info(f"Analyzed {count} data points for: {data_description}")
return analysis.strip()
except Exception as e:
logger.error(f"Error in numerical analysis: {e}")
return f"Error analyzing numerical data: {str(e)}"
async def analyze_data(
data_input: str,
dependencies: Optional[AnalysisDependencies] = None
) -> DataAnalysisReport:
"""
Analyze data and return structured report.
Args:
data_input: Raw data or description to analyze
dependencies: Optional analysis configuration
Returns:
Structured DataAnalysisReport with validation
"""
if dependencies is None:
dependencies = AnalysisDependencies()
result = await structured_agent.run(data_input, deps=dependencies)
return result.data
def analyze_data_sync(
data_input: str,
dependencies: Optional[AnalysisDependencies] = None
) -> DataAnalysisReport:
"""
Synchronous version of analyze_data.
Args:
data_input: Raw data or description to analyze
dependencies: Optional analysis configuration
Returns:
Structured DataAnalysisReport with validation
"""
import asyncio
return asyncio.run(analyze_data(data_input, dependencies))
# Example usage and demonstration
if __name__ == "__main__":
import asyncio
async def demo_structured_output():
"""Demonstrate structured output validation."""
print("=== Structured Output Agent Demo ===\n")
# Sample data scenarios
scenarios = [
{
"title": "Sales Performance Data",
"data": """
Monthly sales data for Q4 2024:
October: $125,000
November: $142,000
December: $158,000
Customer satisfaction scores: 4.2, 4.5, 4.1, 4.6, 4.3
Return rate: 3.2%
"""
},
{
"title": "Website Analytics",
"data": """
Website traffic analysis:
- Daily visitors: 5,200 average
- Bounce rate: 35%
- Page load time: 2.1 seconds
- Conversion rate: 3.8%
- Mobile traffic: 68%
"""
}
]
for scenario in scenarios:
print(f"Analysis: {scenario['title']}")
print(f"Input Data: {scenario['data'][:100]}...")
# Configure for business report
deps = AnalysisDependencies(
report_format="business",
include_recommendations=True
)
try:
report = await analyze_data(scenario['data'], deps)
print(f"Summary: {report.summary}")
print(f"Confidence: {report.confidence_score}")
print(f"Data Quality: {report.data_quality}")
print(f"Key Insights: {len(report.key_insights)} found")
for i, insight in enumerate(report.key_insights, 1):
print(f" {i}. {insight.insight} (confidence: {insight.confidence})")
if report.recommendations:
print(f"Recommendations: {len(report.recommendations)}")
for i, rec in enumerate(report.recommendations, 1):
print(f" {i}. {rec}")
print("=" * 60)
except Exception as e:
print(f"Analysis failed: {e}")
print("=" * 60)
# Run the demo
asyncio.run(demo_structured_output())

View File

@@ -0,0 +1,18 @@
[tool:pytest]
testpaths = .
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--tb=short
--strict-markers
--disable-warnings
markers =
integration: Integration tests
slow: Slow running tests
asyncio: Async tests
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
asyncio_mode = auto

View File

@@ -0,0 +1,399 @@
"""
Comprehensive PydanticAI Testing Examples
Demonstrates testing patterns and best practices for PydanticAI agents:
- TestModel for fast development validation
- FunctionModel for custom behavior testing
- Agent.override() for test isolation
- Pytest fixtures and async testing
- Tool validation and error handling tests
"""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock
from dataclasses import dataclass
from typing import Optional, List
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.test import TestModel, FunctionModel
@dataclass
class TestDependencies:
"""Test dependencies for agent testing."""
database: Mock
api_client: Mock
user_id: str = "test_user_123"
class TestResponse(BaseModel):
"""Test response model for validation."""
message: str
confidence: float = 0.8
actions: List[str] = []
# Create test agent for demonstrations
test_agent = Agent(
model="openai:gpt-4o-mini", # Will be overridden in tests
deps_type=TestDependencies,
result_type=TestResponse,
system_prompt="You are a helpful test assistant."
)
@test_agent.tool
async def mock_database_query(
ctx: RunContext[TestDependencies],
query: str
) -> str:
"""Mock database query tool for testing."""
try:
# Simulate database call
result = await ctx.deps.database.execute_query(query)
return f"Database result: {result}"
except Exception as e:
return f"Database error: {str(e)}"
@test_agent.tool
def mock_api_call(
ctx: RunContext[TestDependencies],
endpoint: str,
data: Optional[dict] = None
) -> str:
"""Mock API call tool for testing."""
try:
# Simulate API call
response = ctx.deps.api_client.post(endpoint, json=data)
return f"API response: {response}"
except Exception as e:
return f"API error: {str(e)}"
class TestAgentBasics:
"""Test basic agent functionality with TestModel."""
@pytest.fixture
def test_dependencies(self):
"""Create mock dependencies for testing."""
return TestDependencies(
database=AsyncMock(),
api_client=Mock(),
user_id="test_user_123"
)
def test_agent_with_test_model(self, test_dependencies):
"""Test agent behavior with TestModel."""
test_model = TestModel()
with test_agent.override(model=test_model):
result = test_agent.run_sync(
"Hello, please help me with a simple task.",
deps=test_dependencies
)
# TestModel returns a JSON summary by default
assert result.data.message is not None
assert isinstance(result.data.confidence, float)
assert isinstance(result.data.actions, list)
def test_agent_custom_test_model_output(self, test_dependencies):
"""Test agent with custom TestModel output."""
test_model = TestModel(
custom_output_text='{"message": "Custom test response", "confidence": 0.9, "actions": ["test_action"]}'
)
with test_agent.override(model=test_model):
result = test_agent.run_sync(
"Test message",
deps=test_dependencies
)
assert result.data.message == "Custom test response"
assert result.data.confidence == 0.9
assert result.data.actions == ["test_action"]
@pytest.mark.asyncio
async def test_agent_async_with_test_model(self, test_dependencies):
"""Test async agent behavior with TestModel."""
test_model = TestModel()
with test_agent.override(model=test_model):
result = await test_agent.run(
"Async test message",
deps=test_dependencies
)
assert result.data.message is not None
assert result.data.confidence >= 0.0
class TestAgentTools:
"""Test agent tool functionality."""
@pytest.fixture
def mock_dependencies(self):
"""Create mock dependencies with configured responses."""
database_mock = AsyncMock()
database_mock.execute_query.return_value = "Test data from database"
api_mock = Mock()
api_mock.post.return_value = {"status": "success", "data": "test_data"}
return TestDependencies(
database=database_mock,
api_client=api_mock,
user_id="test_user_456"
)
@pytest.mark.asyncio
async def test_database_tool_success(self, mock_dependencies):
"""Test database tool with successful response."""
test_model = TestModel(call_tools=['mock_database_query'])
with test_agent.override(model=test_model):
result = await test_agent.run(
"Please query the database for user data",
deps=mock_dependencies
)
# Verify database was called
mock_dependencies.database.execute_query.assert_called()
# TestModel should include tool results
assert "mock_database_query" in result.data.message
@pytest.mark.asyncio
async def test_database_tool_error(self, mock_dependencies):
"""Test database tool with error handling."""
# Configure mock to raise exception
mock_dependencies.database.execute_query.side_effect = Exception("Connection failed")
test_model = TestModel(call_tools=['mock_database_query'])
with test_agent.override(model=test_model):
result = await test_agent.run(
"Query the database",
deps=mock_dependencies
)
# Tool should handle the error gracefully
assert "mock_database_query" in result.data.message
def test_api_tool_with_data(self, mock_dependencies):
"""Test API tool with POST data."""
test_model = TestModel(call_tools=['mock_api_call'])
with test_agent.override(model=test_model):
result = test_agent.run_sync(
"Make an API call to create a new record",
deps=mock_dependencies
)
# Verify API was called
mock_dependencies.api_client.post.assert_called()
# Check tool execution in response
assert "mock_api_call" in result.data.message
class TestAgentWithFunctionModel:
"""Test agent behavior with FunctionModel for custom responses."""
@pytest.fixture
def test_dependencies(self):
"""Create basic test dependencies."""
return TestDependencies(
database=AsyncMock(),
api_client=Mock()
)
def test_function_model_custom_behavior(self, test_dependencies):
"""Test agent with FunctionModel for custom behavior."""
def custom_response_func(messages, tools):
"""Custom function to generate specific responses."""
last_message = messages[-1].content if messages else ""
if "error" in last_message.lower():
return '{"message": "Error detected and handled", "confidence": 0.6, "actions": ["error_handling"]}'
else:
return '{"message": "Normal operation", "confidence": 0.9, "actions": ["standard_response"]}'
function_model = FunctionModel(function=custom_response_func)
with test_agent.override(model=function_model):
# Test normal case
result1 = test_agent.run_sync(
"Please help me with a normal request",
deps=test_dependencies
)
assert result1.data.message == "Normal operation"
assert result1.data.confidence == 0.9
# Test error case
result2 = test_agent.run_sync(
"There's an error in the system",
deps=test_dependencies
)
assert result2.data.message == "Error detected and handled"
assert result2.data.confidence == 0.6
assert "error_handling" in result2.data.actions
class TestAgentValidation:
"""Test agent output validation and error scenarios."""
@pytest.fixture
def test_dependencies(self):
"""Create test dependencies."""
return TestDependencies(
database=AsyncMock(),
api_client=Mock()
)
def test_invalid_output_handling(self, test_dependencies):
"""Test how agent handles invalid output format."""
# TestModel with invalid JSON output
test_model = TestModel(
custom_output_text='{"message": "test", "invalid_field": "should_not_exist"}'
)
with test_agent.override(model=test_model):
# This should either succeed with validation or raise appropriate error
try:
result = test_agent.run_sync(
"Test invalid output",
deps=test_dependencies
)
# If it succeeds, Pydantic should filter out invalid fields
assert hasattr(result.data, 'message')
assert not hasattr(result.data, 'invalid_field')
except Exception as e:
# Or it might raise a validation error, which is also acceptable
assert "validation" in str(e).lower() or "error" in str(e).lower()
def test_missing_required_fields(self, test_dependencies):
"""Test handling of missing required fields in output."""
# TestModel with missing required message field
test_model = TestModel(
custom_output_text='{"confidence": 0.8}'
)
with test_agent.override(model=test_model):
try:
result = test_agent.run_sync(
"Test missing fields",
deps=test_dependencies
)
# Should either provide default or raise validation error
if hasattr(result.data, 'message'):
assert result.data.message is not None
except Exception as e:
# Validation error is expected for missing required fields
assert any(keyword in str(e).lower() for keyword in ['validation', 'required', 'missing'])
class TestAgentIntegration:
"""Integration tests for complete agent workflows."""
@pytest.fixture
def full_mock_dependencies(self):
"""Create fully configured mock dependencies."""
database_mock = AsyncMock()
database_mock.execute_query.return_value = {
"user_id": "123",
"name": "Test User",
"status": "active"
}
api_mock = Mock()
api_mock.post.return_value = {
"status": "success",
"transaction_id": "txn_123456"
}
return TestDependencies(
database=database_mock,
api_client=api_mock,
user_id="test_integration_user"
)
@pytest.mark.asyncio
async def test_complete_workflow(self, full_mock_dependencies):
"""Test complete agent workflow with multiple tools."""
test_model = TestModel(call_tools='all') # Call all available tools
with test_agent.override(model=test_model):
result = await test_agent.run(
"Please look up user information and create a new transaction",
deps=full_mock_dependencies
)
# Verify both tools were potentially called
assert result.data.message is not None
assert isinstance(result.data.actions, list)
# Verify mocks were called
full_mock_dependencies.database.execute_query.assert_called()
full_mock_dependencies.api_client.post.assert_called()
class TestAgentErrorRecovery:
"""Test agent error handling and recovery patterns."""
@pytest.fixture
def failing_dependencies(self):
"""Create dependencies that will fail for testing error handling."""
database_mock = AsyncMock()
database_mock.execute_query.side_effect = Exception("Database connection failed")
api_mock = Mock()
api_mock.post.side_effect = Exception("API service unavailable")
return TestDependencies(
database=database_mock,
api_client=api_mock,
user_id="failing_test_user"
)
@pytest.mark.asyncio
async def test_tool_error_recovery(self, failing_dependencies):
"""Test agent behavior when tools fail."""
test_model = TestModel(call_tools='all')
with test_agent.override(model=test_model):
# Agent should handle tool failures gracefully
result = await test_agent.run(
"Try to access database and API",
deps=failing_dependencies
)
# Even with tool failures, agent should return a valid response
assert result.data.message is not None
assert isinstance(result.data.confidence, float)
# Pytest configuration and utilities
@pytest.fixture(scope="session")
def event_loop():
"""Create an instance of the default event loop for the test session."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
def pytest_configure(config):
"""Configure pytest with custom markers."""
config.addinivalue_line(
"markers", "integration: mark test as integration test"
)
config.addinivalue_line(
"markers", "slow: mark test as slow running"
)
if __name__ == "__main__":
# Run tests directly
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,374 @@
"""
Tool-Enabled Agent with Web Search and Calculator
Demonstrates PydanticAI tool integration patterns:
- Environment-based model configuration
- Tool registration with @agent.tool decorator
- RunContext for dependency injection
- Parameter validation with type hints
- Error handling and retry mechanisms
- String output (default, no result_type needed)
"""
import logging
import math
import json
import asyncio
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
from datetime import datetime
import aiohttp
from pydantic_settings import BaseSettings
from pydantic import Field
from pydantic_ai import Agent, RunContext
from pydantic_ai.providers.openai import OpenAIProvider
from pydantic_ai.models.openai import OpenAIModel
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
class Settings(BaseSettings):
"""Configuration settings for the tool-enabled agent."""
# LLM Configuration
llm_provider: str = Field(default="openai")
llm_api_key: str = Field(...)
llm_model: str = Field(default="gpt-4")
llm_base_url: str = Field(default="https://api.openai.com/v1")
class Config:
env_file = ".env"
case_sensitive = False
def get_llm_model() -> OpenAIModel:
"""Get configured LLM model from environment settings."""
try:
settings = Settings()
provider = OpenAIProvider(
base_url=settings.llm_base_url,
api_key=settings.llm_api_key
)
return OpenAIModel(settings.llm_model, provider=provider)
except Exception:
# For testing without env vars
import os
os.environ.setdefault("LLM_API_KEY", "test-key")
settings = Settings()
provider = OpenAIProvider(
base_url=settings.llm_base_url,
api_key="test-key"
)
return OpenAIModel(settings.llm_model, provider=provider)
@dataclass
class ToolDependencies:
"""Dependencies for tool-enabled agent."""
session: Optional[aiohttp.ClientSession] = None
api_timeout: int = 10
max_search_results: int = 5
calculation_precision: int = 6
session_id: Optional[str] = None
SYSTEM_PROMPT = """
You are a helpful research assistant with access to web search and calculation tools.
Your capabilities:
- Web search for current information and facts
- Mathematical calculations and data analysis
- Data processing and formatting
- Source verification and citation
Guidelines:
- Always use tools when you need current information or calculations
- Cite sources when providing factual information
- Show your work for mathematical calculations
- Be precise and accurate in your responses
- If tools fail, explain the limitation and provide what you can
"""
# Create the tool-enabled agent - note: no result_type, defaults to string
tool_agent = Agent(
get_llm_model(),
deps_type=ToolDependencies,
system_prompt=SYSTEM_PROMPT
)
@tool_agent.tool
async def web_search(
ctx: RunContext[ToolDependencies],
query: str,
max_results: Optional[int] = None
) -> str:
"""
Search the web for current information.
Args:
query: Search query string
max_results: Maximum number of results to return (default: 5)
Returns:
Formatted search results with titles, snippets, and URLs
"""
if not ctx.deps.session:
return "Web search unavailable: No HTTP session configured"
max_results = max_results or ctx.deps.max_search_results
try:
# Using DuckDuckGo Instant Answer API as a simple example
# In production, use proper search APIs like Google, Bing, or DuckDuckGo
search_url = "https://api.duckduckgo.com/"
params = {
"q": query,
"format": "json",
"pretty": "1",
"no_redirect": "1"
}
async with ctx.deps.session.get(
search_url,
params=params,
timeout=ctx.deps.api_timeout
) as response:
if response.status == 200:
data = await response.json()
results = []
# Process instant answer if available
if data.get("AbstractText"):
results.append({
"title": "Instant Answer",
"snippet": data["AbstractText"],
"url": data.get("AbstractURL", "")
})
# Process related topics
for topic in data.get("RelatedTopics", [])[:max_results-len(results)]:
if isinstance(topic, dict) and "Text" in topic:
results.append({
"title": topic.get("FirstURL", "").split("/")[-1].replace("_", " "),
"snippet": topic["Text"],
"url": topic.get("FirstURL", "")
})
if not results:
return f"No results found for query: {query}"
# Format results
formatted_results = []
for i, result in enumerate(results, 1):
formatted_results.append(
f"{i}. **{result['title']}**\n"
f" {result['snippet']}\n"
f" Source: {result['url']}"
)
return "\n\n".join(formatted_results)
else:
return f"Search failed with status: {response.status}"
except asyncio.TimeoutError:
return f"Search timed out after {ctx.deps.api_timeout} seconds"
except Exception as e:
return f"Search error: {str(e)}"
@tool_agent.tool
def calculate(
ctx: RunContext[ToolDependencies],
expression: str,
description: Optional[str] = None
) -> str:
"""
Perform mathematical calculations safely.
Args:
expression: Mathematical expression to evaluate
description: Optional description of what's being calculated
Returns:
Calculation result with formatted output
"""
try:
# Safe evaluation - only allow mathematical operations
allowed_names = {
"abs": abs, "round": round, "min": min, "max": max,
"sum": sum, "pow": pow, "sqrt": math.sqrt,
"sin": math.sin, "cos": math.cos, "tan": math.tan,
"log": math.log, "log10": math.log10, "exp": math.exp,
"pi": math.pi, "e": math.e
}
# Remove any potentially dangerous operations
safe_expression = expression.replace("__", "").replace("import", "")
# Evaluate the expression
result = eval(safe_expression, {"__builtins__": {}}, allowed_names)
# Format result with appropriate precision
if isinstance(result, float):
result = round(result, ctx.deps.calculation_precision)
output = f"Calculation: {expression} = {result}"
if description:
output = f"{description}\n{output}"
return output
except Exception as e:
return f"Calculation error: {str(e)}\nExpression: {expression}"
@tool_agent.tool
def format_data(
ctx: RunContext[ToolDependencies],
data: str,
format_type: str = "table"
) -> str:
"""
Format data into structured output.
Args:
data: Raw data to format
format_type: Type of formatting (table, list, json)
Returns:
Formatted data string
"""
try:
lines = data.strip().split('\n')
if format_type == "table":
# Simple table formatting
if len(lines) > 1:
header = lines[0]
rows = lines[1:]
# Basic table formatting
formatted = f"| {header} |\n"
formatted += f"|{'-' * (len(header) + 2)}|\n"
for row in rows[:10]: # Limit to 10 rows
formatted += f"| {row} |\n"
return formatted
else:
return data
elif format_type == "list":
# Bullet point list
formatted_lines = [f"{line.strip()}" for line in lines if line.strip()]
return "\n".join(formatted_lines)
elif format_type == "json":
# Try to parse and format as JSON
try:
parsed = json.loads(data)
return json.dumps(parsed, indent=2)
except json.JSONDecodeError:
# If not valid JSON, create simple key-value structure
items = {}
for i, line in enumerate(lines):
items[f"item_{i+1}"] = line.strip()
return json.dumps(items, indent=2)
return data
except Exception as e:
return f"Formatting error: {str(e)}"
@tool_agent.tool
def get_current_time(ctx: RunContext[ToolDependencies]) -> str:
"""
Get the current date and time.
Returns:
Current timestamp in a readable format
"""
now = datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S UTC")
async def ask_agent(
question: str,
dependencies: Optional[ToolDependencies] = None
) -> str:
"""
Ask the tool-enabled agent a question.
Args:
question: Question or request for the agent
dependencies: Optional tool dependencies
Returns:
String response from the agent
"""
if dependencies is None:
# Create HTTP session for web search
session = aiohttp.ClientSession()
dependencies = ToolDependencies(session=session)
try:
result = await tool_agent.run(question, deps=dependencies)
return result.data
finally:
# Clean up session if we created it
if dependencies.session and not dependencies.session.closed:
await dependencies.session.close()
def ask_agent_sync(question: str) -> str:
"""
Synchronous version of ask_agent.
Args:
question: Question or request for the agent
Returns:
String response from the agent
"""
return asyncio.run(ask_agent(question))
# Example usage and demonstration
if __name__ == "__main__":
async def demo_tools():
"""Demonstrate the tool-enabled agent capabilities."""
print("=== Tool-Enabled Agent Demo ===\n")
# Create dependencies with HTTP session
session = aiohttp.ClientSession()
dependencies = ToolDependencies(session=session)
try:
# Sample questions that exercise different tools
questions = [
"What's the current time?",
"Calculate the square root of 144 plus 25% of 200",
"Search for recent news about artificial intelligence",
"Format this data as a table: Name,Age\nAlice,25\nBob,30\nCharlie,35"
]
for question in questions:
print(f"Question: {question}")
response = await ask_agent(question, dependencies)
print(f"Answer: {response}")
print("-" * 60)
finally:
await session.close()
# Run the demo
asyncio.run(demo_tools())