Introduction
Tool use is what transforms a simple language model into a powerful autonomous agent. While LLMs excel at reasoning and text generation, they cannot interact with the outside world without tools. This guide covers how to integrate external APIs and services into your AI agents, enabling them to read emails, manage files, process documents, handle payments, and automate workflows.
Key Statistics:
- Agents with tool integration handle 70% more tasks autonomously
- Average tool integration reduces workflow time by 65%
- API-first agents achieve 3x higher productivity than standalone LLMs
- Tool calling accuracy: 95%+ with proper implementation
Why Tool Use Matters for Agentic AI
The Problem with Standalone LLMs
Large language models are impressive but limited. They cannot:
- Check your calendar for availability
- Send emails on your behalf
- Process customer orders
- Access your files in Google Drive
- Update your CRM with new leads
- Process PDF invoices
Without tools, agents are just sophisticated chatbots. Tool use transforms them into autonomous coworkers that can actually get work done.
How Tool Use Works
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ AI Agent โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ LLM โโโโโถโ Tool โโโโโถโ Execute โ โ
โ โ Reasoner โ โ Selector โ โ & Respond โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Available Tools Registry โ โ
โ โ โข Gmail API โข Google Drive โข Stripe โ โ
โ โ โข Calendar โข Slack โข PDF Parser โ โ
โ โ โข Dropbox โข Notion โข Custom APIs โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Core Tool Integration Patterns
Pattern 1: Function Calling
Modern LLMs like GPT-4, Claude, and Gemini support native function calling:
# BAD: Manual API calls without structured tool definition
def get_emails():
# Hardcoded, no LLM awareness
service = build_gmail_service()
return service.users().messages().list().execute()
# GOOD: Structured tool with LLM integration
from openai import OpenAI
from typing import List
from pydantic import BaseModel
class EmailTool(BaseModel):
"""Tool for reading emails from Gmail"""
max_results: int = 10
query: str = ""
def execute(self) -> str:
service = build_gmail_service()
messages = service.users().messages().list(
userId='me',
q=self.query,
maxResults=self.max_results
).execute()
return format_emails(messages)
# Register tools with OpenAI
client = OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "get_emails",
"description": "Read recent emails from Gmail",
"parameters": EmailTool.model_json_schema()
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Check my recent emails from John"}],
tools=tools
)
Pattern 2: ReAct (Reasoning + Acting)
ReAct combines reasoning traces with actions:
# BAD: No reasoning trace
def process_request(user_input):
# Jump straight to action
return call_api(user_input)
# GOOD: ReAct pattern with reasoning
from enum import Enum
from typing import Union
class ReActAgent:
def __init__(self, tools: List[callable]):
self.tools = {tool.name: tool for tool in tools}
def think(self, state: dict) -> str:
"""Generate reasoning trace"""
history = state.get("thoughts", [])
context = "\n".join(history[-5:])
prompt = f"""Given the context:
{context}
What should I do next? Choose a tool or provide reasoning."""
return llm.generate(prompt)
def act(self, tool_name: str, args: dict) -> dict:
"""Execute tool and return result"""
tool = self.tools.get(tool_name)
if not tool:
return {"error": f"Unknown tool: {tool_name}"}
return {"result": tool.execute(**args)}
def run(self, task: str, max_iterations: int = 10) -> dict:
state = {"task": task, "thoughts": [], "actions": []}
for _ in range(max_iterations):
# Think
thought = self.think(state)
state["thoughts"].append(thought)
# Check if done
if "FINAL ANSWER" in thought:
return {"status": "complete", "answer": thought}
# Act
tool_name, args = parse_tool_call(thought)
result = self.act(tool_name, args)
state["actions"].append({"tool": tool_name, "result": result})
return {"status": "timeout"}
Pattern 3: Tool Composition
Build complex workflows by chaining tools:
# BAD: Monolithic handler
def process_invoice(invoice_file):
# Everything in one function
raw = read_file(invoice_file)
data = parse_pdf(raw)
amount = extract_amount(data)
send_payment(amount)
log_transaction(amount)
# GOOD: Composable tools
from typing import List, Callable
from dataclasses import dataclass
@dataclass
class ToolResult:
tool: str
output: any
success: bool
class ToolPipeline:
def __init__(self):
self.tools: List[Callable] = []
def add(self, tool: Callable) -> 'ToolPipeline':
self.tools.append(tool)
return self
def execute(self, input_data: any) -> List[ToolResult]:
results = []
current_data = input_data
for tool in self.tools:
try:
output = tool(current_data)
results.append(ToolResult(
tool=tool.name,
output=output,
success=True
))
current_data = output
except Exception as e:
results.append(ToolResult(
tool=tool.name,
output=str(e),
success=False
))
break
return results
# Usage: Compose invoice processing pipeline
pipeline = (ToolPipeline()
.add(read_file_tool)
.add(parse_pdf_tool)
.add(extract_amount_tool)
.add(validate_amount_tool)
.add(send_payment_tool)
.add(log_transaction_tool))
results = pipeline.execute(invoice_path)
Google Workspace Integration
Gmail API
# Setup Gmail service
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
def get_gmail_service():
creds = Credentials.from_authorized_user_info(
info=load_secrets(),
scopes=['https://www.googleapis.com/auth/gmail.modify']
)
return build('gmail', 'v1', credentials=creds)
# Tool: Send email
class SendEmailTool:
name = "send_email"
description = "Send an email via Gmail"
class Params:
to: str
subject: str
body: str
cc: str = ""
def execute(self, to: str, subject: str, body: str, cc: str = "") -> str:
service = get_gmail_service()
message = {
'raw': create_message(to, subject, body, cc)
}
result = service.users().messages().send(
userId='me',
body=message
).execute()
return f"Email sent: {result['id']}"
# Tool: Search emails
class SearchEmailsTool:
name = "search_emails"
description = "Search emails with query"
class Params:
query: str
max_results: int = 10
def execute(self, query: str, max_results: int = 10) -> str:
service = get_gmail_service()
results = service.users().messages().list(
userId='me',
q=query,
maxResults=max_results
).execute()
messages = results.get('messages', [])
return format_search_results(messages, service)
Google Calendar API
from googleapiclient.discovery import build
from datetime import datetime, timedelta
class CalendarTool:
name = "calendar"
description = "Manage Google Calendar events"
def get_service(self):
creds = get_google_credentials(['https://www.googleapis.com/auth/calendar'])
return build('calendar', 'v3', credentials=creds)
class Params:
action: str # "create", "list", "delete"
# create params
title: str = ""
start_time: str = ""
end_time: str = ""
attendees: list = None
# list params
time_min: str = ""
time_max: str = ""
def execute(self, action: str, **kwargs) -> str:
service = self.get_service()
if action == "create":
event = {
'summary': kwargs['title'],
'start': {'dateTime': kwargs['start_time']},
'end': {'dateTime': kwargs['end_time']},
'attendees': [{'email': email} for email in (kwargs.get('attendees') or [])]
}
result = service.events().insert(
calendarId='primary',
body=event
).execute()
return f"Event created: {result.get('htmlLink')}"
elif action == "list":
events = service.events().list(
calendarId='primary',
timeMin=kwargs.get('time_min', datetime.now().isoformat()),
timeMax=kwargs.get('time_max', (datetime.now() + timedelta(days=7)).isoformat()),
singleEvents=True,
orderBy='startTime'
).execute()
return format_events(events.get('items', []))
return "Unknown action"
Google Drive API
class DriveTool:
name = "drive"
description = "Access and manage Google Drive files"
def execute(self, action: str, **kwargs) -> str:
service = build('drive', 'v3', credentials=get_google_credentials())
if action == "list":
results = service.files().list(
q=kwargs.get('query', ''),
pageSize=kwargs.get('max_results', 10),
fields="files(id, name, mimeType, modifiedTime)"
).execute()
return format_file_list(results.get('files', []))
elif action == "read":
file_id = kwargs['file_id']
file = service.files().get(fileId=file_id).execute()
if 'google' in file.get('mimeType', ''):
# Google Doc - export as text
content = service.files().export_media(
fileId=file_id,
mimeType='text/plain'
).execute()
return content.decode('utf-8')
else:
# Binary file
return f"Binary file: {file['name']} ({file.get('size', 'unknown size')})"
elif action == "write":
file_metadata = {
'name': kwargs['filename'],
'parents': [kwargs.get('folder_id', 'root')]
}
media = MediaFileUpload(kwargs['filepath'])
file = service.files().create(
body=file_metadata,
media_body=media,
fields='id, webViewLink'
).execute()
return f"File uploaded: {file.get('webViewLink')}"
return "Unknown action"
Cloud Storage Integration
Dropbox API
import dropbox
class DropboxTool:
name = "dropbox"
description = "Access Dropbox for file storage"
def __init__(self):
self.dbx = dropbox.Dropbox(load_secrets()['dropbox_token'])
def execute(self, action: str, **kwargs) -> str:
if action == "upload":
with open(kwargs['filepath'], 'rb') as f:
self.dbx.files_upload(
f.read(),
kwargs['path'],
mode=dropbox.files.WriteMode.overwrite
)
return f"Uploaded to {kwargs['path']}"
elif action == "download":
metadata, response = self.dbx.files_download(kwargs['path'])
return response.content
elif action == "list":
result = self.dbx.files_list_folder(kwargs.get('path', ''))
return "\n".join([f.name for f in result.entries])
elif action == "share":
link = self.dbx.sharing_create_shared_link_with_settings(kwargs['path'])
return link.url
return "Unknown action"
AWS S3 Integration
import boto3
from botocore.config import Config
class S3Tool:
name = "s3"
description = "Interact with AWS S3 buckets"
def __init__(self):
self.s3 = boto3.client('s3',
config=Config(signature_version='s3v4'))
class Params:
action: str # "upload", "download", "list", "delete"
bucket: str
key: str = ""
filepath: str = ""
def execute(self, action: str, bucket: str, key: str = "", filepath: str = "") -> str:
if action == "upload":
self.s3.upload_file(
filepath,
bucket,
key,
ExtraArgs={'ContentType': 'application/octet-stream'}
)
return f"s3://{bucket}/{key}"
elif action == "download":
self.s3.download_file(bucket, key, filepath)
return f"Downloaded to {filepath}"
elif action == "list":
result = self.s3.list_objects_v2(Bucket=bucket, Prefix=key)
contents = result.get('Contents', [])
return "\n".join([obj['Key'] for obj in contents])
elif action == "delete":
self.s3.delete_object(Bucket=bucket, Key=key)
return f"Deleted s3://{bucket}/{key}"
elif action == "presigned_url":
url = self.s3.generate_presigned_url(
'get_object',
Params={'Bucket': bucket, 'Key': key},
ExpiresIn=3600
)
return url
return "Unknown action"
Document Processing Tools
PDF Processing
import PyPDF2
from pdfplumber import extract_text
from pydantic import BaseModel
class PDFTool:
name = "pdf"
description = "Extract and process PDF documents"
class ExtractParams(BaseModel):
filepath: str
pages: str = "all" # "all", "1-5", or comma-separated
def execute(self, action: str, **kwargs) -> str:
if action == "extract_text":
filepath = kwargs['filepath']
pages = kwargs.get('pages', 'all')
if pages == "all":
with pdfplumber.open(filepath) as pdf:
text = "\n\n".join([page.extract_text() for page in pdf.pages])
else:
# Parse page ranges
page_nums = parse_page_range(pages)
with pdfplumber.open(filepath) as pdf:
text = "\n\n".join([pdf.pages[i].extract_text()
for i in page_nums if i < len(pdf.pages)])
return text[:10000] # Limit output size
elif action == "extract_tables":
filepath = kwargs['filepath']
tables = []
with pdfplumber.open(filepath) as pdf:
for page in pdf.pages:
page_tables = page.extract_tables()
tables.extend(page_tables)
return format_tables(tables)
elif action == "extract_metadata":
with open(kwargs['filepath'], 'rb') as f:
reader = PyPDF2.PdfReader(f)
meta = reader.metadata
return {
'title': meta.get('/Title', 'N/A'),
'author': meta.get('/Author', 'N/A'),
'pages': len(reader.pages)
}
return "Unknown action"
Document OCR
import io
from google.cloud import vision
class OCRTool:
name = "ocr"
description = "Extract text from images using Google Cloud Vision"
def __init__(self):
self.client = vision.ImageAnnotatorClient()
def execute(self, action: str, **kwargs) -> str:
if action == "extract_text":
# Support both file path and image bytes
if 'filepath' in kwargs:
with io.open(kwargs['filepath'], 'rb') as f:
image = vision.Image(content=f.read())
else:
image = vision.Image(content=kwargs['image_bytes'])
response = self.client.text_detection(image=image)
texts = response.text_annotations
if texts:
return texts[0].description
return "No text found"
elif action == "detect_labels":
if 'filepath' in kwargs:
with io.open(kwargs['filepath'], 'rb') as f:
image = vision.Image(content=f.read())
else:
image = vision.Image(content=kwargs['image_bytes'])
response = self.client.label_detection(image=image)
labels = [{'description': label.description,
'score': label.score}
for label in response.label_annotations]
return format_json(labels)
return "Unknown action"
E-commerce & Payment Integration
Stripe API
import stripe
from stripe import PaymentIntent
class StripeTool:
name = "stripe"
description = "Process payments and manage Stripe transactions"
def __init__(self):
stripe.api_key = load_secrets()['stripe_secret_key']
class PaymentParams(BaseModel):
amount: int # in cents
currency: str = "usd"
customer_email: str = ""
description: str = ""
metadata: dict = {}
def execute(self, action: str, **kwargs) -> str:
if action == "create_payment":
intent = stripe.PaymentIntent.create(
amount=kwargs['amount'],
currency=kwargs.get('currency', 'usd'),
receipt_email=kwargs.get('customer_email'),
description=kwargs.get('description'),
metadata=kwargs.get('metadata', {})
)
return {
'client_secret': intent.client_secret,
'payment_id': intent.id
}
elif action == "get_payment":
intent = stripe.PaymentIntent.retrieve(kwargs['payment_id'])
return {
'status': intent.status,
'amount': intent.amount,
'currency': intent.currency
}
elif action == "create_customer":
customer = stripe.Customer.create(
email=kwargs['email'],
name=kwargs.get('name', ''),
metadata=kwargs.get('metadata', {})
)
return {'customer_id': customer.id}
elif action == "list_payments":
payments = stripe.PaymentIntent.list(
limit=kwargs.get('limit', 10),
created={'gte': kwargs.get('since', 0)}
)
return format_payment_list(payments.data)
elif action == "refund":
refund = stripe.Refund.create(
payment_intent=kwargs['payment_id'],
amount=kwargs.get('amount') # Optional: partial refund
)
return {'refund_id': refund.id, 'status': refund.status}
return "Unknown action"
Shopify API
import shopify
class ShopifyTool:
name = "shopify"
description = "Manage Shopify store operations"
def __init__(self):
shopify.ShopifyResource.set_url(load_secrets()['shopify_url'])
shopify.Session.setup(api_key=load_secrets()['shopify_key'],
secret=load_secrets()['shopify_secret'])
def execute(self, action: str, **kwargs) -> str:
session = shopify.Session(load_secrets()['shopify_shop'])
shopify.ShopifyResource.activate_session(session)
try:
if action == "create_order":
order = shopify.Order(**{
'line_items': kwargs['line_items'],
'email': kwargs.get('email'),
'financial_status': 'pending',
'note': kwargs.get('note', '')
})
order.save()
return {'order_id': order.id, 'order_number': order.order_number}
elif action == "get_orders":
orders = shopify.Order.find(
status='any',
limit=kwargs.get('limit', 10)
)
return format_orders(orders)
elif action == "create_product":
product = shopify.Product({
'title': kwargs['title'],
'body_html': kwargs.get('description', ''),
'vendor': kwargs.get('vendor', ''),
'product_type': kwargs.get('type', ''),
'variants': [{
'price': kwargs['price'],
'sku': kwargs.get('sku', '')
}]
})
product.save()
return {'product_id': product.id, 'handle': product.handle}
elif action == "update_inventory":
inventory_item = shopify.InventoryItem.find(kwargs['inventory_id'])
inventory_item.adjust(kwargs['adjustment'])
return {'inventory_level': inventory_item.inventory_quantity}
return "Unknown action"
finally:
shopify.ShopifyResource.clear_session()
Error Handling & Resilience
Retry Patterns
import time
from functools import wraps
import logging
logger = logging.getLogger(__name__)
def retry_with_backoff(
max_retries: int = 3,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: tuple = (Exception,)
):
"""Decorator for retrying tool executions with exponential backoff"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
logger.warning(
f"Tool {func.__name__} failed (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {delay}s..."
)
time.sleep(delay)
delay *= backoff_factor
else:
logger.error(
f"Tool {func.__name__} failed after {max_retries} attempts"
)
raise last_exception
return wrapper
return decorator
class ResilientTool:
"""Base class for tools with retry and error handling"""
@retry_with_backoff(max_retries=3, initial_delay=1.0)
def execute_with_retry(self, *args, **kwargs):
return self.execute(*args, **kwargs)
def execute_safe(self, *args, **kwargs) -> dict:
"""Execute with comprehensive error handling"""
try:
result = self.execute_with_retry(*args, **kwargs)
return {
'success': True,
'data': result,
'error': None
}
except Exception as e:
logger.exception(f"Tool execution failed: {e}")
return {
'success': False,
'data': None,
'error': str(e),
'error_type': type(e).__name__
}
Rate Limiting
import time
from collections import deque
from threading import Lock
class RateLimiter:
"""Token bucket rate limiter for API calls"""
def __init__(self, max_calls: int, time_window: float):
self.max_calls = max_calls
self.time_window = time_window
self.calls = deque()
self.lock = Lock()
def acquire(self) -> bool:
with self.lock:
now = time.time()
# Remove expired timestamps
while self.calls and self.calls[0] < now - self.time_window:
self.calls.popleft()
if len(self.calls) < self.max_calls:
self.calls.append(now)
return True
return False
def wait_and_acquire(self):
"""Wait until rate limit allows execution"""
while not self.acquire():
time.sleep(0.1)
class RateLimitedTool:
"""Base class for tools with rate limiting"""
def __init__(self):
self.limiter = RateLimiter(
max_calls=100, # Adjust per API
time_window=60
)
def execute(self, *args, **kwargs):
self.limiter.wait_and_acquire()
return self._do_execute(*args, **kwargs)
Security Best Practices
API Key Management
# BAD: Hardcoded API keys
STRIPE_KEY = "sk_live_xxxxx"
# GOOD: Secure secrets management
import os
from pathlib import Path
def load_secrets() -> dict:
"""Load secrets from environment or secure vault"""
# Try environment variables first
secrets = {
'stripe': os.getenv('STRIPE_SECRET_KEY'),
'openai': os.getenv('OPENAI_API_KEY'),
'google': os.getenv('GOOGLE_API_CREDENTIALS'),
}
# Fall back to encrypted local file
if not all(secrets.values()):
from keyring import get_password
secrets = {
'stripe': get_password('myapp', 'stripe'),
'openai': get_password('myapp', 'openai'),
'google': get_password('myapp', 'google'),
}
return secrets
# BETTER: Use a secrets manager
class SecretsManager:
def __init__(self, provider: str = "aws"):
if provider == "aws":
import boto3
self.client = boto3.client('secretsmanager')
elif provider == "hashicorp":
import hvac
self.client = hvac.Client()
def get_secret(self, name: str) -> dict:
if isinstance(self.client, boto3.client):
response = self.client.get_secret_value(SecretId=name)
return json.loads(response['SecretString'])
else:
return self.client.secrets.kv.v2.read_secret_version(path=name)['data']
Input Validation
from pydantic import BaseModel, validator, Field
from typing import List
class EmailParams(BaseModel):
to: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
subject: str = Field(..., min_length=1, max_length=200)
body: str = Field(..., min_length=1)
@validator('body')
def sanitize_body(cls, v):
# Prevent prompt injection
dangerous_patterns = ['{{', '}}', '{%', '%}', 'ignore previous']
for pattern in dangerous_patterns:
if pattern.lower() in v.lower():
raise ValueError(f"Potentially dangerous content detected")
return v
@validator('subject')
def sanitize_subject(cls, v):
# Remove potentially dangerous characters
return v.replace('\n', '').replace('\r', '')[:200]
class SafeToolExecution:
"""Execute tools with validated inputs"""
def __init__(self, tool_class, param_model):
self.tool = tool_class()
self.param_model = param_model
def execute(self, raw_params: dict) -> dict:
try:
# Validate inputs
params = self.param_model(**raw_params)
# Execute with validated params
result = self.tool.execute(**params.dict())
return {'success': True, 'result': result}
except ValidationError as e:
return {'success': False, 'error': str(e)}
Testing Tools
import pytest
from unittest.mock import Mock, patch
class TestToolIntegration:
"""Test suite for tool integrations"""
@patch('myapp.gmail.get_gmail_service')
def test_send_email_tool(self, mock_service):
# Setup mock
mock_messages = Mock()
mock_messages.send.return_value.execute.return_value = {'id': 'msg123'}
mock_service.return_value.users.return_value.messages.return_value = mock_messages
# Execute
tool = SendEmailTool()
result = tool.execute(
to="[email protected]",
subject="Test",
body="Hello"
)
# Verify
assert 'msg123' in result
mock_messages.send.assert_called_once()
@patch('myapp.stripe.stripe.PaymentIntent')
def test_payment_creation(self, mock_intent):
# Setup mock
mock_intent.create.return_value = Mock(
id='pi_123',
client_secret='secret_xyz'
)
# Execute
tool = StripeTool()
result = tool.execute(
action='create_payment',
amount=1000,
currency='usd'
)
# Verify
assert result['payment_id'] == 'pi_123'
mock_intent.create.assert_called_once()
def test_rate_limiter(self):
limiter = RateLimiter(max_calls=2, time_window=1.0)
# Should allow 2 calls
assert limiter.acquire() is True
assert limiter.acquire() is True
# Should block 3rd call
assert limiter.acquire() is False
# After time window, should allow again
time.sleep(1.1)
assert limiter.acquire() is True
Best Practices Summary
Do’s
- Use structured tool definitions - Define input schemas with Pydantic
- Implement retry logic - API calls fail; handle gracefully with backoff
- Add rate limiting - Respect API quotas to avoid bans
- Validate all inputs - Prevent injection attacks and bad data
- Log everything - Track tool calls for debugging
- Use async where possible - Parallelize independent API calls
Don’ts
- Don’t hardcode API keys - Use secrets management
- Don’t skip error handling - Every API can fail
- Don’t ignore rate limits - You’ll get blocked
- Don’t trust user input - Always validate and sanitize
- Don’t block on tools - Use timeouts and async patterns
- Don’t forget to test - Mock external services in tests
Related Articles
- Building AI Agents: Autonomous Systems and Tool Integration
- Agentic AI Frameworks: Building Autonomous Systems in 2026
- LLM Orchestration Patterns: Chains, Agents, Tools, and Memory
- Search Engine APIs for Agentic AI Development
Comments