Introduction
MCP, MCP defines a universal interface.
What MCP enabes: base, calls your APIs
- Build a tool once, use it with any MCP-compatible AI ions in the world
How MCP Works
Your App (MCP Client)
ON-RPC over stdio/HTTP)
MCP Server (yours)
โ Your actual systems
Files, DBs, APIs, etc.
sult.
Python
pip install mcp
# server.py โ a simple MCP server with file and database tools
import asyncio
import sqlite3
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types
# Create the MCP server
app = Server("my-tools")
# Tool 1: Read a file
@app.list_tools()
async def list_tools() -> list[types.Tool]:
return [
types.Tool(
name="read_file",
description="Read the contents of a file",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to read"
}
},
"required": ["path"]
}
),
types.Tool(
name="query_database",
local database",
inputSchema={
"type": "object",
"properties": {
"query": {
pe": "string",
"description": "SQL SELECT query to execute"
}
},
"required": ["query"]
}
),
types.Tool(
name="list_directory",
on="List files in a directory",
inputSchema={
bject",
"properties": {
"path": {
"type": "string",
"description": "Directory path to list"
}
},
"required": ["path"]
}
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
== "read_file":
path = Path(arguments["path"])
if not path.exists():
return [types.Te")]
if not path.is_file():
return [types.TextContent(type="text", text=f"Error: Not a file: {path}")]
coding="utf-8")
return [types.TextContent(type="text", text=content)]
elif name == "query_database":
query = arguments["query"]
# Safety: only allow SELECT queries
"SELECT"):
return [types.TextContent(type="text", text="Error: Only SELECT queries are allowed")]
conn = sqlite3.connect("data.db")
conn.row_factory = sqlite3.Row
cursor = conn.execute(query)
w) for row in cursor.fetchall()]
conn.close()
import json
tContent(type="text", text=json.dumps(rows, indent=2))]
list_directory":
path = Path(arguments["path"])
ath.exists():
return [types.TextContent(type="text", text=f"Error: Directory not found: {path}")]
files = []
in sorted(path.iterdir()):
item_type = "dir" if item.is_dir() else "file"
size = item.stat().st_size if item.is_file() else 0
files.append(f"{item_type:4} {size:10} {item.name}")
return [types.TextContent(type="text", text="\n".join(files))]
else:
ame}")]
# Run the server
async def main():
stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
())
Building an MCP Server in TypeScript
ntextprotocol/sdk
// server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
sdk/types.js";
import { readFileSync, readdirSync, statSync } from "fs";
import { join } from "path";
const server = new Server(
{ name: "my-tools", ve1.0.0" },
{ capabilities: { tools: {} } }
);
// Define available tools
sync () => ({
tools: [
{
name: "read_file",
description: "Read the contents of a file",
inputSchema: {
type: "object",
rties: {
path: { type: "string", description: "File path to read" }
},
}
},
{
"fetch_url",
description: "Fetch content from a URL",
inputSchema: {
type: "object",
properties: {
url: { type: "string", description: "URL to fetch" }
contextprotocol/servers)
.com/modelcontextprotocol/typescript-sdk)
- [MCP Server Registry](https://github.com/modelextprotocol/python-sdk)
- [MCP TypeScript SDK](https://githubdocs/)
- [MCP GitHub](https://github.com/modelcontextprotocol)
- [MCP Python SDK](https://github.com/modelcontocumentation](https://modelcontextprotocol.io/ad_text())]
Resources
-
[MCP Official D return [types.TextContent(type=“text”, text=path.rerror: Access denied โ sensitive file")] (type=“text”, text=“E return [types.TextContent if any(p in path.name.lower() for p in blocked_patterns): itive files blocked_patterns = [".env”, “id_rsa”, “credentials”, “secrets”] text=“Error: Access denied โ path outside allowed directory”)]
# Prevent reading sensartswith(str(allowed_base)): return [types.TextContent(type="text", revent directory traversal allowed_base = Path("/data").resolve() if not str(path).st):if name == “read_file”: path = Path(arguments[“path”]).resolve()
# P
# Always validate and sanitize tool inputs
@app.call_tool()
async def call_tool(name: str, arguments: dict```
## Security Considerations
"-y", "@modelcontextprotocol/server-postgres", "postgresql://localhost/mydb"]
}
}
}
and": "npx",
"args": [nv": { "GITHUB_PERSONAL_ACCESS_TOKEN": "ghp_..." }
},
"postgres": {
"comm", "@modelcontextprotocol/server-github"],
"e"github": {
"command": "npx",
"args": ["-yl/server-filesystem", "/Users/me"]
},
system": {
"command": "npx",
"args": ["-y", "@modelcontextprotoco
"mcpServers": {
"fileetch URLs
// Claude Desktop config with multiple servers
{lcontextprotocol/server-sqlite /path/to/db.sqlite
npx mcp-server-fetch # fmunity servers
npx @moder-brave-search # web search
npx @modelcontextprotocol/server-slack
# Comcalhost/mydb
npx @modelcontextprotocol/serveol/server-github
npx @modelcontextprotocol/server-postgres postgresql://lo /path/to/allow
npx @modelcontextprotoctextprotocol/server-filesystemal servers from Anthropic
npx @modelconcratch โ use existing servers:
```bash
# Offici
## Pre-Built MCP Servers
Don't build from sme__ == "__main__":
asyncio.run(main())
): await app.run(read, write, app.create_initialization_options())
if __na async with stdio_server() as (read, write"Created issue #{issue[’number’]}: {issue[‘html_url’]}")]
async def main(): ssue = resp.json() return [types.TextContent(type=“text”, text=fget(“body”, “”)} ) iy": arguments. json={“title”: arguments[“title”], “bodo’]}/issues”, headers=headers,epos/{arguments[‘owner’]}/{arguments[‘rep f"https://api.github.com/rue": resp = await client.post( “, text=content)]
elif name == "create_iss return [types.TextContent(type="text )
data = resp.json()
content = base64.b64decode(data["content"]).decode("utf-8")
uments[‘path’]}”, headers=headers ents[‘owner’]}/{arguments[‘repo’]}/contents/{arg f"https://api.github.com/repos/{argume64 resp = await client.get( elif name == “get_file”: import bas.TextContent(type=“text”, text=summary)]
'stargazers_count']} โญ)"
for r in repos
])
return [types']}: {r['description'] or 'No description'} ({r[y = "\n".join([
f"- {r['name, "sort": "updated"}
)
repos = resp.json()
summaraders=headers,
params={"per_page": 20ts['owner']}/repos",
he f"https://api.github.com/users/{argumenpos":
resp = await client.get(
.AsyncClient() as client:
if name == "list_reUB_TOKEN}",
"Accept": "application/vnd.github.v3+json"
}
async with httpxs = {
"Authorization": f"token {GITH_tool(name: str, arguments: dict):
header ),
]
@app.call_tool() async def call, “title”] } quired": [“owner”, “repo”}, “reng”}, “body”: {“type”: “string”} “title”: {“type”: “striowner”: {“type”: “string”}, “repo”: {“type”: “string”}, “properties”: { “ame=“create_issue”, description=“Create a GitHub issue”, inputSchema={ “type”: “object”,
),
types.Tool(
nn repo"}
},
"required": ["owner", "repo", "path"]
}ype": "string", "description": "File path i "repo": {"type": "string"},
"path": {"t "properties": {
"owner": {"type": "string"},
Hub repository”, inputSchema={ “type”: “object”, “owner”] } ), types.Tool( name=“get_file”, description=“Get file contents from a Git }, “required”: [ties”: { “owner”: {“type”: “string”, “description”: “GitHub username or org”} tion", inputSchema={ “type”: “object”, “proper name=“list_repos”, description=“List GitHub repositories for a user or organizanc def list_tools(): return [ types.Tool(ron[“GITHUB_TOKEN”]
@app.list_tools() asyp = Server(“github-tools”) GITHUB_TOKEN = os.enviort httpx from mcp.server import Server from mcp.server.stdio import stdio_server from mcp import types import os
apitHub Integration
# github_mcp_server.py โ let AI interact with GitHub
import asyncio
imp
## Real-World MCP Server: G:", result.content[0].text)
asyncio.run(use_mcp_tools())
``` arguments={"query": "SELECT * FROM users LIMIT 10"}
)
print("Query resultstool(
"query_database",
ntent[0].text)
# Query database
result = await session.call_ )
print("File contents:", result.co arguments={"path": "/data/report.csv"}
.call_tool(
"read_file",
ol
result = await sessionin tools.tools])
# Call a tols()
print("Available tools:", [t.name for t tools = await session.list_too)
# List available tools
# Initialize
await session.initialize(ync with ClientSession(read, write) as session:,
)
async with stdio_client(server_params) as (read, write):
ason",
args=["server.py"]_tools():
# Connect to your MCP server
server_params = StdioServerParameters(
command="pythmcp.client.stdio import stdio_client
async def use_mcp import ClientSession, StdioServerParameters
from tools in your own application
import asyncio
from mcp
## Using MCP Programmatically
```python
# Use MCPmer icon (๐จ) indicating tools are available.rt Claude Desktop. You'll see a hamtocol/server-filesystem", "/Users/me/Documents"]
}
}
}
Resta “command”: “npx”, “args”: ["-y”, “@modelcontextproto/server.js”] }, “filesystem”: { “: “node”, “args”: ["/path/ “commandy”] }, “my-ts-tools”: { n”, “args”: ["/path/to/server.p “my-tools”: { “command”: “pythoonfig.json (Windows) { “mcpServers”: { nfig.json (macOS) // %APPDATA%\Claude\claude_desktop_c ~/Library/Application Support/Claude/claude_desktop_cor to Claude Desktop’s config:
//onnect(transport);
Connecting to Claude Desktop
Add your serve const transport = new StdioServerTransport(); await server.cmit size }
throw new Error(`Unknown tool: ${name}`);
});
// Start server = await response.text(); return { content: [{ type: “text”, text: text.slice(0, 5000) }] }; // li (name === “fetch_url”) { const response = await fetch(args.url as string); const text }] }; } }
if{ content: [{ type: "text", text: `Error: ${err}`ent }] };
} catch (err) {
return ent: [{ type: "text", text: contSync(args.path as string, "utf-8");
return { cont try {
const content = readFilerequest.params;
if (name === "read_file") {
ments: args } = est) => { const { name, arguequestHandler(CallToolRequestSchema, async (requ} ] }));
// Handle tool calls server.setR required: [“url”] } },
Comments