Integrating Large Language Models into production applications requires careful consideration of API design, error handling, cost management, and performance. This guide covers practical patterns for building robust LLM-powered features. See Python Guide for more context. See Python Guide for more context. See Python Guide for more context.
Basic LLM Integration
Simple API Calls
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
def generate_response(prompt, model="gpt-4", temperature=0.7):
"""Generate a response from an LLM."""
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
temperature=temperature,
max_tokens=500
)
return response.choices[0].message.content
# Usage
response = generate_response("Explain quantum computing in simple terms")
print(response)
Conversation Management
class ConversationManager:
"""Manage multi-turn conversations with LLMs."""
def __init__(self, system_prompt="You are a helpful assistant."):
self.client = OpenAI()
self.system_prompt = system_prompt
self.messages = []
def add_message(self, role, content):
"""Add message to conversation history."""
self.messages.append({"role": role, "content": content})
def get_response(self, user_input, model="gpt-4"):
"""Get response while maintaining conversation context."""
self.add_message("user", user_input)
response = self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": self.system_prompt},
*self.messages
],
temperature=0.7
)
assistant_message = response.choices[0].message.content
self.add_message("assistant", assistant_message)
return assistant_message
def clear_history(self):
"""Clear conversation history."""
self.messages = []
# Usage
manager = ConversationManager()
response1 = manager.get_response("What is Python?")
response2 = manager.get_response("How is it different from Java?")
Streaming Responses
Streaming is essential for real-time user feedback and reducing perceived latency.
Basic Streaming
def stream_response(prompt):
"""Stream response token by token."""
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
print(token, end="", flush=True)
full_response += token
print() # Newline
return full_response
# Usage
response = stream_response("Write a short poem about Python")
Streaming with Callbacks
from typing import Callable
class StreamingCallback:
"""Handle streaming responses with callbacks."""
def __init__(self, on_token: Callable[[str], None] = None):
self.on_token = on_token or print
self.full_response = ""
def stream(self, prompt, model="gpt-4"):
"""Stream response with callback."""
client = OpenAI()
stream = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
self.on_token(token)
self.full_response += token
return self.full_response
# Usage with custom callback
def my_callback(token):
print(f"[{token}]", end="", flush=True)
callback = StreamingCallback(on_token=my_callback)
response = callback.stream("Explain machine learning")
Error Handling and Resilience
Robust Error Handling
from openai import OpenAI, RateLimitError, APIError
import time
from typing import Optional
def call_llm_with_retry(
prompt: str,
max_retries: int = 3,
backoff_factor: float = 2.0
) -> Optional[str]:
"""Call LLM with exponential backoff retry logic."""
client = OpenAI()
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
timeout=30
)
return response.choices[0].message.content
except RateLimitError as e:
wait_time = backoff_factor ** attempt
print(f"Rate limited. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
except APIError as e:
if e.status_code == 500:
wait_time = backoff_factor ** attempt
print(f"Server error. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
else:
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
raise Exception(f"Failed after {max_retries} retries")
# Usage
try:
response = call_llm_with_retry("What is AI?")
print(response)
except Exception as e:
print(f"Error: {e}")
Timeout Handling
import asyncio
from openai import AsyncOpenAI
async def call_with_timeout(prompt, timeout_seconds=30):
"""Call LLM with timeout."""
client = AsyncOpenAI()
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
),
timeout=timeout_seconds
)
return response.choices[0].message.content
except asyncio.TimeoutError:
print(f"Request timed out after {timeout_seconds}s")
return None
# Usage
response = asyncio.run(call_with_timeout("Explain quantum computing"))
Cost Optimization
Token Counting
import tiktoken
def count_tokens(text, model="gpt-4"):
"""Count tokens in text."""
encoding = tiktoken.encoding_for_model(model)
tokens = encoding.encode(text)
return len(tokens)
def estimate_cost(prompt, response, model="gpt-4"):
"""Estimate API cost for a request."""
# Pricing as of 2025 (update as needed)
pricing = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"gpt-4-turbo": {"input": 0.01, "output": 0.03}
}
input_tokens = count_tokens(prompt, model)
output_tokens = count_tokens(response, model)
rates = pricing.get(model, pricing["gpt-4"])
input_cost = (input_tokens / 1000) * rates["input"]
output_cost = (output_tokens / 1000) * rates["output"]
return {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": input_cost + output_cost
}
# Usage
prompt = "Explain machine learning"
response = "Machine learning is..."
cost = estimate_cost(prompt, response)
print(f"Total cost: ${cost['total_cost']:.6f}")
Caching Responses
import hashlib
import json
from pathlib import Path
class LLMCache:
"""Cache LLM responses to reduce API calls."""
def __init__(self, cache_dir=".llm_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_key(self, prompt, model):
"""Generate cache key from prompt and model."""
key = f"{model}:{prompt}"
return hashlib.md5(key.encode()).hexdigest()
def get(self, prompt, model):
"""Get cached response if available."""
cache_key = self._get_cache_key(prompt, model)
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file) as f:
return json.load(f)
return None
def set(self, prompt, model, response):
"""Cache response."""
cache_key = self._get_cache_key(prompt, model)
cache_file = self.cache_dir / f"{cache_key}.json"
with open(cache_file, 'w') as f:
json.dump(response, f)
def call_with_cache(self, prompt, model="gpt-4"):
"""Call LLM with caching."""
cached = self.get(prompt, model)
if cached:
print("Using cached response")
return cached
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
self.set(prompt, model, result)
return result
# Usage
cache = LLMCache()
response = cache.call_with_cache("What is Python?")
Advanced Integration Patterns
Function Calling
import json
def process_with_function_calling(user_query):
"""Use LLM function calling for structured outputs."""
client = OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["location"]
}
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_query}],
tools=tools,
tool_choice="auto"
)
if response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
print(f"Function: {tool_call.function.name}")
print(f"Args: {tool_call.function.arguments}")
return response
# Usage
response = process_with_function_calling("What's the weather in New York?")
Prompt Templates
from string import Template
class PromptTemplate:
"""Manage prompt templates with variable substitution."""
def __init__(self, template_str):
self.template = Template(template_str)
def format(self, **kwargs):
"""Format template with variables."""
return self.template.substitute(**kwargs)
# Usage
template = PromptTemplate("""
You are a $role.
Task: $task
Context: $context
""")
prompt = template.format(
role="Python expert",
task="Explain decorators",
context="For beginners"
)
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
Batch Processing
from typing import List
def batch_process_with_llm(items: List[str], batch_size: int = 10):
"""Process multiple items with LLM."""
client = OpenAI()
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
for item in batch:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Process: {item}"}]
)
results.append(response.choices[0].message.content)
print(f"Processed {min(i + batch_size, len(items))}/{len(items)}")
return results
# Usage
items = [f"Item {i}" for i in range(100)]
results = batch_process_with_llm(items)
Common Pitfalls and Best Practices
❌ Bad: No Error Handling
# DON'T: Assume API calls always succeed
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
✅ Good: Comprehensive Error Handling
# DO: Handle errors gracefully
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
timeout=30
)
except RateLimitError:
# Handle rate limiting
pass
except APIError as e:
# Handle API errors
pass
❌ Bad: Unbounded Conversation History
# DON'T: Keep growing conversation history indefinitely
for user_input in user_inputs:
messages.append({"role": "user", "content": user_input})
# Messages keep growing!
✅ Good: Manage Context Window
# DO: Limit conversation history
MAX_MESSAGES = 20
def add_message_with_limit(messages, role, content):
messages.append({"role": role, "content": content})
if len(messages) > MAX_MESSAGES:
messages = messages[-MAX_MESSAGES:]
return messages
Production Deployment
Monitoring and Logging
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def call_llm_with_logging(prompt, model="gpt-4"):
"""Call LLM with comprehensive logging."""
client = OpenAI()
start_time = datetime.now()
logger.info(f"Starting LLM call with model: {model}")
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"LLM call completed in {duration:.2f}s")
return response.choices[0].message.content
except Exception as e:
logger.error(f"LLM call failed: {e}")
raise
Summary
Integrating LLMs into production applications requires:
- Robust error handling with retry logic and timeouts
- Cost optimization through token counting and caching
- Streaming for better user experience
- Conversation management with context window limits
- Monitoring and logging for production visibility
- Function calling for structured outputs
- Batch processing for efficiency
These patterns ensure reliable, cost-effective, and performant LLM-powered applications.
Comments