Skip to main content

Claude API Complete Guide 2026: Opus 4.7, Sonnet 4.6, Haiku 4.5 — Integration and Best Practices

Created: March 2, 2026 Larry Qu 12 min read

Introduction

Anthropic’s Claude API provides access to the Claude model family across three tiers: Haiku (fastest, cheapest), Sonnet (default production tier), and Opus (maximum reasoning capability). As of May 2026, the current generation includes Opus 4.7 ($5/$25 per million tokens), Sonnet 4.6 ($3/$15), and Haiku 4.5 ($1/$5). All three support 1M token context windows (200K for Haiku), vision, function calling, and extended thinking.

This guide covers Python integration for all model tiers, streaming, vision with image analysis, prompt caching for up to 90% cost reduction on repeated context, batch API at 50% discount, extended thinking for complex reasoning tasks, and production deployment patterns.

Model Overview and Pricing

Model Release Input / 1M Output / 1M Context Caching (cache read) Best For
Opus 4.7 May 2026 $5.00 $25.00 1M $0.50 Complex reasoning, agents, vision
Opus 4.6 Feb 2026 $5.00 $25.00 1M $0.50 Production code, analysis
Sonnet 4.6 Feb 2026 $3.00 $15.00 1M $0.30 Default production tier
Haiku 4.5 Late 2025 $1.00 $5.00 200K $0.10 High-volume, low-latency

Prompt caching reduces repeated input tokens by up to 90% on cache reads. Batch API provides 50% discount for asynchronous workloads.

All pricing per Anthropic’s official rates as of May 2026. No changes since the 4.6 family launch.

Basic Setup

pip install anthropic
from anthropic import Anthropic

client = Anthropic(
    api_key="sk-ant-api03-..."
)

Chat Completion with Streaming

For interactive applications, enable streaming to show tokens as they arrive:

with client.messages.stream(
    model="claude-sonnet-4-20260514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain how attention works in transformers."}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Vision: Image Analysis

Claude can analyze images passed as base64-encoded data:

import base64

with open("architecture-diagram.png", "rb") as f:
    image_data = base64.b64encode(f.read()).decode("utf-8")

response = client.messages.create(
    model="claude-sonnet-4-20260514",
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "Describe the data flow in this architecture diagram."
                },
                {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": "image/png",
                        "data": image_data
                    }
                }
            ]
        }
    ]
)
print(response.content[0].text)

Opus 4.7 supports up to 3.75 MP images (triple the pixel budget of Opus 4.6), making it significantly better at UI screenshots, dense diagrams, and small text in images.

Prompt Caching

Prompt caching stores recently processed context, reducing cost on repeated prefixes such as system prompts or document corpora:

response = client.messages.create(
    model="claude-sonnet-4-20260514",
    max_tokens=256,
    system=[
        {
            "type": "text",
            "text": "You are a legal document analyst. Review the attached contract.",
            # Cache the system prompt — it's reused across many requests
            "cache_control": {"type": "ephemeral"}
        }
    ],
    messages=[
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": contract_text,
                    # Cache the contract text across multiple questions
                    "cache_control": {"type": "ephemeral"}
                },
                {
                    "type": "text",
                    "text": "What are the termination clauses?"
                }
            ]
        }
    ]
)

# Check cache hit/miss in response headers
usage = response.usage
print(f"Input tokens: {usage.input_tokens}")
print(f"Cache creation: {usage.cache_creation_input_tokens}")
print(f"Cache read: {usage.cache_read_input_tokens}")

Cost comparison for a typical legal analysis workflow (100-page contract, 10 questions):

Without caching: 10 × $0.15 = $1.50 (full contract sent each time)
With caching:    $0.19 (first request creates cache) + 9 × $0.02 = $0.37
Savings: ~75%

The cache has a 5-minute TTL (configurable to 1 hour with 2x write cost). Each cache hit costs 0.1x base input rate — a 90% discount on cached tokens.

Batch API

For high-volume, non-urgent workloads, the batch API provides 50% discount:

# Submit a batch of requests (up to 100K requests per batch)
batch = client.batches.create(
    requests=[
        {
            "custom_id": "req-001",
            "params": {
                "model": "claude-sonnet-4-20260514",
                "max_tokens": 256,
                "messages": [
                    {"role": "user", "content": "Summarize: " + doc}
                ]
            }
        }
        for doc in documents  # list of 1000 documents
    ]
)

# Poll until complete
import time
while True:
    batch_status = client.batches.retrieve(batch.id)
    if batch_status.processing_status == "ended":
        break
    time.sleep(10)

# Retrieve results
for result in client.batches.results(batch.id):
    print(f"{result.custom_id}: {result.response.body['content'][0]['text']}")

Extended Thinking

For complex reasoning tasks, Claude’s extended thinking mode lets the model show its reasoning process before producing the final answer:

response = client.messages.create(
    model="claude-sonnet-4-20260514",
    max_tokens=4096,
    thinking={
        "type": "enabled",
        "budget_tokens": 2048  # How many tokens the model can use for thinking
    },
    messages=[
        {
            "role": "user",
            "content": "Solve this: A train leaves station A at 60 mph. "
                       "Another train leaves station B at 90 mph. "
                       "The stations are 300 miles apart. "
                       "When and where do they meet?"
        }
    ]
)

# The thinking block contains the model's reasoning
thinking_block = response.content[0].thinking
print(f"Reasoning: {thinking_block}")

# The text block contains the final answer
final_answer = response.content[0].text
print(f"Answer: {final_answer}")

Set budget_tokens to allocate how much processing the model should spend on reasoning before generating the response. Higher budgets produce more thorough analysis for complex problems (math proofs, legal analysis, multi-step coding).

Function Calling

import json

response = client.messages.create(
    model="claude-sonnet-4-20260514",
    max_tokens=512,
    tools=[
        {
            "name": "get_weather",
            "description": "Get the weather for a location",
            "input_schema": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"},
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
                },
                "required": ["location"]
            }
        }
    ],
    messages=[
        {"role": "user", "content": "What's the weather in Tokyo?"}
    ]
)

# Check if Claude wants to call a tool
for block in response.content:
    if block.type == "tool_use":
        tool_name = block.name
        tool_input = block.input
        print(f"Calling {tool_name} with: {json.dumps(tool_input)}")

Production Routing Strategy

Route requests across model tiers to optimize cost/quality:

def route_request(prompt: str, complexity: str):
    """Route to the appropriate Claude model based on complexity."""
    if complexity == "simple":
        model = "claude-haiku-4-20251022"       # $1/$5 — fast, cheap
    elif complexity == "standard":
        model = "claude-sonnet-4-20260514"      # $3/$15 — default tier
    elif complexity == "complex":
        model = "claude-opus-4-20260515"        # $5/$25 — maximum quality
    else:
        raise ValueError(f"Unknown complexity: {complexity}")

    return client.messages.create(
        model=model,
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )

# Applying routing can cut 30-40% vs running everything on Sonnet.
# Use Haiku for classification/summarization,
# Sonnet for standard generation,
# Opus only for tasks that measurably benefit.

Error Handling and Retry Patterns

Production API calls inevitably face transient errors. Implement exponential backoff with jitter:

import time
import random
from anthropic import (
    Anthropic, APIError, APIConnectionError, RateLimitError,
    APIStatusError
)

def claude_with_retry(client: Anthropic, max_retries: int = 5, **kwargs):
    """Call Claude with exponential backoff and jitter."""
    last_error = None
    for attempt in range(max_retries):
        try:
            return client.messages.create(**kwargs)
        except RateLimitError as e:
            wait = (2 ** attempt) + random.random()
            print(f"Rate limited. Retrying in {wait:.1f}s (attempt {attempt + 1})")
            time.sleep(wait)
            last_error = e
        except APIConnectionError as e:
            wait = (2 ** attempt) + random.random()
            print(f"Connection error. Retrying in {wait:.1f}s")
            time.sleep(wait)
            last_error = e
        except APIStatusError as e:
            if e.status_code >= 500:
                wait = (2 ** attempt) + random.random()
                time.sleep(wait)
                last_error = e
            else:
                raise  # Don't retry 4xx errors (except 429 handled above)
    raise last_error or APIError("Max retries exceeded")

# Usage
response = claude_with_retry(
    client,
    model="claude-sonnet-4-20260514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a Python decorator."}]
)

Streaming with Error Recovery

For long-running streaming operations, handle mid-stream disconnections:

def stream_with_recovery(client: Anthropic, max_retries: int = 3, **kwargs):
    """Stream Claude responses with automatic reconnection on failure."""
    for attempt in range(max_retries):
        try:
            collected = []
            with client.messages.stream(**kwargs) as stream:
                for text in stream.text_stream:
                    collected.append(text)
                    yield text
            return  # Success — exit retry loop
        except (APIConnectionError, TimeoutError) as e:
            if attempt < max_retries - 1:
                wait = (2 ** attempt) + random.random()
                print(f"Stream interrupted. Reconnecting in {wait:.1f}s")
                time.sleep(wait)
                # Re-create the stream from where we left off
                # Note: actual implementation would need to track message history
                continue
            raise

# Usage
for token in stream_with_recovery(
    client,
    model="claude-sonnet-4-20260514",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Write a comprehensive guide..."}]
):
    print(token, end="", flush=True)

Tool Use Chains (Multi-Turn Function Calling)

Claude can make multiple tool calls in sequence, enabling complex agentic workflows. The pattern is: call → execute → return results → Claude uses them for the next step:

import json
import requests

def get_weather(location: str, unit: str = "celsius") -> str:
    """Mock weather API call."""
    return json.dumps({"location": location, "temperature": 22, "unit": unit})

def search_web(query: str) -> str:
    """Mock web search."""
    return json.dumps({"results": [{"title": f"Result for {query}", "url": "https://example.com"}]})

AVAILABLE_TOOLS = {
    "get_weather": get_weather,
    "search_web": search_web,
}

TOOLS = [
    {
        "name": "get_weather",
        "description": "Get current weather for a location",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {"type": "string", "description": "City name"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["location"]
        }
    },
    {
        "name": "search_web",
        "description": "Search the web for information",
        "input_schema": {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "Search query"}
            },
            "required": ["query"]
        }
    }
]

def run_agentic_loop(client: Anthropic, user_message: str, max_turns: int = 5):
    """Run Claude with tool calling until it produces a final answer."""
    messages = [{"role": "user", "content": user_message}]

    for turn in range(max_turns):
        response = client.messages.create(
            model="claude-sonnet-4-20260514",
            max_tokens=4096,
            tools=TOOLS,
            messages=messages
        )

        # Check if Claude produced a text response (final answer)
        has_text = any(block.type == "text" for block in response.content)
        has_tool_calls = any(block.type == "tool_use" for block in response.content)

        if has_text and not has_tool_calls:
            return response.content[0].text

        # Process tool calls
        for block in response.content:
            if block.type == "tool_use":
                tool_name = block.name
                tool_input = block.input
                print(f"\n🔧 Calling {tool_name}({json.dumps(tool_input)})")

                # Execute the tool
                result = AVAILABLE_TOOLS[tool_name](**tool_input)

                # Add Claude's request and tool result to conversation
                messages.append({
                    "role": "assistant",
                    "content": [block]
                })
                messages.append({
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": result
                        }
                    ]
                })

    return "Max turns reached without final answer."

result = run_agentic_loop(client, "What's the weather in Tokyo and find recent news about Japan?")
print(result)

Cost Optimization Calculator

Understanding your actual per-task cost helps choose the right model and strategy:

def calculate_cost(
    model: str,
    input_tokens: int = 0,
    output_tokens: int = 0,
    cache_write_tokens: int = 0,
    cache_read_tokens: int = 0,
    batch: bool = False
) -> dict:
    """Calculate total cost for a Claude API request."""
    rates = {
        "opus-4.7": {"input": 5.00, "output": 25.00, "cache_write": 6.25, "cache_read": 0.50},
        "opus-4.6": {"input": 5.00, "output": 25.00, "cache_write": 6.25, "cache_read": 0.50},
        "sonnet-4.6": {"input": 3.00, "output": 15.00, "cache_write": 3.75, "cache_read": 0.30},
        "haiku-4.5": {"input": 1.00, "output": 5.00, "cache_write": 1.25, "cache_read": 0.10},
    }

    r = rates.get(model, rates["sonnet-4.6"])
    discount = 0.5 if batch else 1.0

    input_cost = (input_tokens / 1_000_000) * r["input"] * discount
    output_cost = (output_tokens / 1_000_000) * r["output"] * discount
    cache_write_cost = (cache_write_tokens / 1_000_000) * r["cache_write"]
    cache_read_cost = (cache_read_tokens / 1_000_000) * r["cache_read"]

    total = input_cost + output_cost + cache_write_cost + cache_read_cost

    return {
        "input_cost": round(input_cost, 6),
        "output_cost": round(output_cost, 6),
        "cache_write_cost": round(cache_write_cost, 6),
        "cache_read_cost": round(cache_read_cost, 6),
        "total_cost": round(total, 6),
        "total_cents": round(total * 100, 2)
    }

# Example: cost of processing 1000 documents with caching
per_doc = calculate_cost(
    model="sonnet-4.6",
    input_tokens=2000,
    output_tokens=500,
    cache_read_tokens=1500,  # 75% cache hit rate
    batch=True
)
print(f"Per document: ${per_doc['total_cost']:.6f}")
print(f"1000 documents: ${per_doc['total_cost'] * 1000:.2f}")

Strategy Comparison

Strategy Cost/1K requests (2K in, 500 out) Quality Latency
All Haiku $3.75 Acceptable for simple tasks Fastest
All Sonnet $11.25 High Moderate
All Opus $37.50 Maximum Slowest
Routing (80% Haiku, 15% Sonnet, 5% Opus) $5.81 Good balance Fast
Sonnet + caching (75% cache hit) $5.06 High Fast
Sonnet + batch (50% off) $5.63 High Delayed
Sonnet + caching + batch $2.53 High Delayed

The optimal strategy for most production workloads: use Haiku for classification/triggers, Sonnet for generation with prompt caching, and batch API for async preprocessing.

Multi-Turn Conversation Management

For chat applications, manage conversation history within context limits:

class ConversationManager:
    """Manages conversation history within Claude's context window."""

    def __init__(self, system_prompt: str, max_context_tokens: int = 100000):
        self.system_prompt = system_prompt
        self.max_context_tokens = max_context_tokens
        self.history: list[dict] = []
        self.total_tokens = 0

    def add_message(self, role: str, content: str | list):
        """Add a message and track estimated token count."""
        self.history.append({"role": role, "content": content})
        # Rough token estimate (4 chars per token)
        if isinstance(content, str):
            self.total_tokens += len(content) // 4
        elif isinstance(content, list):
            for item in content:
                if isinstance(item, dict) and "text" in item:
                    self.total_tokens += len(item["text"]) // 4

        # Trim history if approaching context limit
        self._trim_if_needed()

    def _trim_if_needed(self):
        """Remove oldest messages while preserving system prompt guidance."""
        while self.total_tokens > self.max_context_tokens and len(self.history) > 4:
            removed = self.history.pop(0)
            if isinstance(removed["content"], str):
                self.total_tokens -= len(removed["content"]) // 4
            self.total_tokens -= 20  # overhead per message

    def get_compact_history(self) -> list[dict]:
        """Return history with optional summarization for long conversations."""
        return self.history

    def summarize_and_compress(self, client: Anthropic):
        """Summarize older history to fit within context window."""
        if self.total_tokens <= self.max_context_tokens:
            return

        # Summarize oldest half of history
        mid = len(self.history) // 2
        early_part = self.history[:mid]
        recent_part = self.history[mid:]

        summary_prompt = "Summarize this conversation so far in 2-3 sentences:" + \
            "\n".join([f"{m['role']}: {str(m['content'])[:200]}" for m in early_part])

        summary = client.messages.create(
            model="claude-haiku-4-20251022",
            max_tokens=200,
            messages=[{"role": "user", "content": summary_prompt}]
        )

        self.history = [
            {"role": "user", "content": f"[Previous conversation summary: {summary.content[0].text}]"}
        ] + recent_part
        self.total_tokens = sum(len(str(m['content'])) // 4 for m in self.history)

# Usage
conv = ConversationManager("You are a helpful assistant.")
conv.add_message("user", "Hello, I need help with Python.")
conv.add_message("assistant", "Sure! What specifically?")
conv.add_message("user", "I'm building a web scraper...")

Claude API vs. Alternatives

Feature Claude API OpenAI API Google Gemini API
Best model Opus 4.7 ($5/$25) GPT-5.4 ($6/$30) Gemini 3.1 Pro ($1.25/$5)
Context window 1M tokens (200K Haiku) 128K-200K 1M tokens
Vision Up to 3.75MP images Up to 20MB images Native multimodal
Prompt caching Yes (90% discount) Yes (~50% discount) Not available
Batch discount 50% off 50% off Not available
Extended thinking Yes (budget_tokens) Yes (o-series) Yes (thinking mode)
Function calling JSON schema JSON schema Function declarations
Streaming Server-sent events Server-sent events Server-sent events
Rate limits Tiered (based on usage) Tiered (based on usage) Per-minute quotas
Moderation Content filtering API Built-in moderation Safety filters
Data privacy Enterprise terms Enterprise terms Enterprise terms
SDK languages Python, TypeScript, Java, Go Python, JS, Go, Java, .NET Python, JS, Go, Java, Swift

Claude’s competitive advantages are its large context window (1M tokens), aggressive prompt caching (90% discount), and extended thinking capabilities. For most production workloads, Claude Sonnet 4.6 offers the best price-to-performance ratio.

Max Output Extension (300K Tokens)

For long-form generation (reports, codebases, analysis), Claude supports extended output tokens via a beta header:

response = client.messages.create(
    model="claude-sonnet-4-20260514",
    max_tokens=300000,  # Standard API supports 128K; beta extends to 300K
    extra_headers={
        "anthropic-beta": "output-300k-2026-03-24"
    },
    messages=[
        {"role": "user", "content": "Write a comprehensive technical report on..."}
    ]
)
# Uses batch API output limits — up to 300K tokens per response

This is ideal for generating long documents, entire codebases, or detailed analysis in a single pass. Available for Opus 4.7, Opus 4.6, and Sonnet 4.6 on the Message Batches API.

Content Filtering and Moderation

Claude includes built-in safety mechanisms, but production applications should add their own content filtering layer:

import re

class ContentFilter:
    """Multi-layer content filter for Claude API responses."""

    def __init__(self):
        self.blocked_categories = {
            "pii": self._check_pii,
            "code_injection": self._check_code_injection,
            "hallucination_markers": self._check_hallucination,
        }

    def _check_pii(self, text: str) -> list[str]:
        patterns = {
            "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
            "email": r'\b[\w\.-]+@[\w\.-]+\.\w{2,}\b',
            "phone": r'\b\+?\d[\d\s\-\(\)]{7,}\d\b',
            "credit_card": r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
        }
        found = []
        for name, pattern in patterns.items():
            if re.search(pattern, text):
                found.append(name)
        return found

    def _check_code_injection(self, text: str) -> list[str]:
        dangerous = [
            r'rm\s+-rf\s+/', r'eval\(', r'exec\(', r'system\(',
            r'os\.system\(', r'subprocess\.', r'__import__\('
        ]
        return [p for p in dangerous if re.search(p, text)]

    def _check_hallucination(self, text: str) -> list[str]:
        markers = [
            r'as of my last update', r'as of my knowledge cutoff',
            r'I don\'t have access to', r'according to my training data'
        ]
        return [m for m in markers if re.search(m, text.lower())]

    def filter_response(self, text: str) -> dict:
        issues = {}
        for category, checker in self.blocked_categories.items():
            found = checker(text)
            if found:
                issues[category] = found
        return {
            "safe": len(issues) == 0,
            "issues": issues
        }

filter = ContentFilter()
response = client.messages.create(
    model="claude-sonnet-4-20260514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a privacy policy"}]
)

result = filter.filter_response(response.content[0].text)
if not result["safe"]:
    print(f"⚠️ Issues detected: {result['issues']}")

Resources

Comments

👍 Was this article helpful?