Partial Batch Job Failure
The Interview Question
"Our nightly data pipeline processed 9 million of 10 million records, then crashed. We can't easily tell which records were processed. How do we recover without duplicating or missing data?"
Asked at: Data engineering roles at Amazon, Netflix, Airbnb, any company with batch processing
Time to solve: 30-35 minutes
Difficulty: ⭐⭐⭐⭐ (Senior Data Engineer)
Clarifying Questions to Ask
- "What kind of processing?" → ETL, aggregations, external API calls?
- "Is the process idempotent?" → Can we safely re-run?
- "Where's the checkpoint/state?" → Any progress tracking?
- "What's the downstream impact?" → Reports wrong? Payments failed?
- "Time pressure?" → Need to fix before business opens?
The Problem Visualized
The challenges:
- Which 1M records weren't processed?
- Did any of the 9M have errors?
- Will re-running create duplicates?
Solution 1: Idempotent Processing
Make processing repeatable without side effects:
from datetime import date
from hashlib import md5
class IdempotentBatchProcessor:
"""
Process records idempotently using upsert pattern.
Safe to re-run any time.
"""
def process_batch(self, records):
for record in records:
# Create deterministic ID
record_id = self.create_idempotency_key(record)
# Upsert: Insert or update on conflict
self.target_db.execute("""
INSERT INTO processed_records
(id, data, processed_at, batch_date)
VALUES (?, ?, NOW(), ?)
ON CONFLICT (id, batch_date) DO UPDATE SET
data = EXCLUDED.data,
processed_at = NOW()
""", record_id, record.data, date.today())
def create_idempotency_key(self, record):
"""
Deterministic key based on record content.
Same record always gets same key.
"""
key_parts = f"{record.source_id}:{record.date}:{record.type}"
return md5(key_parts.encode()).hexdigest()
Recovery:
# Just re-run the entire batch!
# Already-processed records will be updated (no-op)
# Missed records will be inserted
processor.process_batch(all_10_million_records)
Solution 2: Checkpointing
Track progress and resume from failure point:
class CheckpointedBatchProcessor:
def __init__(self, job_id: str):
self.job_id = job_id
self.checkpoint_store = RedisCheckpointStore()
def process_batch(self, records):
# Get last checkpoint
last_checkpoint = self.checkpoint_store.get(self.job_id)
start_offset = last_checkpoint.offset if last_checkpoint else 0
for i, record in enumerate(records[start_offset:], start=start_offset):
try:
self.process_record(record)
# Checkpoint every 1000 records
if i % 1000 == 0:
self.checkpoint_store.save(self.job_id, CheckPoint(
offset=i,
timestamp=datetime.now(),
last_record_id=record.id
))
except Exception as e:
# Save checkpoint at failure point
self.checkpoint_store.save(self.job_id, CheckPoint(
offset=i,
error=str(e),
status='failed'
))
raise
# Mark job complete
self.checkpoint_store.save(self.job_id, CheckPoint(
offset=len(records),
status='complete'
))
Recovery:
# Resume from checkpoint
processor = CheckpointedBatchProcessor(job_id="nightly-2024-01-15")
processor.process_batch(records) # Automatically resumes from failure point
Solution 3: Outbox Pattern with Status Tracking
Track each record's processing status:
class OutboxBatchProcessor:
def prepare_batch(self, batch_id: str, records):
"""
Step 1: Create outbox entries for all records.
"""
for record in records:
self.db.execute("""
INSERT INTO processing_outbox
(batch_id, record_id, status, created_at)
VALUES (?, ?, 'pending', NOW())
ON CONFLICT DO NOTHING
""", batch_id, record.id)
def process_batch(self, batch_id: str):
"""
Step 2: Process pending records.
"""
while True:
# Get next pending record
pending = self.db.execute("""
SELECT record_id FROM processing_outbox
WHERE batch_id = ? AND status = 'pending'
LIMIT 1
FOR UPDATE SKIP LOCKED
""", batch_id).fetchone()
if not pending:
break
try:
record = self.fetch_record(pending.record_id)
self.process_record(record)
self.db.execute("""
UPDATE processing_outbox
SET status = 'complete', completed_at = NOW()
WHERE batch_id = ? AND record_id = ?
""", batch_id, pending.record_id)
except Exception as e:
self.db.execute("""
UPDATE processing_outbox
SET status = 'failed', error = ?, failed_at = NOW()
WHERE batch_id = ? AND record_id = ?
""", str(e), batch_id, pending.record_id)
def retry_failed(self, batch_id: str):
"""
Step 3: Retry failed records.
"""
self.db.execute("""
UPDATE processing_outbox
SET status = 'pending', error = NULL
WHERE batch_id = ? AND status = 'failed'
""", batch_id)
self.process_batch(batch_id)
Recovery:
# Check what failed
failed = db.query("""
SELECT record_id, error FROM processing_outbox
WHERE batch_id = 'nightly-2024-01-15' AND status = 'failed'
""")
print(f"Failed records: {len(failed)}")
# Retry just the failed ones
processor.retry_failed("nightly-2024-01-15")
Solution 4: Incremental Processing with Watermarks
For streaming-style batch processing:
class WatermarkProcessor:
"""
Process records based on event time watermarks.
Safe to re-process same time window.
"""
def process_window(self, start_time, end_time):
# Get watermark (last successfully processed time)
watermark = self.get_watermark()
if start_time < watermark:
# Already processed this window - skip or warn
if self.is_reprocessing_safe():
logger.info(f"Re-processing window {start_time} to {end_time}")
else:
raise ValueError(f"Window {start_time} already processed")
# Get records in time window
records = self.source.query("""
SELECT * FROM events
WHERE event_time >= ? AND event_time < ?
ORDER BY event_time
""", start_time, end_time)
# Process with idempotent writes
for record in records:
self.process_record(record)
# Advance watermark
self.set_watermark(end_time)
def get_watermark(self):
return self.state_store.get('watermark') or datetime.min
def set_watermark(self, time):
self.state_store.set('watermark', time)
Recovery:
# Watermark is at 3:45 AM (when it crashed)
# Re-run from last watermark
processor.process_window(
start_time=processor.get_watermark(), # 3:45 AM
end_time=datetime(2024, 1, 16, 0, 0) # End of day
)
Solution 5: Two-Phase Processing
Separate identification from processing:
class TwoPhaseProcessor:
def phase1_identify(self, batch_id: str, source_query):
"""
Phase 1: Identify all records to process (fast).
Create manifest of work.
"""
record_ids = self.source.query(source_query)
manifest = {
'batch_id': batch_id,
'record_count': len(record_ids),
'record_ids': record_ids,
'created_at': datetime.now(),
'status': 'identified'
}
self.store_manifest(manifest)
return manifest
def phase2_process(self, batch_id: str):
"""
Phase 2: Process each record in manifest.
Track progress in manifest.
"""
manifest = self.get_manifest(batch_id)
processed = set(manifest.get('processed_ids', []))
for record_id in manifest['record_ids']:
if record_id in processed:
continue # Already done
try:
record = self.fetch_record(record_id)
self.process_record(record)
processed.add(record_id)
self.update_manifest(batch_id, {'processed_ids': list(processed)})
except Exception as e:
self.update_manifest(batch_id, {
'last_error': str(e),
'failed_at': datetime.now()
})
raise
self.update_manifest(batch_id, {'status': 'complete'})
Recovery:
# Manifest exists with partial progress
# Just resume phase 2
processor.phase2_process("batch-2024-01-15")
Comparison of Approaches
| Approach | Recovery Ease | Performance | Complexity | Use When |
|---|---|---|---|---|
| Idempotent | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐ | Can upsert, performance OK |
| Checkpointing | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | Large datasets, fast processing |
| Outbox | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | Need per-record tracking |
| Watermark | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | Time-based data |
| Two-Phase | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | Complex dependencies |
Production-Ready Batch Job Template
class ProductionBatchJob:
"""
Template for reliable batch processing.
"""
def __init__(self, job_name: str):
self.job_name = job_name
self.batch_id = f"{job_name}-{date.today().isoformat()}"
self.metrics = MetricsClient()
def run(self):
try:
self.metrics.gauge('batch_job_status', 1, tags={'job': self.job_name})
with self.distributed_lock(self.batch_id):
# Prevent concurrent runs
if self.is_already_complete():
logger.info(f"Batch {self.batch_id} already complete")
return
records = self.fetch_records()
self.metrics.gauge('batch_job_records', len(records))
processed = 0
failed = 0
for record in tqdm(records):
try:
self.process_record_idempotent(record)
processed += 1
except Exception as e:
failed += 1
self.handle_failure(record, e)
if processed % 10000 == 0:
self.checkpoint(processed)
self.metrics.counter('batch_records_processed', processed)
self.metrics.counter('batch_records_failed', failed)
self.mark_complete()
except Exception as e:
self.metrics.gauge('batch_job_status', 0)
self.alert_on_call(f"Batch job {self.job_name} failed: {e}")
raise
finally:
self.metrics.gauge('batch_job_status', 0)
@contextmanager
def distributed_lock(self, key):
lock = self.redis.lock(f"batch_lock:{key}", timeout=3600)
if not lock.acquire(blocking=False):
raise RuntimeError(f"Batch {key} is already running")
try:
yield
finally:
lock.release()
Key Takeaways
- Design for failure - Assume the job will crash mid-way
- Idempotency is king - Safe re-runs solve most problems
- Track progress explicitly - Know exactly where you stopped
- Test recovery - Actually kill jobs mid-run and verify recovery
- Alert quickly - Know about failures before business does
- Document runbooks - 3 AM you needs clear instructions
Golden rule: Every batch job should be safe to kill with kill -9 at any point and restart without data loss or duplication.