Skip to main content

Partial Batch Job Failure

The Interview Question

"Our nightly data pipeline processed 9 million of 10 million records, then crashed. We can't easily tell which records were processed. How do we recover without duplicating or missing data?"

Asked at: Data engineering roles at Amazon, Netflix, Airbnb, any company with batch processing

Time to solve: 30-35 minutes

Difficulty: ⭐⭐⭐⭐ (Senior Data Engineer)


Clarifying Questions to Ask

  1. "What kind of processing?" → ETL, aggregations, external API calls?
  2. "Is the process idempotent?" → Can we safely re-run?
  3. "Where's the checkpoint/state?" → Any progress tracking?
  4. "What's the downstream impact?" → Reports wrong? Payments failed?
  5. "Time pressure?" → Need to fix before business opens?

The Problem Visualized

The challenges:

  • Which 1M records weren't processed?
  • Did any of the 9M have errors?
  • Will re-running create duplicates?

Solution 1: Idempotent Processing

Make processing repeatable without side effects:

from datetime import date
from hashlib import md5

class IdempotentBatchProcessor:
"""
Process records idempotently using upsert pattern.
Safe to re-run any time.
"""

def process_batch(self, records):
for record in records:
# Create deterministic ID
record_id = self.create_idempotency_key(record)

# Upsert: Insert or update on conflict
self.target_db.execute("""
INSERT INTO processed_records
(id, data, processed_at, batch_date)
VALUES (?, ?, NOW(), ?)
ON CONFLICT (id, batch_date) DO UPDATE SET
data = EXCLUDED.data,
processed_at = NOW()
""", record_id, record.data, date.today())

def create_idempotency_key(self, record):
"""
Deterministic key based on record content.
Same record always gets same key.
"""
key_parts = f"{record.source_id}:{record.date}:{record.type}"
return md5(key_parts.encode()).hexdigest()

Recovery:

# Just re-run the entire batch!
# Already-processed records will be updated (no-op)
# Missed records will be inserted
processor.process_batch(all_10_million_records)

Solution 2: Checkpointing

Track progress and resume from failure point:

class CheckpointedBatchProcessor:
def __init__(self, job_id: str):
self.job_id = job_id
self.checkpoint_store = RedisCheckpointStore()

def process_batch(self, records):
# Get last checkpoint
last_checkpoint = self.checkpoint_store.get(self.job_id)
start_offset = last_checkpoint.offset if last_checkpoint else 0

for i, record in enumerate(records[start_offset:], start=start_offset):
try:
self.process_record(record)

# Checkpoint every 1000 records
if i % 1000 == 0:
self.checkpoint_store.save(self.job_id, CheckPoint(
offset=i,
timestamp=datetime.now(),
last_record_id=record.id
))

except Exception as e:
# Save checkpoint at failure point
self.checkpoint_store.save(self.job_id, CheckPoint(
offset=i,
error=str(e),
status='failed'
))
raise

# Mark job complete
self.checkpoint_store.save(self.job_id, CheckPoint(
offset=len(records),
status='complete'
))

Recovery:

# Resume from checkpoint
processor = CheckpointedBatchProcessor(job_id="nightly-2024-01-15")
processor.process_batch(records) # Automatically resumes from failure point

Solution 3: Outbox Pattern with Status Tracking

Track each record's processing status:

class OutboxBatchProcessor:
def prepare_batch(self, batch_id: str, records):
"""
Step 1: Create outbox entries for all records.
"""
for record in records:
self.db.execute("""
INSERT INTO processing_outbox
(batch_id, record_id, status, created_at)
VALUES (?, ?, 'pending', NOW())
ON CONFLICT DO NOTHING
""", batch_id, record.id)

def process_batch(self, batch_id: str):
"""
Step 2: Process pending records.
"""
while True:
# Get next pending record
pending = self.db.execute("""
SELECT record_id FROM processing_outbox
WHERE batch_id = ? AND status = 'pending'
LIMIT 1
FOR UPDATE SKIP LOCKED
""", batch_id).fetchone()

if not pending:
break

try:
record = self.fetch_record(pending.record_id)
self.process_record(record)

self.db.execute("""
UPDATE processing_outbox
SET status = 'complete', completed_at = NOW()
WHERE batch_id = ? AND record_id = ?
""", batch_id, pending.record_id)

except Exception as e:
self.db.execute("""
UPDATE processing_outbox
SET status = 'failed', error = ?, failed_at = NOW()
WHERE batch_id = ? AND record_id = ?
""", str(e), batch_id, pending.record_id)

def retry_failed(self, batch_id: str):
"""
Step 3: Retry failed records.
"""
self.db.execute("""
UPDATE processing_outbox
SET status = 'pending', error = NULL
WHERE batch_id = ? AND status = 'failed'
""", batch_id)

self.process_batch(batch_id)

Recovery:

# Check what failed
failed = db.query("""
SELECT record_id, error FROM processing_outbox
WHERE batch_id = 'nightly-2024-01-15' AND status = 'failed'
""")
print(f"Failed records: {len(failed)}")

# Retry just the failed ones
processor.retry_failed("nightly-2024-01-15")

Solution 4: Incremental Processing with Watermarks

For streaming-style batch processing:

class WatermarkProcessor:
"""
Process records based on event time watermarks.
Safe to re-process same time window.
"""

def process_window(self, start_time, end_time):
# Get watermark (last successfully processed time)
watermark = self.get_watermark()

if start_time < watermark:
# Already processed this window - skip or warn
if self.is_reprocessing_safe():
logger.info(f"Re-processing window {start_time} to {end_time}")
else:
raise ValueError(f"Window {start_time} already processed")

# Get records in time window
records = self.source.query("""
SELECT * FROM events
WHERE event_time >= ? AND event_time < ?
ORDER BY event_time
""", start_time, end_time)

# Process with idempotent writes
for record in records:
self.process_record(record)

# Advance watermark
self.set_watermark(end_time)

def get_watermark(self):
return self.state_store.get('watermark') or datetime.min

def set_watermark(self, time):
self.state_store.set('watermark', time)

Recovery:

# Watermark is at 3:45 AM (when it crashed)
# Re-run from last watermark
processor.process_window(
start_time=processor.get_watermark(), # 3:45 AM
end_time=datetime(2024, 1, 16, 0, 0) # End of day
)

Solution 5: Two-Phase Processing

Separate identification from processing:

class TwoPhaseProcessor:
def phase1_identify(self, batch_id: str, source_query):
"""
Phase 1: Identify all records to process (fast).
Create manifest of work.
"""
record_ids = self.source.query(source_query)

manifest = {
'batch_id': batch_id,
'record_count': len(record_ids),
'record_ids': record_ids,
'created_at': datetime.now(),
'status': 'identified'
}

self.store_manifest(manifest)
return manifest

def phase2_process(self, batch_id: str):
"""
Phase 2: Process each record in manifest.
Track progress in manifest.
"""
manifest = self.get_manifest(batch_id)
processed = set(manifest.get('processed_ids', []))

for record_id in manifest['record_ids']:
if record_id in processed:
continue # Already done

try:
record = self.fetch_record(record_id)
self.process_record(record)

processed.add(record_id)
self.update_manifest(batch_id, {'processed_ids': list(processed)})

except Exception as e:
self.update_manifest(batch_id, {
'last_error': str(e),
'failed_at': datetime.now()
})
raise

self.update_manifest(batch_id, {'status': 'complete'})

Recovery:

# Manifest exists with partial progress
# Just resume phase 2
processor.phase2_process("batch-2024-01-15")

Comparison of Approaches

ApproachRecovery EasePerformanceComplexityUse When
Idempotent⭐⭐⭐⭐⭐⭐⭐⭐⭐Can upsert, performance OK
Checkpointing⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Large datasets, fast processing
Outbox⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Need per-record tracking
Watermark⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Time-based data
Two-Phase⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐Complex dependencies

Production-Ready Batch Job Template

class ProductionBatchJob:
"""
Template for reliable batch processing.
"""

def __init__(self, job_name: str):
self.job_name = job_name
self.batch_id = f"{job_name}-{date.today().isoformat()}"
self.metrics = MetricsClient()

def run(self):
try:
self.metrics.gauge('batch_job_status', 1, tags={'job': self.job_name})

with self.distributed_lock(self.batch_id):
# Prevent concurrent runs
if self.is_already_complete():
logger.info(f"Batch {self.batch_id} already complete")
return

records = self.fetch_records()
self.metrics.gauge('batch_job_records', len(records))

processed = 0
failed = 0

for record in tqdm(records):
try:
self.process_record_idempotent(record)
processed += 1
except Exception as e:
failed += 1
self.handle_failure(record, e)

if processed % 10000 == 0:
self.checkpoint(processed)

self.metrics.counter('batch_records_processed', processed)
self.metrics.counter('batch_records_failed', failed)
self.mark_complete()

except Exception as e:
self.metrics.gauge('batch_job_status', 0)
self.alert_on_call(f"Batch job {self.job_name} failed: {e}")
raise
finally:
self.metrics.gauge('batch_job_status', 0)

@contextmanager
def distributed_lock(self, key):
lock = self.redis.lock(f"batch_lock:{key}", timeout=3600)
if not lock.acquire(blocking=False):
raise RuntimeError(f"Batch {key} is already running")
try:
yield
finally:
lock.release()

Key Takeaways

  1. Design for failure - Assume the job will crash mid-way
  2. Idempotency is king - Safe re-runs solve most problems
  3. Track progress explicitly - Know exactly where you stopped
  4. Test recovery - Actually kill jobs mid-run and verify recovery
  5. Alert quickly - Know about failures before business does
  6. Document runbooks - 3 AM you needs clear instructions

Golden rule: Every batch job should be safe to kill with kill -9 at any point and restart without data loss or duplication.