Skip to main content

Backpressure & Flow Control

The Interview Question

"Our data pipeline processes events from Kafka. The producer generates 100K events/sec, but our consumer can only handle 50K/sec. The consumer eventually OOMs and crashes. How do you design proper flow control?"

Asked at: LinkedIn, Netflix, Uber, data-intensive companies

Time to solve: 30-35 minutes

Difficulty: ⭐⭐⭐⭐ (Senior)


Clarifying Questions to Ask

  1. "Can we drop events or must we process everything?" → Determines approach
  2. "What's the latency requirement?" → Real-time vs batch
  3. "Is the traffic spike temporary or sustained?" → Scale vs throttle
  4. "Can we slow down the producer?" → True backpressure possible?
  5. "What's the cost of processing delay?" → Prioritization needs

The Problem Visualized

Without flow control:

  1. Queue grows indefinitely
  2. Consumer memory exhausted
  3. Consumer OOMs and crashes
  4. On restart, massive backlog makes it worse

Solution 1: Consumer-Side Rate Limiting

Don't pull faster than you can process:

import asyncio
from datetime import datetime, timedelta

class RateLimitedConsumer:
def __init__(self, kafka_consumer, max_rate: int = 50000):
self.consumer = kafka_consumer
self.max_rate = max_rate # Events per second
self.window_size = 1.0 # 1 second window
self.event_count = 0
self.window_start = datetime.now()

async def consume(self):
while True:
# Check if we're within rate limit
elapsed = (datetime.now() - self.window_start).total_seconds()

if elapsed >= self.window_size:
# Reset window
self.event_count = 0
self.window_start = datetime.now()

if self.event_count >= self.max_rate:
# At capacity - wait for next window
sleep_time = self.window_size - elapsed
await asyncio.sleep(sleep_time)
continue

# Fetch limited batch
remaining_capacity = self.max_rate - self.event_count
batch_size = min(remaining_capacity, 1000)

messages = self.consumer.poll(
timeout_ms=100,
max_records=batch_size
)

for msg in messages:
await self.process(msg)
self.event_count += 1

async def process(self, message):
# Actual processing logic
pass

Problem: Queue still grows if producer is faster!


Solution 2: Producer-Side Backpressure

Signal producer to slow down when consumer is overwhelmed:

# Using Redis as backpressure signal
import redis
from kafka import KafkaProducer

class BackpressureAwareProducer:
def __init__(self):
self.redis = redis.Redis()
self.kafka = KafkaProducer(bootstrap_servers=['kafka:9092'])
self.base_rate = 100000
self.min_rate = 1000

def get_allowed_rate(self) -> int:
"""
Check consumer lag and adjust rate.
Consumer updates this metric.
"""
lag = int(self.redis.get('consumer_lag') or 0)

if lag > 1000000: # 1M events behind
return self.min_rate # Crawl
elif lag > 100000:
return self.base_rate // 10 # 10% rate
elif lag > 10000:
return self.base_rate // 2 # 50% rate
else:
return self.base_rate # Full speed

def produce(self, events):
rate = self.get_allowed_rate()

for event in events:
self.kafka.send('events', event)

# Rate limit based on consumer capacity
time.sleep(1.0 / rate)

# Consumer updates lag metric
class Consumer:
def report_lag(self):
lag = sum(
partition.highWatermark - partition.position
for partition in self.assignments
)
self.redis.set('consumer_lag', lag)

Solution 3: Bounded Queues with Rejection

When queue is full, reject or drop new events:

from queue import Queue, Full
import threading

class BoundedQueueProcessor:
def __init__(self, max_queue_size: int = 10000):
self.queue = Queue(maxsize=max_queue_size)
self.dropped_count = 0

def enqueue(self, event, block: bool = False, timeout: float = None):
"""
Try to add event to queue.
If full: drop, block, or return error.
"""
try:
self.queue.put(event, block=block, timeout=timeout)
return True
except Full:
self.dropped_count += 1
self.handle_overflow(event)
return False

def handle_overflow(self, event):
"""
Options when queue is full:
1. Drop silently (for metrics, logs)
2. Write to overflow storage (S3, disk)
3. Return error to producer
4. Drop oldest (replace head of queue)
"""
# Option: Write to overflow for later processing
self.write_to_overflow_storage(event)

# Option: Prioritized dropping
if event.get('priority') == 'low':
pass # OK to drop low priority
else:
self.dead_letter_queue.put(event)

Kafka consumer with bounded in-memory buffer:

from confluent_kafka import Consumer, KafkaError

class BoundedKafkaConsumer:
def __init__(self, max_buffer_size: int = 50000):
self.consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'queued.max.messages.kbytes': max_buffer_size, # Kafka's internal buffer
})

def consume_with_backpressure(self):
while True:
msg = self.consumer.poll(timeout=1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise KafkaException(msg.error())

# Process message
success = self.process(msg)

# Only commit if processed successfully
if success:
self.consumer.commit(msg)
else:
# Pause partition to apply backpressure
self.consumer.pause([msg.partition()])
# Resume after recovery
self.schedule_resume(msg.partition(), delay=30)

Solution 4: Load Shedding with Priority

When overwhelmed, process important events first:

import heapq
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedEvent:
priority: int
timestamp: float = field(compare=False)
event: Any = field(compare=False)

class PriorityLoadShedder:
"""
Process high-priority events first.
Drop low-priority events when overwhelmed.
"""

PRIORITY_CRITICAL = 0
PRIORITY_HIGH = 1
PRIORITY_NORMAL = 2
PRIORITY_LOW = 3

def __init__(self, max_queue_size: int = 100000):
self.queue = [] # Min-heap by priority
self.max_size = max_queue_size

def add_event(self, event, priority: int = PRIORITY_NORMAL):
if len(self.queue) >= self.max_size:
self.shed_load(priority)

heapq.heappush(self.queue, PrioritizedEvent(
priority=priority,
timestamp=time.time(),
event=event
))

def shed_load(self, incoming_priority: int):
"""
Drop lowest priority events to make room.
"""
# Find lowest priority event in queue
if self.queue:
# Peek at lowest priority (highest number)
lowest = max(self.queue, key=lambda x: x.priority)

# Only drop if incoming is higher priority
if incoming_priority < lowest.priority:
self.queue.remove(lowest)
heapq.heapify(self.queue)
metrics.increment('events_dropped',
tags={'priority': lowest.priority})
else:
# Incoming is lower priority - drop it
metrics.increment('events_rejected',
tags={'priority': incoming_priority})
raise QueueFullError("Queue full, event rejected")

def process_next(self):
"""Always process highest priority first."""
if self.queue:
event = heapq.heappop(self.queue)
return event.event
return None

Solution 5: Reactive Streams (Project Reactor / RxJava)

Built-in backpressure handling:

// Java with Project Reactor
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;

public class ReactiveKafkaConsumer {

public void consumeWithBackpressure() {
KafkaReceiver<String, Event> receiver = KafkaReceiver.create(
receiverOptions()
);

receiver.receive()
// Backpressure: only request what we can handle
.onBackpressureBuffer(
10000, // Buffer size
event -> handleOverflow(event), // Overflow handler
BufferOverflowStrategy.DROP_LATEST
)
// Limit processing rate
.limitRate(1000) // Request 1000 at a time
// Process in batches
.bufferTimeout(100, Duration.ofMillis(500))
// Parallel processing with bounded concurrency
.flatMap(
batch -> processBatch(batch),
8 // Max 8 concurrent batch processors
)
.subscribe(
result -> ack(result),
error -> handleError(error)
);
}

private void handleOverflow(Event event) {
// Write to dead letter queue
deadLetterProducer.send(event);
}
}

Solution 6: Auto-Scaling Consumers

# Kubernetes HPA based on Kafka lag
# hpa.yaml
"""
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: kafka-consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: kafka-consumer
minReplicas: 2
maxReplicas: 50
metrics:
- type: External
external:
metric:
name: kafka_consumer_lag
selector:
matchLabels:
topic: events
consumer_group: my-group
target:
type: AverageValue
averageValue: "10000" # Scale up if lag > 10K per pod
"""

# Custom metrics exporter for Kafka lag
class KafkaLagExporter:
def __init__(self):
self.admin = KafkaAdminClient(bootstrap_servers=['kafka:9092'])

def get_consumer_lag(self, group_id: str, topic: str) -> int:
offsets = self.admin.list_consumer_group_offsets(group_id)

total_lag = 0
for tp, offset_metadata in offsets.items():
if tp.topic == topic:
# Get high watermark
end_offsets = self.consumer.end_offsets([tp])
lag = end_offsets[tp] - offset_metadata.offset
total_lag += lag

return total_lag

def export_metrics(self):
while True:
lag = self.get_consumer_lag('my-group', 'events')
prometheus_gauge.set(lag)
time.sleep(10)

Monitoring Backpressure

# Prometheus alerts
groups:
- name: backpressure_alerts
rules:
- alert: HighConsumerLag
expr: kafka_consumer_lag > 100000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer lag is high"

- alert: QueueNearCapacity
expr: bounded_queue_size / bounded_queue_capacity > 0.8
for: 2m
labels:
severity: warning

- alert: EventsBeingDropped
expr: rate(events_dropped_total[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Events are being dropped due to backpressure"

Trade-offs Comparison

ApproachProsCons
Rate limitingSimpleDoesn't handle bursts
Producer backpressureTrue flow controlRequires producer cooperation
Bounded queue + dropProtects consumerData loss
Priority sheddingPreserves important dataComplexity
Auto-scalingElastic capacityCost, cold start latency

Key Takeaways

  1. Never have unbounded queues - OOM is inevitable
  2. Signal upstream - True backpressure stops the source
  3. Graceful degradation - Drop low-priority first
  4. Monitor lag - Alert before it's critical
  5. Auto-scale when possible - But have fallback
  6. Design for failure - What happens when at 100% capacity?

Rule of thumb: If producer > consumer for sustained period, you MUST either drop, scale, or signal back. There's no other option.