Skip to main content

MySQL for AI Applications: Vector Storage, JSON, and ML Integration

Created: March 5, 2026 CalmOps 26 min read

Introduction

MySQL has transformed from a traditional relational database into a powerful platform for AI applications. With native JSON support, vector similarity search capabilities, and the integrated HeatWave ML engine, MySQL now competes with specialized vector databases while offering enterprise-grade reliability. This guide covers practical implementations for AI workloads in MySQL.


Vector Embedding Storage

Efficient Embedding Storage Patterns

-- Optimized embedding table with vector dimensions
CREATE TABLE document_embeddings (
    id INT AUTO_INCREMENT PRIMARY KEY,
    document_id INT NOT NULL,
    embedding_type VARCHAR(50) DEFAULT 'text-embedding-ada-002',
    embedding_vector JSON NOT NULL,
    embedding_dim INT GENERATED ALWAYS AS (JSON_LENGTH(embedding_vector)) STORED,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_document (document_id),
    INDEX idx_embedding_dim (embedding_dim),
    INDEX idx_embedding_type (embedding_type)
) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;

-- Store embeddings with metadata
INSERT INTO document_embeddings (document_id, embedding_type, embedding_vector)
VALUES (
    1,
    'text-embedding-ada-002',
    JSON_ARRAY(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0)
);

-- Query with vector similarity (cosine similarity approximation)
SELECT 
    de.document_id,
    JSON_EXTRACT(de.embedding_vector, '$[0]') as first_dim,
    JSON_EXTRACT(de.embedding_vector, '$[1]') as second_dim,
    -- Calculate dot product for similarity
    (
        SELECT SUM(JSON_EXTRACT(de2.embedding_vector, CONCAT('$[', n.n, ']')) * 
                   JSON_EXTRACT(?, CONCAT('$[', n.n, ']')))
        FROM (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) n
        WHERE n.n < de.embedding_dim
    ) as similarity_score
FROM document_embeddings de
WHERE de.embedding_type = 'text-embedding-ada-002'
ORDER BY similarity_score DESC
LIMIT 10;

Python: Production-Grade Embedding Storage

import pymysql
import json
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class DocumentEmbedding:
    document_id: int
    embedding: List[float]
    embedding_type: str = "text-embedding-ada-002"
    metadata: Optional[Dict] = None

class MySQLVectorStore:
    def __init__(self, host: str, user: str, password: str, database: str):
        self.conn = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
    
    def store_embedding(self, embedding: DocumentEmbedding) -> int:
        """Store a single embedding with transaction safety"""
        with self.conn.cursor() as cur:
            sql = """
                INSERT INTO document_embeddings 
                (document_id, embedding_type, embedding_vector, metadata)
                VALUES (%s, %s, %s, %s)
            """
            cur.execute(sql, (
                embedding.document_id,
                embedding.embedding_type,
                json.dumps(embedding.embedding),
                json.dumps(embedding.metadata or {})
            ))
            self.conn.commit()
            return cur.lastrowid
    
    def batch_store(self, embeddings: List[DocumentEmbedding]) -> List[int]:
        """Batch store embeddings for performance"""
        ids = []
        with self.conn.cursor() as cur:
            for emb in embeddings:
                cur.execute("""
                    INSERT INTO document_embeddings 
                    (document_id, embedding_type, embedding_vector, metadata)
                    VALUES (%s, %s, %s, %s)
                """, (
                    emb.document_id,
                    emb.embedding_type,
                    json.dumps(emb.embedding),
                    json.dumps(emb.metadata or {})
                ))
                ids.append(cur.lastrowid)
            self.conn.commit()
        return ids
    
    def find_similar(self, query_vector: List[float], 
                    embedding_type: str = "text-embedding-ada-002",
                    limit: int = 10) -> List[Dict]:
        """Find similar embeddings using vector similarity"""
        with self.conn.cursor() as cur:
            # Simple cosine similarity approximation
            query_json = json.dumps(query_vector)
            cur.execute("""
                SELECT 
                    de.document_id,
                    de.embedding_vector,
                    de.metadata,
                    -- Calculate dot product similarity
                    (
                        SELECT SUM(
                            JSON_EXTRACT(de.embedding_vector, CONCAT('$[', n.n, ']')) * 
                            JSON_EXTRACT(%s, CONCAT('$[', n.n, ']'))
                        )
                        FROM (SELECT 0 as n UNION SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) n
                        WHERE n.n < JSON_LENGTH(de.embedding_vector)
                    ) as similarity
                FROM document_embeddings de
                WHERE de.embedding_type = %s
                ORDER BY similarity DESC
                LIMIT %s
            """, (query_json, embedding_type, limit))
            return cur.fetchall()

JSON Document Store for AI Metadata

Schema Design for AI Applications

-- AI metadata storage with JSON
CREATE TABLE ai_metadata (
    id INT AUTO_INCREMENT PRIMARY KEY,
    model_name VARCHAR(100) NOT NULL,
    model_type ENUM('llm', 'embedding', 'classification', 'regression', 'generation'),
    provider VARCHAR(50),  -- 'openai', 'anthropic', 'cohere', 'custom'
    version VARCHAR(20),
    parameters JSON NOT NULL,
    training_data JSON,
    performance_metrics JSON,
    deployment_config JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_model_type (model_type),
    INDEX idx_provider (provider),
    INDEX idx_created (created_at)
);

-- Store model configuration
INSERT INTO ai_metadata (
    model_name, model_type, provider, version, parameters, performance_metrics
) VALUES (
    'gpt-4-turbo',
    'llm',
    'openai',
    '2026-01-01',
    '{
        "max_tokens": 4096,
        "temperature": 0.7,
        "top_p": 0.9,
        "frequency_penalty": 0,
        "presence_penalty": 0
    }',
    '{
        "accuracy": 0.92,
        "latency_ms": 350,
        "cost_per_1k_tokens": 0.03,
        "throughput_tps": 45
    }'
);

-- Advanced JSON queries for AI metadata
SELECT 
    model_name,
    provider,
    JSON_EXTRACT(parameters, '$.max_tokens') as max_tokens,
    JSON_EXTRACT(performance_metrics, '$.latency_ms') as latency,
    -- Filter by performance threshold
    CASE 
        WHEN JSON_EXTRACT(performance_metrics, '$.accuracy') > 0.9 THEN 'high'
        WHEN JSON_EXTRACT(performance_metrics, '$.accuracy') > 0.8 THEN 'medium'
        ELSE 'low'
    END as accuracy_tier
FROM ai_metadata
WHERE model_type = 'llm'
  AND JSON_EXTRACT(performance_metrics, '$.latency_ms') < 500
ORDER BY JSON_EXTRACT(performance_metrics, '$.accuracy') DESC;

-- JSON aggregation for analytics
SELECT 
    provider,
    COUNT(*) as model_count,
    AVG(JSON_EXTRACT(performance_metrics, '$.accuracy')) as avg_accuracy,
    AVG(JSON_EXTRACT(performance_metrics, '$.latency_ms')) as avg_latency,
    JSON_ARRAYAGG(
        JSON_OBJECT(
            'model', model_name,
            'version', version,
            'max_tokens', JSON_EXTRACT(parameters, '$.max_tokens')
        )
    ) as model_details
FROM ai_metadata
WHERE model_type = 'llm'
GROUP BY provider;

Python: Dynamic Schema Management

class AIMetadataManager:
    def __init__(self, connection):
        self.conn = connection
    
    def store_model_metadata(self, model_data: Dict) -> int:
        """Store AI model metadata with validation"""
        required_fields = ['model_name', 'model_type', 'parameters']
        for field in required_fields:
            if field not in model_data:
                raise ValueError(f"Missing required field: {field}")
        
        with self.conn.cursor() as cur:
            sql = """
                INSERT INTO ai_metadata 
                (model_name, model_type, provider, version, parameters, 
                 training_data, performance_metrics, deployment_config)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """
            cur.execute(sql, (
                model_data['model_name'],
                model_data.get('model_type', 'custom'),
                model_data.get('provider'),
                model_data.get('version', '1.0.0'),
                json.dumps(model_data.get('parameters', {})),
                json.dumps(model_data.get('training_data', {})),
                json.dumps(model_data.get('performance_metrics', {})),
                json.dumps(model_data.get('deployment_config', {}))
            ))
            self.conn.commit()
            return cur.lastrowid
    
    def find_models_by_criteria(self, criteria: Dict) -> List[Dict]:
        """Find models matching specific criteria"""
        conditions = []
        params = []
        
        for key, value in criteria.items():
            if key in ['model_type', 'provider']:
                conditions.append(f"{key} = %s")
                params.append(value)
            elif key.startswith('performance.'):
                metric = key.split('.')[1]
                conditions.append(f"JSON_EXTRACT(performance_metrics, '$.{metric}') > %s")
                params.append(value)
        
        where_clause = " AND ".join(conditions) if conditions else "1=1"
        
        with self.conn.cursor() as cur:
            cur.execute(f"""
                SELECT 
                    model_name,
                    model_type,
                    provider,
                    version,
                    parameters,
                    performance_metrics,
                    created_at
                FROM ai_metadata
                WHERE {where_clause}
                ORDER BY created_at DESC
            """, params)
            return cur.fetchall()

MySQL HeatWave ML Engine

Production ML with HeatWave

HeatWave is MySQL’s integrated in-memory analytics and machine learning engine that enables ML operations directly in the database.

-- Enable HeatWave AutoML for predictive analytics
-- Create training dataset
CREATE TABLE customer_churn_data (
    customer_id INT PRIMARY KEY,
    tenure_months INT,
    monthly_charges DECIMAL(10,2),
    total_charges DECIMAL(10,2),
    contract_type ENUM('Month-to-month', 'One year', 'Two year'),
    payment_method VARCHAR(50),
    paperless_billing BOOLEAN,
    churn_label BOOLEAN
);

-- Train classification model directly in MySQL
CALL sys.ML_TRAIN(
    'customer_churn_data',
    'churn_label',
    @model_handle,
    JSON_OBJECT(
        'task', 'classification',
        'algorithm', 'xgboost',
        'target_column', 'churn_label',
        'exclude_columns', JSON_ARRAY('customer_id'),
        'train_test_split', 0.8,
        'validation_split', 0.1
    )
);

-- Load trained model
CALL sys.ML_MODEL_LOAD(@model_handle, 'churn_prediction_model');

-- Make predictions
SELECT 
    customer_id,
    tenure_months,
    monthly_charges,
    sys.ML_PREDICT_ROW(
        'churn_prediction_model',
        JSON_OBJECT(
            'tenure_months', tenure_months,
            'monthly_charges', monthly_charges,
            'contract_type', contract_type
        )
    ) as churn_probability
FROM customer_churn_data
WHERE churn_label IS NULL  -- Predict for new customers
LIMIT 10;

-- Batch predictions
CREATE TABLE churn_predictions AS
SELECT 
    customer_id,
    sys.ML_PREDICT_ROW(
        'churn_prediction_model',
        JSON_OBJECT(
            'tenure_months', tenure_months,
            'monthly_charges', monthly_charges,
            'contract_type', contract_type,
            'payment_method', payment_method
        )
    ) as prediction,
    CURRENT_TIMESTAMP as predicted_at
FROM customer_churn_data
WHERE churn_label IS NULL;

-- Model evaluation
SELECT 
    sys.ML_MODEL_EVALUATE(
        'churn_prediction_model',
        'customer_churn_data',
        'churn_label'
    ) as model_metrics;

-- Time series forecasting
CALL sys.ML_TRAIN(
    'sales_data',
    'revenue',
    @ts_model_handle,
    JSON_OBJECT(
        'task', 'time_series',
        'time_column', 'sale_date',
        'target_column', 'revenue',
        'horizon', 30,
        'frequency', 'day'
    )
);
-- Enable vector search in HeatWave
ALTER TABLE document_embeddings 
ADD VECTOR INDEX idx_embedding_vector (embedding_vector)
COMMENT '{"type": "hnsw", "distance": "cosine"}';

-- Similarity search with HeatWave
SELECT 
    de1.document_id as query_id,
    de2.document_id as similar_id,
    VECTOR_DISTANCE(de1.embedding_vector, de2.embedding_vector, 'cosine') as similarity
FROM document_embeddings de1
JOIN document_embeddings de2 ON de1.document_id != de2.document_id
WHERE de1.document_id = 123
ORDER BY similarity DESC
LIMIT 10;

-- ANN (Approximate Nearest Neighbor) search
SELECT 
    document_id,
    VECTOR_DISTANCE(embedding_vector, 
        JSON_ARRAY(0.1, 0.2, 0.3, 0.4, 0.5), 
        'cosine') as similarity
FROM document_embeddings
WHERE VECTOR_DISTANCE(embedding_vector, 
        JSON_ARRAY(0.1, 0.2, 0.3, 0.4, 0.5), 
        'cosine') < 0.3
ORDER BY similarity
LIMIT 20;

Python Integration with HeatWave

import mysql.connector
import json
from typing import List, Dict

class HeatWaveML:
    def __init__(self, host: str, user: str, password: str, database: str):
        self.conn = mysql.connector.connect(
            host=host,
            user=user,
            password=password,
            database=database
        )
    
    def train_model(self, table_name: str, target_column: str, 
                   model_name: str, config: Dict = None) -> str:
        """Train ML model using HeatWave"""
        config_json = json.dumps(config or {
            'task': 'classification',
            'algorithm': 'xgboost',
            'train_test_split': 0.8
        })
        
        with self.conn.cursor() as cur:
            # Train model
            cur.execute(f"""
                CALL sys.ML_TRAIN(
                    %s,
                    %s,
                    @model_handle,
                    %s
                )
            """, (table_name, target_column, config_json))
            
            # Load model
            cur.execute(f"""
                CALL sys.ML_MODEL_LOAD(@model_handle, %s)
            """, (model_name,))
            
            self.conn.commit()
            return model_name
    
    def predict(self, model_name: str, features: Dict) -> Dict:
        """Make prediction using trained model"""
        with self.conn.cursor(dictionary=True) as cur:
            cur.execute("""
                SELECT sys.ML_PREDICT_ROW(%s, %s) as prediction
            """, (model_name, json.dumps(features)))
            result = cur.fetchone()
            return json.loads(result['prediction']) if result else None
    
    def batch_predict(self, model_name: str, table_name: str, 
                     output_table: str) -> int:
        """Batch predict and store results"""
        with self.conn.cursor() as cur:
            cur.execute(f"""
                CREATE TABLE {output_table} AS
                SELECT 
                    *,
                    sys.ML_PREDICT_ROW(%s, 
                        JSON_OBJECT(
                            'feature1', feature1,
                            'feature2', feature2,
                            'feature3', feature3
                        )
                    ) as prediction
                FROM {table_name}
            """, (model_name,))
            self.conn.commit()
            return cur.rowcount

---

## LangChain and LLM Integration

### Production-Ready LangChain Patterns

from langchain_community.utilities import SQLDatabase from langchain_community.agent_toolkits import SQLDatabaseToolkit from langchain_openai import ChatOpenAI from langchain.agents import create_sql_agent from langchain.agents.agent_types import AgentType from typing import List, Dict, Optional import json

class MySQLAIAssistant: def init(self, db_config: Dict, llm_model: str = “gpt-4-turbo”): “““Initialize MySQL-powered AI assistant””” # Connect to MySQL database self.db = SQLDatabase.from_uri( f"mysql+pymysql://{db_config[‘user’]}:{db_config[‘password’]}" f"@{db_config[‘host’]}/{db_config[‘database’]}" )

    # Initialize LLM
    self.llm = ChatOpenAI(
        model=llm_model,
        temperature=0.1,
        max_tokens=2000
    )
    
    # Create SQL agent toolkit
    self.toolkit = SQLDatabaseToolkit(db=self.db, llm=self.llm)
    
    # Create agent
    self.agent = create_sql_agent(
        llm=self.llm,
        toolkit=self.toolkit,
        verbose=True,
        agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        handle_parsing_errors=True
    )

def query_with_natural_language(self, question: str) -> Dict:
    """Execute natural language queries against MySQL"""
    try:
        response = self.agent.invoke({"input": question})
        return {
            "success": True,
            "answer": response.get("output", ""),
            "sql_query": response.get("intermediate_steps", [{}])[-1].get("action", ""),
            "raw_response": response
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "answer": f"Error executing query: {e}"
        }

def generate_sql_from_natural_language(self, description: str) -> str:
    """Generate SQL from natural language description"""
    prompt = f"""
    Convert the following natural language description into a valid MySQL SQL query.
    Available tables: {self.db.get_usable_table_names()}
    
    Description: {description}
    
    Return ONLY the SQL query without any explanation.
    """
    
    response = self.llm.invoke(prompt)
    return response.content.strip()

def explain_query_results(self, sql_query: str, results: List[Dict]) -> str:
    """Generate natural language explanation of query results"""
    results_json = json.dumps(results[:10], indent=2)  # Limit for context
    
    prompt = f"""
    Explain the following MySQL query results in natural language:
    
    Query: {sql_query}
    
    Results (first 10 rows):
    {results_json}
    
    Provide a concise summary of what the data shows.
    """
    
    response = self.llm.invoke(prompt)
    return response.content

Example usage

db_config = { “host”: “localhost”, “user”: “ai_user”, “password”: “secure_password”, “database”: “ai_analytics” }

assistant = MySQLAIAssistant(db_config)

Natural language query

result = assistant.query_with_natural_language( “What were the top 5 products by revenue last month?” ) print(result[“answer”])

Generate SQL from description

sql = assistant.generate_sql_from_natural_language( “Find customers who made more than 5 purchases in the last 30 days” ) print(f"Generated SQL: {sql}")


### RAG (Retrieval-Augmented Generation) with MySQL

from langchain_community.vectorstores import FAISS from langchain_openai import OpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import MySQLLoader import pymysql

class MySQLRAGSystem: def init(self, db_config: Dict, embedding_model: str = “text-embedding-ada-002”): self.db_config = db_config self.embeddings = OpenAIEmbeddings(model=embedding_model) self.vector_store = None

def load_documents_from_mysql(self, table_name: str, 
                             text_columns: List[str],
                             metadata_columns: List[str] = None) -> List:
    """Load documents from MySQL table for RAG"""
    # Build query to combine text columns
    text_expr = " || ' ' || ".join(text_columns)
    metadata_select = ", ".join(metadata_columns) if metadata_columns else "NULL as metadata"
    
    query = f"""
    SELECT 
        CONCAT({text_expr}) as content,
        {metadata_select}
    FROM {table_name}
    WHERE {" AND ".join([f"{col} IS NOT NULL" for col in text_columns])}
    """
    
    # Execute query
    conn = pymysql.connect(**self.db_config)
    with conn.cursor() as cur:
        cur.execute(query)
        rows = cur.fetchall()
    
    # Create documents
    documents = []
    for row in rows:
        metadata = {}
        if metadata_columns:
            for i, col in enumerate(metadata_columns, 1):
                metadata[col] = row[i]
        
        documents.append({
            "page_content": row[0],
            "metadata": metadata
        })
    
    return documents

def build_vector_index(self, documents: List, chunk_size: int = 1000):
    """Build vector index from documents"""
    # Split documents
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=200,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )
    
    texts = text_splitter.split_documents(documents)
    
    # Create vector store
    self.vector_store = FAISS.from_documents(texts, self.embeddings)
    return len(texts)

def search_similar(self, query: str, k: int = 5) -> List[Dict]:
    """Search for similar content"""
    if not self.vector_store:
        raise ValueError("Vector store not initialized. Call build_vector_index first.")
    
    results = self.vector_store.similarity_search_with_score(query, k=k)
    
    return [
        {
            "content": doc.page_content,
            "metadata": doc.metadata,
            "score": score
        }
        for doc, score in results
    ]

def answer_with_context(self, question: str, 
                       context_documents: List[Dict]) -> str:
    """Generate answer using retrieved context"""
    # Combine context
    context = "\n\n".join([
        f"Source {i+1}:\n{doc['content']}"
        for i, doc in enumerate(context_documents)
    ])
    
    prompt = f"""
    Answer the question based on the following context.
    
    Context:
    {context}
    
    Question: {question}
    
    Answer:
    """
    
    # Use LLM to generate answer
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.1)
    response = llm.invoke(prompt)
    
    return response.content

Example RAG workflow

rag_system = MySQLRAGSystem(db_config)

Load documents from MySQL

documents = rag_system.load_documents_from_mysql( table_name=“knowledge_base”, text_columns=[“title”, “content”, “summary”], metadata_columns=[“category”, “author”, “created_date”] )

Build vector index

rag_system.build_vector_index(documents)

Search and answer

similar_docs = rag_system.search_similar( “How to optimize MySQL for AI workloads?”, k=3 )

answer = rag_system.answer_with_context( “What are best practices for MySQL AI applications?”, similar_docs ) print(answer)


---

## Storing ML Model Metadata

– Model registry CREATE TABLE ml_models ( model_id INT AUTO_INCREMENT PRIMARY KEY, model_name VARCHAR(100) NOT NULL, model_type VARCHAR(50), – ‘classification’, ‘regression’, etc. framework VARCHAR(50), – ‘sklearn’, ’tensorflow’, etc. version VARCHAR(20), parameters JSON, metrics JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_production BOOLEAN DEFAULT FALSE );

– Model versions CREATE TABLE model_versions ( version_id INT AUTO_INCREMENT PRIMARY KEY, model_id INT REFERENCES ml_models(model_id), version VARCHAR(20), model_file_path VARCHAR(500), training_data_hash VARCHAR(64), accuracy DECIMAL(5, 4), precision_score DECIMAL(5, 4), recall_score DECIMAL(5, 4), f1_score DECIMAL(5, 4), trained_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

– Insert model metadata INSERT INTO ml_models (model_name, model_type, framework, version, parameters, metrics, is_production) VALUES ( ‘customer_churn’, ‘classification’, ‘xgboost’, ‘v2.1.0’, ‘{“max_depth”: 6, “learning_rate”: 0.1, “n_estimators”: 100}’, ‘{“accuracy”: 0.95, “precision”: 0.93, “recall”: 0.94}’, TRUE );


---

## Feature Store with MySQL

### Storing Precomputed Features

class FeatureStore: def init(self, connection): self.conn = connection

def store_features(self, entity_type, entity_id, features):
    """Store precomputed ML features"""
    import json
    
    with self.conn.cursor() as cur:
        sql = """
            INSERT INTO ml_features (entity_type, entity_id, features)
            VALUES (%s, %s, %s)
            ON DUPLICATE KEY UPDATE features = %s
        """
        cur.execute(sql, (entity_type, entity_id, json.dumps(features), json.dumps(features)))
        self.conn.commit()

def get_features(self, entity_type, entity_id):
    """Retrieve features for ML inference"""
    import json
    
    with self.conn.cursor() as cur:
        cur.execute("""
            SELECT features
            FROM ml_features
            WHERE entity_type = %s AND entity_id = %s
        """, (entity_type, entity_id))
        result = cur.fetchone()
        return json.loads(result[0]) if result else None

def batch_get(self, entity_type, entity_ids):
    """Batch retrieve features"""
    import json
    
    placeholders = ','.join(['%s'] * len(entity_ids))
    with self.conn.cursor() as cur:
        cur.execute(f"""
            SELECT entity_id, features
            FROM ml_features
            WHERE entity_type = %s AND entity_id IN ({placeholders})
        """, [entity_type] + entity_ids)
        
        return {
            row[0]: json.loads(row[1]) 
            for row in cur.fetchall()
        }

Create table

""" CREATE TABLE ml_features ( id INT AUTO_INCREMENT PRIMARY KEY, entity_type VARCHAR(50) NOT NULL, entity_id BIGINT NOT NULL, features JSON NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY (entity_type, entity_id) );

CREATE INDEX idx_ml_features_entity ON ml_features (entity_type, entity_id); """


---

## Session Management for LLM Applications

class LLMSessionManager: def init(self, conn): self.conn = conn

def create_session(self, user_id):
    """Create new chat session"""
    import uuid
    session_id = str(uuid.uuid4())
    
    with self.conn.cursor() as cur:
        cur.execute("""
            INSERT INTO chat_sessions (session_id, user_id, messages)
            VALUES (%s, %s, '[]')
        """, (session_id, user_id))
        self.conn.commit()
        return session_id

def add_message(self, session_id, role, content):
    """Add message to session"""
    import json
    
    with self.conn.cursor() as cur:
        cur.execute("""
            UPDATE chat_sessions
            SET messages = JSON_ARRAY_APPEND(messages, '$', %s),
                updated_at = NOW()
            WHERE session_id = %s
        """, (json.dumps({"role": role, "content": content, "timestamp": str(datetime.now())}), session_id))
        self.conn.commit()

def get_context(self, session_id, max_messages=20):
    """Get conversation history"""
    import json
    
    with self.conn.cursor() as cur:
        cur.execute("""
            SELECT messages
            FROM chat_sessions
            WHERE session_id = %s
        """, (session_id,))
        result = cur.fetchone()
        if not result:
            return []
        
        messages = json.loads(result[0])
        return messages[-max_messages:]

Create table

""" CREATE TABLE chat_sessions ( session_id VARCHAR(36) PRIMARY KEY, user_id BIGINT, messages JSON, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_user_sessions (user_id, updated_at) ); """


---

## Semantic Caching

import hashlib import json

class SemanticCache: def init(self, conn, ttl=3600): self.conn = conn self.ttl = ttl

def _make_key(self, prompt):
    """Create cache key from prompt"""
    return hashlib.sha256(prompt.encode()).hexdigest()

def get(self, prompt):
    """Get cached response"""
    key = self._make_key(prompt)
    
    with self.conn.cursor() as cur:
        cur.execute("""
            SELECT response, created_at
            FROM prompt_cache
            WHERE prompt_hash = %s
            AND created_at > NOW() - INTERVAL %s SECOND
        """, (key, self.ttl))
        
        result = cur.fetchone()
        if result:
            return result[0]
        return None

def set(self, prompt, response):
    """Cache response"""
    key = self._make_key(prompt)
    
    with self.conn.cursor() as cur:
        cur.execute("""
            INSERT INTO prompt_cache (prompt_hash, prompt, response)
            VALUES (%s, %s, %s)
            ON DUPLICATE KEY UPDATE response = %s, created_at = NOW()
        """, (key, prompt, response, response))
        self.conn.commit()

Create table

""" CREATE TABLE prompt_cache ( prompt_hash VARCHAR(64) PRIMARY KEY, prompt TEXT NOT NULL, response TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_cache_expiry (created_at) ); """


---

## Production Monitoring and Observability

### Comprehensive AI Workload Monitoring

– AI workload performance dashboard CREATE VIEW ai_workload_monitoring AS SELECT DATE(created_at) as date, HOUR(created_at) as hour, COUNT(*) as total_requests, COUNT(CASE WHEN JSON_EXTRACT(metadata, ‘$.model_type') = 'llm' THEN 1 END) as llm_requests, COUNT(CASE WHEN JSON_EXTRACT(metadata, '$.model_type’) = ’embedding’ THEN 1 END) as embedding_requests, AVG(JSON_EXTRACT(metrics, ‘$.latency_ms')) as avg_latency_ms, MAX(JSON_EXTRACT(metrics, '$.latency_ms’)) as max_latency_ms, SUM(JSON_EXTRACT(metrics, ‘$.tokens_used')) as total_tokens, SUM(JSON_EXTRACT(metrics, '$.cost_usd’)) as total_cost_usd FROM ai_requests WHERE created_at > NOW() - INTERVAL 7 DAY GROUP BY DATE(created_at), HOUR(created_at) ORDER BY date DESC, hour DESC;

– Embedding storage analytics SELECT embedding_type, COUNT(*) as document_count, AVG(JSON_LENGTH(embedding_vector)) as avg_dimensions, MIN(JSON_LENGTH(embedding_vector)) as min_dimensions, MAX(JSON_LENGTH(embedding_vector)) as max_dimensions, SUM(LENGTH(embedding_vector)) as total_storage_bytes, AVG(LENGTH(embedding_vector)) as avg_storage_bytes FROM document_embeddings GROUP BY embedding_type ORDER BY document_count DESC;

– Model performance tracking SELECT model_name, model_type, COUNT(*) as prediction_count, AVG(JSON_EXTRACT(metrics, ‘$.latency_ms')) as avg_latency, AVG(JSON_EXTRACT(metrics, '$.accuracy’)) as avg_accuracy, AVG(JSON_EXTRACT(metrics, ‘$.confidence’)) as avg_confidence, MIN(created_at) as first_used, MAX(created_at) as last_used FROM model_predictions WHERE created_at > NOW() - INTERVAL 30 DAY GROUP BY model_name, model_type HAVING prediction_count > 100 ORDER BY prediction_count DESC;

– Cost analysis by AI component SELECT component_type, SUM(cost_usd) as total_cost, AVG(cost_usd) as avg_cost_per_request, COUNT(*) as request_count, MIN(cost_usd) as min_cost, MAX(cost_usd) as max_cost FROM ai_costs WHERE cost_date > DATE_SUB(CURDATE(), INTERVAL 30 DAY) GROUP BY component_type ORDER BY total_cost DESC;

– Error rate monitoring SELECT DATE(error_time) as error_date, error_type, COUNT(*) as error_count, GROUP_CONCAT(DISTINCT user_id) as affected_users, AVG(response_time_ms) as avg_response_time FROM ai_errors WHERE error_time > NOW() - INTERVAL 7 DAY GROUP BY DATE(error_time), error_type ORDER BY error_date DESC, error_count DESC;


### Real-time Alerting System

– Create alert triggers for AI workloads DELIMITER //

CREATE TRIGGER ai_high_latency_alert AFTER INSERT ON ai_requests FOR EACH ROW BEGIN DECLARE avg_latency DECIMAL(10,2); DECLARE threshold DECIMAL(10,2) DEFAULT 1000; – 1 second

-- Get average latency for this model type
SELECT AVG(JSON_EXTRACT(metrics, '$.latency_ms'))
INTO avg_latency
FROM ai_requests
WHERE JSON_EXTRACT(metadata, '$.model_type') = 
      JSON_EXTRACT(NEW.metadata, '$.model_type')
  AND created_at > NOW() - INTERVAL 1 HOUR;

-- Check if latency exceeds threshold
IF JSON_EXTRACT(NEW.metrics, '$.latency_ms') > threshold 
   AND JSON_EXTRACT(NEW.metrics, '$.latency_ms') > avg_latency * 2 THEN
   
    INSERT INTO ai_alerts (
        alert_type,
        severity,
        message,
        metadata,
        created_at
    ) VALUES (
        'high_latency',
        'warning',
        CONCAT('High latency detected: ', 
               JSON_EXTRACT(NEW.metrics, '$.latency_ms'), 'ms'),
        JSON_OBJECT(
            'request_id', NEW.request_id,
            'model_type', JSON_EXTRACT(NEW.metadata, '$.model_type'),
            'latency_ms', JSON_EXTRACT(NEW.metrics, '$.latency_ms'),
            'avg_latency_ms', avg_latency
        ),
        NOW()
    );
END IF;

END//

DELIMITER ;

– Cost threshold alert DELIMITER //

CREATE TRIGGER ai_cost_threshold_alert AFTER INSERT ON ai_costs FOR EACH ROW BEGIN DECLARE daily_total DECIMAL(10,2); DECLARE threshold DECIMAL(10,2) DEFAULT 100.00; – $100 daily threshold

-- Calculate daily total for this component
SELECT SUM(cost_usd)
INTO daily_total
FROM ai_costs
WHERE component_type = NEW.component_type
  AND cost_date = NEW.cost_date;

-- Check if exceeds threshold
IF daily_total > threshold THEN
    INSERT INTO ai_alerts (
        alert_type,
        severity,
        message,
        metadata,
        created_at
    ) VALUES (
        'cost_threshold',
        'critical',
        CONCAT('Cost threshold exceeded: $', daily_total),
        JSON_OBJECT(
            'component_type', NEW.component_type,
            'daily_total', daily_total,
            'threshold', threshold,
            'date', NEW.cost_date
        ),
        NOW()
    );
END IF;

END//

DELIMITER ;


### Python Monitoring Dashboard

import pymysql import json from datetime import datetime, timedelta from typing import Dict, List import pandas as pd

class AIMonitoringDashboard: def init(self, db_config: Dict): self.conn = pymysql.connect(**db_config)

def get_performance_metrics(self, hours: int = 24) -> pd.DataFrame:
    """Get performance metrics for the last N hours"""
    query = """
        SELECT 
            DATE_FORMAT(created_at, '%Y-%m-%d %H:00') as hour,
            COUNT(*) as request_count,
            AVG(JSON_EXTRACT(metrics, '$.latency_ms')) as avg_latency,
            AVG(JSON_EXTRACT(metrics, '$.tokens_used')) as avg_tokens,
            SUM(JSON_EXTRACT(metrics, '$.cost_usd')) as total_cost,
            COUNT(DISTINCT user_id) as unique_users
        FROM ai_requests
        WHERE created_at > NOW() - INTERVAL %s HOUR
        GROUP BY DATE_FORMAT(created_at, '%Y-%m-%d %H:00')
        ORDER BY hour
    """
    
    with self.conn.cursor() as cur:
        cur.execute(query, (hours,))
        columns = [desc[0] for desc in cur.description]
        data = cur.fetchall()
    
    return pd.DataFrame(data, columns=columns)

def get_error_analysis(self, days: int = 7) -> Dict:
    """Analyze error patterns"""
    query = """
        SELECT 
            error_type,
            COUNT(*) as error_count,
            AVG(response_time_ms) as avg_response_time,
            MIN(error_time) as first_error,
            MAX(error_time) as last_error,
            GROUP_CONCAT(DISTINCT user_id) as affected_users
        FROM ai_errors
        WHERE error_time > NOW() - INTERVAL %s DAY
        GROUP BY error_type
        ORDER BY error_count DESC
    """
    
    with self.conn.cursor() as cur:
        cur.execute(query, (days,))
        results = cur.fetchall()
    
    return {
        "total_errors": sum(row[1] for row in results),
        "error_types": [
            {
                "type": row[0],
                "count": row[1],
                "avg_response_time": row[2],
                "first_occurrence": row[3],
                "last_occurrence": row[4],
                "affected_users": row[5].split(',') if row[5] else []
            }
            for row in results
        ]
    }

def get_cost_breakdown(self, start_date: str, end_date: str) -> pd.DataFrame:
    """Get cost breakdown by component and date"""
    query = """
        SELECT 
            cost_date,
            component_type,
            SUM(cost_usd) as daily_cost,
            COUNT(*) as request_count,
            AVG(cost_usd) as avg_cost_per_request
        FROM ai_costs
        WHERE cost_date BETWEEN %s AND %s
        GROUP BY cost_date, component_type
        ORDER BY cost_date DESC, daily_cost DESC
    """
    
    with self.conn.cursor() as cur:
        cur.execute(query, (start_date, end_date))
        columns = [desc[0] for desc in cur.description]
        data = cur.fetchall()
    
    df = pd.DataFrame(data, columns=columns)
    df['cost_date'] = pd.to_datetime(df['cost_date'])
    return df

def generate_health_report(self) -> Dict:
    """Generate comprehensive health report"""
    report = {
        "timestamp": datetime.now().isoformat(),
        "performance": {},
        "costs": {},
        "errors": {},
        "recommendations": []
    }
    
    # Performance metrics
    perf_df = self.get_performance_metrics(24)
    if not perf_df.empty:
        report["performance"] = {
            "total_requests": int(perf_df['request_count'].sum()),
            "avg_latency": float(perf_df['avg_latency'].mean()),
            "total_cost": float(perf_df['total_cost'].sum()),
            "peak_hour": perf_df.loc[perf_df['request_count'].idxmax()].to_dict()
        }
    
    # Error analysis
    error_analysis = self.get_error_analysis(1)
    report["errors"] = error_analysis
    
    # Cost analysis
    today = datetime.now().date()
    yesterday = today - timedelta(days=1)
    cost_df = self.get_cost_breakdown(
        yesterday.isoformat(),
        today.isoformat()
    )
    
    if not cost_df.empty:
        report["costs"] = {
            "yesterday_total": float(cost_df['daily_cost'].sum()),
            "by_component": cost_df.groupby('component_type')['daily_cost']
                               .sum().to_dict(),
            "trend": "increasing" if len(cost_df) > 1 and 
                     cost_df['daily_cost'].iloc[0] > cost_df['daily_cost'].iloc[1] 
                     else "stable"
        }
    
    # Generate recommendations
    if report["performance"].get("avg_latency", 0) > 500:
        report["recommendations"].append(
            "High latency detected. Consider optimizing model calls or implementing caching."
        )
    
    if report["errors"].get("total_errors", 0) > 10:
        report["recommendations"].append(
            "High error rate. Review error patterns and implement retry logic."
        )
    
    if report["costs"].get("yesterday_total", 0) > 50:
        report["recommendations"].append(
            "High daily cost. Consider implementing cost controls or optimizing model usage."
        )
    
    return report

---

Production Best Practices

Performance Optimization Strategies

-- 1. Optimized embedding storage
-- Use appropriate data types based on embedding size
CREATE TABLE optimized_embeddings (
    id INT AUTO_INCREMENT PRIMARY KEY,
    document_id INT NOT NULL,
    embedding_type VARCHAR(50),
    -- Store as JSON for flexibility, BLOB for large embeddings
    embedding_vector JSON,  -- For embeddings up to 4096 dimensions
    -- embedding_blob BLOB,  -- For larger embeddings
    embedding_dim INT GENERATED ALWAYS AS (JSON_LENGTH(embedding_vector)) STORED,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_document_embedding (document_id, embedding_type),
    INDEX idx_embedding_dim (embedding_dim),
    INDEX idx_created (created_at)
) ENGINE=InnoDB 
  ROW_FORMAT=DYNAMIC
  KEY_BLOCK_SIZE=8;

-- 2. Generated columns for frequent JSON queries
ALTER TABLE ai_metadata
ADD COLUMN model_family VARCHAR(50) 
GENERATED ALWAYS AS (
    CASE 
        WHEN model_name LIKE 'gpt-%' THEN 'openai'
        WHEN model_name LIKE 'claude-%' THEN 'anthropic'
        WHEN model_name LIKE 'command-%' THEN 'cohere'
        ELSE 'custom'
    END
) STORED,
ADD COLUMN max_context_tokens INT 
GENERATED ALWAYS AS (JSON_EXTRACT(parameters, '$.max_tokens')) STORED,
ADD COLUMN estimated_cost_per_1k DECIMAL(10,4)
GENERATED ALWAYS AS (JSON_EXTRACT(performance_metrics, '$.cost_per_1k_tokens')) STORED;

CREATE INDEX idx_model_family ON ai_metadata(model_family);
CREATE INDEX idx_max_context ON ai_metadata(max_context_tokens);
CREATE INDEX idx_estimated_cost ON ai_metadata(estimated_cost_per_1k);

-- 3. Partitioning for time-series AI data
ALTER TABLE ai_requests
PARTITION BY RANGE (TO_DAYS(created_at)) (
    PARTITION p2026_q1 VALUES LESS THAN (TO_DAYS('2026-04-01')),
    PARTITION p2026_q2 VALUES LESS THAN (TO_DAYS('2026-07-01')),
    PARTITION p2026_q3 VALUES LESS THAN (TO_DAYS('2026-10-01')),
    PARTITION p2026_q4 VALUES LESS THAN (TO_DAYS('2027-01-01')),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

-- 4. Materialized views for frequent aggregations
CREATE TABLE ai_daily_stats (
    stat_date DATE PRIMARY KEY,
    total_requests INT DEFAULT 0,
    total_tokens BIGINT DEFAULT 0,
    total_cost DECIMAL(10,2) DEFAULT 0,
    avg_latency_ms DECIMAL(10,2) DEFAULT 0,
    error_rate DECIMAL(5,4) DEFAULT 0,
    unique_users INT DEFAULT 0,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- Refresh procedure
DELIMITER //

CREATE PROCEDURE refresh_ai_daily_stats()
BEGIN
    REPLACE INTO ai_daily_stats
    SELECT 
        DATE(created_at) as stat_date,
        COUNT(*) as total_requests,
        SUM(JSON_EXTRACT(metrics, '$.tokens_used')) as total_tokens,
        SUM(JSON_EXTRACT(metrics, '$.cost_usd')) as total_cost,
        AVG(JSON_EXTRACT(metrics, '$.latency_ms')) as avg_latency_ms,
        COUNT(CASE WHEN error_code IS NOT NULL THEN 1 END) / COUNT(*) as error_rate,
        COUNT(DISTINCT user_id) as unique_users,
        NOW() as updated_at
    FROM ai_requests
    WHERE created_at >= CURDATE() - INTERVAL 7 DAY
    GROUP BY DATE(created_at);
END//

DELIMITER ;

-- Schedule refresh (run via cron or event scheduler)
CREATE EVENT IF NOT EXISTS refresh_ai_stats_daily
ON SCHEDULE EVERY 1 HOUR
DO
    CALL refresh_ai_daily_stats();

Security and Compliance

-- 1. Role-based access control for AI workloads
CREATE ROLE ai_developer;
CREATE ROLE ai_operator;
CREATE ROLE ai_auditor;

-- Developer permissions (read/write models, test data)
GRANT SELECT, INSERT, UPDATE, DELETE ON ai_metadata TO ai_developer;
GRANT SELECT, INSERT ON model_predictions TO ai_developer;
GRANT EXECUTE ON PROCEDURE train_model TO ai_developer;

-- Operator permissions (production operations)
GRANT SELECT ON ai_metadata TO ai_operator;
GRANT SELECT, INSERT ON model_predictions TO ai_operator;
GRANT EXECUTE ON PROCEDURE make_prediction TO ai_operator;
GRANT SELECT ON ai_daily_stats TO ai_operator;

-- Auditor permissions (read-only monitoring)
GRANT SELECT ON ai_metadata TO ai_auditor;
GRANT SELECT ON model_predictions TO ai_auditor;
GRANT SELECT ON ai_daily_stats TO ai_auditor;
GRANT SELECT ON ai_alerts TO ai_auditor;
GRANT SELECT ON audit_log TO ai_auditor;

-- Create users and assign roles
CREATE USER 'dev_ai'@'%' IDENTIFIED BY 'dev_password';
CREATE USER 'ops_ai'@'%' IDENTIFIED BY 'ops_password';
CREATE USER 'audit_ai'@'%' IDENTIFIED BY 'audit_password';

GRANT ai_developer TO 'dev_ai'@'%';
GRANT ai_operator TO 'ops_ai'@'%';
GRANT ai_auditor TO 'audit_ai'@'%';

-- 2. Data encryption for sensitive AI data
ALTER TABLE ai_metadata 
MODIFY COLUMN parameters JSON 
ENCRYPTED WITH (KEY_ID = 'ai_encryption_key');

ALTER TABLE model_predictions
MODIFY COLUMN input_data JSON
ENCRYPTED WITH (KEY_ID = 'ai_encryption_key'),
MODIFY COLUMN output_data JSON
ENCRYPTED WITH (KEY_ID = 'ai_encryption_key');

-- 3. Comprehensive audit logging
CREATE TABLE ai_audit_log (
    audit_id BIGINT AUTO_INCREMENT PRIMARY KEY,
    event_time TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
    user_name VARCHAR(100),
    event_type VARCHAR(50),
    table_name VARCHAR(100),
    record_id VARCHAR(100),
    old_values JSON,
    new_values JSON,
    ip_address VARCHAR(45),
    user_agent TEXT,
    INDEX idx_audit_time (event_time),
    INDEX idx_audit_user (user_name, event_time),
    INDEX idx_audit_event (event_type, event_time)
) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;

-- Audit trigger for sensitive tables
DELIMITER //

CREATE TRIGGER audit_ai_metadata_changes
AFTER UPDATE ON ai_metadata
FOR EACH ROW
BEGIN
    INSERT INTO ai_audit_log (
        user_name,
        event_type,
        table_name,
        record_id,
        old_values,
        new_values
    ) VALUES (
        USER(),
        'UPDATE',
        'ai_metadata',
        NEW.id,
        JSON_OBJECT(
            'model_name', OLD.model_name,
            'parameters', OLD.parameters,
            'performance_metrics', OLD.performance_metrics
        ),
        JSON_OBJECT(
            'model_name', NEW.model_name,
            'parameters', NEW.parameters,
            'performance_metrics', NEW.performance_metrics
        )
    );
END//

DELIMITER ;

-- 4. Data retention and compliance
CREATE TABLE ai_data_retention_policy (
    table_name VARCHAR(100) PRIMARY KEY,
    retention_days INT NOT NULL,
    archive_strategy ENUM('delete', 'archive', 'anonymize'),
    last_cleanup_date DATE,
    cleanup_status ENUM('pending', 'completed', 'failed'),
    cleanup_details JSON
);

INSERT INTO ai_data_retention_policy VALUES
('ai_requests', 90, 'archive', NULL, 'pending', '{}'),
('model_predictions', 365, 'archive', NULL, 'pending', '{}'),
('ai_errors', 30, 'delete', NULL, 'pending', '{}'),
('prompt_cache', 7, 'delete', NULL, 'pending', '{}');

-- Cleanup procedure
DELIMITER //

CREATE PROCEDURE cleanup_ai_data()
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE v_table_name VARCHAR(100);
    DECLARE v_retention_days INT;
    DECLARE v_archive_strategy VARCHAR(20);
    DECLARE cur CURSOR FOR 
        SELECT table_name, retention_days, archive_strategy
        FROM ai_data_retention_policy
        WHERE cleanup_status != 'completed';
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
    
    OPEN cur;
    
    read_loop: LOOP
        FETCH cur INTO v_table_name, v_retention_days, v_archive_strategy;
        IF done THEN
            LEAVE read_loop;
        END IF;
        
        START TRANSACTION;
        
        CASE v_archive_strategy
            WHEN 'delete' THEN
                SET @sql = CONCAT(
                    'DELETE FROM ', v_table_name,
                    ' WHERE created_at < DATE_SUB(NOW(), INTERVAL ', 
                    v_retention_days, ' DAY)'
                );
                PREPARE stmt FROM @sql;
                EXECUTE stmt;
                DEALLOCATE PREPARE stmt;
                
            WHEN 'archive' THEN
                -- Archive to separate table
                SET @archive_table = CONCAT(v_table_name, '_archive');
                SET @sql = CONCAT(
                    'INSERT INTO ', @archive_table,
                    ' SELECT * FROM ', v_table_name,
                    ' WHERE created_at < DATE_SUB(NOW(), INTERVAL ', 
                    v_retention_days, ' DAY)'
                );
                PREPARE stmt FROM @sql;
                EXECUTE stmt;
                DEALLOCATE PREPARE stmt;
                
                -- Delete from main table
                SET @sql = CONCAT(
                    'DELETE FROM ', v_table_name,
                    ' WHERE created_at < DATE_SUB(NOW(), INTERVAL ', 
                    v_retention_days, ' DAY)'
                );
                PREPARE stmt FROM @sql;
                EXECUTE stmt;
                DEALLOCATE PREPARE stmt;
                
            WHEN 'anonymize' THEN
                -- Anonymize sensitive data
                SET @sql = CONCAT(
                    'UPDATE ', v_table_name,
                    ' SET user_id = NULL, ip_address = NULL',
                    ' WHERE created_at < DATE_SUB(NOW(), INTERVAL ', 
                    v_retention_days, ' DAY)'
                );
                PREPARE stmt FROM @sql;
                EXECUTE stmt;
                DEALLOCATE PREPARE stmt;
        END CASE;
        
        -- Update policy status
        UPDATE ai_data_retention_policy
        SET last_cleanup_date = CURDATE(),
            cleanup_status = 'completed',
            cleanup_details = JSON_OBJECT(
                'rows_affected', ROW_COUNT(),
                'execution_time', NOW()
            )
        WHERE table_name = v_table_name;
        
        COMMIT;
    END LOOP;
    
    CLOSE cur;
END//

DELIMITER ;

Cost Optimization Strategies

-- 1. Cost tracking and optimization
CREATE TABLE ai_cost_optimization (
    optimization_id INT AUTO_INCREMENT PRIMARY KEY,
    strategy_name VARCHAR(100),
    target_component VARCHAR(50),
    estimated_savings DECIMAL(10,2),
    implementation_status ENUM('planned', 'in_progress', 'completed', 'failed'),
    implemented_date DATE,
    actual_savings DECIMAL(10,2),
    details JSON
);

-- Common optimization strategies
INSERT INTO ai_cost_optimization VALUES
(NULL, 'Prompt caching', 'llm', 500.00, 'completed', '2026-01-15', 450.00, 
 '{"cache_hit_rate": 0.35, "avg_tokens_saved": 1200}'),
(NULL, 'Embedding batch processing', 'embedding', 300.00, 'in_progress', NULL, NULL,
 '{"batch_size": 100, "estimated_reduction": 0.4}'),
(NULL, 'Model selection optimization', 'llm', 800.00, 'planned', NULL, NULL,
 '{"target_models": ["gpt-3.5-turbo", "claude-instant"], "accuracy_threshold": 0.85}');

-- 2. Cost-aware query routing
CREATE TABLE model_cost_profile (
    model_name VARCHAR(100) PRIMARY KEY,
    provider VARCHAR(50),
    cost_per_1k_input DECIMAL(10,4),
    cost_per_1k_output DECIMAL(10,4),
    avg_latency_ms INT,
    accuracy_score DECIMAL(5,4),
    is_available BOOLEAN DEFAULT TRUE,
    rate_limit_per_minute INT,
    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- Cost-aware routing function
DELIMITER //

CREATE FUNCTION get_cost_optimal_model(
    required_accuracy DECIMAL(5,4),
    max_latency_ms INT,
    estimated_tokens INT
) RETURNS VARCHAR(100)
DETERMINISTIC
READS SQL DATA
BEGIN
    DECLARE optimal_model VARCHAR(100);
    
    SELECT model_name INTO optimal_model
    FROM model_cost_profile
    WHERE accuracy_score >= required_accuracy
      AND avg_latency_ms <= max_latency_ms
      AND is_available = TRUE
    ORDER BY (cost_per_1k_input + cost_per_1k_output) * (estimated_tokens / 1000)
    LIMIT 1;
    
    RETURN optimal_model;
END//

DELIMITER ;

-- 3. Usage quotas and limits
CREATE TABLE ai_usage_quotas (
    user_id INT PRIMARY KEY,
    monthly_token_limit BIGINT DEFAULT 1000000,
    monthly_cost_limit DECIMAL(10,2) DEFAULT 100.00,
    concurrent_request_limit INT DEFAULT 10,
    tokens_used_this_month BIGINT DEFAULT 0,
    cost_used_this_month DECIMAL(10,2) DEFAULT 0,
    quota_reset_date DATE,
    is_active BOOLEAN DEFAULT TRUE,
    INDEX idx_quota_reset (quota_reset_date)
);

-- Quota enforcement trigger
DELIMITER //

CREATE TRIGGER enforce_ai_usage_quotas
BEFORE INSERT ON ai_requests
FOR EACH ROW
BEGIN
    DECLARE v_tokens_used BIGINT;
    DECLARE v_cost_used DECIMAL(10,2);
    DECLARE v_token_limit BIGINT;
    DECLARE v_cost_limit DECIMAL(10,2);
    DECLARE v_is_active BOOLEAN;
    
    -- Get user quotas
    SELECT 
        tokens_used_this_month,
        cost_used_this_month,
        monthly_token_limit,
        monthly_cost_limit,
        is_active
    INTO 
        v_tokens_used,
        v_cost_used,
        v_token_limit,
        v_cost_limit,
        v_is_active
    FROM ai_usage_quotas
    WHERE user_id = NEW.user_id;
    
    -- Check if user is active
    IF NOT v_is_active THEN
        SIGNAL SQLSTATE '45000'
        SET MESSAGE_TEXT = 'User AI access is disabled';
    END IF;
    
    -- Check token quota
    IF v_tokens_used + NEW.estimated_tokens > v_token_limit THEN
        SIGNAL SQLSTATE '45000'
        SET MESSAGE_TEXT = 'Monthly token quota exceeded';
    END IF;
    
    -- Check cost quota
    IF v_cost_used + NEW.estimated_cost > v_cost_limit THEN
        SIGNAL SQLSTATE '45000'
        SET MESSAGE_TEXT = 'Monthly cost quota exceeded';
    END IF;
    
    -- Update usage (will be committed with transaction)
    UPDATE ai_usage_quotas
    SET 
        tokens_used_this_month = tokens_used_this_month + NEW.estimated_tokens,
        cost_used_this_month = cost_used_this_month + NEW.estimated_cost
    WHERE user_id = NEW.user_id;
END//

DELIMITER ;

-- Monthly quota reset procedure
DELIMITER //

CREATE PROCEDURE reset_monthly_quotas()
BEGIN
    UPDATE ai_usage_quotas
    SET 
        tokens_used_this_month = 0,
        cost_used_this_month = 0,
        quota_reset_date = LAST_DAY(CURDATE())
    WHERE quota_reset_date < CURDATE()
       OR quota_reset_date IS NULL;
END//

DELIMITER ;

---

## Resources and Further Learning

### Official Documentation
- [MySQL JSON Functions](https://dev.mysql.com/doc/refman/8.0/en/json-functions.html) - Comprehensive guide to JSON operations
- [MySQL HeatWave ML](https://dev.mysql.com/doc/heatwave/en/heatwave-machine-learning.html) - In-database machine learning
- [MySQL Vector Search](https://dev.mysql.com/doc/heatwave/en/heatwave-vector-search.html) - Vector similarity search capabilities
- [MySQL Security Guide](https://dev.mysql.com/doc/refman/8.0/en/security.html) - Security best practices

### AI Integration Libraries
- [LangChain MySQL](https://python.langchain.com/docs/integrations/providers/mysql) - Official MySQL integration
- [SQLAlchemy](https://www.sqlalchemy.org/) - Python SQL toolkit and ORM
- [PyMySQL](https://pymysql.readthedocs.io/) - Pure Python MySQL client
- [MySQL Connector/Python](https://dev.mysql.com/doc/connector-python/en/) - Official MySQL Python connector

### Monitoring and Observability
- [MySQL Performance Schema](https://dev.mysql.com/doc/refman/8.0/en/performance-schema.html) - Built-in performance monitoring
- [MySQL Enterprise Monitor](https://www.mysql.com/products/enterprise/monitor.html) - Advanced monitoring (commercial)
- [Prometheus MySQL Exporter](https://github.com/prometheus/mysqld_exporter) - Metrics collection for Prometheus
- [Grafana MySQL Dashboards](https://grafana.com/grafana/dashboards/?search=mysql) - Visualization templates

### Cost Management Tools
- [AWS Cost Explorer](https://aws.amazon.com/aws-cost-management/aws-cost-explorer/) - Cloud cost analysis
- [Google Cloud Billing](https://cloud.google.com/billing/docs) - Cost management for GCP
- [Azure Cost Management](https://azure.microsoft.com/en-us/products/cost-management) - Azure cost optimization
- [OpenCost](https://www.opencost.io/) - Open-source cost monitoring

### Community Resources
- [MySQL Forums](https://forums.mysql.com/) - Official community forums
- [Stack Overflow MySQL Tag](https://stackoverflow.com/questions/tagged/mysql) - Q&A platform
- [r/mysql](https://www.reddit.com/r/mysql/) - Reddit community
- [MySQL Planet](https://planet.mysql.com/) - Aggregated MySQL blogs

### Books and Courses
- **"High Performance MySQL"** by Baron Schwartz et al. - Performance optimization
- **"MySQL Cookbook"** by Paul DuBois - Practical solutions
- **"Designing Data-Intensive Applications"** by Martin Kleppmann - Database design principles
- [MySQL Tutorial on W3Schools](https://www.w3schools.com/mysql/) - Beginner-friendly tutorials
- [Coursera: Database Management Essentials](https://www.coursera.org/learn/database-management) - University-level course

### Open Source Projects
- [VectorDB](https://github.com/milvus-io/milvus) - Open-source vector database (comparison)
- [PostgreSQL with pgvector](https://github.com/pgvector/pgvector) - PostgreSQL vector extension
- [Chroma](https://github.com/chroma-core/chroma) - AI-native embedding database
- [Weaviate](https://github.com/weaviate/weaviate) - Vector search engine

---

## Conclusion

MySQL has evolved into a comprehensive platform for AI applications, offering capabilities that rival specialized vector databases while maintaining enterprise-grade reliability. The key strengths for AI workloads include:

### Core Advantages
1. **Native JSON Support** - Flexible schema design for AI metadata and embeddings
2. **Vector Search Capabilities** - Built-in similarity search with HeatWave
3. **In-Database ML** - Direct model training and inference with HeatWave ML
4. **Enterprise Reliability** - ACID compliance, replication, and backup
5. **Cost Efficiency** - Lower total cost compared to specialized vector databases

### Production Readiness
MySQL is production-ready for AI workloads when:
- Embedding dimensions are moderate (up to 4096 dimensions)
- Query patterns are well-defined and indexed
- Cost control and compliance are critical requirements
- Existing MySQL infrastructure can be leveraged

### When to Consider Alternatives
Consider specialized vector databases when:
- Working with ultra-high-dimensional embeddings (10K+ dimensions)
- Requiring advanced ANN algorithms with high recall
- Needing specialized vector indexing not available in MySQL
- Operating at extreme scale with billions of vectors

### Future Outlook
MySQL continues to enhance its AI capabilities with:
- Improved vector search performance
- More ML algorithms in HeatWave
- Better integration with AI frameworks
- Enhanced monitoring for AI workloads

### Implementation Checklist
Before deploying MySQL for AI:
- [ ] Design efficient embedding storage schema
- [ ] Implement proper indexing strategies
- [ ] Set up comprehensive monitoring
- [ ] Configure security and access controls
- [ ] Plan for cost management and optimization
- [ ] Establish data retention policies
- [ ] Test at production scale with realistic workloads

MySQL provides a compelling option for organizations seeking to integrate AI capabilities into existing database infrastructure. By leveraging MySQL's strengths in reliability, security, and cost-efficiency, teams can build robust AI applications without the complexity of managing multiple specialized databases.

For organizations already invested in MySQL, the path to AI integration is straightforward and offers significant advantages in terms of operational simplicity, cost control, and enterprise readiness.

Comments

Share this article

Scan to read on mobile