LLM Integration in Applications: Building AI-Powered Features
Integrating Large Language Models into production applications requires careful consideration of API design, error handling, cost management, and performance. This guide covers practical patterns for building robust LLM-powered features.
Basic LLM Integration
Simple API Calls
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
def generate_response(prompt, model="gpt-4", temperature=0.7):
"""Generate a response from an LLM."""
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
temperature=temperature,
max_tokens=500
)
return response.choices[0].message.content
# Usage
response = generate_response("Explain quantum computing in simple terms")
print(response)
Conversation Management
class ConversationManager:
"""Manage multi-turn conversations with LLMs."""
def __init__(self, system_prompt="You are a helpful assistant."):
self.client = OpenAI()
self.system_prompt = system_prompt
self.messages = []
def add_message(self, role, content):
"""Add message to conversation history."""
self.messages.append({"role": role, "content": content})
def get_response(self, user_input, model="gpt-4"):
"""Get response while maintaining conversation context."""
self.add_message("user", user_input)
response = self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": self.system_prompt},
*self.messages
],
temperature=0.7
)
assistant_message = response.choices[0].message.content
self.add_message("assistant", assistant_message)
return assistant_message
def clear_history(self):
"""Clear conversation history."""
self.messages = []
# Usage
manager = ConversationManager()
response1 = manager.get_response("What is Python?")
response2 = manager.get_response("How is it different from Java?")
Streaming Responses
Streaming is essential for real-time user feedback and reducing perceived latency.
Basic Streaming
def stream_response(prompt):
"""Stream response token by token."""
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
print(token, end="", flush=True)
full_response += token
print() # Newline
return full_response
# Usage
response = stream_response("Write a short poem about Python")
Streaming with Callbacks
from typing import Callable
class StreamingCallback:
"""Handle streaming responses with callbacks."""
def __init__(self, on_token: Callable[[str], None] = None):
self.on_token = on_token or print
self.full_response = ""
def stream(self, prompt, model="gpt-4"):
"""Stream response with callback."""
client = OpenAI()
stream = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
self.on_token(token)
self.full_response += token
return self.full_response
# Usage with custom callback
def my_callback(token):
print(f"[{token}]", end="", flush=True)
callback = StreamingCallback(on_token=my_callback)
response = callback.stream("Explain machine learning")
Error Handling and Resilience
Robust Error Handling
from openai import OpenAI, RateLimitError, APIError
import time
from typing import Optional
def call_llm_with_retry(
prompt: str,
max_retries: int = 3,
backoff_factor: float = 2.0
) -> Optional[str]:
"""Call LLM with exponential backoff retry logic."""
client = OpenAI()
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
timeout=30
)
return response.choices[0].message.content
except RateLimitError as e:
wait_time = backoff_factor ** attempt
print(f"Rate limited. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
except APIError as e:
if e.status_code == 500:
wait_time = backoff_factor ** attempt
print(f"Server error. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
else:
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
raise Exception(f"Failed after {max_retries} retries")
# Usage
try:
response = call_llm_with_retry("What is AI?")
print(response)
except Exception as e:
print(f"Error: {e}")
Timeout Handling
import asyncio
from openai import AsyncOpenAI
async def call_with_timeout(prompt, timeout_seconds=30):
"""Call LLM with timeout."""
client = AsyncOpenAI()
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
),
timeout=timeout_seconds
)
return response.choices[0].message.content
except asyncio.TimeoutError:
print(f"Request timed out after {timeout_seconds}s")
return None
# Usage
response = asyncio.run(call_with_timeout("Explain quantum computing"))
Cost Optimization
Token Counting
import tiktoken
def count_tokens(text, model="gpt-4"):
"""Count tokens in text."""
encoding = tiktoken.encoding_for_model(model)
tokens = encoding.encode(text)
return len(tokens)
def estimate_cost(prompt, response, model="gpt-4"):
"""Estimate API cost for a request."""
# Pricing as of 2025 (update as needed)
pricing = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"gpt-4-turbo": {"input": 0.01, "output": 0.03}
}
input_tokens = count_tokens(prompt, model)
output_tokens = count_tokens(response, model)
rates = pricing.get(model, pricing["gpt-4"])
input_cost = (input_tokens / 1000) * rates["input"]
output_cost = (output_tokens / 1000) * rates["output"]
return {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": input_cost + output_cost
}
# Usage
prompt = "Explain machine learning"
response = "Machine learning is..."
cost = estimate_cost(prompt, response)
print(f"Total cost: ${cost['total_cost']:.6f}")
Caching Responses
import hashlib
import json
from pathlib import Path
class LLMCache:
"""Cache LLM responses to reduce API calls."""
def __init__(self, cache_dir=".llm_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_key(self, prompt, model):
"""Generate cache key from prompt and model."""
key = f"{model}:{prompt}"
return hashlib.md5(key.encode()).hexdigest()
def get(self, prompt, model):
"""Get cached response if available."""
cache_key = self._get_cache_key(prompt, model)
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file) as f:
return json.load(f)
return None
def set(self, prompt, model, response):
"""Cache response."""
cache_key = self._get_cache_key(prompt, model)
cache_file = self.cache_dir / f"{cache_key}.json"
with open(cache_file, 'w') as f:
json.dump(response, f)
def call_with_cache(self, prompt, model="gpt-4"):
"""Call LLM with caching."""
cached = self.get(prompt, model)
if cached:
print("Using cached response")
return cached
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
self.set(prompt, model, result)
return result
# Usage
cache = LLMCache()
response = cache.call_with_cache("What is Python?")
Advanced Integration Patterns
Function Calling
import json
def process_with_function_calling(user_query):
"""Use LLM function calling for structured outputs."""
client = OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["location"]
}
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_query}],
tools=tools,
tool_choice="auto"
)
if response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
print(f"Function: {tool_call.function.name}")
print(f"Args: {tool_call.function.arguments}")
return response
# Usage
response = process_with_function_calling("What's the weather in New York?")
Prompt Templates
from string import Template
class PromptTemplate:
"""Manage prompt templates with variable substitution."""
def __init__(self, template_str):
self.template = Template(template_str)
def format(self, **kwargs):
"""Format template with variables."""
return self.template.substitute(**kwargs)
# Usage
template = PromptTemplate("""
You are a $role.
Task: $task
Context: $context
""")
prompt = template.format(
role="Python expert",
task="Explain decorators",
context="For beginners"
)
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
Batch Processing
from typing import List
def batch_process_with_llm(items: List[str], batch_size: int = 10):
"""Process multiple items with LLM."""
client = OpenAI()
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
for item in batch:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Process: {item}"}]
)
results.append(response.choices[0].message.content)
print(f"Processed {min(i + batch_size, len(items))}/{len(items)}")
return results
# Usage
items = [f"Item {i}" for i in range(100)]
results = batch_process_with_llm(items)
Common Pitfalls and Best Practices
โ Bad: No Error Handling
# DON'T: Assume API calls always succeed
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
โ Good: Comprehensive Error Handling
# DO: Handle errors gracefully
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
timeout=30
)
except RateLimitError:
# Handle rate limiting
pass
except APIError as e:
# Handle API errors
pass
โ Bad: Unbounded Conversation History
# DON'T: Keep growing conversation history indefinitely
for user_input in user_inputs:
messages.append({"role": "user", "content": user_input})
# Messages keep growing!
โ Good: Manage Context Window
# DO: Limit conversation history
MAX_MESSAGES = 20
def add_message_with_limit(messages, role, content):
messages.append({"role": role, "content": content})
if len(messages) > MAX_MESSAGES:
messages = messages[-MAX_MESSAGES:]
return messages
Production Deployment
Monitoring and Logging
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def call_llm_with_logging(prompt, model="gpt-4"):
"""Call LLM with comprehensive logging."""
client = OpenAI()
start_time = datetime.now()
logger.info(f"Starting LLM call with model: {model}")
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"LLM call completed in {duration:.2f}s")
return response.choices[0].message.content
except Exception as e:
logger.error(f"LLM call failed: {e}")
raise
Summary
Integrating LLMs into production applications requires:
- Robust error handling with retry logic and timeouts
- Cost optimization through token counting and caching
- Streaming for better user experience
- Conversation management with context window limits
- Monitoring and logging for production visibility
- Function calling for structured outputs
- Batch processing for efficiency
These patterns ensure reliable, cost-effective, and performant LLM-powered applications.
Comments