Cache de Memória sem Vazamentos (Python)
Aprenda boas práticas e técnicas de segurança para implementar cache de memória em Python sem vazamentos de dados.
Por Que Cache de Memória Pode Ser um Risco
Cache de memória é uma técnica essencial para melhorar a performance de aplicações. No contexto de agentes de IA, armazenar memórias, preferências e contextos em cache acelera significativamente as respostas. Porém, um cache mal implementado pode se tornar uma porta de entrada para vazamentos de dados sensíveis.
Neste artigo, vamos explorar as principais vulnerabilidades de segurança em sistemas de cache e como implementar soluções robustas em Python que protegem informações confidenciais.
Vulnerabilidades Comuns em Sistemas de Cache
1. Cache Poisoning
Ocorre quando um atacante consegue inserir dados maliciosos no cache, que são então servidos para outros usuários. Em sistemas de memória de IA, isso pode significar injetar preferências falsas ou contextos manipulados.
2. Memory Leaks em Dados Sensíveis
Quando dados sensíveis permanecem na memória após seu uso, podem ser acessados por processos não autorizados ou expostos em dumps de memória.
3. Isolation Failures
Em sistemas multi-tenant, memórias de um usuário podem vazar para outro se não houver isolamento adequado de chaves de cache.
Implementando Cache Seguro em Python
Pattern 1: Chaves de Cache com Namespace Isolado
Sempre inclua identificadores únicos do tenant/usuário nas chaves de cache para garantir isolamento completo.
import hashlib
from typing import Any
class SecureCacheKeyBuilder:
"""Gera chaves de cache seguras com isolamento por tenant."""
def __init__(self, tenant_id: str, salt: str):
self.tenant_id = tenant_id
self.salt = salt
def build_key(self, resource_type: str, resource_id: str) -> str:
"""Cria chave com hash para evitar enumeração."""
raw_key = f"{self.tenant_id}:{resource_type}:{resource_id}"
hashed = hashlib.sha256(f"{raw_key}{self.salt}".encode()).hexdigest()[:16]
return f"mem:{self.tenant_id}:{hashed}"
def validate_access(self, key: str) -> bool:
"""Verifica se a chave pertence ao tenant atual."""
return key.startswith(f"mem:{self.tenant_id}:") Pattern 2: TTL Automático para Dados Sensíveis
Implemente expiração automática para garantir que dados sensíveis não permaneçam no cache indefinidamente.
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
class SensitivityLevel(Enum):
PUBLIC = "public" # TTL: 1 hora
INTERNAL = "internal" # TTL: 15 minutos
SENSITIVE = "sensitive" # TTL: 5 minutos
SECRET = "secret" # TTL: 1 minuto
TTL_MAP = {
SensitivityLevel.PUBLIC: timedelta(hours=1),
SensitivityLevel.INTERNAL: timedelta(minutes=15),
SensitivityLevel.SENSITIVE: timedelta(minutes=5),
SensitivityLevel.SECRET: timedelta(minutes=1),
}
@dataclass
class CachedMemory:
key: str
value: Any
sensitivity: SensitivityLevel
created_at: datetime
@property
def expires_at(self) -> datetime:
return self.created_at + TTL_MAP[self.sensitivity]
@property
def is_expired(self) -> bool:
return datetime.utcnow() > self.expires_at Pattern 3: Sanitização e Validação
Nunca confie em dados vindos do cache sem validação. Implemente sanitização tanto na escrita quanto na leitura.
from pydantic import BaseModel, validator
import re
class MemoryEntry(BaseModel):
content: str
tags: list[str]
@validator('content')
def sanitize_content(cls, v):
# Remove potenciais payloads de injeção
v = re.sub(r'<script[^>]*>.*?</script>', '', v, flags=re.IGNORECASE)
# Limita tamanho para evitar DoS
return v[:10000]
@validator('tags', each_item=True)
def sanitize_tags(cls, v):
# Tags devem ser alfanuméricas
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Tag contém caracteres inválidos')
return v.lower()
def safe_cache_set(cache, key: str, data: dict) -> bool:
"""Valida dados antes de inserir no cache."""
try:
validated = MemoryEntry(**data)
cache.set(key, validated.dict())
return True
except Exception:
return False Pattern 4: Limpeza Segura de Memória
Quando dados sensíveis são removidos do cache, garanta que a memória seja sobrescrita para evitar recuperação forense.
import secrets
import gc
from typing import Optional
class SecureCache:
def __init__(self):
self._store: dict[str, CachedMemory] = {}
def secure_delete(self, key: str) -> bool:
"""Remove item e sobrescreve memória."""
if key not in self._store:
return False
# Sobrescreve dados com bytes aleatórios antes de deletar
item = self._store[key]
if isinstance(item.value, str):
# Sobrescreve string na memória
overwrite = secrets.token_hex(len(item.value))
self._store[key] = CachedMemory(
key=key,
value=overwrite,
sensitivity=item.sensitivity,
created_at=item.created_at
)
del self._store[key]
gc.collect() # Força garbage collection
return True
def purge_expired(self) -> int:
"""Remove todas as entradas expiradas de forma segura."""
expired_keys = [
k for k, v in self._store.items()
if v.is_expired
]
for key in expired_keys:
self.secure_delete(key)
return len(expired_keys) Checklist de Segurança para Cache de Memória
- Isolamento de tenant: Cada usuário/organização deve ter namespace separado
- TTL obrigatório: Nunca armazene dados sensíveis sem expiração definida
- Validação bidirecional: Sanitize na escrita E na leitura
- Audit logging: Registre acessos ao cache para detectar anomalias
- Criptografia at-rest: Para dados classificados como SECRET, criptografe antes de cachear
- Rate limiting: Limite requisições ao cache por tenant para evitar abuse
- Monitoramento de tamanho: Alerte quando o cache crescer anormalmente
Exemplo Completo: Secure Memory Cache
from functools import wraps
import logging
logger = logging.getLogger(__name__)
def audit_cache_access(func):
"""Decorator para logging de auditoria."""
@wraps(func)
def wrapper(self, key: str, *args, **kwargs):
result = func(self, key, *args, **kwargs)
logger.info(f"Cache {func.__name__}: key={key[:20]}..., tenant={self.tenant_id}")
return result
return wrapper
class SecureMemoryCache:
def __init__(self, tenant_id: str, max_size: int = 1000):
self.tenant_id = tenant_id
self.max_size = max_size
self._cache = SecureCache()
self._key_builder = SecureCacheKeyBuilder(tenant_id, salt=secrets.token_hex(16))
@audit_cache_access
def get(self, key: str) -> Optional[Any]:
if not self._key_builder.validate_access(key):
logger.warning(f"Unauthorized access attempt: {key}")
return None
item = self._cache._store.get(key)
if item and item.is_expired:
self._cache.secure_delete(key)
return None
return item.value if item else None
@audit_cache_access
def set(self, resource_type: str, resource_id: str,
value: Any, sensitivity: SensitivityLevel) -> str:
if len(self._cache._store) >= self.max_size:
self._cache.purge_expired()
key = self._key_builder.build_key(resource_type, resource_id)
self._cache._store[key] = CachedMemory(
key=key,
value=value,
sensitivity=sensitivity,
created_at=datetime.utcnow()
)
return key Conclusão
Implementar cache de memória seguro requer atenção em múltiplas camadas: isolamento de dados, expiração automática, validação rigorosa e limpeza segura. Em sistemas de IA onde memórias persistentes contêm preferências, decisões e contextos sensíveis dos usuários, essas práticas não são opcionais—são fundamentais.
Lembre-se: a performance nunca deve vir às custas da segurança. Um cache rápido mas vulnerável é um passivo, não um ativo.
Simplifique com CodeMem
Construir infraestrutura de cache segura é complexo. O CodeMem oferece armazenamento de memória com isolamento multi-tenant, criptografia, TTL automático e audit logging—tudo pronto para uso.