Best Practices
This guide covers production-ready patterns, optimization techniques, and security best practices for building robust agents on AgentHub.
Agent Development
Code Organization
Structured Project Layout:
my-agent/
├── src/
│ ├── agent/
│ │ ├── __init__.py
│ │ ├── core.py # Main agent logic
│ │ ├── handlers.py # Request handlers
│ │ └── utils.py # Helper functions
│ ├── config/
│ │ ├── __init__.py
│ │ └── settings.py # Configuration management
│ └── tests/
│ ├── test_agent.py
│ └── test_handlers.py
├── requirements.txt # Dependencies
├── Dockerfile # Custom container (optional)
├── .dockerignore # Exclude unnecessary files
├── README.md # Usage instructions
└── main.py # Entry point
Configuration Management:
# config/settings.py
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class AgentConfig:
api_key: str
log_level: str = "INFO"
timeout_seconds: int = 30
max_retries: int = 3
cache_ttl: int = 3600
@classmethod
def from_env(cls) -> 'AgentConfig':
return cls(
api_key=os.environ['API_KEY'],
log_level=os.environ.get('LOG_LEVEL', 'INFO'),
timeout_seconds=int(os.environ.get('TIMEOUT_SECONDS', '30')),
max_retries=int(os.environ.get('MAX_RETRIES', '3')),
cache_ttl=int(os.environ.get('CACHE_TTL', '3600'))
)
Error Handling Patterns:
import logging
from functools import wraps
from typing import Any, Callable
logger = logging.getLogger(__name__)
def with_error_handling(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"Error in {func.__name__}: {e}", exc_info=True)
# Return graceful fallback instead of crashing
return {"error": "Service temporarily unavailable"}
return wrapper
@with_error_handling
def process_request(data: dict) -> dict:
# Your processing logic here
return {"result": "processed"}
Dependency Management
Requirements Specification:
# requirements.txt - Pin major versions, allow minor updates
requests>=2.31.0,<3.0.0
flask>=2.3.0,<3.0.0
sqlalchemy>=2.0.0,<3.0.0
# For security-critical dependencies, pin exact versions
cryptography==41.0.4
# Group optional dependencies
# Development dependencies
pytest>=7.0.0
black>=23.0.0
mypy>=1.0.0
Virtual Environment Setup:
# Use specific Python version
python3.11 -m venv venv
source venv/bin/activate
# Install with locked versions
pip install -r requirements.txt
# Generate lock file for reproducible builds
pip freeze > requirements-lock.txt
Docker Multi-stage Builds:
# Build stage - includes dev tools
FROM python:3.11-slim AS builder
WORKDIR /app
# Install build dependencies
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
# Production stage - minimal runtime
FROM python:3.11-slim AS runtime
WORKDIR /app
# Copy only installed packages
COPY --from=builder /root/.local /root/.local
# Copy application code
COPY src/ ./src/
COPY main.py .
# Create non-root user
RUN useradd --create-home --shell /bin/bash agent
USER agent
# Set up environment
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1
CMD ["python", "main.py"]
Performance Optimization
Resource Efficiency
Memory Management:
import gc
import psutil
import logging
logger = logging.getLogger(__name__)
class MemoryMonitor:
def __init__(self, threshold_percent: float = 80.0):
self.threshold = threshold_percent
def check_memory(self):
memory = psutil.virtual_memory()
if memory.percent > self.threshold:
logger.warning(f"High memory usage: {memory.percent}%")
# Force garbage collection
gc.collect()
def get_memory_stats(self) -> dict:
memory = psutil.virtual_memory()
return {
"total_gb": round(memory.total / (1024**3), 2),
"available_gb": round(memory.available / (1024**3), 2),
"percent_used": memory.percent
}
# Use memory monitoring in your agent
monitor = MemoryMonitor()
def process_large_dataset(data):
monitor.check_memory()
# Process data in chunks to avoid memory spikes
chunk_size = 1000
for i in range(0, len(data), chunk_size):
chunk = data[i:i + chunk_size]
process_chunk(chunk)
# Periodic cleanup
if i % (chunk_size * 10) == 0:
gc.collect()
CPU Optimization:
import asyncio
import concurrent.futures
from typing import List
class TaskProcessor:
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
async def process_async_tasks(self, tasks: List[callable]):
"""Process I/O-bound tasks concurrently"""
async with asyncio.TaskGroup() as group:
results = [group.create_task(task()) for task in tasks]
return [result.result() for result in results]
def process_cpu_tasks(self, tasks: List[callable]):
"""Process CPU-bound tasks in parallel"""
with concurrent.futures.ProcessPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = [executor.submit(task) for task in tasks]
return [future.result() for future in futures]
Caching Strategies:
from functools import lru_cache
import redis
import pickle
import hashlib
import time
class CacheManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
def cache_key(self, func_name: str, *args, **kwargs) -> str:
"""Generate consistent cache key"""
key_data = f"{func_name}:{args}:{sorted(kwargs.items())}"
return hashlib.md5(key_data.encode()).hexdigest()
def cached_function(self, ttl: int = 3600):
"""Decorator for caching function results"""
def decorator(func):
def wrapper(*args, **kwargs):
cache_key = self.cache_key(func.__name__, *args, **kwargs)
# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return pickle.loads(cached_result)
# Execute function and cache result
result = func(*args, **kwargs)
self.redis_client.setex(
cache_key,
ttl,
pickle.dumps(result)
)
return result
return wrapper
return decorator
# Usage
cache_manager = CacheManager()
@cache_manager.cached_function(ttl=1800) # Cache for 30 minutes
def expensive_api_call(query: str) -> dict:
# Expensive operation here
return fetch_data_from_api(query)
Database Optimization
Connection Pooling:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
from contextlib import contextmanager
class DatabaseManager:
def __init__(self, database_url: str):
self.engine = create_engine(
database_url,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Validate connections
pool_recycle=3600 # Recycle connections hourly
)
@contextmanager
def get_connection(self):
"""Context manager for database connections"""
conn = self.engine.connect()
try:
yield conn
finally:
conn.close()
# Usage
db_manager = DatabaseManager(os.environ['DATABASE_URL'])
def get_user_data(user_id: int) -> dict:
with db_manager.get_connection() as conn:
result = conn.execute(
"SELECT * FROM users WHERE id = %s",
(user_id,)
)
return dict(result.fetchone())
Security Best Practices
Environment Variables and Secrets
Secure Configuration:
import os
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class SecureConfig:
"""Secure configuration management"""
def __init__(self):
self._validate_required_vars()
def _validate_required_vars(self):
"""Validate all required environment variables are present"""
required_vars = ['API_KEY', 'DATABASE_URL']
missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
@property
def api_key(self) -> str:
return os.environ['API_KEY']
@property
def database_url(self) -> str:
# Never log database URLs
return os.environ['DATABASE_URL']
@property
def debug_mode(self) -> bool:
return os.environ.get('DEBUG', 'false').lower() == 'true'
def get_optional_setting(self, key: str, default: str = None) -> Optional[str]:
value = os.environ.get(key, default)
if value != default:
logger.info(f"Using custom setting for {key}")
return value
Logging Security:
import re
import logging
class SecureFormatter(logging.Formatter):
"""Formatter that removes sensitive information from logs"""
SENSITIVE_PATTERNS = [
r'api[_-]?key["\s]*[:=]["\s]*([a-zA-Z0-9]+)',
r'password["\s]*[:=]["\s]*([^\s"]+)',
r'token["\s]*[:=]["\s]*([a-zA-Z0-9\-._]+)',
r'secret["\s]*[:=]["\s]*([^\s"]+)'
]
def format(self, record):
message = super().format(record)
# Redact sensitive information
for pattern in self.SENSITIVE_PATTERNS:
message = re.sub(pattern, r'\g<0>[REDACTED]', message, flags=re.IGNORECASE)
return message
# Configure secure logging
handler = logging.StreamHandler()
handler.setFormatter(SecureFormatter())
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
Input Validation
Request Validation:
from dataclasses import dataclass
from typing import Optional, List
import re
@dataclass
class ValidatedInput:
"""Input validation with sanitization"""
@staticmethod
def validate_email(email: str) -> str:
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
raise ValueError("Invalid email format")
return email.lower().strip()
@staticmethod
def validate_query(query: str, max_length: int = 1000) -> str:
if not query or len(query.strip()) == 0:
raise ValueError("Query cannot be empty")
query = query.strip()
if len(query) > max_length:
raise ValueError(f"Query too long (max {max_length} characters)")
# Remove potentially dangerous characters
sanitized = re.sub(r'[<>"\']', '', query)
return sanitized
@staticmethod
def validate_api_key(api_key: str) -> str:
if not api_key or len(api_key) < 32:
raise ValueError("Invalid API key format")
# Check format (example for OpenAI keys)
if not api_key.startswith(('sk-', 'pk-')):
raise ValueError("Unrecognized API key format")
return api_key
# Usage in request handlers
def process_user_request(request_data: dict) -> dict:
try:
email = ValidatedInput.validate_email(request_data.get('email', ''))
query = ValidatedInput.validate_query(request_data.get('query', ''))
return process_validated_request(email, query)
except ValueError as e:
logger.warning(f"Invalid input: {e}")
return {"error": "Invalid input provided"}
Rate Limiting
Request Rate Limiting:
import time
from collections import defaultdict, deque
from typing import Dict
import threading
class RateLimiter:
"""Thread-safe rate limiter"""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: Dict[str, deque] = defaultdict(deque)
self.lock = threading.Lock()
def is_allowed(self, identifier: str) -> bool:
"""Check if request is allowed for identifier (IP, user_id, etc.)"""
current_time = time.time()
with self.lock:
# Clean old requests outside window
user_requests = self.requests[identifier]
while user_requests and user_requests[0] < current_time - self.window_seconds:
user_requests.popleft()
# Check if under limit
if len(user_requests) >= self.max_requests:
return False
# Add current request
user_requests.append(current_time)
return True
def get_remaining_requests(self, identifier: str) -> int:
"""Get remaining requests for identifier"""
with self.lock:
return max(0, self.max_requests - len(self.requests[identifier]))
# Usage in web handlers
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
def api_endpoint(request):
client_ip = request.remote_addr
if not rate_limiter.is_allowed(client_ip):
return {
"error": "Rate limit exceeded",
"retry_after": 60
}, 429
# Process request normally
return process_request(request)
Monitoring and Observability
Structured Logging
Comprehensive Logging Setup:
import json
import logging
import sys
from datetime import datetime
from typing import Dict, Any
class StructuredLogger:
"""Structured JSON logging for better observability"""
def __init__(self, service_name: str, version: str):
self.service_name = service_name
self.version = version
self.logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
logger = logging.getLogger(self.service_name)
logger.setLevel(logging.INFO)
# JSON formatter for structured logs
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(self._json_formatter())
logger.addHandler(handler)
return logger
def _json_formatter(self):
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"service": self.service_name,
"version": self.version,
"message": record.getMessage()
}
# Add extra fields if present
if hasattr(record, 'extra'):
log_entry.update(record.extra)
return json.dumps(log_entry)
return JSONFormatter()
def info(self, message: str, **kwargs):
extra = {"extra": kwargs} if kwargs else {}
self.logger.info(message, extra=extra)
def error(self, message: str, error: Exception = None, **kwargs):
extra = {"extra": kwargs}
if error:
extra["extra"]["error_type"] = type(error).__name__
extra["extra"]["error_message"] = str(error)
self.logger.error(message, extra=extra)
# Usage
logger = StructuredLogger("market-agent", "1.2.0")
def process_market_data(symbol: str):
logger.info("Processing market data", symbol=symbol, action="process_start")
try:
data = fetch_market_data(symbol)
logger.info("Market data fetched successfully",
symbol=symbol,
data_points=len(data),
action="fetch_complete")
insights = generate_insights(data)
logger.info("Insights generated",
symbol=symbol,
insight_count=len(insights),
action="analysis_complete")
return insights
except Exception as e:
logger.error("Failed to process market data",
error=e,
symbol=symbol,
action="process_error")
raise
Health Checks and Metrics
Comprehensive Health Monitoring:
import asyncio
import time
from dataclasses import dataclass
from typing import Dict, List, Optional
import psutil
import requests
@dataclass
class HealthCheck:
name: str
status: str # "healthy", "degraded", "unhealthy"
message: str
response_time_ms: Optional[int] = None
metadata: Optional[Dict] = None
class HealthMonitor:
"""Comprehensive health monitoring system"""
def __init__(self):
self.checks: List[callable] = []
def add_check(self, check_func: callable):
"""Add a health check function"""
self.checks.append(check_func)
async def run_all_checks(self) -> Dict[str, Any]:
"""Run all registered health checks"""
results = []
for check_func in self.checks:
start_time = time.time()
try:
result = await check_func()
result.response_time_ms = int((time.time() - start_time) * 1000)
results.append(result)
except Exception as e:
results.append(HealthCheck(
name=check_func.__name__,
status="unhealthy",
message=f"Check failed: {str(e)}",
response_time_ms=int((time.time() - start_time) * 1000)
))
# Overall health status
statuses = [check.status for check in results]
if "unhealthy" in statuses:
overall_status = "unhealthy"
elif "degraded" in statuses:
overall_status = "degraded"
else:
overall_status = "healthy"
return {
"status": overall_status,
"timestamp": datetime.utcnow().isoformat(),
"checks": {check.name: check.__dict__ for check in results},
"summary": {
"total_checks": len(results),
"healthy": len([c for c in results if c.status == "healthy"]),
"degraded": len([c for c in results if c.status == "degraded"]),
"unhealthy": len([c for c in results if c.status == "unhealthy"])
}
}
# Health check implementations
async def check_database() -> HealthCheck:
try:
with db_manager.get_connection() as conn:
result = conn.execute("SELECT 1")
return HealthCheck(
name="database",
status="healthy",
message="Database connection successful"
)
except Exception as e:
return HealthCheck(
name="database",
status="unhealthy",
message=f"Database connection failed: {str(e)}"
)
async def check_external_api() -> HealthCheck:
try:
response = requests.get("https://api.example.com/health", timeout=5)
if response.status_code == 200:
return HealthCheck(
name="external_api",
status="healthy",
message="External API responsive"
)
else:
return HealthCheck(
name="external_api",
status="degraded",
message=f"External API returned {response.status_code}"
)
except requests.Timeout:
return HealthCheck(
name="external_api",
status="degraded",
message="External API timeout (degraded service)"
)
except Exception as e:
return HealthCheck(
name="external_api",
status="unhealthy",
message=f"External API error: {str(e)}"
)
async def check_system_resources() -> HealthCheck:
memory = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent(interval=1)
if memory.percent > 90 or cpu_percent > 90:
status = "unhealthy"
message = f"High resource usage: CPU {cpu_percent}%, Memory {memory.percent}%"
elif memory.percent > 80 or cpu_percent > 80:
status = "degraded"
message = f"Elevated resource usage: CPU {cpu_percent}%, Memory {memory.percent}%"
else:
status = "healthy"
message = f"Normal resource usage: CPU {cpu_percent}%, Memory {memory.percent}%"
return HealthCheck(
name="system_resources",
status=status,
message=message,
metadata={
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"memory_available_gb": round(memory.available / (1024**3), 2)
}
)
# Set up health monitoring
health_monitor = HealthMonitor()
health_monitor.add_check(check_database)
health_monitor.add_check(check_external_api)
health_monitor.add_check(check_system_resources)
# Health endpoint for AgentHub
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/health')
async def health_endpoint():
health_status = await health_monitor.run_all_checks()
status_code = 200 if health_status['status'] == 'healthy' else 503
return jsonify(health_status), status_code
Deployment Strategies
Blue-Green Deployments
Safe Deployment Pattern:
# Agent configuration for blue-green deployment
agent_config:
name: "market-agent"
environments:
production:
blue:
build_id: "build-v1.2.0"
replicas: 3
traffic_weight: 100
green:
build_id: "build-v1.3.0"
replicas: 1
traffic_weight: 0
deployment_strategy:
type: "blue-green"
health_check_path: "/health"
readiness_timeout: 300
rollback_on_failure: true
Deployment Script:
import requests
import time
from typing import Dict
class BlueGreenDeployment:
def __init__(self, agent_id: str, api_key: str):
self.agent_id = agent_id
self.api_key = api_key
self.base_url = "https://prod-agent-hosting-api.useagenthub.com"
def deploy_green_environment(self, new_build_id: str) -> str:
"""Deploy new version to green environment"""
response = requests.post(
f"{self.base_url}/instances",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"agent_id": self.agent_id,
"build_id": new_build_id,
"name": f"{self.agent_id}-green",
"config": {"replicas": 1}
}
)
return response.json()["id"]
def health_check(self, instance_id: str) -> bool:
"""Check if instance is healthy"""
response = requests.get(
f"{self.base_url}/instances/{instance_id}",
headers={"Authorization": f"Bearer {self.api_key}"}
)
instance = response.json()
return instance["status"] == "running"
def switch_traffic(self, green_instance_id: str):
"""Switch traffic from blue to green"""
# Implementation depends on load balancer configuration
pass
def safe_deployment(self, new_build_id: str) -> bool:
"""Execute safe blue-green deployment"""
try:
# Deploy green environment
green_instance_id = self.deploy_green_environment(new_build_id)
# Wait for green to be healthy
for _ in range(30): # 5-minute timeout
if self.health_check(green_instance_id):
break
time.sleep(10)
else:
raise Exception("Green environment failed health checks")
# Switch traffic to green
self.switch_traffic(green_instance_id)
# Monitor for issues
time.sleep(300) # 5-minute monitoring period
if self.health_check(green_instance_id):
return True
else:
raise Exception("Green environment became unhealthy after traffic switch")
except Exception as e:
print(f"Deployment failed: {e}")
# Rollback logic would go here
return False
Configuration Management
Environment-Specific Configuration:
import os
from enum import Enum
from dataclasses import dataclass
from typing import Dict, Any
class Environment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
@dataclass
class EnvironmentConfig:
"""Environment-specific configuration"""
# Base configuration
debug: bool
log_level: str
# Resource limits
cpu_limit: str
memory_limit: str
# External service endpoints
api_endpoints: Dict[str, str]
# Feature flags
features: Dict[str, bool]
class ConfigManager:
"""Centralized configuration management"""
CONFIGS = {
Environment.DEVELOPMENT: EnvironmentConfig(
debug=True,
log_level="DEBUG",
cpu_limit="0.5",
memory_limit="1Gi",
api_endpoints={
"market_data": "https://api-dev.example.com",
"news_service": "https://news-dev.example.com"
},
features={
"experimental_analysis": True,
"advanced_caching": False
}
),
Environment.PRODUCTION: EnvironmentConfig(
debug=False,
log_level="INFO",
cpu_limit="2.0",
memory_limit="4Gi",
api_endpoints={
"market_data": "https://api.example.com",
"news_service": "https://news.example.com"
},
features={
"experimental_analysis": False,
"advanced_caching": True
}
)
}
@classmethod
def get_config(cls) -> EnvironmentConfig:
env_name = os.environ.get('ENVIRONMENT', 'development')
try:
environment = Environment(env_name)
return cls.CONFIGS[environment]
except ValueError:
raise ValueError(f"Unknown environment: {env_name}")
@classmethod
def is_feature_enabled(cls, feature_name: str) -> bool:
config = cls.get_config()
return config.features.get(feature_name, False)
# Usage
config = ConfigManager.get_config()
if ConfigManager.is_feature_enabled('experimental_analysis'):
# Use experimental features
pass
This comprehensive best practices guide covers all the essential aspects of building production-ready agents on AgentHub, from code organization to deployment strategies. Following these patterns will help ensure your agents are reliable, secure, and performant.
Ready to implement these practices? Start with the 5-Minute Quickstart →