Introduction
MySQL has transformed from a traditional relational database into a powerful platform for AI applications. With native JSON support, vector similarity search capabilities, and the integrated HeatWave ML engine, MySQL now competes with specialized vector databases while offering enterprise-grade reliability. This guide covers practical implementations for AI workloads in MySQL.
Vector Embedding Storage
Efficient Embedding Storage Patterns
-- Optimized embedding table with vector dimensions
CREATE TABLE document_embeddings (
id INT AUTO_INCREMENT PRIMARY KEY,
document_id INT NOT NULL,
embedding_type VARCHAR(50) DEFAULT 'text-embedding-ada-002',
embedding_vector JSON NOT NULL,
embedding_dim INT GENERATED ALWAYS AS (JSON_LENGTH(embedding_vector)) STORED,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_document (document_id),
INDEX idx_embedding_dim (embedding_dim),
INDEX idx_embedding_type (embedding_type)
) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;
-- Store embeddings with metadata
INSERT INTO document_embeddings (document_id, embedding_type, embedding_vector)
VALUES (
1,
'text-embedding-ada-002',
JSON_ARRAY(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)
);
-- Query with vector similarity (cosine similarity approximation)
SELECT
de.document_id,
JSON_EXTRACT(de.embedding_vector, '$[0]') as first_dim,
JSON_EXTRACT(de.embedding_vector, '$[1]') as second_dim,
-- Calculate dot product for similarity
(
SELECT SUM(JSON_EXTRACT(de2.embedding_vector, CONCAT('$[', n.n, ']')) *
JSON_EXTRACT(?, CONCAT('$[', n.n, ']')))
FROM (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) n
WHERE n.n < de.embedding_dim
) as similarity_score
FROM document_embeddings de
WHERE de.embedding_type = 'text-embedding-ada-002'
ORDER BY similarity_score DESC
LIMIT 10;
Python: Production-Grade Embedding Storage
import pymysql
import json
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class DocumentEmbedding:
document_id: int
embedding: List[float]
embedding_type: str = "text-embedding-ada-002"
metadata: Optional[Dict] = None
class MySQLVectorStore:
def __init__(self, host: str, user: str, password: str, database: str):
self.conn = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def store_embedding(self, embedding: DocumentEmbedding) -> int:
"""Store a single embedding with transaction safety"""
with self.conn.cursor() as cur:
sql = """
INSERT INTO document_embeddings
(document_id, embedding_type, embedding_vector, metadata)
VALUES (%s, %s, %s, %s)
"""
cur.execute(sql, (
embedding.document_id,
embedding.embedding_type,
json.dumps(embedding.embedding),
json.dumps(embedding.metadata or {})
))
self.conn.commit()
return cur.lastrowid
def batch_store(self, embeddings: List[DocumentEmbedding]) -> List[int]:
"""Batch store embeddings for performance"""
ids = []
with self.conn.cursor() as cur:
for emb in embeddings:
cur.execute("""
INSERT INTO document_embeddings
(document_id, embedding_type, embedding_vector, metadata)
VALUES (%s, %s, %s, %s)
""", (
emb.document_id,
emb.embedding_type,
json.dumps(emb.embedding),
json.dumps(emb.metadata or {})
))
ids.append(cur.lastrowid)
self.conn.commit()
return ids
def find_similar(self, query_vector: List[float],
embedding_type: str = "text-embedding-ada-002",
limit: int = 10) -> List[Dict]:
"""Find similar embeddings using vector similarity"""
with self.conn.cursor() as cur:
# Simple cosine similarity approximation
query_json = json.dumps(query_vector)
cur.execute("""
SELECT
de.document_id,
de.embedding_vector,
de.metadata,
-- Calculate dot product similarity
(
SELECT SUM(
JSON_EXTRACT(de.embedding_vector, CONCAT('$[', n.n, ']')) *
JSON_EXTRACT(%s, CONCAT('$[', n.n, ']'))
)
FROM (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) n
WHERE n.n < JSON_LENGTH(de.embedding_vector)
) as similarity
FROM document_embeddings de
WHERE de.embedding_type = %s
ORDER BY similarity DESC
LIMIT %s
""", (query_json, embedding_type, limit))
return cur.fetchall()
JSON Document Store for AI Metadata
Schema Design for AI Applications
-- AI metadata storage with JSON
CREATE TABLE ai_metadata (
id INT AUTO_INCREMENT PRIMARY KEY,
model_name VARCHAR(100) NOT NULL,
model_type ENUM('llm', 'embedding', 'classification', 'regression', 'generation'),
provider VARCHAR(50), -- 'openai', 'anthropic', 'cohere', 'custom'
version VARCHAR(20),
parameters JSON NOT NULL,
training_data JSON,
performance_metrics JSON,
deployment_config JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_model_type (model_type),
INDEX idx_provider (provider),
INDEX idx_created (created_at)
);
-- Store model configuration
INSERT INTO ai_metadata (
model_name, model_type, provider, version, parameters, performance_metrics
) VALUES (
'gpt-4-turbo',
'llm',
'openai',
'2026-01-01',
'{
"max_tokens": 4096,
"temperature": 0.7,
"top_p": 0.9,
"frequency_penalty": 0,
"presence_penalty": 0
}',
'{
"accuracy": 0.92,
"latency_ms": 350,
"cost_per_1k_tokens": 0.03,
"throughput_tps": 45
}'
);
-- Advanced JSON queries for AI metadata
SELECT
model_name,
provider,
JSON_EXTRACT(parameters, '$.max_tokens') as max_tokens,
JSON_EXTRACT(performance_metrics, '$.latency_ms') as latency,
-- Filter by performance threshold
CASE
WHEN JSON_EXTRACT(performance_metrics, '$.accuracy') > 0.9 THEN 'high'
WHEN JSON_EXTRACT(performance_metrics, '$.accuracy') > 0.8 THEN 'medium'
ELSE 'low'
END as accuracy_tier
FROM ai_metadata
WHERE model_type = 'llm'
AND JSON_EXTRACT(performance_metrics, '$.latency_ms') < 500
ORDER BY JSON_EXTRACT(performance_metrics, '$.accuracy') DESC;
-- JSON aggregation for analytics
SELECT
provider,
COUNT(*) as model_count,
AVG(JSON_EXTRACT(performance_metrics, '$.accuracy')) as avg_accuracy,
AVG(JSON_EXTRACT(performance_metrics, '$.latency_ms')) as avg_latency,
JSON_ARRAYAGG(
JSON_OBJECT(
'model', model_name,
'version', version,
'max_tokens', JSON_EXTRACT(parameters, '$.max_tokens')
)
) as model_details
FROM ai_metadata
WHERE model_type = 'llm'
GROUP BY provider;
Python: Dynamic Schema Management
class AIMetadataManager:
def __init__(self, connection):
self.conn = connection
def store_model_metadata(self, model_data: Dict) -> int:
"""Store AI model metadata with validation"""
required_fields = ['model_name', 'model_type', 'parameters']
for field in required_fields:
if field not in model_data:
raise ValueError(f"Missing required field: {field}")
with self.conn.cursor() as cur:
sql = """
INSERT INTO ai_metadata
(model_name, model_type, provider, version, parameters,
training_data, performance_metrics, deployment_config)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cur.execute(sql, (
model_data['model_name'],
model_data.get('model_type', 'custom'),
model_data.get('provider'),
model_data.get('version', '1.0.0'),
json.dumps(model_data.get('parameters', {})),
json.dumps(model_data.get('training_data', {})),
json.dumps(model_data.get('performance_metrics', {})),
json.dumps(model_data.get('deployment_config', {}))
))
self.conn.commit()
return cur.lastrowid
def find_models_by_criteria(self, criteria: Dict) -> List[Dict]:
"""Find models matching specific criteria"""
conditions = []
params = []
for key, value in criteria.items():
if key in ['model_type', 'provider']:
conditions.append(f"{key} = %s")
params.append(value)
elif key.startswith('performance.'):
metric = key.split('.')[1]
conditions.append(f"JSON_EXTRACT(performance_metrics, '$.{metric}') > %s")
params.append(value)
where_clause = " AND ".join(conditions) if conditions else "1=1"
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT
model_name,
model_type,
provider,
version,
parameters,
performance_metrics,
created_at
FROM ai_metadata
WHERE {where_clause}
ORDER BY created_at DESC
""", params)
return cur.fetchall()
MySQL HeatWave ML Engine
Production ML with HeatWave
HeatWave is MySQL’s integrated in-memory analytics and machine learning engine that enables ML operations directly in the database.
-- Enable HeatWave AutoML for predictive analytics
-- Create training dataset
CREATE TABLE customer_churn_data (
customer_id INT PRIMARY KEY,
tenure_months INT,
monthly_charges DECIMAL(10,2),
total_charges DECIMAL(10,2),
contract_type ENUM('Month-to-month', 'One year', 'Two year'),
payment_method VARCHAR(50),
paperless_billing BOOLEAN,
churn_label BOOLEAN
);
-- Train classification model directly in MySQL
CALL sys.ML_TRAIN(
'customer_churn_data',
'churn_label',
@model_handle,
JSON_OBJECT(
'task', 'classification',
'algorithm', 'xgboost',
'target_column', 'churn_label',
'exclude_columns', JSON_ARRAY('customer_id'),
'train_test_split', 0.8,
'validation_split', 0.1
)
);
-- Load trained model
CALL sys.ML_MODEL_LOAD(@model_handle, 'churn_prediction_model');
-- Make predictions
SELECT
customer_id,
tenure_months,
monthly_charges,
sys.ML_PREDICT_ROW(
'churn_prediction_model',
JSON_OBJECT(
'tenure_months', tenure_months,
'monthly_charges', monthly_charges,
'contract_type', contract_type
)
) as churn_probability
FROM customer_churn_data
WHERE churn_label IS NULL -- Predict for new customers
LIMIT 10;
-- Batch predictions
CREATE TABLE churn_predictions AS
SELECT
customer_id,
sys.ML_PREDICT_ROW(
'churn_prediction_model',
JSON_OBJECT(
'tenure_months', tenure_months,
'monthly_charges', monthly_charges,
'contract_type', contract_type,
'payment_method', payment_method
)
) as prediction,
CURRENT_TIMESTAMP as predicted_at
FROM customer_churn_data
WHERE churn_label IS NULL;
-- Model evaluation
SELECT
sys.ML_MODEL_EVALUATE(
'churn_prediction_model',
'customer_churn_data',
'churn_label'
) as model_metrics;
-- Time series forecasting
CALL sys.ML_TRAIN(
'sales_data',
'revenue',
@ts_model_handle,
JSON_OBJECT(
'task', 'time_series',
'time_column', 'sale_date',
'target_column', 'revenue',
'horizon', 30,
'frequency', 'day'
)
);
HeatWave Vector Similarity Search
-- Enable vector search in HeatWave
ALTER TABLE document_embeddings
ADD VECTOR INDEX idx_embedding_vector (embedding_vector)
COMMENT '{"type": "hnsw", "distance": "cosine"}';
-- Similarity search with HeatWave
SELECT
de1.document_id as query_id,
de2.document_id as similar_id,
VECTOR_DISTANCE(de1.embedding_vector, de2.embedding_vector, 'cosine') as similarity
FROM document_embeddings de1
JOIN document_embeddings de2 ON de1.document_id != de2.document_id
WHERE de1.document_id = 123
ORDER BY similarity DESC
LIMIT 10;
-- ANN (Approximate Nearest Neighbor) search
SELECT
document_id,
VECTOR_DISTANCE(embedding_vector,
JSON_ARRAY(0.1, 0.2, 0.3, 0.4, 0.5),
'cosine') as similarity
FROM document_embeddings
WHERE VECTOR_DISTANCE(embedding_vector,
JSON_ARRAY(0.1, 0.2, 0.3, 0.4, 0.5),
'cosine') < 0.3
ORDER BY similarity
LIMIT 20;
Python Integration with HeatWave
import mysql.connector
import json
from typing import List, Dict
class HeatWaveML:
def __init__(self, host: str, user: str, password: str, database: str):
self.conn = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
def train_model(self, table_name: str, target_column: str,
model_name: str, config: Dict = None) -> str:
"""Train ML model using HeatWave"""
config_json = json.dumps(config or {
'task': 'classification',
'algorithm': 'xgboost',
'train_test_split': 0.8
})
with self.conn.cursor() as cur:
# Train model
cur.execute(f"""
CALL sys.ML_TRAIN(
%s,
%s,
@model_handle,
%s
)
""", (table_name, target_column, config_json))
# Load model
cur.execute(f"""
CALL sys.ML_MODEL_LOAD(@model_handle, %s)
""", (model_name,))
self.conn.commit()
return model_name
def predict(self, model_name: str, features: Dict) -> Dict:
"""Make prediction using trained model"""
with self.conn.cursor(dictionary=True) as cur:
cur.execute("""
SELECT sys.ML_PREDICT_ROW(%s, %s) as prediction
""", (model_name, json.dumps(features)))
result = cur.fetchone()
return json.loads(result['prediction']) if result else None
def batch_predict(self, model_name: str, table_name: str,
output_table: str) -> int:
"""Batch predict and store results"""
with self.conn.cursor() as cur:
cur.execute(f"""
CREATE TABLE {output_table} AS
SELECT
*,
sys.ML_PREDICT_ROW(%s,
JSON_OBJECT(
'feature1', feature1,
'feature2', feature2,
'feature3', feature3
)
) as prediction
FROM {table_name}
""", (model_name,))
self.conn.commit()
return cur.rowcount
---
## LangChain and LLM Integration
### Production-Ready LangChain Patterns
```python
from langchain_community.utilities import SQLDatabase
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_openai import ChatOpenAI
from langchain.agents import create_sql_agent
from langchain.agents.agent_types import AgentType
from typing import List, Dict, Optional
import json
class MySQLAIAssistant:
def __init__(self, db_config: Dict, llm_model: str = "gpt-4-turbo"):
"""Initialize MySQL-powered AI assistant"""
# Connect to MySQL database
self.db = SQLDatabase.from_uri(
f"mysql+pymysql://{db_config['user']}:{db_config['password']}"
f"@{db_config['host']}/{db_config['database']}"
)
# Initialize LLM
self.llm = ChatOpenAI(
model=llm_model,
temperature=0.1,
max_tokens=2000
)
# Create SQL agent toolkit
self.toolkit = SQLDatabaseToolkit(db=self.db, llm=self.llm)
# Create agent
self.agent = create_sql_agent(
llm=self.llm,
toolkit=self.toolkit,
verbose=True,
agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
handle_parsing_errors=True
)
def query_with_natural_language(self, question: str) -> Dict:
"""Execute natural language queries against MySQL"""
try:
response = self.agent.invoke({"input": question})
return {
"success": True,
"answer": response.get("output", ""),
"sql_query": response.get("intermediate_steps", [{}])[-1].get("action", ""),
"raw_response": response
}
except Exception as e:
return {
"success": False,
"error": str(e),
"answer": f"Error executing query: {e}"
}
def generate_sql_from_natural_language(self, description: str) -> str:
"""Generate SQL from natural language description"""
prompt = f"""
Convert the following natural language description into a valid MySQL SQL query.
Available tables: {self.db.get_usable_table_names()}
Description: {description}
Return ONLY the SQL query without any explanation.
"""
response = self.llm.invoke(prompt)
return response.content.strip()
def explain_query_results(self, sql_query: str, results: List[Dict]) -> str:
"""Generate natural language explanation of query results"""
results_json = json.dumps(results[:10], indent=2) # Limit for context
prompt = f"""
Explain the following MySQL query results in natural language:
Query: {sql_query}
Results (first 10 rows):
{results_json}
Provide a concise summary of what the data shows.
"""
response = self.llm.invoke(prompt)
return response.content
# Example usage
db_config = {
"host": "localhost",
"user": "ai_user",
"password": "secure_password",
"database": "ai_analytics"
}
assistant = MySQLAIAssistant(db_config)
# Natural language query
result = assistant.query_with_natural_language(
"What were the top 5 products by revenue last month?"
)
print(result["answer"])
# Generate SQL from description
sql = assistant.generate_sql_from_natural_language(
"Find customers who made more than 5 purchases in the last 30 days"
)
print(f"Generated SQL: {sql}")
RAG (Retrieval-Augmented Generation) with MySQL
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import MySQLLoader
import pymysql
class MySQLRAGSystem:
def __init__(self, db_config: Dict, embedding_model: str = "text-embedding-ada-002"):
self.db_config = db_config
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.vector_store = None
def load_documents_from_mysql(self, table_name: str,
text_columns: List[str],
metadata_columns: List[str] = None) -> List:
"""Load documents from MySQL table for RAG"""
# Build query to combine text columns
text_expr = " || ' ' || ".join(text_columns)
metadata_select = ", ".join(metadata_columns) if metadata_columns else "NULL as metadata"
query = f"""
SELECT
CONCAT({text_expr}) as content,
{metadata_select}
FROM {table_name}
WHERE {" AND ".join([f"{col} IS NOT NULL" for col in text_columns])}
"""
# Execute query
conn = pymysql.connect(**self.db_config)
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
# Create documents
documents = []
for row in rows:
metadata = {}
if metadata_columns:
for i, col in enumerate(metadata_columns, 1):
metadata[col] = row[i]
documents.append({
"page_content": row[0],
"metadata": metadata
})
return documents
def build_vector_index(self, documents: List, chunk_size: int = 1000):
"""Build vector index from documents"""
# Split documents
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
texts = text_splitter.split_documents(documents)
# Create vector store
self.vector_store = FAISS.from_documents(texts, self.embeddings)
return len(texts)
def search_similar(self, query: str, k: int = 5) -> List[Dict]:
"""Search for similar content"""
if not self.vector_store:
raise ValueError("Vector store not initialized. Call build_vector_index first.")
results = self.vector_store.similarity_search_with_score(query, k=k)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"score": score
}
for doc, score in results
]
def answer_with_context(self, question: str,
context_documents: List[Dict]) -> str:
"""Generate answer using retrieved context"""
# Combine context
context = "\n\n".join([
f"Source {i+1}:\n{doc['content']}"
for i, doc in enumerate(context_documents)
])
prompt = f"""
Answer the question based on the following context.
Context:
{context}
Question: {question}
Answer:
"""
# Use LLM to generate answer
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.1)
response = llm.invoke(prompt)
return response.content
# Example RAG workflow
rag_system = MySQLRAGSystem(db_config)
# Load documents from MySQL
documents = rag_system.load_documents_from_mysql(
table_name="knowledge_base",
text_columns=["title", "content", "summary"],
metadata_columns=["category", "author", "created_date"]
)
# Build vector index
rag_system.build_vector_index(documents)
# Search and answer
similar_docs = rag_system.search_similar(
"How to optimize MySQL for AI workloads?",
k=3
)
answer = rag_system.answer_with_context(
"What are best practices for MySQL AI applications?",
similar_docs
)
print(answer)
Storing ML Model Metadata
-- Model registry
CREATE TABLE ml_models (
model_id INT AUTO_INCREMENT PRIMARY KEY,
model_name VARCHAR(100) NOT NULL,
model_type VARCHAR(50), -- 'classification', 'regression', etc.
framework VARCHAR(50), -- 'sklearn', 'tensorflow', etc.
version VARCHAR(20),
parameters JSON,
metrics JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_production BOOLEAN DEFAULT FALSE
);
-- Model versions
CREATE TABLE model_versions (
version_id INT AUTO_INCREMENT PRIMARY KEY,
model_id INT REFERENCES ml_models(model_id),
version VARCHAR(20),
model_file_path VARCHAR(500),
training_data_hash VARCHAR(64),
accuracy DECIMAL(5, 4),
precision_score DECIMAL(5, 4),
recall_score DECIMAL(5, 4),
f1_score DECIMAL(5, 4),
trained_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Insert model metadata
INSERT INTO ml_models (model_name, model_type, framework, version, parameters, metrics, is_production)
VALUES (
'customer_churn',
'classification',
'xgboost',
'v2.1.0',
'{"max_depth": 6, "learning_rate": 0.1, "n_estimators": 100}',
'{"accuracy": 0.95, "precision": 0.93, "recall": 0.94}',
TRUE
);
Feature Store with MySQL
Storing Precomputed Features
class FeatureStore:
def __init__(self, connection):
self.conn = connection
def store_features(self, entity_type, entity_id, features):
"""Store precomputed ML features"""
import json
with self.conn.cursor() as cur:
sql = """
INSERT INTO ml_features (entity_type, entity_id, features)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE features = %s
"""
cur.execute(sql, (entity_type, entity_id, json.dumps(features), json.dumps(features)))
self.conn.commit()
def get_features(self, entity_type, entity_id):
"""Retrieve features for ML inference"""
import json
with self.conn.cursor() as cur:
cur.execute("""
SELECT features
FROM ml_features
WHERE entity_type = %s AND entity_id = %s
""", (entity_type, entity_id))
result = cur.fetchone()
return json.loads(result[0]) if result else None
def batch_get(self, entity_type, entity_ids):
"""Batch retrieve features"""
import json
placeholders = ','.join(['%s'] * len(entity_ids))
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT entity_id, features
FROM ml_features
WHERE entity_type = %s AND entity_id IN ({placeholders})
""", [entity_type] + entity_ids)
return {
row[0]: json.loads(row[1])
for row in cur.fetchall()
}
# Create table
"""
CREATE TABLE ml_features (
id INT AUTO_INCREMENT PRIMARY KEY,
entity_type VARCHAR(50) NOT NULL,
entity_id BIGINT NOT NULL,
features JSON NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY (entity_type, entity_id)
);
CREATE INDEX idx_ml_features_entity ON ml_features (entity_type, entity_id);
"""
Session Management for LLM Applications
class LLMSessionManager:
def __init__(self, conn):
self.conn = conn
def create_session(self, user_id):
"""Create new chat session"""
import uuid
session_id = str(uuid.uuid4())
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO chat_sessions (session_id, user_id, messages)
VALUES (%s, %s, '[]')
""", (session_id, user_id))
self.conn.commit()
return session_id
def add_message(self, session_id, role, content):
"""Add message to session"""
import json
with self.conn.cursor() as cur:
cur.execute("""
UPDATE chat_sessions
SET messages = JSON_ARRAY_APPEND(messages, '$', %s),
updated_at = NOW()
WHERE session_id = %s
""", (json.dumps({"role": role, "content": content, "timestamp": str(datetime.now())}), session_id))
self.conn.commit()
def get_context(self, session_id, max_messages=20):
"""Get conversation history"""
import json
with self.conn.cursor() as cur:
cur.execute("""
SELECT messages
FROM chat_sessions
WHERE session_id = %s
""", (session_id,))
result = cur.fetchone()
if not result:
return []
messages = json.loads(result[0])
return messages[-max_messages:]
# Create table
"""
CREATE TABLE chat_sessions (
session_id VARCHAR(36) PRIMARY KEY,
user_id BIGINT,
messages JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_sessions (user_id, updated_at)
);
"""
Semantic Caching
import hashlib
import json
class SemanticCache:
def __init__(self, conn, ttl=3600):
self.conn = conn
self.ttl = ttl
def _make_key(self, prompt):
"""Create cache key from prompt"""
return hashlib.sha256(prompt.encode()).hexdigest()
def get(self, prompt):
"""Get cached response"""
key = self._make_key(prompt)
with self.conn.cursor() as cur:
cur.execute("""
SELECT response, created_at
FROM prompt_cache
WHERE prompt_hash = %s
AND created_at > NOW() - INTERVAL %s SECOND
""", (key, self.ttl))
result = cur.fetchone()
if result:
return result[0]
return None
def set(self, prompt, response):
"""Cache response"""
key = self._make_key(prompt)
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO prompt_cache (prompt_hash, prompt, response)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE response = %s, created_at = NOW()
""", (key, prompt, response, response))
self.conn.commit()
# Create table
"""
CREATE TABLE prompt_cache (
prompt_hash VARCHAR(64) PRIMARY KEY,
prompt TEXT NOT NULL,
response TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_cache_expiry (created_at)
);
"""
Production Monitoring and Observability
Comprehensive AI Workload Monitoring
-- AI workload performance dashboard
CREATE VIEW ai_workload_monitoring AS
SELECT
DATE(created_at) as date,
HOUR(created_at) as hour,
COUNT(*) as total_requests,
COUNT(CASE WHEN JSON_EXTRACT(metadata, '$.model_type') = 'llm' THEN 1 END) as llm_requests,
COUNT(CASE WHEN JSON_EXTRACT(metadata, '$.model_type') = 'embedding' THEN 1 END) as embedding_requests,
AVG(JSON_EXTRACT(metrics, '$.latency_ms')) as avg_latency_ms,
MAX(JSON_EXTRACT(metrics, '$.latency_ms')) as max_latency_ms,
SUM(JSON_EXTRACT(metrics, '$.tokens_used')) as total_tokens,
SUM(JSON_EXTRACT(metrics, '$.cost_usd')) as total_cost_usd
FROM ai_requests
WHERE created_at > NOW() - INTERVAL 7 DAY
GROUP BY DATE(created_at), HOUR(created_at)
ORDER BY date DESC, hour DESC;
-- Embedding storage analytics
SELECT
embedding_type,
COUNT(*) as document_count,
AVG(JSON_LENGTH(embedding_vector)) as avg_dimensions,
MIN(JSON_LENGTH(embedding_vector)) as min_dimensions,
MAX(JSON_LENGTH(embedding_vector)) as max_dimensions,
SUM(LENGTH(embedding_vector)) as total_storage_bytes,
AVG(LENGTH(embedding_vector)) as avg_storage_bytes
FROM document_embeddings
GROUP BY embedding_type
ORDER BY document_count DESC;
-- Model performance tracking
SELECT
model_name,
model_type,
COUNT(*) as prediction_count,
AVG(JSON_EXTRACT(metrics, '$.latency_ms')) as avg_latency,
AVG(JSON_EXTRACT(metrics, '$.accuracy')) as avg_accuracy,
AVG(JSON_EXTRACT(metrics, '$.confidence')) as avg_confidence,
MIN(created_at) as first_used,
MAX(created_at) as last_used
FROM model_predictions
WHERE created_at > NOW() - INTERVAL 30 DAY
GROUP BY model_name, model_type
HAVING prediction_count > 100
ORDER BY prediction_count DESC;
-- Cost analysis by AI component
SELECT
component_type,
SUM(cost_usd) as total_cost,
AVG(cost_usd) as avg_cost_per_request,
COUNT(*) as request_count,
MIN(cost_usd) as min_cost,
MAX(cost_usd) as max_cost
FROM ai_costs
WHERE cost_date > DATE_SUB(CURDATE(), INTERVAL 30 DAY)
GROUP BY component_type
ORDER BY total_cost DESC;
-- Error rate monitoring
SELECT
DATE(error_time) as error_date,
error_type,
COUNT(*) as error_count,
GROUP_CONCAT(DISTINCT user_id) as affected_users,
AVG(response_time_ms) as avg_response_time
FROM ai_errors
WHERE error_time > NOW() - INTERVAL 7 DAY
GROUP BY DATE(error_time), error_type
ORDER BY error_date DESC, error_count DESC;
Real-time Alerting System
-- Create alert triggers for AI workloads
DELIMITER //
CREATE TRIGGER ai_high_latency_alert
AFTER INSERT ON ai_requests
FOR EACH ROW
BEGIN
DECLARE avg_latency DECIMAL(10,2);
DECLARE threshold DECIMAL(10,2) DEFAULT 1000; -- 1 second
-- Get average latency for this model type
SELECT AVG(JSON_EXTRACT(metrics, '$.latency_ms'))
INTO avg_latency
FROM ai_requests
WHERE JSON_EXTRACT(metadata, '$.model_type') =
JSON_EXTRACT(NEW.metadata, '$.model_type')
AND created_at > NOW() - INTERVAL 1 HOUR;
-- Check if latency exceeds threshold
IF JSON_EXTRACT(NEW.metrics, '$.latency_ms') > threshold
AND JSON_EXTRACT(NEW.metrics, '$.latency_ms') > avg_latency * 2 THEN
INSERT INTO ai_alerts (
alert_type,
severity,
message,
metadata,
created_at
) VALUES (
'high_latency',
'warning',
CONCAT('High latency detected: ',
JSON_EXTRACT(NEW.metrics, '$.latency_ms'), 'ms'),
JSON_OBJECT(
'request_id', NEW.request_id,
'model_type', JSON_EXTRACT(NEW.metadata, '$.model_type'),
'latency_ms', JSON_EXTRACT(NEW.metrics, '$.latency_ms'),
'avg_latency_ms', avg_latency
),
NOW()
);
END IF;
END//
DELIMITER ;
-- Cost threshold alert
DELIMITER //
CREATE TRIGGER ai_cost_threshold_alert
AFTER INSERT ON ai_costs
FOR EACH ROW
BEGIN
DECLARE daily_total DECIMAL(10,2);
DECLARE threshold DECIMAL(10,2) DEFAULT 100.00; -- $100 daily threshold
-- Calculate daily total for this component
SELECT SUM(cost_usd)
INTO daily_total
FROM ai_costs
WHERE component_type = NEW.component_type
AND cost_date = NEW.cost_date;
-- Check if exceeds threshold
IF daily_total > threshold THEN
INSERT INTO ai_alerts (
alert_type,
severity,
message,
metadata,
created_at
) VALUES (
'cost_threshold',
'critical',
CONCAT('Cost threshold exceeded: $', daily_total),
JSON_OBJECT(
'component_type', NEW.component_type,
'daily_total', daily_total,
'threshold', threshold,
'date', NEW.cost_date
),
NOW()
);
END IF;
END//
DELIMITER ;
Python Monitoring Dashboard
import pymysql
import json
from datetime import datetime, timedelta
from typing import Dict, List
import pandas as pd
class AIMonitoringDashboard:
def __init__(self, db_config: Dict):
self.conn = pymysql.connect(**db_config)
def get_performance_metrics(self, hours: int = 24) -> pd.DataFrame:
"""Get performance metrics for the last N hours"""
query = """
SELECT
DATE_FORMAT(created_at, '%Y-%m-%d %H:00') as hour,
COUNT(*) as request_count,
AVG(JSON_EXTRACT(metrics, '$.latency_ms')) as avg_latency,
AVG(JSON_EXTRACT(metrics, '$.tokens_used')) as avg_tokens,
SUM(JSON_EXTRACT(metrics, '$.cost_usd')) as total_cost,
COUNT(DISTINCT user_id) as unique_users
FROM ai_requests
WHERE created_at > NOW() - INTERVAL %s HOUR
GROUP BY DATE_FORMAT(created_at, '%Y-%m-%d %H:00')
ORDER BY hour
"""
with self.conn.cursor() as cur:
cur.execute(query, (hours,))
columns = [desc[0] for desc in cur.description]
data = cur.fetchall()
return pd.DataFrame(data, columns=columns)
def get_error_analysis(self, days: int = 7) -> Dict:
"""Analyze error patterns"""
query = """
SELECT
error_type,
COUNT(*) as error_count,
AVG(response_time_ms) as avg_response_time,
MIN(error_time) as first_error,
MAX(error_time) as last_error,
GROUP_CONCAT(DISTINCT user_id) as affected_users
FROM ai_errors
WHERE error_time > NOW() - INTERVAL %s DAY
GROUP BY error_type
ORDER BY error_count DESC
"""
with self.conn.cursor() as cur:
cur.execute(query, (days,))
results = cur.fetchall()
return {
"total_errors": sum(row[1] for row in results),
"error_types": [
{
"type": row[0],
"count": row[1],
"avg_response_time": row[2],
"first_occurrence": row[3],
"last_occurrence": row[4],
"affected_users": row[5].split(',') if row[5] else []
}
for row in results
]
}
def get_cost_breakdown(self, start_date: str, end_date: str) -> pd.DataFrame:
"""Get cost breakdown by component and date"""
query = """
SELECT
cost_date,
component_type,
SUM(cost_usd) as daily_cost,
COUNT(*) as request_count,
AVG(cost_usd) as avg_cost_per_request
FROM ai_costs
WHERE cost_date BETWEEN %s AND %s
GROUP BY cost_date, component_type
ORDER BY cost_date DESC, daily_cost DESC
"""
with self.conn.cursor() as cur:
cur.execute(query, (start_date, end_date))
columns = [desc[0] for desc in cur.description]
data = cur.fetchall()
df = pd.DataFrame(data, columns=columns)
df['cost_date'] = pd.to_datetime(df['cost_date'])
return df
def generate_health_report(self) -> Dict:
"""Generate comprehensive health report"""
report = {
"timestamp": datetime.now().isoformat(),
"performance": {},
"costs": {},
"errors": {},
"recommendations": []
}
# Performance metrics
perf_df = self.get_performance_metrics(24)
if not perf_df.empty:
report["performance"] = {
"total_requests": int(perf_df['request_count'].sum()),
"avg_latency": float(perf_df['avg_latency'].mean()),
"total_cost": float(perf_df['total_cost'].sum()),
"peak_hour": perf_df.loc[perf_df['request_count'].idxmax()].to_dict()
}
# Error analysis
error_analysis = self.get_error_analysis(1)
report["errors"] = error_analysis
# Cost analysis
today = datetime.now().date()
yesterday = today - timedelta(days=1)
cost_df = self.get_cost_breakdown(
yesterday.isoformat(),
today.isoformat()
)
if not cost_df.empty:
report["costs"] = {
"yesterday_total": float(cost_df['daily_cost'].sum()),
"by_component": cost_df.groupby('component_type')['daily_cost']
.sum().to_dict(),
"trend": "increasing" if len(cost_df) > 1 and
cost_df['daily_cost'].iloc[0] > cost_df['daily_cost'].iloc[1]
else "stable"
}
# Generate recommendations
if report["performance"].get("avg_latency", 0) > 500:
report["recommendations"].append(
"High latency detected. Consider optimizing model calls or implementing caching."
)
if report["errors"].get("total_errors", 0) > 10:
report["recommendations"].append(
"High error rate. Review error patterns and implement retry logic."
)
if report["costs"].get("yesterday_total", 0) > 50:
report["recommendations"].append(
"High daily cost. Consider implementing cost controls or optimizing model usage."
)
return report
---
## Production Best Practices
### Performance Optimization Strategies
```sql
-- 1. Optimized embedding storage
-- Use appropriate data types based on embedding size
CREATE TABLE optimized_embeddings (
id INT AUTO_INCREMENT PRIMARY KEY,
document_id INT NOT NULL,
embedding_type VARCHAR(50),
-- Store as JSON for flexibility, BLOB for large embeddings
embedding_vector JSON, -- For embeddings up to 4096 dimensions
-- embedding_blob BLOB, -- For larger embeddings
embedding_dim INT GENERATED ALWAYS AS (JSON_LENGTH(embedding_vector)) STORED,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_document_embedding (document_id, embedding_type),
INDEX idx_embedding_dim (embedding_dim),
INDEX idx_created (created_at)
) ENGINE=InnoDB
ROW_FORMAT=DYNAMIC
KEY_BLOCK_SIZE=8;
-- 2. Generated columns for frequent JSON queries
ALTER TABLE ai_metadata
ADD COLUMN model_family VARCHAR(50)
GENERATED ALWAYS AS (
CASE
WHEN model_name LIKE 'gpt-%' THEN 'openai'
WHEN model_name LIKE 'claude-%' THEN 'anthropic'
WHEN model_name LIKE 'command-%' THEN 'cohere'
ELSE 'custom'
END
) STORED,
ADD COLUMN max_context_tokens INT
GENERATED ALWAYS AS (JSON_EXTRACT(parameters, '$.max_tokens')) STORED,
ADD COLUMN estimated_cost_per_1k DECIMAL(10,4)
GENERATED ALWAYS AS (JSON_EXTRACT(performance_metrics, '$.cost_per_1k_tokens')) STORED;
CREATE INDEX idx_model_family ON ai_metadata(model_family);
CREATE INDEX idx_max_context ON ai_metadata(max_context_tokens);
CREATE INDEX idx_estimated_cost ON ai_metadata(estimated_cost_per_1k);
-- 3. Partitioning for time-series AI data
ALTER TABLE ai_requests
PARTITION BY RANGE (TO_DAYS(created_at)) (
PARTITION p2026_q1 VALUES LESS THAN (TO_DAYS('2026-04-01')),
PARTITION p2026_q2 VALUES LESS THAN (TO_DAYS('2026-07-01')),
PARTITION p2026_q3 VALUES LESS THAN (TO_DAYS('2026-10-01')),
PARTITION p2026_q4 VALUES LESS THAN (TO_DAYS('2027-01-01')),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
-- 4. Materialized views for frequent aggregations
CREATE TABLE ai_daily_stats (
stat_date DATE PRIMARY KEY,
total_requests INT DEFAULT 0,
total_tokens BIGINT DEFAULT 0,
total_cost DECIMAL(10,2) DEFAULT 0,
avg_latency_ms DECIMAL(10,2) DEFAULT 0,
error_rate DECIMAL(5,4) DEFAULT 0,
unique_users INT DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- Refresh procedure
DELIMITER //
CREATE PROCEDURE refresh_ai_daily_stats()
BEGIN
REPLACE INTO ai_daily_stats
SELECT
DATE(created_at) as stat_date,
COUNT(*) as total_requests,
SUM(JSON_EXTRACT(metrics, '$.tokens_used')) as total_tokens,
SUM(JSON_EXTRACT(metrics, '$.cost_usd')) as total_cost,
AVG(JSON_EXTRACT(metrics, '$.latency_ms')) as avg_latency_ms,
COUNT(CASE WHEN error_code IS NOT NULL THEN 1 END) / COUNT(*) as error_rate,
COUNT(DISTINCT user_id) as unique_users,
NOW() as updated_at
FROM ai_requests
WHERE created_at >= CURDATE() - INTERVAL 7 DAY
GROUP BY DATE(created_at);
END//
DELIMITER ;
-- Schedule refresh (run via cron or event scheduler)
CREATE EVENT IF NOT EXISTS refresh_ai_stats_daily
ON SCHEDULE EVERY 1 HOUR
DO
CALL refresh_ai_daily_stats();
Security and Compliance
-- 1. Role-based access control for AI workloads
CREATE ROLE ai_developer;
CREATE ROLE ai_operator;
CREATE ROLE ai_auditor;
-- Developer permissions (read/write models, test data)
GRANT SELECT, INSERT, UPDATE, DELETE ON ai_metadata TO ai_developer;
GRANT SELECT, INSERT ON model_predictions TO ai_developer;
GRANT EXECUTE ON PROCEDURE train_model TO ai_developer;
-- Operator permissions (production operations)
GRANT SELECT ON ai_metadata TO ai_operator;
GRANT SELECT, INSERT ON model_predictions TO ai_operator;
GRANT EXECUTE ON PROCEDURE make_prediction TO ai_operator;
GRANT SELECT ON ai_daily_stats TO ai_operator;
-- Auditor permissions (read-only monitoring)
GRANT SELECT ON ai_metadata TO ai_auditor;
GRANT SELECT ON model_predictions TO ai_auditor;
GRANT SELECT ON ai_daily_stats TO ai_auditor;
GRANT SELECT ON ai_alerts TO ai_auditor;
GRANT SELECT ON audit_log TO ai_auditor;
-- Create users and assign roles
CREATE USER 'dev_ai'@'%' IDENTIFIED BY 'dev_password';
CREATE USER 'ops_ai'@'%' IDENTIFIED BY 'ops_password';
CREATE USER 'audit_ai'@'%' IDENTIFIED BY 'audit_password';
GRANT ai_developer TO 'dev_ai'@'%';
GRANT ai_operator TO 'ops_ai'@'%';
GRANT ai_auditor TO 'audit_ai'@'%';
-- 2. Data encryption for sensitive AI data
ALTER TABLE ai_metadata
MODIFY COLUMN parameters JSON
ENCRYPTED WITH (KEY_ID = 'ai_encryption_key');
ALTER TABLE model_predictions
MODIFY COLUMN input_data JSON
ENCRYPTED WITH (KEY_ID = 'ai_encryption_key'),
MODIFY COLUMN output_data JSON
ENCRYPTED WITH (KEY_ID = 'ai_encryption_key');
-- 3. Comprehensive audit logging
CREATE TABLE ai_audit_log (
audit_id BIGINT AUTO_INCREMENT PRIMARY KEY,
event_time TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
user_name VARCHAR(100),
event_type VARCHAR(50),
table_name VARCHAR(100),
record_id VARCHAR(100),
old_values JSON,
new_values JSON,
ip_address VARCHAR(45),
user_agent TEXT,
INDEX idx_audit_time (event_time),
INDEX idx_audit_user (user_name, event_time),
INDEX idx_audit_event (event_type, event_time)
) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;
-- Audit trigger for sensitive tables
DELIMITER //
CREATE TRIGGER audit_ai_metadata_changes
AFTER UPDATE ON ai_metadata
FOR EACH ROW
BEGIN
INSERT INTO ai_audit_log (
user_name,
event_type,
table_name,
record_id,
old_values,
new_values
) VALUES (
USER(),
'UPDATE',
'ai_metadata',
NEW.id,
JSON_OBJECT(
'model_name', OLD.model_name,
'parameters', OLD.parameters,
'performance_metrics', OLD.performance_metrics
),
JSON_OBJECT(
'model_name', NEW.model_name,
'parameters', NEW.parameters,
'performance_metrics', NEW.performance_metrics
)
);
END//
DELIMITER ;
-- 4. Data retention and compliance
CREATE TABLE ai_data_retention_policy (
table_name VARCHAR(100) PRIMARY KEY,
retention_days INT NOT NULL,
archive_strategy ENUM('delete', 'archive', 'anonymize'),
last_cleanup_date DATE,
cleanup_status ENUM('pending', 'completed', 'failed'),
cleanup_details JSON
);
INSERT INTO ai_data_retention_policy VALUES
('ai_requests', 90, 'archive', NULL, 'pending', '{}'),
('model_predictions', 365, 'archive', NULL, 'pending', '{}'),
('ai_errors', 30, 'delete', NULL, 'pending', '{}'),
('prompt_cache', 7, 'delete', NULL, 'pending', '{}');
-- Cleanup procedure
DELIMITER //
CREATE PROCEDURE cleanup_ai_data()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_table_name VARCHAR(100);
DECLARE v_retention_days INT;
DECLARE v_archive_strategy VARCHAR(20);
DECLARE cur CURSOR FOR
SELECT table_name, retention_days, archive_strategy
FROM ai_data_retention_policy
WHERE cleanup_status != 'completed';
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur;
read_loop: LOOP
FETCH cur INTO v_table_name, v_retention_days, v_archive_strategy;
IF done THEN
LEAVE read_loop;
END IF;
START TRANSACTION;
CASE v_archive_strategy
WHEN 'delete' THEN
SET @sql = CONCAT(
'DELETE FROM ', v_table_name,
' WHERE created_at < DATE_SUB(NOW(), INTERVAL ',
v_retention_days, ' DAY)'
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
WHEN 'archive' THEN
-- Archive to separate table
SET @archive_table = CONCAT(v_table_name, '_archive');
SET @sql = CONCAT(
'INSERT INTO ', @archive_table,
' SELECT * FROM ', v_table_name,
' WHERE created_at < DATE_SUB(NOW(), INTERVAL ',
v_retention_days, ' DAY)'
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- Delete from main table
SET @sql = CONCAT(
'DELETE FROM ', v_table_name,
' WHERE created_at < DATE_SUB(NOW(), INTERVAL ',
v_retention_days, ' DAY)'
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
WHEN 'anonymize' THEN
-- Anonymize sensitive data
SET @sql = CONCAT(
'UPDATE ', v_table_name,
' SET user_id = NULL, ip_address = NULL',
' WHERE created_at < DATE_SUB(NOW(), INTERVAL ',
v_retention_days, ' DAY)'
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END CASE;
-- Update policy status
UPDATE ai_data_retention_policy
SET last_cleanup_date = CURDATE(),
cleanup_status = 'completed',
cleanup_details = JSON_OBJECT(
'rows_affected', ROW_COUNT(),
'execution_time', NOW()
)
WHERE table_name = v_table_name;
COMMIT;
END LOOP;
CLOSE cur;
END//
DELIMITER ;
Cost Optimization Strategies
-- 1. Cost tracking and optimization
CREATE TABLE ai_cost_optimization (
optimization_id INT AUTO_INCREMENT PRIMARY KEY,
strategy_name VARCHAR(100),
target_component VARCHAR(50),
estimated_savings DECIMAL(10,2),
implementation_status ENUM('planned', 'in_progress', 'completed', 'failed'),
implemented_date DATE,
actual_savings DECIMAL(10,2),
details JSON
);
-- Common optimization strategies
INSERT INTO ai_cost_optimization VALUES
(NULL, 'Prompt caching', 'llm', 500.00, 'completed', '2026-01-15', 450.00,
'{"cache_hit_rate": 0.35, "avg_tokens_saved": 1200}'),
(NULL, 'Embedding batch processing', 'embedding', 300.00, 'in_progress', NULL, NULL,
'{"batch_size": 100, "estimated_reduction": 0.4}'),
(NULL, 'Model selection optimization', 'llm', 800.00, 'planned', NULL, NULL,
'{"target_models": ["gpt-3.5-turbo", "claude-instant"], "accuracy_threshold": 0.85}');
-- 2. Cost-aware query routing
CREATE TABLE model_cost_profile (
model_name VARCHAR(100) PRIMARY KEY,
provider VARCHAR(50),
cost_per_1k_input DECIMAL(10,4),
cost_per_1k_output DECIMAL(10,4),
avg_latency_ms INT,
accuracy_score DECIMAL(5,4),
is_available BOOLEAN DEFAULT TRUE,
rate_limit_per_minute INT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- Cost-aware routing function
DELIMITER //
CREATE FUNCTION get_cost_optimal_model(
required_accuracy DECIMAL(5,4),
max_latency_ms INT,
estimated_tokens INT
) RETURNS VARCHAR(100)
DETERMINISTIC
READS SQL DATA
BEGIN
DECLARE optimal_model VARCHAR(100);
SELECT model_name INTO optimal_model
FROM model_cost_profile
WHERE accuracy_score >= required_accuracy
AND avg_latency_ms <= max_latency_ms
AND is_available = TRUE
ORDER BY (cost_per_1k_input + cost_per_1k_output) * (estimated_tokens / 1000)
LIMIT 1;
RETURN optimal_model;
END//
DELIMITER ;
-- 3. Usage quotas and limits
CREATE TABLE ai_usage_quotas (
user_id INT PRIMARY KEY,
monthly_token_limit BIGINT DEFAULT 1000000,
monthly_cost_limit DECIMAL(10,2) DEFAULT 100.00,
concurrent_request_limit INT DEFAULT 10,
tokens_used_this_month BIGINT DEFAULT 0,
cost_used_this_month DECIMAL(10,2) DEFAULT 0,
quota_reset_date DATE,
is_active BOOLEAN DEFAULT TRUE,
INDEX idx_quota_reset (quota_reset_date)
);
-- Quota enforcement trigger
DELIMITER //
CREATE TRIGGER enforce_ai_usage_quotas
BEFORE INSERT ON ai_requests
FOR EACH ROW
BEGIN
DECLARE v_tokens_used BIGINT;
DECLARE v_cost_used DECIMAL(10,2);
DECLARE v_token_limit BIGINT;
DECLARE v_cost_limit DECIMAL(10,2);
DECLARE v_is_active BOOLEAN;
-- Get user quotas
SELECT
tokens_used_this_month,
cost_used_this_month,
monthly_token_limit,
monthly_cost_limit,
is_active
INTO
v_tokens_used,
v_cost_used,
v_token_limit,
v_cost_limit,
v_is_active
FROM ai_usage_quotas
WHERE user_id = NEW.user_id;
-- Check if user is active
IF NOT v_is_active THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'User AI access is disabled';
END IF;
-- Check token quota
IF v_tokens_used + NEW.estimated_tokens > v_token_limit THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Monthly token quota exceeded';
END IF;
-- Check cost quota
IF v_cost_used + NEW.estimated_cost > v_cost_limit THEN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Monthly cost quota exceeded';
END IF;
-- Update usage (will be committed with transaction)
UPDATE ai_usage_quotas
SET
tokens_used_this_month = tokens_used_this_month + NEW.estimated_tokens,
cost_used_this_month = cost_used_this_month + NEW.estimated_cost
WHERE user_id = NEW.user_id;
END//
DELIMITER ;
-- Monthly quota reset procedure
DELIMITER //
CREATE PROCEDURE reset_monthly_quotas()
BEGIN
UPDATE ai_usage_quotas
SET
tokens_used_this_month = 0,
cost_used_this_month = 0,
quota_reset_date = LAST_DAY(CURDATE())
WHERE quota_reset_date < CURDATE()
OR quota_reset_date IS NULL;
END//
DELIMITER ;
---
## Resources and Further Learning
### Official Documentation
- [MySQL JSON Functions](https://dev.mysql.com/doc/refman/8.0/en/json-functions.html) - Comprehensive guide to JSON operations
- [MySQL HeatWave ML](https://dev.mysql.com/doc/heatwave/en/heatwave-machine-learning.html) - In-database machine learning
- [MySQL Vector Search](https://dev.mysql.com/doc/heatwave/en/heatwave-vector-search.html) - Vector similarity search capabilities
- [MySQL Security Guide](https://dev.mysql.com/doc/refman/8.0/en/security.html) - Security best practices
### AI Integration Libraries
- [LangChain MySQL](https://python.langchain.com/docs/integrations/providers/mysql) - Official MySQL integration
- [SQLAlchemy](https://www.sqlalchemy.org/) - Python SQL toolkit and ORM
- [PyMySQL](https://pymysql.readthedocs.io/) - Pure Python MySQL client
- [MySQL Connector/Python](https://dev.mysql.com/doc/connector-python/en/) - Official MySQL Python connector
### Monitoring and Observability
- [MySQL Performance Schema](https://dev.mysql.com/doc/refman/8.0/en/performance-schema.html) - Built-in performance monitoring
- [MySQL Enterprise Monitor](https://www.mysql.com/products/enterprise/monitor.html) - Advanced monitoring (commercial)
- [Prometheus MySQL Exporter](https://github.com/prometheus/mysqld_exporter) - Metrics collection for Prometheus
- [Grafana MySQL Dashboards](https://grafana.com/grafana/dashboards/?search=mysql) - Visualization templates
### Cost Management Tools
- [AWS Cost Explorer](https://aws.amazon.com/aws-cost-management/aws-cost-explorer/) - Cloud cost analysis
- [Google Cloud Billing](https://cloud.google.com/billing/docs) - Cost management for GCP
- [Azure Cost Management](https://azure.microsoft.com/en-us/products/cost-management) - Azure cost optimization
- [OpenCost](https://www.opencost.io/) - Open-source cost monitoring
### Community Resources
- [MySQL Forums](https://forums.mysql.com/) - Official community forums
- [Stack Overflow MySQL Tag](https://stackoverflow.com/questions/tagged/mysql) - Q&A platform
- [r/mysql](https://www.reddit.com/r/mysql/) - Reddit community
- [MySQL Planet](https://planet.mysql.com/) - Aggregated MySQL blogs
### Books and Courses
- **"High Performance MySQL"** by Baron Schwartz et al. - Performance optimization
- **"MySQL Cookbook"** by Paul DuBois - Practical solutions
- **"Designing Data-Intensive Applications"** by Martin Kleppmann - Database design principles
- [MySQL Tutorial on W3Schools](https://www.w3schools.com/mysql/) - Beginner-friendly tutorials
- [Coursera: Database Management Essentials](https://www.coursera.org/learn/database-management) - University-level course
### Open Source Projects
- [VectorDB](https://github.com/milvus-io/milvus) - Open-source vector database (comparison)
- [PostgreSQL with pgvector](https://github.com/pgvector/pgvector) - PostgreSQL vector extension
- [Chroma](https://github.com/chroma-core/chroma) - AI-native embedding database
- [Weaviate](https://github.com/weaviate/weaviate) - Vector search engine
---
## Conclusion
MySQL has evolved into a comprehensive platform for AI applications, offering capabilities that rival specialized vector databases while maintaining enterprise-grade reliability. The key strengths for AI workloads include:
### Core Advantages
1. **Native JSON Support** - Flexible schema design for AI metadata and embeddings
2. **Vector Search Capabilities** - Built-in similarity search with HeatWave
3. **In-Database ML** - Direct model training and inference with HeatWave ML
4. **Enterprise Reliability** - ACID compliance, replication, and backup
5. **Cost Efficiency** - Lower total cost compared to specialized vector databases
### Production Readiness
MySQL is production-ready for AI workloads when:
- Embedding dimensions are moderate (up to 4096 dimensions)
- Query patterns are well-defined and indexed
- Cost control and compliance are critical requirements
- Existing MySQL infrastructure can be leveraged
### When to Consider Alternatives
Consider specialized vector databases when:
- Working with ultra-high-dimensional embeddings (10K+ dimensions)
- Requiring advanced ANN algorithms with high recall
- Needing specialized vector indexing not available in MySQL
- Operating at extreme scale with billions of vectors
### Future Outlook
MySQL continues to enhance its AI capabilities with:
- Improved vector search performance
- More ML algorithms in HeatWave
- Better integration with AI frameworks
- Enhanced monitoring for AI workloads
### Implementation Checklist
Before deploying MySQL for AI:
- [ ] Design efficient embedding storage schema
- [ ] Implement proper indexing strategies
- [ ] Set up comprehensive monitoring
- [ ] Configure security and access controls
- [ ] Plan for cost management and optimization
- [ ] Establish data retention policies
- [ ] Test at production scale with realistic workloads
MySQL provides a compelling option for organizations seeking to integrate AI capabilities into existing database infrastructure. By leveraging MySQL's strengths in reliability, security, and cost-efficiency, teams can build robust AI applications without the complexity of managing multiple specialized databases.
For organizations already invested in MySQL, the path to AI integration is straightforward and offers significant advantages in terms of operational simplicity, cost control, and enterprise readiness.
Comments