Event Ordering Problem
The Interview Question
"We're using event-driven architecture with Kafka. Events from a user session are arriving out of order - we see 'order_completed' before 'order_created'. This is causing data corruption. How do you solve this?"
Asked at: Uber, LinkedIn, Netflix, Confluent
Time to solve: 30-35 minutes
Difficulty: ⭐⭐⭐⭐ (Senior)
Clarifying Questions to Ask
- "Is ordering needed globally or per-entity?" → Per-user? Per-order?
- "What's the acceptable latency for ordering?" → Real-time or batch OK?
- "How many events per second?" → Scale affects solution
- "Are events from one producer or many?" → Distributed producers = harder
- "What happens if we process out of order?" → Severity of impact
Why Events Arrive Out of Order
Root causes:
- Multiple partitions - No ordering guarantee across partitions
- Multiple producers - Different network latencies
- Consumer parallelism - Consumers process at different speeds
- Retries - Failed message retries arrive later
Solution 1: Single Partition Per Entity
Force all events for same entity to same partition:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def publish_order_event(event):
# Use order_id as partition key
# All events for same order go to same partition
producer.send(
'order-events',
key=event['order_id'].encode('utf-8'), # Partition key
value=event
)
# Events
publish_order_event({'order_id': 'ORD-123', 'type': 'created', 'ts': 1000})
publish_order_event({'order_id': 'ORD-123', 'type': 'completed', 'ts': 1001})
# Both go to same partition → ordered within partition
Limitation: Doesn't help if producers have different network latencies.
Solution 2: Event Versioning / Sequence Numbers
Add explicit sequence numbers to events:
# Producer side
class OrderEventProducer:
def __init__(self):
self.sequence_store = Redis() # Or database
def publish(self, order_id, event_type, data):
# Atomically increment sequence number
seq_num = self.sequence_store.incr(f"seq:{order_id}")
event = {
'order_id': order_id,
'sequence': seq_num, # Explicit ordering
'type': event_type,
'data': data,
'timestamp': time.time()
}
kafka_producer.send('order-events', key=order_id, value=event)
# Consumer side - reorder before processing
class OrderedEventConsumer:
def __init__(self):
self.buffer = defaultdict(list) # {order_id: [events]}
self.expected_seq = defaultdict(lambda: 1) # {order_id: next_expected}
def handle_event(self, event):
order_id = event['order_id']
seq = event['sequence']
expected = self.expected_seq[order_id]
if seq == expected:
# In order - process immediately
self.process_event(event)
self.expected_seq[order_id] += 1
# Check buffer for next events
self.process_buffered(order_id)
elif seq > expected:
# Future event - buffer it
heapq.heappush(self.buffer[order_id], (seq, event))
# Set timeout to process anyway after N seconds
self.schedule_timeout(order_id, seq, timeout=30)
else:
# Old event (duplicate or already processed)
log.warning(f"Received old event: {seq} < {expected}")
def process_buffered(self, order_id):
"""Process any buffered events that are now in order"""
while self.buffer[order_id]:
next_seq, next_event = self.buffer[order_id][0]
if next_seq == self.expected_seq[order_id]:
heapq.heappop(self.buffer[order_id])
self.process_event(next_event)
self.expected_seq[order_id] += 1
else:
break # Gap in sequence, wait
Solution 3: Timestamp-Based Ordering
Use logical timestamps (Lamport or Vector Clocks):
# Lamport Timestamp implementation
class LamportClock:
def __init__(self):
self.time = 0
self.lock = threading.Lock()
def tick(self) -> int:
"""Increment and return timestamp"""
with self.lock:
self.time += 1
return self.time
def update(self, received_time: int) -> int:
"""Update based on received timestamp"""
with self.lock:
self.time = max(self.time, received_time) + 1
return self.time
# Producer
clock = LamportClock()
def publish_event(event):
event['lamport_ts'] = clock.tick()
kafka_producer.send('events', value=event)
# Consumer - sort by lamport timestamp
def consume_events(events):
sorted_events = sorted(events, key=lambda e: (e['lamport_ts'], e['producer_id']))
for event in sorted_events:
process(event)
For true causality, use Vector Clocks:
class VectorClock:
def __init__(self, node_id, num_nodes):
self.node_id = node_id
self.clock = [0] * num_nodes
def tick(self):
self.clock[self.node_id] += 1
return self.clock.copy()
def update(self, received_clock):
for i in range(len(self.clock)):
self.clock[i] = max(self.clock[i], received_clock[i])
self.clock[self.node_id] += 1
@staticmethod
def compare(clock_a, clock_b) -> str:
"""Returns: 'before', 'after', 'concurrent'"""
a_before_b = all(a <= b for a, b in zip(clock_a, clock_b))
b_before_a = all(b <= a for a, b in zip(clock_a, clock_b))
if a_before_b and not b_before_a:
return 'before'
elif b_before_a and not a_before_b:
return 'after'
else:
return 'concurrent'
Solution 4: Event Sourcing with Optimistic Concurrency
Use expected version for ordering:
class EventStore:
def append(self, stream_id: str, events: list, expected_version: int):
"""
Append events only if stream is at expected version.
Prevents out-of-order writes at the source.
"""
with self.db.transaction():
current_version = self.get_stream_version(stream_id)
if current_version != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"but stream is at {current_version}"
)
for i, event in enumerate(events):
event['version'] = current_version + i + 1
self.db.insert('events', event)
# Usage
store = EventStore()
# First event
store.append('order-123', [{'type': 'created'}], expected_version=0)
# Second event - must specify expected version
store.append('order-123', [{'type': 'completed'}], expected_version=1)
# Out of order attempt fails
store.append('order-123', [{'type': 'shipped'}], expected_version=1)
# ConcurrencyError: Expected version 1, but stream is at 2
Solution 5: Idempotent Event Processing
Make processing order-independent where possible:
class OrderProjection:
"""
Idempotent projection that handles events in any order.
Uses event timestamps, not arrival order.
"""
def apply_event(self, event):
order = self.get_order(event['order_id'])
# Use event timestamp, not processing time
event_time = event['timestamp']
if event['type'] == 'created':
if order is None or order.created_at is None:
self.create_order(event['order_id'], event_time)
elif event['type'] == 'completed':
if order is None:
# order_completed arrived before order_created
# Create placeholder, will be updated when created arrives
self.create_order(event['order_id'], created_at=None)
# Only update if this event is newer
if order.completed_at is None or event_time > order.completed_at:
self.set_completed(event['order_id'], event_time)
def get_order_state(self, order_id):
"""
Derive state from timestamps, not event order.
"""
order = self.get_order(order_id)
if order.completed_at:
return 'completed'
elif order.created_at:
return 'created'
else:
return 'unknown' # Waiting for events
Kafka-Specific Configuration
# Producer config for ordering
max.in.flight.requests.per.connection=1 # Prevent reordering on retry
enable.idempotence=true # Exactly-once semantics
acks=all # Wait for all replicas
# Consumer config
max.poll.records=500 # Batch for reordering
enable.auto.commit=false # Manual commit after processing
Trade-offs Comparison
| Approach | Ordering Guarantee | Latency | Complexity |
|---|---|---|---|
| Single partition | Per-key | Low | Low |
| Sequence numbers | Per-entity | Medium | Medium |
| Vector clocks | Causal | High | High |
| Event sourcing | Global | Medium | High |
| Idempotent processing | Eventually consistent | Low | Medium |
Interview Follow-up Questions
"What if an event is lost?"
Use sequence numbers with gap detection. If event 5 is missing after timeout, either:
- Request replay from source
- Mark as "incomplete" and alert
"How do you handle events from different systems?"
Use correlation IDs + global sequence. Each system publishes with its local sequence, aggregator merges using timestamps.
"What about events that are hours late?"
Time window + dead letter queue. Process within window, send late events to DLQ for manual review.
Key Takeaways
- Partition by entity - Same key = same partition = ordered
- Explicit sequence numbers - Don't rely on timestamps alone
- Buffer and reorder - Consumer-side reordering for out-of-order events
- Idempotent processing - Make order not matter where possible
- Timeout + fallback - Don't wait forever for missing events
Rule of thumb: If ordering matters, design your system so events for the same entity always go through the same path (single partition, single producer).