Introduction
MCP, MCP defines a universal interface.
What MCP enabes: base, calls your APIs
- Build a tool once, use it with any MCP-compatible AI ions in the world
How MCP Works
Your App (MCP Client)
ON-RPC over stdio/HTTP)
MCP Server (yours)
↕ Your actual systems
Files, DBs, APIs, etc.
sult.
Python
pip install mcp
# server.py — a simple MCP server with file and database tools
import asyncio
import sqlite3
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types
# Create the MCP server
app = Server("my-tools")
# Tool 1: Read a file
@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="read_file",
description="Read the contents of a file",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to read"
}
},
"required": ["path"]
}
),
types.Tool(
name="query_database",
local database",
inputSchema={
"type": "object",
"properties": {
"query": {
pe": "string",
"description": "SQL SELECT query to execute"
}
},
"required": ["query"]
}
),
types.Tool(
name="list_directory",
on="List files in a directory",
inputSchema={
bject",
"properties": {
"path": {
"type": "string",
"description": "Directory path to list"
}
},
"required": ["path"]
}
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
== "read_file":
path = Path(arguments["path"])
if not path.exists():
return [types.Te")]
if not path.is_file():
return [types.TextContent(type="text", text=f"Error: Not a file: {path}")]
coding="utf-8")
return [types.TextContent(type="text", text=content)]
elif name == "query_database":
query = arguments["query"]
# Safety: only allow SELECT queries
"SELECT"):
return [types.TextContent(type="text", text="Error: Only SELECT queries are allowed")]
conn = sqlite3.connect("data.db")
conn.row_factory = sqlite3.Row
cursor = conn.execute(query)
w) for row in cursor.fetchall()]
conn.close()
import json
tContent(type="text", text=json.dumps(rows, indent=2))]
list_directory":
path = Path(arguments["path"])
ath.exists():
return [types.TextContent(type="text", text=f"Error: Directory not found: {path}")]
files = []
in sorted(path.iterdir()):
item_type = "dir" if item.is_dir() else "file"
size = item.stat().st_size if item.is_file() else 0
files.append(f"{item_type:4} {size:10} {item.name}")
return [types.TextContent(type="text", text="\n".join(files))]
else:
ame}")]
# Run the server
async def main():
stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
())
Building an MCP Server in TypeScript
ntextprotocol/sdk
// server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
sdk/types.js";
import { readFileSync, readdirSync, statSync } from "fs";
import { join } from "path";
const server = new Server(
{ name: "my-tools", ve1.0.0" },
{ capabilities: { tools: {} } }
);
// Define available tools
sync () => ({
tools: [
{
name: "read_file",
description: "Read the contents of a file",
inputSchema: {
type: "object",
rties: {
path: { type: "string", description: "File path to read" }
},
}
},
{
"fetch_url",
description: "Fetch content from a URL",
inputSchema: {
type: "object",
properties: {
url: { type: "string", description: "URL to fetch" }
contextprotocol/servers)
.com/modelcontextprotocol/typescript-sdk)
- [MCP Official Documentation](https://modelcontextprotocol.io/docs)
- [MCP GitHub](https://github.com/modelcontextprotocol)
- [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk)
- [MCP TypeScript SDK](https://github.com/modelcontextprotocol/typescript-sdk)
- [MCP Server Registry](https://github.com/modelcontextprotocol/servers)
Resources
-
[MCP Official D return [types.TextContent(type=“text”, text=path.rerror: Access denied — sensitive file")] (type=“text”, text=“E return [types.TextContent if any(p in path.name.lower() for p in blocked_patterns): itive files blocked_patterns = [".env”, “id_rsa”, “credentials”, “secrets”] text=“Error: Access denied — path outside allowed directory”)]
## Prevent reading sensartswith(str(allowed_base)): return [types.TextContent(type="text", revent directory traversal allowed_base = Path("/data").resolve() if not str(path).st):if name == “read_file”: path = Path(arguments[“path”]).resolve()
## P
# Always validate and sanitize tool inputs
@app.call_tool()
async def call_tool(name: str, arguments: dict```json
## Security Considerations
"-y", "@modelcontextprotocol/server-postgres", "postgresql://localhost/mydb"]
}
}
}
and": "npx",
"args": [nv": { "GITHUB_PERSONAL_ACCESS_TOKEN": "ghp_..." }
},
"postgres": {
"comm", "@modelcontextprotocol/server-github"],
"e"github": {
"command": "npx",
"args": ["-yl/server-filesystem", "/Users/me"]
},
system": {
"command": "npx",
"args": ["-y", "@modelcontextprotoco
"mcpServers": {
"fileetch URLs
// Claude Desktop config with multiple servers
{lcontextprotocol/server-sqlite /path/to/db.sqlite
npx mcp-server-fetch # fmunity servers
npx @moder-brave-search # web search
npx @modelcontextprotocol/server-slack
# Comcalhost/mydb
npx @modelcontextprotocol/serveol/server-github
npx @modelcontextprotocol/server-postgres postgresql://lo /path/to/allow
npx @modelcontextprotoctextprotocol/server-filesystemal servers from Anthropic
npx @modelconcratch — use existing servers:
Offici
Pre-Built MCP Servers
Don’t build from sme__ == “main”: asyncio.run(main())
):
await app.run(read, write, app.create_initialization_options())
if __na async with stdio_server() as (read, write"Created issue #{issue['number']}: {issue['html_url']}")]
async def main():
ssue = resp.json()
return [types.TextContent(type="text", text=fget("body", "")}
)
iy": arguments.
json={"title": arguments["title"], "bodo']}/issues",
headers=headers,epos/{arguments['owner']}/{arguments['rep f"https://api.github.com/rue":
resp = await client.post(
", text=content)]
elif name == "create_iss return [types.TextContent(type="text )
data = resp.json()
content = base64.b64decode(data["content"]).decode("utf-8")
uments['path']}",
headers=headers
ents['owner']}/{arguments['repo']}/contents/{arg f"https://api.github.com/repos/{argume64
resp = await client.get(
elif name == "get_file":
import bas.TextContent(type="text", text=summary)]
'stargazers_count']} ⭐)"
for r in repos
])
return [types']}: {r['description'] or 'No description'} ({r[y = "\n".join([
f"- {r['name, "sort": "updated"}
)
repos = resp.json()
summaraders=headers,
params={"per_page": 20ts['owner']}/repos",
he f"https://api.github.com/users/{argumenpos":
resp = await client.get(
.AsyncClient() as client:
if name == "list_reUB_TOKEN}",
"Accept": "application/vnd.github.v3+json"
}
async with httpxs = {
"Authorization": f"token {GITH_tool(name: str, arguments: dict):
header ),
]
@app.call_tool()
async def call, "title"]
}
quired": ["owner", "repo"},
"reng"},
"body": {"type": "string"}
"title": {"type": "striowner": {"type": "string"},
"repo": {"type": "string"},
"properties": {
"ame="create_issue",
description="Create a GitHub issue",
inputSchema={
"type": "object",
),
types.Tool(
nn repo"}
},
"required": ["owner", "repo", "path"]
}ype": "string", "description": "File path i "repo": {"type": "string"},
"path": {"t "properties": {
"owner": {"type": "string"},
Hub repository",
inputSchema={
"type": "object",
"owner"]
}
),
types.Tool(
name="get_file",
description="Get file contents from a Git },
"required": [ties": {
"owner": {"type": "string", "description": "GitHub username or org"}
tion",
inputSchema={
"type": "object",
"proper
name="list_repos",
description="List GitHub repositories for a user or organizanc def list_tools():
return [
types.Tool(ron["GITHUB_TOKEN"]
@app.list_tools()
asyp = Server("github-tools")
GITHUB_TOKEN = os.enviort httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types
import os
apitHub Integration
github_mcp_server.py — let AI interact with GitHub
import asyncio imp
Real-World MCP Server: G:", result.content[0].text)
asyncio.run(use_mcp_tools())
arguments={“query”: “SELECT * FROM users LIMIT 10”} ) print(“Query resultstool( “query_database”, ntent[0].text)
# Query database
result = await session.call_ )
print("File contents:", result.co arguments={"path": "/data/report.csv"}
.call_tool(
"read_file",
ol
result = await sessionin tools.tools])
# Call a tols()
print("Available tools:", [t.name for t tools = await session.list_too)
# List available tools
# Initialize
await session.initialize(ync with ClientSession(read, write) as session:,
)
async with stdio_client(server_params) as (read, write):
ason",
args=["server.py"]_tools():
# Connect to your MCP server
server_params = StdioServerParameters(
command="pythmcp.client.stdio import stdio_client
async def use_mcp import ClientSession, StdioServerParameters from tools in your own application import asyncio from mcp
Using MCP Programmatically
# Use MCPmer icon (🔨) indicating tools are available.rt Claude Desktop. You'll see a hamtocol/server-filesystem", "/Users/me/Documents"]
}
}
}
Resta “command”: “npx”, “args”: ["-y”, “@modelcontextproto/server.js”] }, “filesystem”: { “: “node”, “args”: ["/path/ “commandy”] }, “my-ts-tools”: { n”, “args”: ["/path/to/server.p “my-tools”: { “command”: “pythoonfig.json (Windows) { “mcpServers”: { nfig.json (macOS) // %APPDATA%\Claude\claude_desktop_c ~/Library/Application Support/Claude/claude_desktop_cor to Claude Desktop’s config:
//onnect(transport);
Connecting to Claude Desktop
Add your serve const transport = new StdioServerTransport(); await server.cmit size }
throw new Error(`Unknown tool: ${name}`);
});
// Start server = await response.text(); return { content: [{ type: “text”, text: text.slice(0, 5000) }] }; // li (name === “fetch_url”) { const response = await fetch(args.url as string); const text }] }; } }
if{ content: [{ type: "text", text: `Error: ${err}`ent }] };
} catch (err) {
return ent: [{ type: "text", text: contSync(args.path as string, "utf-8");
return { cont try {
const content = readFilerequest.params;
if (name === "read_file") {
ments: args } = est) => { const { name, arguequestHandler(CallToolRequestSchema, async (requ} ] }));
// Handle tool calls server.setR required: [“url”] } },
Comments