Skip to main content

Real-World System Design Problems

TL;DR

These are the "how would you solve this?" problems that appear in senior interviews. Unlike "design X from scratch", these test your ability to evolve existing systems with constraints like zero downtime, backward compatibility, and technical debt.

Why This Matters

In interviews: Senior/Staff roles at Microsoft, Google, and Meta focus heavily on these scenarios. They test judgment, not just knowledge.

At work: You'll spend more time evolving systems than building from scratch.


Problem 1: Shared Database Between Microservices

The Scenario

"We have 5 microservices that all share a single PostgreSQL database. This causes tight coupling - changes to one service break others. How do we fix this with zero downtime?"

Why It's a Problem

  • Tight coupling: Schema changes affect all services
  • No isolation: One service can corrupt another's data
  • Scaling bottleneck: Can't scale databases independently
  • Deployment risk: Can't deploy services independently

Solution: Strangler Fig Pattern + Database Per Service

Phase 1: Identify Boundaries (1-2 weeks)

Analyze which tables belong to which service:
- users, auth_tokens → Auth Service
- orders, order_items → Order Service
- products, inventory → Product Service
- payments, refunds → Payment Service
- notifications, templates → Notification Service

Phase 2: Create Service APIs (2-4 weeks)

# Before: Direct database access
def get_user(user_id):
return db.query("SELECT * FROM users WHERE id = ?", user_id)

# After: API call
def get_user(user_id):
return http.get(f"http://auth-service/api/users/{user_id}")

Phase 3: Dual Writes (2-4 weeks)

# Dual write pattern
def create_user(user_data):
# Write to both databases
old_db.insert("users", user_data)
new_db.insert("users", user_data)

# Compare and alert if mismatch
verify_sync(user_data.id)

Phase 4: Read from New, Write to Both (2 weeks)

def get_user(user_id):
return new_db.query("SELECT * FROM users WHERE id = ?", user_id)

def create_user(user_data):
new_db.insert("users", user_data)
old_db.insert("users", user_data) # Still write to old for safety

Phase 5: Cut Over (1 week)

Key Techniques

TechniquePurpose
Strangler FigGradually replace old with new
Dual WritesMaintain consistency during migration
Feature FlagsControl rollout, easy rollback
Shadow ReadsCompare old vs new before cutover
CDC (Change Data Capture)Sync data between databases

Rollback Plan

# Feature flag controls which database to use
if feature_flag("use_new_auth_db"):
return new_db.get_user(user_id)
else:
return old_db.get_user(user_id)

# Rollback = flip the flag

Problem 2: Monolith to Microservices Migration

The Scenario

"We have a 10-year-old monolith (500K lines of code) that's becoming impossible to maintain. How do we migrate to microservices without rewriting everything?"

Solution: Incremental Strangler Fig

Step 1: Identify Seams

Analyze code for natural boundaries:
- High change frequency modules → Extract first
- Clear domain boundaries → Easy wins
- External integrations → Natural services
- Performance bottlenecks → Isolate and scale

Step 2: Extract One Service at a Time

Step 3: Use Anti-Corruption Layer

# Anti-corruption layer translates between old and new
class AuthAntiCorruptionLayer:
def authenticate(self, credentials):
if feature_flag("use_new_auth"):
return new_auth_service.authenticate(credentials)
else:
return legacy_auth_module.authenticate(credentials)

Migration Order Strategy

1. Start with leaf services (no dependencies)
2. Move stateless services first
3. Data-heavy services last
4. Keep database migrations separate from code migrations

Common Pitfalls

PitfallSolution
Big Bang rewriteNever! Incremental migration only
Distributed monolithDefine clear boundaries, no circular dependencies
Data duplication issuesUse CDC or event sourcing
Testing gapsContract testing between services

Problem 3: Zero-Downtime Database Migration

The Scenario

"We need to migrate from MySQL to PostgreSQL (or change schema significantly) with zero downtime. We have 1TB of data and 10K QPS."

Solution: Online Migration Pattern

Phase 1: Set Up Replication

Phase 2: Backfill Historical Data

# Batch copy historical data (off-peak hours)
def backfill():
batch_size = 10000
offset = 0

while True:
rows = mysql.query(f"SELECT * FROM users LIMIT {batch_size} OFFSET {offset}")
if not rows:
break

postgres.bulk_insert("users", rows)
offset += batch_size

# Rate limit to avoid overwhelming source
time.sleep(0.1)

Phase 3: Catch-Up with CDC

# Debezium captures all changes during backfill
# Consumer applies changes to PostgreSQL
def process_change_event(event):
if event.operation == "INSERT":
postgres.insert(event.table, event.after)
elif event.operation == "UPDATE":
postgres.update(event.table, event.after, event.key)
elif event.operation == "DELETE":
postgres.delete(event.table, event.key)

Phase 4: Shadow Reads (Validate)

def get_user(user_id):
mysql_result = mysql.get_user(user_id)
postgres_result = postgres.get_user(user_id)

# Log differences (don't fail)
if mysql_result != postgres_result:
log.warn(f"Mismatch for user {user_id}")
metrics.increment("migration.mismatch")

return mysql_result # Still return MySQL result

Phase 5: Cutover

# Feature flag for cutover
def get_user(user_id):
if feature_flag("use_postgres"):
return postgres.get_user(user_id)
else:
return mysql.get_user(user_id)

Tools

ToolPurpose
DebeziumCDC for MySQL/PostgreSQL
AWS DMSManaged database migration
gh-ostOnline MySQL schema migration
pgloaderMySQL to PostgreSQL migration

Problem 4: Handling Hot Partitions / Hot Keys

The Scenario

"Our DynamoDB table is heavily throttled because 90% of traffic goes to a few celebrity users. How do we handle hot partitions?"

Solution: Write Sharding + Read Caching

Approach 1: Add Random Suffix (Write Sharding)

# Problem: All writes for celebrity go to same partition
key = f"user:{celebrity_id}" # Hot partition

# Solution: Distribute writes across multiple partitions
def write_event(user_id, event):
shard = random.randint(0, 9)
key = f"user:{user_id}:shard:{shard}"
dynamodb.put_item(key, event)

# Read requires scatter-gather
def read_events(user_id):
events = []
for shard in range(10):
key = f"user:{user_id}:shard:{shard}"
events.extend(dynamodb.query(key))
return sorted(events, key=lambda e: e.timestamp)

Approach 2: Caching Layer

def get_celebrity_timeline(user_id):
# Check cache first (99% hit rate for celebrities)
cached = redis.get(f"timeline:{user_id}")
if cached:
return cached

# Cache miss - query DynamoDB
timeline = dynamodb.query(f"user:{user_id}")

# Cache for 5 minutes
redis.set(f"timeline:{user_id}", timeline, ex=300)
return timeline

Approach 3: Pre-Aggregation

# Instead of storing individual events, pre-aggregate
def update_follower_count(user_id, delta):
# Don't increment on every follow
# Batch updates every minute
redis.incr(f"follower_delta:{user_id}", delta)

# Cron job every minute
def flush_follower_counts():
for key in redis.scan("follower_delta:*"):
user_id = key.split(":")[1]
delta = redis.getdel(key)
dynamodb.update(f"user:{user_id}", "follower_count", delta)

Problem 5: API Versioning Migration

The Scenario

"We need to deprecate API v1 and migrate all clients to v2. We have 1000+ clients, some we don't control. How do we do this gracefully?"

Solution: Gradual Deprecation

Phase 1: Announce Deprecation (3-6 months ahead)

HTTP/1.1 200 OK
Deprecation: Sun, 01 Jun 2025 00:00:00 GMT
Sunset: Sun, 01 Dec 2025 00:00:00 GMT
Link: <https://api.example.com/v2/users>; rel="successor-version"
X-API-Warn: "API v1 is deprecated. Please migrate to v2."

Phase 2: Track Usage

def api_v1_handler(request):
# Log who's still using v1
metrics.increment("api.v1.requests", tags={
"client_id": request.headers.get("X-Client-ID"),
"endpoint": request.path,
"user_agent": request.headers.get("User-Agent")
})

return handle_request(request)

Phase 3: Provide Migration Tools

# Offer adapter that translates v1 → v2
class V1ToV2Adapter:
def translate_request(self, v1_request):
return {
"user_id": v1_request["userId"], # camelCase → snake_case
"email": v1_request["emailAddress"],
}

def translate_response(self, v2_response):
return {
"userId": v2_response["user_id"],
"emailAddress": v2_response["email"],
}

Phase 4: Gradual Throttling

def api_v1_handler(request):
# Progressively reduce v1 rate limits
remaining_days = (sunset_date - today()).days

if remaining_days < 30:
rate_limit = 100 # Reduced from 1000
elif remaining_days < 7:
rate_limit = 10 # Severely limited

if not check_rate_limit(request.client_id, rate_limit):
return Response(429, "Please migrate to API v2")

Phase 5: Final Sunset

def api_v1_handler(request):
return Response(
410, # Gone
{
"error": "API v1 has been sunset",
"migration_guide": "https://docs.example.com/v2-migration",
"v2_endpoint": "https://api.example.com/v2"
}
)

Problem 6: Handling Cascading Failures

The Scenario

"Our payment service went down for 5 minutes, but it caused our entire e-commerce site to be unavailable for 2 hours. How do we prevent this?"

Solution: Bulkhead + Circuit Breaker + Graceful Degradation

Root Cause Analysis

Problem: Payment timeout (30s) × concurrent requests = thread pool exhaustion

Solution 1: Circuit Breaker

from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=60)
def call_payment_service(order):
return http.post("http://payment-service/charge", order, timeout=5)

def checkout(order):
try:
payment_result = call_payment_service(order)
except CircuitBreakerError:
# Circuit is open - fail fast
return {"status": "pending", "message": "Payment processing delayed"}

Solution 2: Bulkhead (Separate Thread Pools)

# Separate thread pools for critical vs non-critical
critical_pool = ThreadPoolExecutor(max_workers=100) # Cart, checkout
non_critical_pool = ThreadPoolExecutor(max_workers=20) # Recommendations

def checkout(order):
future = critical_pool.submit(process_checkout, order)
return future.result(timeout=10)

def get_recommendations(user_id):
future = non_critical_pool.submit(fetch_recommendations, user_id)
return future.result(timeout=5)

Solution 3: Graceful Degradation

def checkout(order):
try:
# Try to charge immediately
payment = payment_service.charge(order, timeout=5)
return {"status": "completed", "payment_id": payment.id}
except (TimeoutError, CircuitBreakerError):
# Fallback: Queue for later processing
queue.enqueue("pending_payments", order)
return {
"status": "pending",
"message": "Your order is confirmed. Payment processing shortly."
}

Solution 4: Timeout Hierarchy

# Timeouts should cascade: inner < outer
PAYMENT_TIMEOUT = 3 # Service call
CHECKOUT_TIMEOUT = 5 # Business logic
REQUEST_TIMEOUT = 10 # HTTP request

def checkout_handler(request):
with timeout(REQUEST_TIMEOUT):
with timeout(CHECKOUT_TIMEOUT):
payment = call_payment(timeout=PAYMENT_TIMEOUT)

Problem 7: Data Consistency in Distributed Transactions

The Scenario

"When a user places an order, we need to: 1) Create order, 2) Deduct inventory, 3) Charge payment, 4) Send confirmation email. If any step fails, we need to roll back. How?"

Solution: Saga Pattern with Compensation

Implementation

class OrderSaga:
def __init__(self, order_data):
self.order_data = order_data
self.completed_steps = []

def execute(self):
try:
# Step 1: Create order
order = self.create_order()
self.completed_steps.append(("order", order.id))

# Step 2: Reserve inventory
reservation = self.reserve_inventory()
self.completed_steps.append(("inventory", reservation.id))

# Step 3: Charge payment
payment = self.charge_payment()
self.completed_steps.append(("payment", payment.id))

# Step 4: Send email (no compensation needed)
self.send_confirmation()

return {"status": "success", "order_id": order.id}

except Exception as e:
self.compensate()
return {"status": "failed", "error": str(e)}

def compensate(self):
# Reverse order of completed steps
for step_type, step_id in reversed(self.completed_steps):
if step_type == "payment":
payment_service.refund(step_id)
elif step_type == "inventory":
inventory_service.release(step_id)
elif step_type == "order":
order_service.cancel(step_id)

Problem 8: Real-Time Leaderboard at Scale

The Scenario

"We're building a mobile game with 100M players. We need a real-time leaderboard showing top 100 and each player's rank. Updates happen 10K times/second."

Solution: Redis Sorted Set + Caching

import redis

class Leaderboard:
def __init__(self):
self.redis = redis.Redis()
self.key = "game:leaderboard"

def update_score(self, player_id, score):
# O(log N) - very fast even with 100M players
self.redis.zadd(self.key, {player_id: score})

def get_top_100(self):
# O(log N + 100)
return self.redis.zrevrange(self.key, 0, 99, withscores=True)

def get_rank(self, player_id):
# O(log N)
rank = self.redis.zrevrank(self.key, player_id)
return rank + 1 if rank is not None else None

def get_nearby_players(self, player_id, count=10):
# Get players around current player's rank
rank = self.redis.zrevrank(self.key, player_id)
start = max(0, rank - count // 2)
return self.redis.zrevrange(self.key, start, start + count, withscores=True)

Optimization for Extreme Scale

# Shard by score range for extreme scale
class ShardedLeaderboard:
def __init__(self):
self.shards = [
("top_100", 0, 100),
("rank_101_1000", 101, 1000),
("rank_1001_10000", 1001, 10000),
]

def update_score(self, player_id, score):
# Write to appropriate shard
# Periodically rebalance between shards
pass

Problem 9: Feature Flags at Scale

The Scenario

"We want to gradually roll out a new checkout flow to 1% → 10% → 50% → 100% of users. How do we implement feature flags that work across all services?"

Solution: Centralized Feature Flag Service

class FeatureFlagService:
def __init__(self):
self.redis = redis.Redis()

def is_enabled(self, flag_name, user_id=None, default=False):
flag = self.get_flag(flag_name)

if flag is None:
return default

if flag["type"] == "boolean":
return flag["enabled"]

elif flag["type"] == "percentage":
# Consistent hashing - same user always gets same result
hash_value = hash(f"{flag_name}:{user_id}") % 100
return hash_value < flag["percentage"]

elif flag["type"] == "user_list":
return user_id in flag["users"]

elif flag["type"] == "segment":
user = self.get_user(user_id)
return self.matches_segment(user, flag["segment"])

def get_flag(self, flag_name):
# Cache flag config (refresh every 30s)
cached = self.local_cache.get(flag_name)
if cached:
return cached

flag = self.redis.hgetall(f"flag:{flag_name}")
self.local_cache.set(flag_name, flag, ttl=30)
return flag

Usage

def checkout(user_id, cart):
if feature_flags.is_enabled("new_checkout_flow", user_id):
return new_checkout_flow(user_id, cart)
else:
return legacy_checkout_flow(user_id, cart)

Gradual Rollout

# Day 1: 1% of users
feature_flags.update("new_checkout_flow", {"type": "percentage", "percentage": 1})

# Day 3: 10% (if metrics look good)
feature_flags.update("new_checkout_flow", {"type": "percentage", "percentage": 10})

# Day 7: 50%
feature_flags.update("new_checkout_flow", {"type": "percentage", "percentage": 50})

# Day 14: 100% (full rollout)
feature_flags.update("new_checkout_flow", {"type": "boolean", "enabled": True})

Problem 10: Multi-Region Active-Active

The Scenario

"We need our application to be active in both US and EU regions. Users should write to the nearest region. How do we handle conflicts?"

Solution: CRDTs + Last-Write-Wins

Conflict Resolution Strategies

# Strategy 1: Last-Write-Wins (Simple)
class LWWRegister:
def __init__(self):
self.value = None
self.timestamp = 0

def update(self, value, timestamp):
if timestamp > self.timestamp:
self.value = value
self.timestamp = timestamp

def merge(self, other):
if other.timestamp > self.timestamp:
self.value = other.value
self.timestamp = other.timestamp

# Strategy 2: CRDT Counter (Add-only)
class GCounter:
def __init__(self, node_id):
self.node_id = node_id
self.counts = {} # node_id → count

def increment(self):
self.counts[self.node_id] = self.counts.get(self.node_id, 0) + 1

def value(self):
return sum(self.counts.values())

def merge(self, other):
for node_id, count in other.counts.items():
self.counts[node_id] = max(self.counts.get(node_id, 0), count)

When to Use What

Data TypeStrategyExample
User profileLast-write-winsEmail, name updates
Shopping cartSet unionItems in cart
Like countCRDT CounterPost likes
CommentsOrdered logAppend-only
InventoryReservation systemDon't use CRDT (needs coordination)

Quick Reference: Migration Patterns

PatternUse CaseRisk Level
Strangler FigReplace monolith incrementallyLow
Blue-GreenZero-downtime deploymentMedium
CanaryGradual rollout (1% → 100%)Low
Shadow TrafficTest new system with prod trafficLow
Dual WritesDatabase migrationMedium
CDCReal-time data syncMedium
Feature FlagsControl rolloutLow

Interview Tips for These Problems

  1. Ask clarifying questions: Constraints, SLAs, timeline
  2. Start with simplest solution: Then add complexity
  3. Discuss rollback plan: Always have an escape hatch
  4. Mention monitoring: How will you know it's working?
  5. Consider partial failures: What if step 3 of 5 fails?
  6. Time-box phases: Give realistic estimates

Congratulations! You now have real-world problem-solving skills for senior interviews. These scenarios are exactly what Microsoft, Google, and Meta ask for Staff+ roles.

Practice: Take each problem and explain your solution out loud in 10 minutes.


Next: Advanced Scenarios - GDPR deletion, thundering herd, split-brain, debugging mysteries, and incident postmortems.