python observability logging monitoring

MCP + Observability (Python)

Build traceable and auditable MCP servers with structured logging, distributed tracing, and metrics in Python.

CodeMem Team

Why Observability Matters for MCP

When your AI agent makes decisions based on memory, you need to understand why. What memories were retrieved? How long did the query take? Did any errors occur silently? Without proper observability, debugging MCP-powered agents becomes guesswork.

Observability gives you traceability—the ability to follow a request from start to finish—and auditability—a complete record of what happened and when. In this guide, we'll implement the three pillars of observability (logging, tracing, and metrics) for MCP servers in Python.

The Three Pillars of Observability

Before diving into code, let's understand what each pillar provides:

  • Logging: Discrete events with context (what happened)
  • Tracing: Request flow across operations (how it happened)
  • Metrics: Aggregated measurements over time (how well it's working)

Structured Logging for MCP Operations

Plain text logs are hard to query. Structured logging outputs JSON that can be parsed, filtered, and analyzed. Here's a logging setup tailored for MCP operations:

import logging
import json
import sys
from datetime import datetime
from typing import Any

class MCPJSONFormatter(logging.Formatter):
    """Structured JSON formatter for MCP operations."""
    
    def format(self, record: logging.LogRecord) -> str:
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "message": record.getMessage(),
            "logger": record.name,
            "mcp": {}
        }
        
        # Include MCP-specific context if present
        if hasattr(record, "tool_name"):
            log_entry["mcp"]["tool"] = record.tool_name
        if hasattr(record, "memory_ids"):
            log_entry["mcp"]["memory_ids"] = record.memory_ids
        if hasattr(record, "query"):
            log_entry["mcp"]["query"] = record.query
        if hasattr(record, "duration_ms"):
            log_entry["mcp"]["duration_ms"] = record.duration_ms
        if hasattr(record, "trace_id"):
            log_entry["trace_id"] = record.trace_id
            
        return json.dumps(log_entry)

def setup_mcp_logger(name: str = "mcp") -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(MCPJSONFormatter())
    logger.addHandler(handler)
    
    return logger

Using the Logger in MCP Tools

Wrap your MCP tool handlers with contextual logging:

import time

logger = setup_mcp_logger()

async def handle_memory_search(query: str, limit: int = 10) -> list[dict]:
    """MCP tool handler with structured logging."""
    start_time = time.perf_counter()
    
    logger.info(
        "Memory search started",
        extra={"tool_name": "memory_search", "query": query}
    )
    
    try:
        results = await memory_store.search(query, limit=limit)
        duration_ms = (time.perf_counter() - start_time) * 1000
        
        logger.info(
            f"Memory search completed: {len(results)} results",
            extra={
                "tool_name": "memory_search",
                "query": query,
                "memory_ids": [r["id"] for r in results],
                "duration_ms": round(duration_ms, 2)
            }
        )
        return results
        
    except Exception as e:
        logger.error(
            f"Memory search failed: {e}",
            extra={"tool_name": "memory_search", "query": query},
            exc_info=True
        )
        raise

Distributed Tracing with OpenTelemetry

When an LLM calls multiple MCP tools in sequence, you need to trace the entire request chain. OpenTelemetry provides vendor-neutral tracing that works with Jaeger, Zipkin, and cloud providers.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from functools import wraps

# Initialize tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer("mcp-server")

# Export to your observability backend (Jaeger, Honeycomb, etc.)
otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(otlp_exporter)
)

def traced_tool(tool_name: str):
    """Decorator to add tracing to MCP tool handlers."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            with tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
                span.set_attribute("mcp.tool.name", tool_name)
                span.set_attribute("mcp.tool.args", str(kwargs))
                
                try:
                    result = await func(*args, **kwargs)
                    span.set_attribute("mcp.tool.success", True)
                    return result
                except Exception as e:
                    span.set_attribute("mcp.tool.success", False)
                    span.set_attribute("mcp.tool.error", str(e))
                    span.record_exception(e)
                    raise
        return wrapper
    return decorator

# Usage
@traced_tool("memory_store")
async def store_memory(content: str, tags: list[str]) -> str:
    """Store a memory with full tracing."""
    with tracer.start_as_current_span("validate_content"):
        validated = validate_memory_content(content)
    
    with tracer.start_as_current_span("embed_content"):
        embedding = await generate_embedding(validated)
    
    with tracer.start_as_current_span("persist_memory"):
        memory_id = await db.insert(validated, embedding, tags)
    
    return memory_id

Metrics for MCP Health Monitoring

Metrics aggregate data over time, enabling dashboards and alerting. Track these key metrics for MCP servers:

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Tool invocation metrics
tool_calls_total = Counter(
    "mcp_tool_calls_total",
    "Total MCP tool invocations",
    ["tool_name", "status"]
)

tool_duration_seconds = Histogram(
    "mcp_tool_duration_seconds",
    "Tool execution duration in seconds",
    ["tool_name"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)

# Memory-specific metrics
memories_retrieved = Histogram(
    "mcp_memories_retrieved",
    "Number of memories retrieved per search",
    buckets=[0, 1, 5, 10, 20, 50]
)

memory_store_size = Gauge(
    "mcp_memory_store_size",
    "Current number of memories in store"
)

# Start metrics endpoint
start_http_server(8000)  # Expose metrics at :8000/metrics

Instrumenting Tools with Metrics

import time

async def handle_memory_search(query: str, limit: int = 10) -> list[dict]:
    """MCP tool with full observability instrumentation."""
    start_time = time.perf_counter()
    
    try:
        results = await memory_store.search(query, limit=limit)
        
        # Record metrics
        duration = time.perf_counter() - start_time
        tool_calls_total.labels(tool_name="memory_search", status="success").inc()
        tool_duration_seconds.labels(tool_name="memory_search").observe(duration)
        memories_retrieved.observe(len(results))
        
        return results
        
    except Exception as e:
        tool_calls_total.labels(tool_name="memory_search", status="error").inc()
        raise

Building an Audit Trail

For compliance and debugging, maintain a persistent audit log of all MCP operations:

from dataclasses import dataclass, asdict
from datetime import datetime
import json

@dataclass
class AuditEvent:
    timestamp: str
    trace_id: str
    tool_name: str
    operation: str
    agent_id: str
    input_hash: str  # Hash of input for privacy
    memory_ids: list[str]
    duration_ms: float
    success: bool
    error_message: str | None = None

class AuditLogger:
    def __init__(self, output_path: str = "audit.jsonl"):
        self.output_path = output_path
    
    def log(self, event: AuditEvent):
        with open(self.output_path, "a") as f:
            f.write(json.dumps(asdict(event)) + "\n")
    
    def query(self, trace_id: str) -> list[AuditEvent]:
        """Retrieve all events for a trace."""
        events = []
        with open(self.output_path) as f:
            for line in f:
                event = json.loads(line)
                if event["trace_id"] == trace_id:
                    events.append(AuditEvent(**event))
        return events

Putting It All Together

Here's a complete MCP tool handler with all three observability pillars:

@traced_tool("memory_search")
async def handle_memory_search(query: str, limit: int = 10) -> list[dict]:
    """Fully observable MCP memory search."""
    trace_id = trace.get_current_span().get_span_context().trace_id
    start_time = time.perf_counter()
    
    logger.info("Search started", extra={
        "tool_name": "memory_search",
        "query": query,
        "trace_id": format(trace_id, "032x")
    })
    
    try:
        results = await memory_store.search(query, limit=limit)
        duration_ms = (time.perf_counter() - start_time) * 1000
        
        # Metrics
        tool_calls_total.labels(tool_name="memory_search", status="success").inc()
        tool_duration_seconds.labels(tool_name="memory_search").observe(duration_ms / 1000)
        
        # Audit
        audit_logger.log(AuditEvent(
            timestamp=datetime.utcnow().isoformat(),
            trace_id=format(trace_id, "032x"),
            tool_name="memory_search",
            operation="search",
            agent_id=get_current_agent_id(),
            input_hash=hash_input(query),
            memory_ids=[r["id"] for r in results],
            duration_ms=duration_ms,
            success=True
        ))
        
        return results
    except Exception as e:
        tool_calls_total.labels(tool_name="memory_search", status="error").inc()
        logger.error(f"Search failed: {e}", exc_info=True)
        raise

Start Observing Your MCP Server

Observability isn't optional for production AI systems. When your agent behaves unexpectedly, you need to trace exactly which memories were retrieved and why. When latency spikes, you need metrics to identify the bottleneck.

CodeMem includes built-in observability with structured logging, OpenTelemetry tracing, and Prometheus metrics—so you can focus on building agents, not instrumentation.

Get full observability for your MCP server with CodeMem →