Skip to main content
โšก Calmops

Stock Trading APIs: Building Trading Platforms

Introduction

The democratization of stock trading has been dramatically accelerated by APIs. What once required expensive terminal subscriptions, dedicated trading infrastructure, and deep broker relationships is now accessible to any developer with API credentials. This transformation has enabled a new generation of trading platforms, investment apps, and algorithmic trading systems.

Building a trading platform requires understanding multiple complex systems: market data feeds that stream millions of ticks per second, order management systems that must execute with microsecond precision, broker integrations that must satisfy regulatory requirements, and risk management systems that protect both the platform and its users.

This guide walks through the technical and business considerations for building stock trading platforms. You’ll learn about available APIs, architectural patterns, regulatory requirements, and implementation strategies used by successful trading platforms.


Understanding the Trading API Landscape

The trading API ecosystem consists of several distinct layers, each serving different purposes in the trading infrastructure.

API Categories

Market Data APIs: Provide real-time and historical price data. These are typically direct feeds from exchanges or aggregated by data vendors.

Provider Data Type Latency Pricing
Polygon.io Real-time + Historical <100ms $49-199/month
Alpaca Real-time + Historical <100ms Free-$99/month
IEX Cloud Delayed + Historical <500ms $99+/month
Tiingo Historical N/A Free-$30/month
Nasdaq Data Link Historical N/A Custom

Brokerage APIs: Enable actual trade execution. These connect to clearing firms and exchanges to place buy and sell orders.

Provider Securities API Style Regulatory
Alpaca US Stocks/ETFs REST + WebSocket SEC registered
Interactive Brokers Global REST + C++ SEC registered
TD Ameritrade US Stocks/Options REST SEC registered
E*TRADE US Stocks/Options REST SEC registered
Fidelity US Stocks REST SEC registered

Trading Infrastructure APIs: Specialized services for high-frequency trading, algorithmic strategies, and institutional operations.

Key Trading Concepts

Order Types:

  • Market Order: Execute immediately at best available price
  • Limit Order: Execute only at specified price or better
  • Stop Order: Trigger market order when price reaches stop level
  • Stop-Limit: Trigger limit order when price reaches stop level
  • Trailing Stop: Stop price moves with market, maintaining distance

Order Destinations:

  • Exchange: Direct connection to NYSE, NASDAQ, etc.
  • Dark Pool: Private liquidity venues (no public quotes)
  • Internalization: Broker fills order from own inventory
  • ATS (Alternative Trading System): Non-exchange matching venues

Market Data Integration

Real-time market data forms the foundation of any trading platform. Understanding data formats, protocols, and optimization strategies is essential.

Market Data Feeds

# Real-time Market Data Handler
import asyncio
import json
from datetime import datetime
from typing import Dict, List
import aiohttp

class MarketDataClient:
    def __init__(self, api_key: str, feed_type: str = "iex"):
        self.api_key = api_key
        self.feed_type = feed_type
        self.websocket = None
        self.subscriptions: Dict[str, List[callable]] = {}
        self.reconnect_attempts = 0
        self.max_reconnects = 5
    
    async def connect(self):
        """Establish WebSocket connection to market data feed."""
        if self.feed_type == "iex":
            ws_url = f"wss://ws.iex.cloud/v1?token={self.api_key}"
        elif self.feed_type == "alpaca":
            ws_url = f"wss://stream.data.alpaca.markets/v2/iex"
        else:
            raise ValueError(f"Unknown feed type: {self.feed_type}")
        
        self.websocket = await aiohttp.ClientSession().ws_connect(ws_url)
        self.reconnect_attempts = 0
        print(f"Connected to {self.feed_type} market data feed")
        
        # Start message handler
        asyncio.create_task(self._handle_messages())
    
    async def subscribe(self, symbols: List[str], callback: callable):
        """Subscribe to real-time updates for symbols."""
        for symbol in symbols:
            if symbol not in self.subscriptions:
                self.subscriptions[symbol] = []
            self.subscriptions[symbol].append(callback)
        
        # Send subscription message
        subscribe_msg = {
            "action": "subscribe",
            "trades": symbols,
            "quotes": symbols
        }
        
        await self.websocket.send_json(subscribe_msg)
    
    async def _handle_messages(self):
        """Process incoming market data messages."""
        async for msg in self.websocket:
            if msg.type == aiohttp.WSMsgType.TEXT:
                data = json.loads(msg.data)
                await self._process_message(data)
            elif msg.type == aiohttp.WSMsgType.ERROR:
                print(f"WebSocket error: {msg.data}")
                await self._attempt_reconnect()
    
    async def _process_message(self, data: dict):
        """Route market data to appropriate subscribers."""
        # Handle different message types
        if data.get("type") == "trade":
            symbol = data.get("S")
            price = data.get("p")
            volume = data.get("v")
            timestamp = data.get("t")
            
            trade_data = {
                "symbol": symbol,
                "price": price,
                "volume": volume,
                "timestamp": timestamp,
                "exchange": data.get("x", "NASDAQ")
            }
            
            # Notify subscribers
            if symbol in self.subscriptions:
                for callback in self.subscriptions[symbol]:
                    await callback(trade_data)
        
        elif data.get("type") == "quote":
            symbol = data.get("S")
            quote_data = {
                "symbol": symbol,
                "bid_price": data.get("bp"),
                "bid_size": data.get("bs"),
                "ask_price": data.get("ap"),
                "ask_size": data.get("as"),
                "timestamp": data.get("t")
            }
            
            if symbol in self.subscriptions:
                for callback in self.subscriptions[symbol]:
                    await callback(quote_data)
    
    async def _attempt_reconnect(self):
        """Attempt to reconnect on connection loss."""
        if self.reconnect_attempts < self.max_reconnects:
            self.reconnect_attempts += 1
            delay = 2 ** self.reconnect_attempts  # Exponential backoff
            print(f"Attempting reconnect in {delay}s (attempt {self.reconnect_attempts})")
            await asyncio.sleep(delay)
            await self.connect()
            # Resubscribe to symbols
            for symbol in self.subscriptions.keys():
                await self.subscribe([symbol], lambda x: x)

Historical Data Management

Trading platforms require substantial historical data for backtesting and analysis:

class HistoricalDataManager:
    def __init__(self, data_provider, storage_backend):
        self.provider = data_provider
        self.storage = storage_backend
    
    async def fetch_historical_bars(
        self,
        symbol: str,
        timeframe: str,  # 1Min, 5Min, 1Hour, 1Day
        start: datetime,
        end: datetime
    ) -> pd.DataFrame:
        """Fetch and store historical price data."""
        # Check cache first
        cached = await self.storage.get_bars(symbol, timeframe, start, end)
        if cached is not None and len(cached) > 0:
            return cached
        
        # Fetch from provider
        bars = await self.provider.get_bars(
            symbol=symbol,
            timeframe=timeframe,
            start=start,
            end=end
        )
        
        # Store for future use
        await self.storage.save_bars(bars)
        
        return bars
    
    def calculate_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
        """Add technical indicators to price data."""
        # Moving averages
        df['sma_20'] = df['close'].rolling(window=20).mean()
        df['sma_50'] = df['close'].rolling(window=50).mean()
        df['sma_200'] = df['close'].rolling(window=200).mean()
        
        # RSI
        delta = df['close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        df['rsi'] = 100 - (100 / (1 + rs))
        
        # MACD
        exp1 = df['close'].ewm(span=12, adjust=False).mean()
        exp2 = df['close'].ewm(span=26, adjust=False).mean()
        df['macd'] = exp1 - exp2
        df['signal'] = df['macd'].ewm(span=9, adjust=False).mean()
        
        # Bollinger Bands
        df['bb_middle'] = df['close'].rolling(window=20).mean()
        bb_std = df['close'].rolling(window=20).std()
        df['bb_upper'] = df['bb_middle'] + (bb_std * 2)
        df['bb_lower'] = df['bb_middle'] - (bb_std * 2)
        
        return df

Order Execution Systems

Executing orders reliably and efficiently is the most critical component of any trading platform. This section covers the architecture and implementation.

Order Management System (OMS)

from enum import Enum
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import uuid

class OrderSide(Enum):
    BUY = "buy"
    SELL = "sell"

class OrderType(Enum):
    MARKET = "market"
    LIMIT = "limit"
    STOP = "stop"
    STOP_LIMIT = "stop_limit"

class OrderStatus(Enum):
    PENDING = "pending"
    SUBMITTED = "submitted"
    PARTIALLY_FILLED = "partially_filled"
    FILLED = "filled"
    CANCELLED = "cancelled"
    REJECTED = "rejected"

@dataclass
class Order:
    symbol: str
    side: OrderSide
    quantity: int
    order_type: OrderType
    limit_price: Optional[float] = None
    stop_price: Optional[float] = None
    time_in_force: str = "day"  # day, gtc, ioc, fok
    
    # Internal fields
    order_id: str = None
    status: OrderStatus = OrderStatus.PENDING
    filled_quantity: int = 0
    average_fill_price: float = 0
    created_at: datetime = None
    updated_at: datetime = None
    account_id: str = None
    
    def __post_init__(self):
        if self.order_id is None:
            self.order_id = str(uuid.uuid4())
        if self.created_at is None:
            self.created_at = datetime.utcnow()


class OrderManagementSystem:
    def __init__(self, broker_adapter, risk_engine, order_book):
        self.broker = broker_adapter
        self.risk = risk_engine
        self.order_book = order_book
        self.pending_orders: Dict[str, Order] = {}
    
    async def submit_order(self, order: Order, account_id: str) -> Order:
        """Submit new order through the OMS."""
        order.account_id = account_id
        order.created_at = datetime.utcnow()
        
        # Pre-trade risk check
        risk_result = await self.risk.check_order(order)
        if not risk_result.approved:
            order.status = OrderStatus.REJECTED
            order.updated_at = datetime.utcnow()
            return order
        
        # Store in order book
        self.order_book.add_order(order)
        
        # Submit to broker
        try:
            broker_response = await self.broker.submit_order(order)
            order.status = OrderStatus.SUBMITTED
            order.updated_at = datetime.utcnow()
            
            # Handle response
            if broker_response.filled:
                order.status = OrderStatus.FILLED
                order.filled_quantity = broker_response.filled_quantity
                order.average_fill_price = broker_response.average_price
            
            return order
            
        except BrokerException as e:
            order.status = OrderStatus.REJECTED
            order.updated_at = datetime.utcnow()
            raise OrderException(f"Order rejected: {e}")
    
    async def cancel_order(self, order_id: str, account_id: str) -> bool:
        """Cancel an existing order."""
        order = self.order_book.get_order(order_id)
        
        if order is None:
            raise OrderNotFoundException(f"Order {order_id} not found")
        
        if order.account_id != account_id:
            raise UnauthorizedException("Order belongs to different account")
        
        if order.status in [OrderStatus.FILLED, OrderStatus.CANCELLED]:
            raise OrderException(f"Cannot cancel order in {order.status} status")
        
        # Submit cancellation to broker
        await self.broker.cancel_order(order_id)
        
        order.status = OrderStatus.CANCELLED
        order.updated_at = datetime.utcnow()
        
        return True
    
    async def modify_order(self, order_id: str, modifications: dict, account_id: str) -> Order:
        """Modify an existing order (cancel/replace)."""
        original_order = self.order_book.get_order(order_id)
        
        if original_order is None:
            raise OrderNotFoundException(f"Order {order_id} not found")
        
        # Cancel original
        await self.cancel_order(order_id, account_id)
        
        # Create new order with modifications
        new_order = Order(
            symbol=modifications.get('symbol', original_order.symbol),
            side=modifications.get('side', original_order.side),
            quantity=modifications.get('quantity', original_order.quantity),
            order_type=modifications.get('order_type', original_order.order_type),
            limit_price=modifications.get('limit_price', original_order.limit_price),
            stop_price=modifications.get('stop_price', original_order.stop_price),
            time_in_force=modifications.get('time_in_force', original_order.time_in_force)
        )
        
        # Submit new order
        return await self.submit_order(new_order, account_id)

Broker Integration

class BrokerAdapter:
    def __init__(self, broker_config):
        self.api_key = broker_config['api_key']
        self.secret_key = broker_config['secret_key']
        self.base_url = broker_config['base_url']
        self.account_id = None
    
    async def authenticate(self):
        """Authenticate with brokerage API."""
        response = await self._request(
            method="POST",
            endpoint="/v1/auth/token",
            data={
                "api_key": self.api_key,
                "secret_key": self.secret_key
            }
        )
        
        self.account_id = response['account_id']
        self._session_headers = {
            "Authorization": f"Bearer {response['token']}"
        }
        
        return response
    
    async def submit_order(self, order: Order) -> dict:
        """Submit order to broker."""
        order_payload = {
            "symbol": order.symbol,
            "qty": order.quantity,
            "side": order.side.value,
            "type": order.order_type.value,
            "time_in_force": order.time_in_force,
            "client_order_id": order.order_id
        }
        
        if order.limit_price:
            order_payload["limit_price"] = order.limit_price
        
        if order.stop_price:
            order_payload["stop_price"] = order.stop_price
        
        response = await self._request(
            method="POST",
            endpoint="/v2/orders",
            data=order_payload
        )
        
        return {
            "broker_order_id": response['id'],
            "status": response['status'],
            "filled_quantity": response.get('filled_qty', 0),
            "average_price": response.get('avg_fill_price', 0)
        }
    
    async def get_account_positions(self) -> List[dict]:
        """Get current positions."""
        response = await self._request(
            method="GET",
            endpoint="/v2/positions"
        )
        
        return response
    
    async def get_account_balance(self) -> dict:
        """Get account cash balances."""
        response = await self._request(
            method="GET",
            endpoint="/v2/account"
        )
        
        return {
            "cash": response['cash'],
            "buying_power": response['buying_power'],
            "portfolio_value": response['portfolio_value']
        }
    
    async def _request(self, method: str, endpoint: str, data: dict = None) -> dict:
        """Make authenticated request to broker API."""
        import aiohttp
        
        url = f"{self.base_url}{endpoint}"
        headers = getattr(self, '_session_headers', {"APCA-API-KEY-ID": self.api_key})
        
        async with aiohttp.ClientSession() as session:
            async with session.request(
                method=method,
                url=url,
                json=data,
                headers=headers
            ) as response:
                if response.status >= 400:
                    error = await response.text()
                    raise BrokerException(f"Broker API error: {response.status} - {error}")
                
                return await response.json()

Risk Management

Risk management is not optionalโ€”it’s a regulatory requirement and essential for platform survival.

Risk Engine Components

class RiskEngine:
    def __init__(self, config):
        self.max_position_size = config.get('max_position_size', 10000)
        self.max_order_value = config.get('max_order_value', 100000)
        self.max_daily_loss = config.get('max_daily_loss', 10000)
        self.max_leverage = config.get('max_leverage', 2.0)
        self.restricted_symbols = config.get('restricted_symbols', [])
    
    async def check_order(self, order: Order) -> RiskResult:
        """Perform pre-trade risk checks."""
        violations = []
        
        # Check restricted symbols
        if order.symbol in self.restricted_symbols:
            violations.append(f"Trading restricted for {order.symbol}")
        
        # Check order value
        estimated_value = order.quantity * (order.limit_price or self._get_market_price(order.symbol))
        if estimated_value > self.max_order_value:
            violations.append(
                f"Order value ${estimated_value} exceeds limit ${self.max_order_value}"
            )
        
        # Check position limit
        current_position = await self._get_position(order.symbol)
        if abs(current_position) + order.quantity > self.max_position_size:
            violations.append(
                f"Position would exceed limit: {current_position} + {order.quantity} > {self.max_position_size}"
            )
        
        # Check buying power
        account = await self._get_account()
        required_buying_power = estimated_value if order.side == OrderSide.BUY else 0
        if required_buying_power > float(account['buying_power']):
            violations.append("Insufficient buying power")
        
        # Check daily loss limit
        daily_pnl = await self._get_daily_pnl()
        if daily_pnl < -self.max_daily_loss:
            violations.append(f"Daily loss limit exceeded: ${daily_pnl}")
        
        return RiskResult(
            approved=len(violations) == 0,
            violations=violations
        )
    
    async def check_portfolio(self, positions: List[dict]) -> RiskResult:
        """Perform portfolio-level risk checks."""
        violations = []
        
        # Check leverage
        total_long = sum(p['market_value'] for p in positions if float(p['market_value']) > 0)
        total_short = abs(sum(p['market_value'] for p in positions if float(p['market_value']) < 0))
        leverage = (total_long + total_short) / (total_long - total_short) if total_long > total_short else 0
        
        if leverage > self.max_leverage:
            violations.append(f"Leverage {leverage:.2f}x exceeds limit {self.max_leverage}x")
        
        # Check concentration
        total_value = sum(abs(float(p['market_value'])) for p in positions)
        for position in positions:
            concentration = abs(float(position['market_value'])) / total_value if total_value > 0 else 0
            if concentration > 0.25:
                violations.append(
                    f"Concentration risk: {position['symbol']} is {concentration*100:.1f}% of portfolio"
                )
        
        return RiskResult(
            approved=len(violations) == 0,
            violations=violations
        )


@dataclass
class RiskResult:
    approved: bool
    violations: List[str]

Building a Trading Platform Architecture

Modern trading platforms use a microservices architecture to handle the scale and reliability requirements:

# Trading Platform Architecture

services:
  # Market Data Layer
  market-data-api:
    description: "REST API for historical data queries"
    scaling: "3-10 instances"
    
  market-data-stream:
    description: "WebSocket handlers for real-time data"
    scaling: "5-20 instances"
    protocol: "WebSocket"
    
  market-data-processor:
    description: "Process and normalize incoming market data"
    scaling: "3-5 instances"
    
  # Trading Layer
  order-api:
    description: "REST API for order management"
    scaling: "3-10 instances"
    auth: "OAuth 2.0"
    
  order-processor:
    description: "Handle order execution logic"
    scaling: "3-5 instances"
    latency: "<10ms"
    
  risk-engine:
    description: "Pre-trade and post-trade risk checks"
    scaling: "2-5 instances"
    critical: true
    
  # Reference Data
  instrument-service:
    description: "Security master, corporate actions"
    scaling: "2 instances"
    
  # User Management
  user-service:
    description: "Account management, KYC"
    scaling: "3-5 instances"
    
  # Analytics
  analytics-pipeline:
    description: "Trade analytics, reporting"
    scaling: "2-5 instances"
    
  # Infrastructure
  message-broker:
    type: "Apache Kafka"
    purpose: "Event streaming between services"
    
  cache:
    type: "Redis Cluster"
    purpose: "Real-time data caching"

Regulatory Requirements

Trading platforms must comply with numerous regulations:

Key Regulatory Bodies

  • SEC (Securities and Exchange Commission): US securities regulation
  • FINRA (Financial Industry Regulatory Authority): Broker-dealer oversight
  • CFTC (Commodity Futures Trading Commission): Derivatives regulation
  • SROs (Self-Regulatory Organizations): Market structure oversight

Compliance Requirements

Registration: Most trading platforms require broker-dealer registration, which involves:

  • SEC registration (Form BD)
  • FINRA membership
  • State registrations (blue sky)
  • Minimum capital requirements
  • Compliance infrastructure

Order Recording: SEC Rule 17a-4 requires:

  • Order ticket documentation
  • Timestamps (millisecond precision)
  • Audit trails
  • Record retention (6+ years)

Best Execution: Platforms must:

  • Exercise reasonable diligence
  • Consider speed, price, likelihood of execution
  • Monitor execution quality
  • Document rationale

Implementation Checklist

Planning Phase

  • Define target market and product scope
  • Evaluate broker partnerships
  • Assess regulatory requirements
  • Design system architecture
  • Estimate infrastructure costs

Development Phase

  • Implement market data integration
  • Build order management system
  • Create risk management engine
  • Develop user interfaces
  • Integrate with broker APIs

Compliance Phase

  • Complete regulatory filings
  • Implement compliance monitoring
  • Establish audit procedures
  • Create regulatory reporting
  • Engage compliance counsel

Launch Phase

  • Conduct UAT with test accounts
  • Paper trading period
  • Gradual user rollout
  • Establish support processes
  • Monitor execution quality

Summary

Building a stock trading platform requires navigating complex technical and regulatory challenges:

  1. Market data is foundational: Reliable, low-latency market data enables all trading functionality.

  2. Order execution must be reliable: The OMS is the most critical componentโ€”failures here directly impact users.

  3. Risk management is non-negotiable: Pre-trade checks, position limits, and monitoring protect both the platform and users.

  4. Regulatory compliance is complex: Broker-dealer registration, best execution, and record-keeping requirements apply.

  5. Architecture matters: Microservices, real-time processing, and redundancy are essential for production systems.

The trading API ecosystem has matured significantly, enabling developers to build sophisticated platforms without the infrastructure investments previously required.


External Resources

Comments