MCP + Observability (Python)
Build traceable and auditable MCP servers with structured logging, distributed tracing, and metrics in Python.
Why Observability Matters for MCP
When your AI agent makes decisions based on memory, you need to understand why. What memories were retrieved? How long did the query take? Did any errors occur silently? Without proper observability, debugging MCP-powered agents becomes guesswork.
Observability gives you traceability—the ability to follow a request from start to finish—and auditability—a complete record of what happened and when. In this guide, we'll implement the three pillars of observability (logging, tracing, and metrics) for MCP servers in Python.
The Three Pillars of Observability
Before diving into code, let's understand what each pillar provides:
- Logging: Discrete events with context (what happened)
- Tracing: Request flow across operations (how it happened)
- Metrics: Aggregated measurements over time (how well it's working)
Structured Logging for MCP Operations
Plain text logs are hard to query. Structured logging outputs JSON that can be parsed, filtered, and analyzed. Here's a logging setup tailored for MCP operations:
import logging
import json
import sys
from datetime import datetime
from typing import Any
class MCPJSONFormatter(logging.Formatter):
"""Structured JSON formatter for MCP operations."""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"message": record.getMessage(),
"logger": record.name,
"mcp": {}
}
# Include MCP-specific context if present
if hasattr(record, "tool_name"):
log_entry["mcp"]["tool"] = record.tool_name
if hasattr(record, "memory_ids"):
log_entry["mcp"]["memory_ids"] = record.memory_ids
if hasattr(record, "query"):
log_entry["mcp"]["query"] = record.query
if hasattr(record, "duration_ms"):
log_entry["mcp"]["duration_ms"] = record.duration_ms
if hasattr(record, "trace_id"):
log_entry["trace_id"] = record.trace_id
return json.dumps(log_entry)
def setup_mcp_logger(name: str = "mcp") -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(MCPJSONFormatter())
logger.addHandler(handler)
return logger Using the Logger in MCP Tools
Wrap your MCP tool handlers with contextual logging:
import time
logger = setup_mcp_logger()
async def handle_memory_search(query: str, limit: int = 10) -> list[dict]:
"""MCP tool handler with structured logging."""
start_time = time.perf_counter()
logger.info(
"Memory search started",
extra={"tool_name": "memory_search", "query": query}
)
try:
results = await memory_store.search(query, limit=limit)
duration_ms = (time.perf_counter() - start_time) * 1000
logger.info(
f"Memory search completed: {len(results)} results",
extra={
"tool_name": "memory_search",
"query": query,
"memory_ids": [r["id"] for r in results],
"duration_ms": round(duration_ms, 2)
}
)
return results
except Exception as e:
logger.error(
f"Memory search failed: {e}",
extra={"tool_name": "memory_search", "query": query},
exc_info=True
)
raise Distributed Tracing with OpenTelemetry
When an LLM calls multiple MCP tools in sequence, you need to trace the entire request chain. OpenTelemetry provides vendor-neutral tracing that works with Jaeger, Zipkin, and cloud providers.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from functools import wraps
# Initialize tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("mcp-server")
# Export to your observability backend (Jaeger, Honeycomb, etc.)
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(otlp_exporter)
)
def traced_tool(tool_name: str):
"""Decorator to add tracing to MCP tool handlers."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
span.set_attribute("mcp.tool.name", tool_name)
span.set_attribute("mcp.tool.args", str(kwargs))
try:
result = await func(*args, **kwargs)
span.set_attribute("mcp.tool.success", True)
return result
except Exception as e:
span.set_attribute("mcp.tool.success", False)
span.set_attribute("mcp.tool.error", str(e))
span.record_exception(e)
raise
return wrapper
return decorator
# Usage
@traced_tool("memory_store")
async def store_memory(content: str, tags: list[str]) -> str:
"""Store a memory with full tracing."""
with tracer.start_as_current_span("validate_content"):
validated = validate_memory_content(content)
with tracer.start_as_current_span("embed_content"):
embedding = await generate_embedding(validated)
with tracer.start_as_current_span("persist_memory"):
memory_id = await db.insert(validated, embedding, tags)
return memory_id Metrics for MCP Health Monitoring
Metrics aggregate data over time, enabling dashboards and alerting. Track these key metrics for MCP servers:
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Tool invocation metrics
tool_calls_total = Counter(
"mcp_tool_calls_total",
"Total MCP tool invocations",
["tool_name", "status"]
)
tool_duration_seconds = Histogram(
"mcp_tool_duration_seconds",
"Tool execution duration in seconds",
["tool_name"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
# Memory-specific metrics
memories_retrieved = Histogram(
"mcp_memories_retrieved",
"Number of memories retrieved per search",
buckets=[0, 1, 5, 10, 20, 50]
)
memory_store_size = Gauge(
"mcp_memory_store_size",
"Current number of memories in store"
)
# Start metrics endpoint
start_http_server(8000) # Expose metrics at :8000/metrics Instrumenting Tools with Metrics
import time
async def handle_memory_search(query: str, limit: int = 10) -> list[dict]:
"""MCP tool with full observability instrumentation."""
start_time = time.perf_counter()
try:
results = await memory_store.search(query, limit=limit)
# Record metrics
duration = time.perf_counter() - start_time
tool_calls_total.labels(tool_name="memory_search", status="success").inc()
tool_duration_seconds.labels(tool_name="memory_search").observe(duration)
memories_retrieved.observe(len(results))
return results
except Exception as e:
tool_calls_total.labels(tool_name="memory_search", status="error").inc()
raise Building an Audit Trail
For compliance and debugging, maintain a persistent audit log of all MCP operations:
from dataclasses import dataclass, asdict
from datetime import datetime
import json
@dataclass
class AuditEvent:
timestamp: str
trace_id: str
tool_name: str
operation: str
agent_id: str
input_hash: str # Hash of input for privacy
memory_ids: list[str]
duration_ms: float
success: bool
error_message: str | None = None
class AuditLogger:
def __init__(self, output_path: str = "audit.jsonl"):
self.output_path = output_path
def log(self, event: AuditEvent):
with open(self.output_path, "a") as f:
f.write(json.dumps(asdict(event)) + "\n")
def query(self, trace_id: str) -> list[AuditEvent]:
"""Retrieve all events for a trace."""
events = []
with open(self.output_path) as f:
for line in f:
event = json.loads(line)
if event["trace_id"] == trace_id:
events.append(AuditEvent(**event))
return events Putting It All Together
Here's a complete MCP tool handler with all three observability pillars:
@traced_tool("memory_search")
async def handle_memory_search(query: str, limit: int = 10) -> list[dict]:
"""Fully observable MCP memory search."""
trace_id = trace.get_current_span().get_span_context().trace_id
start_time = time.perf_counter()
logger.info("Search started", extra={
"tool_name": "memory_search",
"query": query,
"trace_id": format(trace_id, "032x")
})
try:
results = await memory_store.search(query, limit=limit)
duration_ms = (time.perf_counter() - start_time) * 1000
# Metrics
tool_calls_total.labels(tool_name="memory_search", status="success").inc()
tool_duration_seconds.labels(tool_name="memory_search").observe(duration_ms / 1000)
# Audit
audit_logger.log(AuditEvent(
timestamp=datetime.utcnow().isoformat(),
trace_id=format(trace_id, "032x"),
tool_name="memory_search",
operation="search",
agent_id=get_current_agent_id(),
input_hash=hash_input(query),
memory_ids=[r["id"] for r in results],
duration_ms=duration_ms,
success=True
))
return results
except Exception as e:
tool_calls_total.labels(tool_name="memory_search", status="error").inc()
logger.error(f"Search failed: {e}", exc_info=True)
raise Start Observing Your MCP Server
Observability isn't optional for production AI systems. When your agent behaves unexpectedly, you need to trace exactly which memories were retrieved and why. When latency spikes, you need metrics to identify the bottleneck.
CodeMem includes built-in observability with structured logging, OpenTelemetry tracing, and Prometheus metrics—so you can focus on building agents, not instrumentation.