Skip to main content
โšก Calmops

Tool Use APIs for Agentic AI Development: Complete Guide

Introduction

Tool use is what transforms a simple language model into a powerful autonomous agent. While LLMs excel at reasoning and text generation, they cannot interact with the outside world without tools. This guide covers how to integrate external APIs and services into your AI agents, enabling them to read emails, manage files, process documents, handle payments, and automate workflows.

Key Statistics:

  • Agents with tool integration handle 70% more tasks autonomously
  • Average tool integration reduces workflow time by 65%
  • API-first agents achieve 3x higher productivity than standalone LLMs
  • Tool calling accuracy: 95%+ with proper implementation

Why Tool Use Matters for Agentic AI

The Problem with Standalone LLMs

Large language models are impressive but limited. They cannot:

  • Check your calendar for availability
  • Send emails on your behalf
  • Process customer orders
  • Access your files in Google Drive
  • Update your CRM with new leads
  • Process PDF invoices

Without tools, agents are just sophisticated chatbots. Tool use transforms them into autonomous coworkers that can actually get work done.

How Tool Use Works

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      AI Agent                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚
โ”‚  โ”‚   LLM       โ”‚โ”€โ”€โ”€โ–ถโ”‚   Tool      โ”‚โ”€โ”€โ”€โ–ถโ”‚   Execute   โ”‚     โ”‚
โ”‚  โ”‚   Reasoner  โ”‚    โ”‚   Selector   โ”‚    โ”‚   & Respond โ”‚     โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ”‚
โ”‚         โ”‚                                      โ”‚            โ”‚
โ”‚         โ–ผ                                      โ–ผ            โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚              Available Tools Registry                โ”‚   โ”‚
โ”‚  โ”‚  โ€ข Gmail API    โ€ข Google Drive    โ€ข Stripe          โ”‚   โ”‚
โ”‚  โ”‚  โ€ข Calendar     โ€ข Slack           โ€ข PDF Parser      โ”‚   โ”‚
โ”‚  โ”‚  โ€ข Dropbox      โ€ข Notion          โ€ข Custom APIs     โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Core Tool Integration Patterns

Pattern 1: Function Calling

Modern LLMs like GPT-4, Claude, and Gemini support native function calling:

# BAD: Manual API calls without structured tool definition
def get_emails():
    # Hardcoded, no LLM awareness
    service = build_gmail_service()
    return service.users().messages().list().execute()

# GOOD: Structured tool with LLM integration
from openai import OpenAI
from typing import List
from pydantic import BaseModel

class EmailTool(BaseModel):
    """Tool for reading emails from Gmail"""
    max_results: int = 10
    query: str = ""
    
    def execute(self) -> str:
        service = build_gmail_service()
        messages = service.users().messages().list(
            userId='me',
            q=self.query,
            maxResults=self.max_results
        ).execute()
        return format_emails(messages)

# Register tools with OpenAI
client = OpenAI()
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_emails",
            "description": "Read recent emails from Gmail",
            "parameters": EmailTool.model_json_schema()
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Check my recent emails from John"}],
    tools=tools
)

Pattern 2: ReAct (Reasoning + Acting)

ReAct combines reasoning traces with actions:

# BAD: No reasoning trace
def process_request(user_input):
    # Jump straight to action
    return call_api(user_input)

# GOOD: ReAct pattern with reasoning
from enum import Enum
from typing import Union

class ReActAgent:
    def __init__(self, tools: List[callable]):
        self.tools = {tool.name: tool for tool in tools}
    
    def think(self, state: dict) -> str:
        """Generate reasoning trace"""
        history = state.get("thoughts", [])
        context = "\n".join(history[-5:])
        prompt = f"""Given the context:
{context}

What should I do next? Choose a tool or provide reasoning."""
        return llm.generate(prompt)
    
    def act(self, tool_name: str, args: dict) -> dict:
        """Execute tool and return result"""
        tool = self.tools.get(tool_name)
        if not tool:
            return {"error": f"Unknown tool: {tool_name}"}
        return {"result": tool.execute(**args)}
    
    def run(self, task: str, max_iterations: int = 10) -> dict:
        state = {"task": task, "thoughts": [], "actions": []}
        
        for _ in range(max_iterations):
            # Think
            thought = self.think(state)
            state["thoughts"].append(thought)
            
            # Check if done
            if "FINAL ANSWER" in thought:
                return {"status": "complete", "answer": thought}
            
            # Act
            tool_name, args = parse_tool_call(thought)
            result = self.act(tool_name, args)
            state["actions"].append({"tool": tool_name, "result": result})
            
        return {"status": "timeout"}

Pattern 3: Tool Composition

Build complex workflows by chaining tools:

# BAD: Monolithic handler
def process_invoice(invoice_file):
    # Everything in one function
    raw = read_file(invoice_file)
    data = parse_pdf(raw)
    amount = extract_amount(data)
    send_payment(amount)
    log_transaction(amount)

# GOOD: Composable tools
from typing import List, Callable
from dataclasses import dataclass

@dataclass
class ToolResult:
    tool: str
    output: any
    success: bool

class ToolPipeline:
    def __init__(self):
        self.tools: List[Callable] = []
    
    def add(self, tool: Callable) -> 'ToolPipeline':
        self.tools.append(tool)
        return self
    
    def execute(self, input_data: any) -> List[ToolResult]:
        results = []
        current_data = input_data
        
        for tool in self.tools:
            try:
                output = tool(current_data)
                results.append(ToolResult(
                    tool=tool.name,
                    output=output,
                    success=True
                ))
                current_data = output
            except Exception as e:
                results.append(ToolResult(
                    tool=tool.name,
                    output=str(e),
                    success=False
                ))
                break
        
        return results

# Usage: Compose invoice processing pipeline
pipeline = (ToolPipeline()
    .add(read_file_tool)
    .add(parse_pdf_tool)
    .add(extract_amount_tool)
    .add(validate_amount_tool)
    .add(send_payment_tool)
    .add(log_transaction_tool))

results = pipeline.execute(invoice_path)

Google Workspace Integration

Gmail API

# Setup Gmail service
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

def get_gmail_service():
    creds = Credentials.from_authorized_user_info(
        info=load_secrets(),
        scopes=['https://www.googleapis.com/auth/gmail.modify']
    )
    return build('gmail', 'v1', credentials=creds)

# Tool: Send email
class SendEmailTool:
    name = "send_email"
    description = "Send an email via Gmail"
    
    class Params:
        to: str
        subject: str
        body: str
        cc: str = ""
    
    def execute(self, to: str, subject: str, body: str, cc: str = "") -> str:
        service = get_gmail_service()
        
        message = {
            'raw': create_message(to, subject, body, cc)
        }
        
        result = service.users().messages().send(
            userId='me',
            body=message
        ).execute()
        
        return f"Email sent: {result['id']}"

# Tool: Search emails
class SearchEmailsTool:
    name = "search_emails"
    description = "Search emails with query"
    
    class Params:
        query: str
        max_results: int = 10
    
    def execute(self, query: str, max_results: int = 10) -> str:
        service = get_gmail_service()
        
        results = service.users().messages().list(
            userId='me',
            q=query,
            maxResults=max_results
        ).execute()
        
        messages = results.get('messages', [])
        return format_search_results(messages, service)

Google Calendar API

from googleapiclient.discovery import build
from datetime import datetime, timedelta

class CalendarTool:
    name = "calendar"
    description = "Manage Google Calendar events"
    
    def get_service(self):
        creds = get_google_credentials(['https://www.googleapis.com/auth/calendar'])
        return build('calendar', 'v3', credentials=creds)
    
    class Params:
        action: str  # "create", "list", "delete"
        # create params
        title: str = ""
        start_time: str = ""
        end_time: str = ""
        attendees: list = None
        # list params
        time_min: str = ""
        time_max: str = ""
    
    def execute(self, action: str, **kwargs) -> str:
        service = self.get_service()
        
        if action == "create":
            event = {
                'summary': kwargs['title'],
                'start': {'dateTime': kwargs['start_time']},
                'end': {'dateTime': kwargs['end_time']},
                'attendees': [{'email': email} for email in (kwargs.get('attendees') or [])]
            }
            result = service.events().insert(
                calendarId='primary',
                body=event
            ).execute()
            return f"Event created: {result.get('htmlLink')}"
        
        elif action == "list":
            events = service.events().list(
                calendarId='primary',
                timeMin=kwargs.get('time_min', datetime.now().isoformat()),
                timeMax=kwargs.get('time_max', (datetime.now() + timedelta(days=7)).isoformat()),
                singleEvents=True,
                orderBy='startTime'
            ).execute()
            return format_events(events.get('items', []))
        
        return "Unknown action"

Google Drive API

class DriveTool:
    name = "drive"
    description = "Access and manage Google Drive files"
    
    def execute(self, action: str, **kwargs) -> str:
        service = build('drive', 'v3', credentials=get_google_credentials())
        
        if action == "list":
            results = service.files().list(
                q=kwargs.get('query', ''),
                pageSize=kwargs.get('max_results', 10),
                fields="files(id, name, mimeType, modifiedTime)"
            ).execute()
            return format_file_list(results.get('files', []))
        
        elif action == "read":
            file_id = kwargs['file_id']
            file = service.files().get(fileId=file_id).execute()
            
            if 'google' in file.get('mimeType', ''):
                # Google Doc - export as text
                content = service.files().export_media(
                    fileId=file_id,
                    mimeType='text/plain'
                ).execute()
                return content.decode('utf-8')
            else:
                # Binary file
                return f"Binary file: {file['name']} ({file.get('size', 'unknown size')})"
        
        elif action == "write":
            file_metadata = {
                'name': kwargs['filename'],
                'parents': [kwargs.get('folder_id', 'root')]
            }
            media = MediaFileUpload(kwargs['filepath'])
            file = service.files().create(
                body=file_metadata,
                media_body=media,
                fields='id, webViewLink'
            ).execute()
            return f"File uploaded: {file.get('webViewLink')}"
        
        return "Unknown action"

Cloud Storage Integration

Dropbox API

import dropbox

class DropboxTool:
    name = "dropbox"
    description = "Access Dropbox for file storage"
    
    def __init__(self):
        self.dbx = dropbox.Dropbox(load_secrets()['dropbox_token'])
    
    def execute(self, action: str, **kwargs) -> str:
        if action == "upload":
            with open(kwargs['filepath'], 'rb') as f:
                self.dbx.files_upload(
                    f.read(),
                    kwargs['path'],
                    mode=dropbox.files.WriteMode.overwrite
                )
            return f"Uploaded to {kwargs['path']}"
        
        elif action == "download":
            metadata, response = self.dbx.files_download(kwargs['path'])
            return response.content
        
        elif action == "list":
            result = self.dbx.files_list_folder(kwargs.get('path', ''))
            return "\n".join([f.name for f in result.entries])
        
        elif action == "share":
            link = self.dbx.sharing_create_shared_link_with_settings(kwargs['path'])
            return link.url
        
        return "Unknown action"

AWS S3 Integration

import boto3
from botocore.config import Config

class S3Tool:
    name = "s3"
    description = "Interact with AWS S3 buckets"
    
    def __init__(self):
        self.s3 = boto3.client('s3',
            config=Config(signature_version='s3v4'))
    
    class Params:
        action: str  # "upload", "download", "list", "delete"
        bucket: str
        key: str = ""
        filepath: str = ""
    
    def execute(self, action: str, bucket: str, key: str = "", filepath: str = "") -> str:
        if action == "upload":
            self.s3.upload_file(
                filepath,
                bucket,
                key,
                ExtraArgs={'ContentType': 'application/octet-stream'}
            )
            return f"s3://{bucket}/{key}"
        
        elif action == "download":
            self.s3.download_file(bucket, key, filepath)
            return f"Downloaded to {filepath}"
        
        elif action == "list":
            result = self.s3.list_objects_v2(Bucket=bucket, Prefix=key)
            contents = result.get('Contents', [])
            return "\n".join([obj['Key'] for obj in contents])
        
        elif action == "delete":
            self.s3.delete_object(Bucket=bucket, Key=key)
            return f"Deleted s3://{bucket}/{key}"
        
        elif action == "presigned_url":
            url = self.s3.generate_presigned_url(
                'get_object',
                Params={'Bucket': bucket, 'Key': key},
                ExpiresIn=3600
            )
            return url
        
        return "Unknown action"

Document Processing Tools

PDF Processing

import PyPDF2
from pdfplumber import extract_text
from pydantic import BaseModel

class PDFTool:
    name = "pdf"
    description = "Extract and process PDF documents"
    
    class ExtractParams(BaseModel):
        filepath: str
        pages: str = "all"  # "all", "1-5", or comma-separated
    
    def execute(self, action: str, **kwargs) -> str:
        if action == "extract_text":
            filepath = kwargs['filepath']
            pages = kwargs.get('pages', 'all')
            
            if pages == "all":
                with pdfplumber.open(filepath) as pdf:
                    text = "\n\n".join([page.extract_text() for page in pdf.pages])
            else:
                # Parse page ranges
                page_nums = parse_page_range(pages)
                with pdfplumber.open(filepath) as pdf:
                    text = "\n\n".join([pdf.pages[i].extract_text() 
                                       for i in page_nums if i < len(pdf.pages)])
            
            return text[:10000]  # Limit output size
        
        elif action == "extract_tables":
            filepath = kwargs['filepath']
            tables = []
            
            with pdfplumber.open(filepath) as pdf:
                for page in pdf.pages:
                    page_tables = page.extract_tables()
                    tables.extend(page_tables)
            
            return format_tables(tables)
        
        elif action == "extract_metadata":
            with open(kwargs['filepath'], 'rb') as f:
                reader = PyPDF2.PdfReader(f)
                meta = reader.metadata
                return {
                    'title': meta.get('/Title', 'N/A'),
                    'author': meta.get('/Author', 'N/A'),
                    'pages': len(reader.pages)
                }
        
        return "Unknown action"

Document OCR

import io
from google.cloud import vision

class OCRTool:
    name = "ocr"
    description = "Extract text from images using Google Cloud Vision"
    
    def __init__(self):
        self.client = vision.ImageAnnotatorClient()
    
    def execute(self, action: str, **kwargs) -> str:
        if action == "extract_text":
            # Support both file path and image bytes
            if 'filepath' in kwargs:
                with io.open(kwargs['filepath'], 'rb') as f:
                    image = vision.Image(content=f.read())
            else:
                image = vision.Image(content=kwargs['image_bytes'])
            
            response = self.client.text_detection(image=image)
            texts = response.text_annotations
            
            if texts:
                return texts[0].description
            return "No text found"
        
        elif action == "detect_labels":
            if 'filepath' in kwargs:
                with io.open(kwargs['filepath'], 'rb') as f:
                    image = vision.Image(content=f.read())
            else:
                image = vision.Image(content=kwargs['image_bytes'])
            
            response = self.client.label_detection(image=image)
            labels = [{'description': label.description, 
                       'score': label.score} 
                      for label in response.label_annotations]
            
            return format_json(labels)
        
        return "Unknown action"

E-commerce & Payment Integration

Stripe API

import stripe
from stripe import PaymentIntent

class StripeTool:
    name = "stripe"
    description = "Process payments and manage Stripe transactions"
    
    def __init__(self):
        stripe.api_key = load_secrets()['stripe_secret_key']
    
    class PaymentParams(BaseModel):
        amount: int  # in cents
        currency: str = "usd"
        customer_email: str = ""
        description: str = ""
        metadata: dict = {}
    
    def execute(self, action: str, **kwargs) -> str:
        if action == "create_payment":
            intent = stripe.PaymentIntent.create(
                amount=kwargs['amount'],
                currency=kwargs.get('currency', 'usd'),
                receipt_email=kwargs.get('customer_email'),
                description=kwargs.get('description'),
                metadata=kwargs.get('metadata', {})
            )
            return {
                'client_secret': intent.client_secret,
                'payment_id': intent.id
            }
        
        elif action == "get_payment":
            intent = stripe.PaymentIntent.retrieve(kwargs['payment_id'])
            return {
                'status': intent.status,
                'amount': intent.amount,
                'currency': intent.currency
            }
        
        elif action == "create_customer":
            customer = stripe.Customer.create(
                email=kwargs['email'],
                name=kwargs.get('name', ''),
                metadata=kwargs.get('metadata', {})
            )
            return {'customer_id': customer.id}
        
        elif action == "list_payments":
            payments = stripe.PaymentIntent.list(
                limit=kwargs.get('limit', 10),
                created={'gte': kwargs.get('since', 0)}
            )
            return format_payment_list(payments.data)
        
        elif action == "refund":
            refund = stripe.Refund.create(
                payment_intent=kwargs['payment_id'],
                amount=kwargs.get('amount')  # Optional: partial refund
            )
            return {'refund_id': refund.id, 'status': refund.status}
        
        return "Unknown action"

Shopify API

import shopify

class ShopifyTool:
    name = "shopify"
    description = "Manage Shopify store operations"
    
    def __init__(self):
        shopify.ShopifyResource.set_url(load_secrets()['shopify_url'])
        shopify.Session.setup(api_key=load_secrets()['shopify_key'],
                              secret=load_secrets()['shopify_secret'])
    
    def execute(self, action: str, **kwargs) -> str:
        session = shopify.Session(load_secrets()['shopify_shop'])
        shopify.ShopifyResource.activate_session(session)
        
        try:
            if action == "create_order":
                order = shopify.Order(**{
                    'line_items': kwargs['line_items'],
                    'email': kwargs.get('email'),
                    'financial_status': 'pending',
                    'note': kwargs.get('note', '')
                })
                order.save()
                return {'order_id': order.id, 'order_number': order.order_number}
            
            elif action == "get_orders":
                orders = shopify.Order.find(
                    status='any',
                    limit=kwargs.get('limit', 10)
                )
                return format_orders(orders)
            
            elif action == "create_product":
                product = shopify.Product({
                    'title': kwargs['title'],
                    'body_html': kwargs.get('description', ''),
                    'vendor': kwargs.get('vendor', ''),
                    'product_type': kwargs.get('type', ''),
                    'variants': [{
                        'price': kwargs['price'],
                        'sku': kwargs.get('sku', '')
                    }]
                })
                product.save()
                return {'product_id': product.id, 'handle': product.handle}
            
            elif action == "update_inventory":
                inventory_item = shopify.InventoryItem.find(kwargs['inventory_id'])
                inventory_item.adjust(kwargs['adjustment'])
                return {'inventory_level': inventory_item.inventory_quantity}
            
            return "Unknown action"
        finally:
            shopify.ShopifyResource.clear_session()

Error Handling & Resilience

Retry Patterns

import time
from functools import wraps
import logging

logger = logging.getLogger(__name__)

def retry_with_backoff(
    max_retries: int = 3,
    initial_delay: float = 1.0,
    backoff_factor: float = 2.0,
    exceptions: tuple = (Exception,)
):
    """Decorator for retrying tool executions with exponential backoff"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = initial_delay
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        logger.warning(
                            f"Tool {func.__name__} failed (attempt {attempt + 1}/{max_retries}): {e}. "
                            f"Retrying in {delay}s..."
                        )
                        time.sleep(delay)
                        delay *= backoff_factor
                    else:
                        logger.error(
                            f"Tool {func.__name__} failed after {max_retries} attempts"
                        )
            
            raise last_exception
        return wrapper
    return decorator

class ResilientTool:
    """Base class for tools with retry and error handling"""
    
    @retry_with_backoff(max_retries=3, initial_delay=1.0)
    def execute_with_retry(self, *args, **kwargs):
        return self.execute(*args, **kwargs)
    
    def execute_safe(self, *args, **kwargs) -> dict:
        """Execute with comprehensive error handling"""
        try:
            result = self.execute_with_retry(*args, **kwargs)
            return {
                'success': True,
                'data': result,
                'error': None
            }
        except Exception as e:
            logger.exception(f"Tool execution failed: {e}")
            return {
                'success': False,
                'data': None,
                'error': str(e),
                'error_type': type(e).__name__
            }

Rate Limiting

import time
from collections import deque
from threading import Lock

class RateLimiter:
    """Token bucket rate limiter for API calls"""
    
    def __init__(self, max_calls: int, time_window: float):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = deque()
        self.lock = Lock()
    
    def acquire(self) -> bool:
        with self.lock:
            now = time.time()
            
            # Remove expired timestamps
            while self.calls and self.calls[0] < now - self.time_window:
                self.calls.popleft()
            
            if len(self.calls) < self.max_calls:
                self.calls.append(now)
                return True
            
            return False
    
    def wait_and_acquire(self):
        """Wait until rate limit allows execution"""
        while not self.acquire():
            time.sleep(0.1)

class RateLimitedTool:
    """Base class for tools with rate limiting"""
    
    def __init__(self):
        self.limiter = RateLimiter(
            max_calls=100,  # Adjust per API
            time_window=60
        )
    
    def execute(self, *args, **kwargs):
        self.limiter.wait_and_acquire()
        return self._do_execute(*args, **kwargs)

Security Best Practices

API Key Management

# BAD: Hardcoded API keys
STRIPE_KEY = "sk_live_xxxxx"

# GOOD: Secure secrets management
import os
from pathlib import Path

def load_secrets() -> dict:
    """Load secrets from environment or secure vault"""
    # Try environment variables first
    secrets = {
        'stripe': os.getenv('STRIPE_SECRET_KEY'),
        'openai': os.getenv('OPENAI_API_KEY'),
        'google': os.getenv('GOOGLE_API_CREDENTIALS'),
    }
    
    # Fall back to encrypted local file
    if not all(secrets.values()):
        from keyring import get_password
        secrets = {
            'stripe': get_password('myapp', 'stripe'),
            'openai': get_password('myapp', 'openai'),
            'google': get_password('myapp', 'google'),
        }
    
    return secrets

# BETTER: Use a secrets manager
class SecretsManager:
    def __init__(self, provider: str = "aws"):
        if provider == "aws":
            import boto3
            self.client = boto3.client('secretsmanager')
        elif provider == "hashicorp":
            import hvac
            self.client = hvac.Client()
    
    def get_secret(self, name: str) -> dict:
        if isinstance(self.client, boto3.client):
            response = self.client.get_secret_value(SecretId=name)
            return json.loads(response['SecretString'])
        else:
            return self.client.secrets.kv.v2.read_secret_version(path=name)['data']

Input Validation

from pydantic import BaseModel, validator, Field
from typing import List

class EmailParams(BaseModel):
    to: str = Field(..., pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
    subject: str = Field(..., min_length=1, max_length=200)
    body: str = Field(..., min_length=1)
    
    @validator('body')
    def sanitize_body(cls, v):
        # Prevent prompt injection
        dangerous_patterns = ['{{', '}}', '{%', '%}', 'ignore previous']
        for pattern in dangerous_patterns:
            if pattern.lower() in v.lower():
                raise ValueError(f"Potentially dangerous content detected")
        return v
    
    @validator('subject')
    def sanitize_subject(cls, v):
        # Remove potentially dangerous characters
        return v.replace('\n', '').replace('\r', '')[:200]

class SafeToolExecution:
    """Execute tools with validated inputs"""
    
    def __init__(self, tool_class, param_model):
        self.tool = tool_class()
        self.param_model = param_model
    
    def execute(self, raw_params: dict) -> dict:
        try:
            # Validate inputs
            params = self.param_model(**raw_params)
            
            # Execute with validated params
            result = self.tool.execute(**params.dict())
            
            return {'success': True, 'result': result}
        except ValidationError as e:
            return {'success': False, 'error': str(e)}

Testing Tools

import pytest
from unittest.mock import Mock, patch

class TestToolIntegration:
    """Test suite for tool integrations"""
    
    @patch('myapp.gmail.get_gmail_service')
    def test_send_email_tool(self, mock_service):
        # Setup mock
        mock_messages = Mock()
        mock_messages.send.return_value.execute.return_value = {'id': 'msg123'}
        mock_service.return_value.users.return_value.messages.return_value = mock_messages
        
        # Execute
        tool = SendEmailTool()
        result = tool.execute(
            to="[email protected]",
            subject="Test",
            body="Hello"
        )
        
        # Verify
        assert 'msg123' in result
        mock_messages.send.assert_called_once()
    
    @patch('myapp.stripe.stripe.PaymentIntent')
    def test_payment_creation(self, mock_intent):
        # Setup mock
        mock_intent.create.return_value = Mock(
            id='pi_123',
            client_secret='secret_xyz'
        )
        
        # Execute
        tool = StripeTool()
        result = tool.execute(
            action='create_payment',
            amount=1000,
            currency='usd'
        )
        
        # Verify
        assert result['payment_id'] == 'pi_123'
        mock_intent.create.assert_called_once()
    
    def test_rate_limiter(self):
        limiter = RateLimiter(max_calls=2, time_window=1.0)
        
        # Should allow 2 calls
        assert limiter.acquire() is True
        assert limiter.acquire() is True
        
        # Should block 3rd call
        assert limiter.acquire() is False
        
        # After time window, should allow again
        time.sleep(1.1)
        assert limiter.acquire() is True

Best Practices Summary

Do’s

  1. Use structured tool definitions - Define input schemas with Pydantic
  2. Implement retry logic - API calls fail; handle gracefully with backoff
  3. Add rate limiting - Respect API quotas to avoid bans
  4. Validate all inputs - Prevent injection attacks and bad data
  5. Log everything - Track tool calls for debugging
  6. Use async where possible - Parallelize independent API calls

Don’ts

  1. Don’t hardcode API keys - Use secrets management
  2. Don’t skip error handling - Every API can fail
  3. Don’t ignore rate limits - You’ll get blocked
  4. Don’t trust user input - Always validate and sanitize
  5. Don’t block on tools - Use timeouts and async patterns
  6. Don’t forget to test - Mock external services in tests

Comments