go streaming real-time performance

Streaming Memory Updates in Go

Build high-performance, real-time memory systems using streaming patterns in Go. Learn channels, backpressure handling, and async updates for responsive AI agents.

CodeMem Team

Why Streaming Matters for AI Memory

Traditional request-response patterns fall short when your AI agent needs to process memory updates in real-time. Whether you're handling user corrections, ingesting new context mid-conversation, or synchronizing memories across distributed agents, blocking operations kill responsiveness.

Go's concurrency primitives—goroutines and channels—make it uniquely suited for building streaming memory systems. You get the performance of compiled code with the simplicity of CSP (Communicating Sequential Processes) patterns. Let's build a production-ready streaming memory pipeline.

The Streaming Architecture

A streaming memory system has three core components: producers (memory sources), a processing pipeline, and consumers (storage/retrieval). Here's the foundational structure:

package memory

type MemoryUpdate struct {
    ID        string
    ProjectID string
    Content   string
    Action    UpdateAction // Create, Update, Delete
    Priority  int          // 0 = normal, 1 = high (user correction)
    Timestamp time.Time
}

type UpdateAction int

const (
    ActionCreate UpdateAction = iota
    ActionUpdate
    ActionDelete
)

type StreamConfig struct {
    BufferSize      int           // Channel buffer capacity
    Workers         int           // Parallel processors
    FlushInterval   time.Duration // Batch write interval
    MaxBatchSize    int           // Max items per batch
    BackpressureAt  float64       // Buffer fill ratio to slow producers
}

Building the Memory Stream

The stream processor is the heart of the system. It receives updates, processes them in parallel, and batches writes for efficiency:

type MemoryStream struct {
    updates    chan MemoryUpdate
    highPrio   chan MemoryUpdate // Fast lane for corrections
    done       chan struct{}
    store      MemoryStore
    config     StreamConfig
    metrics    *StreamMetrics
}

func NewMemoryStream(store MemoryStore, cfg StreamConfig) *MemoryStream {
    return &MemoryStream{
        updates:  make(chan MemoryUpdate, cfg.BufferSize),
        highPrio: make(chan MemoryUpdate, cfg.BufferSize/4),
        done:     make(chan struct{}),
        store:    store,
        config:   cfg,
        metrics:  NewStreamMetrics(),
    }
}

func (s *MemoryStream) Start(ctx context.Context) {
    // Spawn worker pool
    for i := 0; i < s.config.Workers; i++ {
        go s.worker(ctx, i)
    }
    
    // Start batch flusher
    go s.batchFlusher(ctx)
    
    // Monitor backpressure
    go s.monitorBackpressure(ctx)
}

Priority-Aware Processing

Not all memory updates are equal. User corrections should bypass the queue. The worker implements priority selection using Go's select statement:

func (s *MemoryStream) worker(ctx context.Context, id int) {
    batch := make([]MemoryUpdate, 0, s.config.MaxBatchSize)
    ticker := time.NewTicker(s.config.FlushInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            s.flushBatch(batch) // Drain on shutdown
            return
            
        case update := <-s.highPrio:
            // High priority: process immediately, don't batch
            s.processImmediate(ctx, update)
            s.metrics.RecordProcessed(update, true)
            
        case update := <-s.updates:
            batch = append(batch, update)
            if len(batch) >= s.config.MaxBatchSize {
                s.flushBatch(batch)
                batch = batch[:0]
            }
            
        case <-ticker.C:
            if len(batch) > 0 {
                s.flushBatch(batch)
                batch = batch[:0]
            }
        }
    }
}

func (s *MemoryStream) processImmediate(ctx context.Context, update MemoryUpdate) {
    start := time.Now()
    
    switch update.Action {
    case ActionCreate, ActionUpdate:
        // Generate embedding synchronously for high-priority
        embedding := s.embedder.Embed(ctx, update.Content)
        s.store.Upsert(ctx, update, embedding)
    case ActionDelete:
        s.store.Delete(ctx, update.ID)
    }
    
    s.metrics.RecordLatency(time.Since(start))
}

Handling Backpressure

When consumers can't keep up with producers, you need backpressure to prevent memory exhaustion. Here's a non-blocking send with backpressure signaling:

func (s *MemoryStream) Send(update MemoryUpdate) error {
    // Route high-priority updates
    if update.Priority > 0 {
        select {
        case s.highPrio <- update:
            return nil
        default:
            // High-prio full? Force into regular queue
        }
    }
    
    // Check backpressure
    fillRatio := float64(len(s.updates)) / float64(cap(s.updates))
    if fillRatio > s.config.BackpressureAt {
        s.metrics.RecordBackpressure()
        // Option 1: Block with timeout
        select {
        case s.updates <- update:
            return nil
        case <-time.After(100 * time.Millisecond):
            return ErrBackpressure
        }
    }
    
    // Normal path: non-blocking send
    select {
    case s.updates <- update:
        return nil
    default:
        return ErrBufferFull
    }
}

func (s *MemoryStream) monitorBackpressure(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            fill := float64(len(s.updates)) / float64(cap(s.updates))
            s.metrics.RecordBufferFill(fill)
            
            if fill > 0.9 {
                log.Warn("memory stream near capacity", 
                    "fill_ratio", fill,
                    "pending", len(s.updates))
            }
        }
    }
}

Batch Writes for Throughput

Individual writes are expensive. Batching amortizes network and I/O costs:

func (s *MemoryStream) flushBatch(batch []MemoryUpdate) {
    if len(batch) == 0 {
        return
    }
    
    start := time.Now()
    
    // Generate embeddings in parallel
    embeddings := s.embedder.EmbedBatch(context.Background(), 
        extractContents(batch))
    
    // Bulk upsert
    items := make([]BulkItem, len(batch))
    for i, update := range batch {
        items[i] = BulkItem{
            Update:    update,
            Embedding: embeddings[i],
        }
    }
    
    if err := s.store.BulkUpsert(context.Background(), items); err != nil {
        s.metrics.RecordError(err)
        // Retry logic or dead-letter queue
        s.handleFailedBatch(batch, err)
        return
    }
    
    s.metrics.RecordBatch(len(batch), time.Since(start))
}

Real-Time Subscriptions

Allow consumers to subscribe to memory changes for reactive UIs or agent coordination:

type Subscriber struct {
    ID       string
    Filter   func(MemoryUpdate) bool
    Channel  chan MemoryUpdate
}

func (s *MemoryStream) Subscribe(filter func(MemoryUpdate) bool) *Subscriber {
    sub := &Subscriber{
        ID:      uuid.New().String(),
        Filter:  filter,
        Channel: make(chan MemoryUpdate, 100),
    }
    s.subscribers.Store(sub.ID, sub)
    return sub
}

func (s *MemoryStream) broadcast(update MemoryUpdate) {
    s.subscribers.Range(func(key, value any) bool {
        sub := value.(*Subscriber)
        if sub.Filter(update) {
            select {
            case sub.Channel <- update:
            default:
                // Subscriber too slow, skip
            }
        }
        return true
    })
}

Performance Results

With this architecture, you can expect:

  • Throughput: 10,000+ memory updates/second with 4 workers
  • Latency: Sub-millisecond for high-priority updates
  • Memory efficiency: Bounded buffer prevents OOM under load
  • Graceful degradation: Backpressure signals let producers adapt

Key Takeaways

Building streaming memory systems in Go requires attention to these principles:

  1. Use buffered channels — They decouple producers from consumers
  2. Implement priority lanes — User corrections can't wait in line
  3. Handle backpressure explicitly — Don't let slow consumers crash the system
  4. Batch for throughput — Individual writes don't scale
  5. Enable subscriptions — Real-time systems need reactive consumers

Stream Memory Updates with CodeMem

CodeMem's API supports real-time memory streaming out of the box—priority updates, batch writes, and webhooks for subscriptions. Focus on your agent logic while we handle the infrastructure at scale.

Start Streaming Free →