Introduction
Building production AI applications requires robust API integration patterns. This guide covers essential patterns for integrating AI services reliably, including rate limiting, fallback strategies, caching, and error handling.
Core Integration Patterns
1. Unified AI Client
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
import os
class BaseAIClient(ABC):
@abstractmethod
def generate(self, prompt: str, **kwargs) -> str:
pass
@abstractmethod
def generate_with_functions(self, prompt: str, functions: list, **kwargs) -> Dict:
pass
class OpenAIClient(BaseAIClient):
def __init__(self, api_key: str):
from openai import OpenAI
self.client = OpenAI(api_key=api_key)
def generate(self, prompt: str, **kwargs) -> str:
response = self.client.chat.completions.create(
model=kwargs.get("model", "gpt-4o"),
messages=[{"role": "user", "content": prompt}],
temperature=kwargs.get("temperature", 0.7),
max_tokens=kwargs.get("max_tokens", 1024)
)
return response.choices[0].message.content
class AnthropicClient(BaseAIClient):
def __init__(self, api_key: str):
from anthropic import Anthropic
self.client = Anthropic(api_key=api_key)
def generate(self, prompt: str, **kwargs) -> str:
response = self.client.messages.create(
model=kwargs.get("model", "claude-sonnet-4-20250514"),
max_tokens=kwargs.get("max_tokens", 1024),
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
class UnifiedAIClient:
def __init__(self, provider: str = "openai", **config):
self.provider = provider
if provider == "openai":
self.client = OpenAIClient(config.get("api_key", os.getenv("OPENAI_API_KEY")))
elif provider == "anthropic":
self.client = AnthropicClient(config.get("api_key", os.getenv("ANTHROPIC_API_KEY")))
else:
raise ValueError(f"Unknown provider: {provider}")
def generate(self, prompt: str, **kwargs) -> str:
return self.client.generate(prompt, **kwargs)
2. Rate Limiting
import time
from collections import defaultdict
from threading import Lock
from datetime import datetime, timedelta
class TokenBucketRateLimiter:
def __init__(self, rate: int, per_seconds: int):
self.rate = rate
self.per_seconds = per_seconds
self.tokens = defaultdict(lambda: rate)
self.last_update = defaultdict(datetime.now)
self.lock = Lock()
def allow(self, key: str) -> bool:
with self.lock:
now = datetime.now()
elapsed = (now - self.last_update[key]).total_seconds()
# Refill tokens
self.tokens[key] = min(
self.rate,
self.tokens[key] + elapsed * (self.rate / self.per_seconds)
)
self.last_update[key] = now
if self.tokens[key] >= 1:
self.tokens[key] -= 1
return True
return False
def wait_time(self, key: str) -> float:
if self.tokens[key] >= 1:
return 0
return (1 - self.tokens[key]) * (self.per_seconds / self.rate)
class RateLimitedClient:
def __init__(self, client, requests_per_minute: int = 60):
self.client = client
self.limiter = TokenBucketRateLimiter(requests_per_minute, 60)
def generate(self, prompt: str, **kwargs):
key = kwargs.get("user_id", "default")
if not self.limiter.allow(key):
wait = self.limiter.wait_time(key)
time.sleep(wait)
return self.client.generate(prompt, **kwargs)
3. Fallback Strategies
class FallbackChain:
def __init__(self, clients: list):
self.clients = clients
def generate(self, prompt: str, **kwargs):
errors = []
for client in self.clients:
try:
return client.generate(prompt, **kwargs)
except Exception as e:
errors.append((client.__class__.__name__, str(e)))
continue
raise RuntimeError(f"All clients failed: {errors}")
# Usage
primary = OpenAIClient(os.getenv("OPENAI_API_KEY"))
fallback = AnthropicClient(os.getenv("ANTHROPIC_API_KEY"))
client = FallbackChain([primary, fallback])
response = client.generate("Hello!")
4. Caching
import hashlib
import json
from typing import Optional
class PromptCache:
def __init__(self, redis_client, ttl: int = 3600):
self.redis = redis_client
self.ttl = ttl
def _hash(self, prompt: str, **kwargs) -> str:
content = json.dumps({"prompt": prompt, "kwargs": kwargs}, sort_keys=True)
return f"ai_cache:{hashlib.sha256(content.encode()).hexdigest()}"
def get(self, prompt: str, **kwargs) -> Optional[str]:
key = self._hash(prompt, **kwargs)
cached = self.redis.get(key)
return cached.decode() if cached else None
def set(self, prompt: str, response: str, **kwargs):
key = self._hash(prompt, **kwargs)
self.redis.setex(key, self.ttl, response)
def generate(self, prompt: str, client, **kwargs):
cached = self.get(prompt, **kwargs)
if cached:
return cached
response = client.generate(prompt, **kwargs)
self.set(prompt, response, **kwargs)
return response
5. Retry with Backoff
import time
from functools import wraps
def retry_with_backoff(max_retries: int = 3, backoff_factor: float = 2.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
print(f"Retry {attempt + 1}/{max_retries} after {wait_time}s")
time.sleep(wait_time)
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def generate_with_retry(client, prompt: str):
return client.generate(prompt)
Production Architecture
class AIPlatformClient:
def __init__(self, config: Dict):
self.config = config
self.clients = self._initialize_clients()
self.cache = PromptCache(config.get("redis"))
self.rate_limiter = TokenBucketRateLimiter(
config.get("rate_limit", 60),
60
)
def _initialize_clients(self):
clients = []
if "openai" in self.config.get("providers", []):
clients.append(OpenAIClient(self.config["openai_key"]))
if "anthropic" in self.config.get("providers", []):
clients.append(AnthropicClient(self.config["anthropic_key"]))
return FallbackChain(clients)
def generate(self, prompt: str, use_cache: bool = True, **kwargs):
# Check cache
if use_cache:
cached = self.cache.get(prompt, **kwargs)
if cached:
return cached
# Rate limit
if not self.rate_limiter.allow("default"):
time.sleep(self.rate_limiter.wait_time("default"))
# Generate
response = self.clients.generate(prompt, **kwargs)
# Cache result
if use_cache:
self.cache.set(prompt, response, **kwargs)
return response
Best Practices
- Always use fallback chains - Multiple providers ensure reliability
- Implement caching - Reduce costs and improve latency
- Add rate limiting - Prevent quota exhaustion
- Log everything - Monitor API usage and errors
- Handle timeouts - Set appropriate timeouts
External Resources
Conclusion
Building resilient AI integrations requires careful attention to reliability, cost, and performance. The patterns in this guide help you build production-ready AI applications that can handle failures gracefully while optimizing costs.
Comments