Streaming Memory Updates in Go
Build high-performance, real-time memory systems using streaming patterns in Go. Learn channels, backpressure handling, and async updates for responsive AI agents.
Why Streaming Matters for AI Memory
Traditional request-response patterns fall short when your AI agent needs to process memory updates in real-time. Whether you're handling user corrections, ingesting new context mid-conversation, or synchronizing memories across distributed agents, blocking operations kill responsiveness.
Go's concurrency primitives—goroutines and channels—make it uniquely suited for building streaming memory systems. You get the performance of compiled code with the simplicity of CSP (Communicating Sequential Processes) patterns. Let's build a production-ready streaming memory pipeline.
The Streaming Architecture
A streaming memory system has three core components: producers (memory sources), a processing pipeline, and consumers (storage/retrieval). Here's the foundational structure:
package memory
type MemoryUpdate struct {
ID string
ProjectID string
Content string
Action UpdateAction // Create, Update, Delete
Priority int // 0 = normal, 1 = high (user correction)
Timestamp time.Time
}
type UpdateAction int
const (
ActionCreate UpdateAction = iota
ActionUpdate
ActionDelete
)
type StreamConfig struct {
BufferSize int // Channel buffer capacity
Workers int // Parallel processors
FlushInterval time.Duration // Batch write interval
MaxBatchSize int // Max items per batch
BackpressureAt float64 // Buffer fill ratio to slow producers
} Building the Memory Stream
The stream processor is the heart of the system. It receives updates, processes them in parallel, and batches writes for efficiency:
type MemoryStream struct {
updates chan MemoryUpdate
highPrio chan MemoryUpdate // Fast lane for corrections
done chan struct{}
store MemoryStore
config StreamConfig
metrics *StreamMetrics
}
func NewMemoryStream(store MemoryStore, cfg StreamConfig) *MemoryStream {
return &MemoryStream{
updates: make(chan MemoryUpdate, cfg.BufferSize),
highPrio: make(chan MemoryUpdate, cfg.BufferSize/4),
done: make(chan struct{}),
store: store,
config: cfg,
metrics: NewStreamMetrics(),
}
}
func (s *MemoryStream) Start(ctx context.Context) {
// Spawn worker pool
for i := 0; i < s.config.Workers; i++ {
go s.worker(ctx, i)
}
// Start batch flusher
go s.batchFlusher(ctx)
// Monitor backpressure
go s.monitorBackpressure(ctx)
} Priority-Aware Processing
Not all memory updates are equal. User corrections should bypass the queue. The worker implements priority selection using Go's select statement:
func (s *MemoryStream) worker(ctx context.Context, id int) {
batch := make([]MemoryUpdate, 0, s.config.MaxBatchSize)
ticker := time.NewTicker(s.config.FlushInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.flushBatch(batch) // Drain on shutdown
return
case update := <-s.highPrio:
// High priority: process immediately, don't batch
s.processImmediate(ctx, update)
s.metrics.RecordProcessed(update, true)
case update := <-s.updates:
batch = append(batch, update)
if len(batch) >= s.config.MaxBatchSize {
s.flushBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
s.flushBatch(batch)
batch = batch[:0]
}
}
}
}
func (s *MemoryStream) processImmediate(ctx context.Context, update MemoryUpdate) {
start := time.Now()
switch update.Action {
case ActionCreate, ActionUpdate:
// Generate embedding synchronously for high-priority
embedding := s.embedder.Embed(ctx, update.Content)
s.store.Upsert(ctx, update, embedding)
case ActionDelete:
s.store.Delete(ctx, update.ID)
}
s.metrics.RecordLatency(time.Since(start))
} Handling Backpressure
When consumers can't keep up with producers, you need backpressure to prevent memory exhaustion. Here's a non-blocking send with backpressure signaling:
func (s *MemoryStream) Send(update MemoryUpdate) error {
// Route high-priority updates
if update.Priority > 0 {
select {
case s.highPrio <- update:
return nil
default:
// High-prio full? Force into regular queue
}
}
// Check backpressure
fillRatio := float64(len(s.updates)) / float64(cap(s.updates))
if fillRatio > s.config.BackpressureAt {
s.metrics.RecordBackpressure()
// Option 1: Block with timeout
select {
case s.updates <- update:
return nil
case <-time.After(100 * time.Millisecond):
return ErrBackpressure
}
}
// Normal path: non-blocking send
select {
case s.updates <- update:
return nil
default:
return ErrBufferFull
}
}
func (s *MemoryStream) monitorBackpressure(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fill := float64(len(s.updates)) / float64(cap(s.updates))
s.metrics.RecordBufferFill(fill)
if fill > 0.9 {
log.Warn("memory stream near capacity",
"fill_ratio", fill,
"pending", len(s.updates))
}
}
}
} Batch Writes for Throughput
Individual writes are expensive. Batching amortizes network and I/O costs:
func (s *MemoryStream) flushBatch(batch []MemoryUpdate) {
if len(batch) == 0 {
return
}
start := time.Now()
// Generate embeddings in parallel
embeddings := s.embedder.EmbedBatch(context.Background(),
extractContents(batch))
// Bulk upsert
items := make([]BulkItem, len(batch))
for i, update := range batch {
items[i] = BulkItem{
Update: update,
Embedding: embeddings[i],
}
}
if err := s.store.BulkUpsert(context.Background(), items); err != nil {
s.metrics.RecordError(err)
// Retry logic or dead-letter queue
s.handleFailedBatch(batch, err)
return
}
s.metrics.RecordBatch(len(batch), time.Since(start))
} Real-Time Subscriptions
Allow consumers to subscribe to memory changes for reactive UIs or agent coordination:
type Subscriber struct {
ID string
Filter func(MemoryUpdate) bool
Channel chan MemoryUpdate
}
func (s *MemoryStream) Subscribe(filter func(MemoryUpdate) bool) *Subscriber {
sub := &Subscriber{
ID: uuid.New().String(),
Filter: filter,
Channel: make(chan MemoryUpdate, 100),
}
s.subscribers.Store(sub.ID, sub)
return sub
}
func (s *MemoryStream) broadcast(update MemoryUpdate) {
s.subscribers.Range(func(key, value any) bool {
sub := value.(*Subscriber)
if sub.Filter(update) {
select {
case sub.Channel <- update:
default:
// Subscriber too slow, skip
}
}
return true
})
} Performance Results
With this architecture, you can expect:
- Throughput: 10,000+ memory updates/second with 4 workers
- Latency: Sub-millisecond for high-priority updates
- Memory efficiency: Bounded buffer prevents OOM under load
- Graceful degradation: Backpressure signals let producers adapt
Key Takeaways
Building streaming memory systems in Go requires attention to these principles:
- Use buffered channels — They decouple producers from consumers
- Implement priority lanes — User corrections can't wait in line
- Handle backpressure explicitly — Don't let slow consumers crash the system
- Batch for throughput — Individual writes don't scale
- Enable subscriptions — Real-time systems need reactive consumers
Stream Memory Updates with CodeMem
CodeMem's API supports real-time memory streaming out of the box—priority updates, batch writes, and webhooks for subscriptions. Focus on your agent logic while we handle the infrastructure at scale.
Start Streaming Free →