Real-World System Design Problems
TL;DR
These are the "how would you solve this?" problems that appear in senior interviews. Unlike "design X from scratch", these test your ability to evolve existing systems with constraints like zero downtime, backward compatibility, and technical debt.
Why This Matters
In interviews: Senior/Staff roles at Microsoft, Google, and Meta focus heavily on these scenarios. They test judgment, not just knowledge.
At work: You'll spend more time evolving systems than building from scratch.
Problem 1: Shared Database Between Microservices
The Scenario
"We have 5 microservices that all share a single PostgreSQL database. This causes tight coupling - changes to one service break others. How do we fix this with zero downtime?"
Why It's a Problem
- Tight coupling: Schema changes affect all services
- No isolation: One service can corrupt another's data
- Scaling bottleneck: Can't scale databases independently
- Deployment risk: Can't deploy services independently
Solution: Strangler Fig Pattern + Database Per Service
Phase 1: Identify Boundaries (1-2 weeks)
Analyze which tables belong to which service:
- users, auth_tokens → Auth Service
- orders, order_items → Order Service
- products, inventory → Product Service
- payments, refunds → Payment Service
- notifications, templates → Notification Service
Phase 2: Create Service APIs (2-4 weeks)
# Before: Direct database access
def get_user(user_id):
return db.query("SELECT * FROM users WHERE id = ?", user_id)
# After: API call
def get_user(user_id):
return http.get(f"http://auth-service/api/users/{user_id}")
Phase 3: Dual Writes (2-4 weeks)
# Dual write pattern
def create_user(user_data):
# Write to both databases
old_db.insert("users", user_data)
new_db.insert("users", user_data)
# Compare and alert if mismatch
verify_sync(user_data.id)
Phase 4: Read from New, Write to Both (2 weeks)
def get_user(user_id):
return new_db.query("SELECT * FROM users WHERE id = ?", user_id)
def create_user(user_data):
new_db.insert("users", user_data)
old_db.insert("users", user_data) # Still write to old for safety
Phase 5: Cut Over (1 week)
Key Techniques
| Technique | Purpose |
|---|---|
| Strangler Fig | Gradually replace old with new |
| Dual Writes | Maintain consistency during migration |
| Feature Flags | Control rollout, easy rollback |
| Shadow Reads | Compare old vs new before cutover |
| CDC (Change Data Capture) | Sync data between databases |
Rollback Plan
# Feature flag controls which database to use
if feature_flag("use_new_auth_db"):
return new_db.get_user(user_id)
else:
return old_db.get_user(user_id)
# Rollback = flip the flag
Problem 2: Monolith to Microservices Migration
The Scenario
"We have a 10-year-old monolith (500K lines of code) that's becoming impossible to maintain. How do we migrate to microservices without rewriting everything?"
Solution: Incremental Strangler Fig
Step 1: Identify Seams
Analyze code for natural boundaries:
- High change frequency modules → Extract first
- Clear domain boundaries → Easy wins
- External integrations → Natural services
- Performance bottlenecks → Isolate and scale
Step 2: Extract One Service at a Time
Step 3: Use Anti-Corruption Layer
# Anti-corruption layer translates between old and new
class AuthAntiCorruptionLayer:
def authenticate(self, credentials):
if feature_flag("use_new_auth"):
return new_auth_service.authenticate(credentials)
else:
return legacy_auth_module.authenticate(credentials)
Migration Order Strategy
1. Start with leaf services (no dependencies)
2. Move stateless services first
3. Data-heavy services last
4. Keep database migrations separate from code migrations
Common Pitfalls
| Pitfall | Solution |
|---|---|
| Big Bang rewrite | Never! Incremental migration only |
| Distributed monolith | Define clear boundaries, no circular dependencies |
| Data duplication issues | Use CDC or event sourcing |
| Testing gaps | Contract testing between services |
Problem 3: Zero-Downtime Database Migration
The Scenario
"We need to migrate from MySQL to PostgreSQL (or change schema significantly) with zero downtime. We have 1TB of data and 10K QPS."
Solution: Online Migration Pattern
Phase 1: Set Up Replication
Phase 2: Backfill Historical Data
# Batch copy historical data (off-peak hours)
def backfill():
batch_size = 10000
offset = 0
while True:
rows = mysql.query(f"SELECT * FROM users LIMIT {batch_size} OFFSET {offset}")
if not rows:
break
postgres.bulk_insert("users", rows)
offset += batch_size
# Rate limit to avoid overwhelming source
time.sleep(0.1)
Phase 3: Catch-Up with CDC
# Debezium captures all changes during backfill
# Consumer applies changes to PostgreSQL
def process_change_event(event):
if event.operation == "INSERT":
postgres.insert(event.table, event.after)
elif event.operation == "UPDATE":
postgres.update(event.table, event.after, event.key)
elif event.operation == "DELETE":
postgres.delete(event.table, event.key)
Phase 4: Shadow Reads (Validate)
def get_user(user_id):
mysql_result = mysql.get_user(user_id)
postgres_result = postgres.get_user(user_id)
# Log differences (don't fail)
if mysql_result != postgres_result:
log.warn(f"Mismatch for user {user_id}")
metrics.increment("migration.mismatch")
return mysql_result # Still return MySQL result
Phase 5: Cutover
# Feature flag for cutover
def get_user(user_id):
if feature_flag("use_postgres"):
return postgres.get_user(user_id)
else:
return mysql.get_user(user_id)
Tools
| Tool | Purpose |
|---|---|
| Debezium | CDC for MySQL/PostgreSQL |
| AWS DMS | Managed database migration |
| gh-ost | Online MySQL schema migration |
| pgloader | MySQL to PostgreSQL migration |
Problem 4: Handling Hot Partitions / Hot Keys
The Scenario
"Our DynamoDB table is heavily throttled because 90% of traffic goes to a few celebrity users. How do we handle hot partitions?"
Solution: Write Sharding + Read Caching
Approach 1: Add Random Suffix (Write Sharding)
# Problem: All writes for celebrity go to same partition
key = f"user:{celebrity_id}" # Hot partition
# Solution: Distribute writes across multiple partitions
def write_event(user_id, event):
shard = random.randint(0, 9)
key = f"user:{user_id}:shard:{shard}"
dynamodb.put_item(key, event)
# Read requires scatter-gather
def read_events(user_id):
events = []
for shard in range(10):
key = f"user:{user_id}:shard:{shard}"
events.extend(dynamodb.query(key))
return sorted(events, key=lambda e: e.timestamp)
Approach 2: Caching Layer
def get_celebrity_timeline(user_id):
# Check cache first (99% hit rate for celebrities)
cached = redis.get(f"timeline:{user_id}")
if cached:
return cached
# Cache miss - query DynamoDB
timeline = dynamodb.query(f"user:{user_id}")
# Cache for 5 minutes
redis.set(f"timeline:{user_id}", timeline, ex=300)
return timeline
Approach 3: Pre-Aggregation
# Instead of storing individual events, pre-aggregate
def update_follower_count(user_id, delta):
# Don't increment on every follow
# Batch updates every minute
redis.incr(f"follower_delta:{user_id}", delta)
# Cron job every minute
def flush_follower_counts():
for key in redis.scan("follower_delta:*"):
user_id = key.split(":")[1]
delta = redis.getdel(key)
dynamodb.update(f"user:{user_id}", "follower_count", delta)
Problem 5: API Versioning Migration
The Scenario
"We need to deprecate API v1 and migrate all clients to v2. We have 1000+ clients, some we don't control. How do we do this gracefully?"
Solution: Gradual Deprecation
Phase 1: Announce Deprecation (3-6 months ahead)
HTTP/1.1 200 OK
Deprecation: Sun, 01 Jun 2025 00:00:00 GMT
Sunset: Sun, 01 Dec 2025 00:00:00 GMT
Link: <https://api.example.com/v2/users>; rel="successor-version"
X-API-Warn: "API v1 is deprecated. Please migrate to v2."
Phase 2: Track Usage
def api_v1_handler(request):
# Log who's still using v1
metrics.increment("api.v1.requests", tags={
"client_id": request.headers.get("X-Client-ID"),
"endpoint": request.path,
"user_agent": request.headers.get("User-Agent")
})
return handle_request(request)
Phase 3: Provide Migration Tools
# Offer adapter that translates v1 → v2
class V1ToV2Adapter:
def translate_request(self, v1_request):
return {
"user_id": v1_request["userId"], # camelCase → snake_case
"email": v1_request["emailAddress"],
}
def translate_response(self, v2_response):
return {
"userId": v2_response["user_id"],
"emailAddress": v2_response["email"],
}
Phase 4: Gradual Throttling
def api_v1_handler(request):
# Progressively reduce v1 rate limits
remaining_days = (sunset_date - today()).days
if remaining_days < 30:
rate_limit = 100 # Reduced from 1000
elif remaining_days < 7:
rate_limit = 10 # Severely limited
if not check_rate_limit(request.client_id, rate_limit):
return Response(429, "Please migrate to API v2")
Phase 5: Final Sunset
def api_v1_handler(request):
return Response(
410, # Gone
{
"error": "API v1 has been sunset",
"migration_guide": "https://docs.example.com/v2-migration",
"v2_endpoint": "https://api.example.com/v2"
}
)
Problem 6: Handling Cascading Failures
The Scenario
"Our payment service went down for 5 minutes, but it caused our entire e-commerce site to be unavailable for 2 hours. How do we prevent this?"
Solution: Bulkhead + Circuit Breaker + Graceful Degradation
Root Cause Analysis
Problem: Payment timeout (30s) × concurrent requests = thread pool exhaustion
Solution 1: Circuit Breaker
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=60)
def call_payment_service(order):
return http.post("http://payment-service/charge", order, timeout=5)
def checkout(order):
try:
payment_result = call_payment_service(order)
except CircuitBreakerError:
# Circuit is open - fail fast
return {"status": "pending", "message": "Payment processing delayed"}
Solution 2: Bulkhead (Separate Thread Pools)
# Separate thread pools for critical vs non-critical
critical_pool = ThreadPoolExecutor(max_workers=100) # Cart, checkout
non_critical_pool = ThreadPoolExecutor(max_workers=20) # Recommendations
def checkout(order):
future = critical_pool.submit(process_checkout, order)
return future.result(timeout=10)
def get_recommendations(user_id):
future = non_critical_pool.submit(fetch_recommendations, user_id)
return future.result(timeout=5)
Solution 3: Graceful Degradation
def checkout(order):
try:
# Try to charge immediately
payment = payment_service.charge(order, timeout=5)
return {"status": "completed", "payment_id": payment.id}
except (TimeoutError, CircuitBreakerError):
# Fallback: Queue for later processing
queue.enqueue("pending_payments", order)
return {
"status": "pending",
"message": "Your order is confirmed. Payment processing shortly."
}
Solution 4: Timeout Hierarchy
# Timeouts should cascade: inner < outer
PAYMENT_TIMEOUT = 3 # Service call
CHECKOUT_TIMEOUT = 5 # Business logic
REQUEST_TIMEOUT = 10 # HTTP request
def checkout_handler(request):
with timeout(REQUEST_TIMEOUT):
with timeout(CHECKOUT_TIMEOUT):
payment = call_payment(timeout=PAYMENT_TIMEOUT)
Problem 7: Data Consistency in Distributed Transactions
The Scenario
"When a user places an order, we need to: 1) Create order, 2) Deduct inventory, 3) Charge payment, 4) Send confirmation email. If any step fails, we need to roll back. How?"
Solution: Saga Pattern with Compensation
Implementation
class OrderSaga:
def __init__(self, order_data):
self.order_data = order_data
self.completed_steps = []
def execute(self):
try:
# Step 1: Create order
order = self.create_order()
self.completed_steps.append(("order", order.id))
# Step 2: Reserve inventory
reservation = self.reserve_inventory()
self.completed_steps.append(("inventory", reservation.id))
# Step 3: Charge payment
payment = self.charge_payment()
self.completed_steps.append(("payment", payment.id))
# Step 4: Send email (no compensation needed)
self.send_confirmation()
return {"status": "success", "order_id": order.id}
except Exception as e:
self.compensate()
return {"status": "failed", "error": str(e)}
def compensate(self):
# Reverse order of completed steps
for step_type, step_id in reversed(self.completed_steps):
if step_type == "payment":
payment_service.refund(step_id)
elif step_type == "inventory":
inventory_service.release(step_id)
elif step_type == "order":
order_service.cancel(step_id)
Problem 8: Real-Time Leaderboard at Scale
The Scenario
"We're building a mobile game with 100M players. We need a real-time leaderboard showing top 100 and each player's rank. Updates happen 10K times/second."
Solution: Redis Sorted Set + Caching
import redis
class Leaderboard:
def __init__(self):
self.redis = redis.Redis()
self.key = "game:leaderboard"
def update_score(self, player_id, score):
# O(log N) - very fast even with 100M players
self.redis.zadd(self.key, {player_id: score})
def get_top_100(self):
# O(log N + 100)
return self.redis.zrevrange(self.key, 0, 99, withscores=True)
def get_rank(self, player_id):
# O(log N)
rank = self.redis.zrevrank(self.key, player_id)
return rank + 1 if rank is not None else None
def get_nearby_players(self, player_id, count=10):
# Get players around current player's rank
rank = self.redis.zrevrank(self.key, player_id)
start = max(0, rank - count // 2)
return self.redis.zrevrange(self.key, start, start + count, withscores=True)
Optimization for Extreme Scale
# Shard by score range for extreme scale
class ShardedLeaderboard:
def __init__(self):
self.shards = [
("top_100", 0, 100),
("rank_101_1000", 101, 1000),
("rank_1001_10000", 1001, 10000),
]
def update_score(self, player_id, score):
# Write to appropriate shard
# Periodically rebalance between shards
pass
Problem 9: Feature Flags at Scale
The Scenario
"We want to gradually roll out a new checkout flow to 1% → 10% → 50% → 100% of users. How do we implement feature flags that work across all services?"
Solution: Centralized Feature Flag Service
class FeatureFlagService:
def __init__(self):
self.redis = redis.Redis()
def is_enabled(self, flag_name, user_id=None, default=False):
flag = self.get_flag(flag_name)
if flag is None:
return default
if flag["type"] == "boolean":
return flag["enabled"]
elif flag["type"] == "percentage":
# Consistent hashing - same user always gets same result
hash_value = hash(f"{flag_name}:{user_id}") % 100
return hash_value < flag["percentage"]
elif flag["type"] == "user_list":
return user_id in flag["users"]
elif flag["type"] == "segment":
user = self.get_user(user_id)
return self.matches_segment(user, flag["segment"])
def get_flag(self, flag_name):
# Cache flag config (refresh every 30s)
cached = self.local_cache.get(flag_name)
if cached:
return cached
flag = self.redis.hgetall(f"flag:{flag_name}")
self.local_cache.set(flag_name, flag, ttl=30)
return flag
Usage
def checkout(user_id, cart):
if feature_flags.is_enabled("new_checkout_flow", user_id):
return new_checkout_flow(user_id, cart)
else:
return legacy_checkout_flow(user_id, cart)
Gradual Rollout
# Day 1: 1% of users
feature_flags.update("new_checkout_flow", {"type": "percentage", "percentage": 1})
# Day 3: 10% (if metrics look good)
feature_flags.update("new_checkout_flow", {"type": "percentage", "percentage": 10})
# Day 7: 50%
feature_flags.update("new_checkout_flow", {"type": "percentage", "percentage": 50})
# Day 14: 100% (full rollout)
feature_flags.update("new_checkout_flow", {"type": "boolean", "enabled": True})
Problem 10: Multi-Region Active-Active
The Scenario
"We need our application to be active in both US and EU regions. Users should write to the nearest region. How do we handle conflicts?"
Solution: CRDTs + Last-Write-Wins
Conflict Resolution Strategies
# Strategy 1: Last-Write-Wins (Simple)
class LWWRegister:
def __init__(self):
self.value = None
self.timestamp = 0
def update(self, value, timestamp):
if timestamp > self.timestamp:
self.value = value
self.timestamp = timestamp
def merge(self, other):
if other.timestamp > self.timestamp:
self.value = other.value
self.timestamp = other.timestamp
# Strategy 2: CRDT Counter (Add-only)
class GCounter:
def __init__(self, node_id):
self.node_id = node_id
self.counts = {} # node_id → count
def increment(self):
self.counts[self.node_id] = self.counts.get(self.node_id, 0) + 1
def value(self):
return sum(self.counts.values())
def merge(self, other):
for node_id, count in other.counts.items():
self.counts[node_id] = max(self.counts.get(node_id, 0), count)
When to Use What
| Data Type | Strategy | Example |
|---|---|---|
| User profile | Last-write-wins | Email, name updates |
| Shopping cart | Set union | Items in cart |
| Like count | CRDT Counter | Post likes |
| Comments | Ordered log | Append-only |
| Inventory | Reservation system | Don't use CRDT (needs coordination) |
Quick Reference: Migration Patterns
| Pattern | Use Case | Risk Level |
|---|---|---|
| Strangler Fig | Replace monolith incrementally | Low |
| Blue-Green | Zero-downtime deployment | Medium |
| Canary | Gradual rollout (1% → 100%) | Low |
| Shadow Traffic | Test new system with prod traffic | Low |
| Dual Writes | Database migration | Medium |
| CDC | Real-time data sync | Medium |
| Feature Flags | Control rollout | Low |
Interview Tips for These Problems
- Ask clarifying questions: Constraints, SLAs, timeline
- Start with simplest solution: Then add complexity
- Discuss rollback plan: Always have an escape hatch
- Mention monitoring: How will you know it's working?
- Consider partial failures: What if step 3 of 5 fails?
- Time-box phases: Give realistic estimates
Congratulations! You now have real-world problem-solving skills for senior interviews. These scenarios are exactly what Microsoft, Google, and Meta ask for Staff+ roles.
Practice: Take each problem and explain your solution out loud in 10 minutes.
Next: Advanced Scenarios - GDPR deletion, thundering herd, split-brain, debugging mysteries, and incident postmortems.