Skip to main content

PII Leaked in Logs

The Interview Question

"During a security audit, we discovered that we've been logging credit card numbers, SSNs, and passwords in plain text for 2 years. The logs are in Elasticsearch, S3 backups, and have been shipped to three third-party analytics tools. What do we do?"

Asked at: Any company handling sensitive data, especially fintech, healthcare

Time to solve: 35-40 minutes

Difficulty: ⭐⭐⭐⭐ (Senior, with compliance knowledge)


Clarifying Questions to Ask

  1. "What compliance frameworks apply?" → PCI-DSS, GDPR, HIPAA, SOC 2?
  2. "How much data volume?" → Affects remediation time
  3. "What third parties have the data?" → Legal and contractual obligations
  4. "Do we have data retention policies?" → Older logs might auto-delete
  5. "Has there been unauthorized access?" → Determines if it's a breach

Immediate Actions (First 24 Hours)

Hour 0-1: Containment

# 1. STOP THE BLEEDING - Deploy emergency log filter
class EmergencyLogFilter:
SENSITIVE_PATTERNS = [
r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', # Credit card
r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b', # SSN
r'password["\s:=]+\S+', # Password in logs
r'ssn["\s:=]+\S+',
r'credit_card["\s:=]+\S+',
]

def filter(self, record):
message = record.getMessage()
for pattern in self.SENSITIVE_PATTERNS:
message = re.sub(pattern, '[REDACTED]', message, flags=re.IGNORECASE)
record.msg = message
return True

# Apply to ALL loggers immediately
import logging
for logger_name in logging.root.manager.loggerDict:
logging.getLogger(logger_name).addFilter(EmergencyLogFilter())
# 2. Disable log shipping to third parties
# Stop Datadog agent
systemctl stop datadog-agent

# Stop Splunk forwarder
systemctl stop SplunkForwarder

# Pause Elasticsearch Logstash pipeline
curl -X POST "localhost:9200/_logstash/_pause"

Hour 1-4: Assessment

# Scan existing logs to understand scope
import re
from pathlib import Path

def scan_for_pii(log_path: Path):
"""
Scan logs to assess exposure scope.
"""
patterns = {
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ssn': r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'password': r'(password|passwd|pwd)["\s:=]+\S+',
}

findings = {pii_type: [] for pii_type in patterns}

for log_file in log_path.glob('**/*.log*'):
with open(log_file, 'r', errors='ignore') as f:
for line_num, line in enumerate(f):
for pii_type, pattern in patterns.items():
if re.search(pattern, line, re.IGNORECASE):
findings[pii_type].append({
'file': str(log_file),
'line': line_num,
'sample': line[:100] + '...' # Don't log full PII!
})

return findings

# Generate report
findings = scan_for_pii(Path('/var/log/app'))
for pii_type, instances in findings.items():
print(f"{pii_type}: {len(instances)} instances found")
notification_checklist:
internal:
- [ ] CISO / Security team
- [ ] Legal / Compliance team
- [ ] DPO (Data Protection Officer) if GDPR applies
- [ ] Executive leadership

external_if_required:
- [ ] PCI Council (if cardholder data) - 24 hour requirement
- [ ] Affected individuals (GDPR: 72 hours if breach)
- [ ] Regulatory bodies (varies by jurisdiction)
- [ ] Third-party processors (contractual obligations)

Data Remediation

Step 1: Delete from Elasticsearch

# Delete documents containing PII
from elasticsearch import Elasticsearch

es = Elasticsearch(['localhost:9200'])

# Query for documents with PII patterns
query = {
"query": {
"bool": {
"should": [
{"regexp": {"message": ".*\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}.*"}},
{"regexp": {"message": ".*password.*:.*"}},
# ... more patterns
]
}
}
}

# Delete by query
result = es.delete_by_query(
index="application-logs-*",
body=query,
wait_for_completion=False, # Async for large datasets
conflicts="proceed"
)

print(f"Task ID: {result['task']}")

# Monitor progress
task_status = es.tasks.get(task_id=result['task'])

Step 2: Delete from S3 Backups

import boto3
import re

s3 = boto3.client('s3')
BUCKET = 'logs-backup'

def delete_pii_from_s3():
"""
For compliance, you might need to:
1. Delete objects with PII
2. Or overwrite with redacted versions
3. Document everything for audit trail
"""
paginator = s3.get_paginator('list_objects_v2')

for page in paginator.paginate(Bucket=BUCKET, Prefix='logs/'):
for obj in page.get('Contents', []):
key = obj['Key']

# Download, scan, redact, re-upload (or delete)
response = s3.get_object(Bucket=BUCKET, Key=key)
content = response['Body'].read().decode('utf-8')

if contains_pii(content):
# Option 1: Delete entirely
s3.delete_object(Bucket=BUCKET, Key=key)
log_deletion(key)

# Option 2: Redact and re-upload
# redacted = redact_pii(content)
# s3.put_object(Bucket=BUCKET, Key=key, Body=redacted)

print(f"Processed: {key}")

def contains_pii(content):
patterns = [
r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b',
]
return any(re.search(p, content) for p in patterns)

Step 3: Request Deletion from Third Parties

## Third-Party Data Deletion Request Template

Dear [Vendor],

Under our Data Processing Agreement dated [DATE], we are invoking
Section [X] regarding data deletion.

We have identified that logs shipped to your platform between
[START_DATE] and [END_DATE] contained personal data that was
inadvertently included. Specifically:
- Log indices/streams: [LIST]
- Data types affected: Credit card numbers, SSN
- Approximate volume: [X] GB

We request:
1. Immediate deletion of all affected data
2. Deletion from any backups within [TIMEFRAME]
3. Written confirmation of deletion
4. Confirmation that data was not shared with sub-processors

Please confirm receipt and provide deletion confirmation within
72 hours per our SLA.

Regards,
[Your Name]
[DPO/Security Officer]

Prevention: Proper Logging Architecture

Structured Logging with Auto-Redaction

import logging
import json
import re
from dataclasses import dataclass, asdict

@dataclass
class LogContext:
user_id: str
request_id: str
action: str
# Never add PII fields here!

class SecureJSONFormatter(logging.Formatter):
"""
JSON formatter that automatically redacts sensitive data.
"""
REDACT_PATTERNS = {
'credit_card': (r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CC REDACTED]'),
'ssn': (r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b', '[SSN REDACTED]'),
'password': (r'(password|passwd|pwd)["\s:=]+\S+', r'\1=[REDACTED]'),
'bearer_token': (r'Bearer\s+\S+', 'Bearer [REDACTED]'),
'api_key': (r'(api[_-]?key)["\s:=]+\S+', r'\1=[REDACTED]'),
}

REDACT_KEYS = {'password', 'ssn', 'credit_card', 'card_number',
'cvv', 'secret', 'token', 'api_key'}

def format(self, record):
log_entry = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': self._redact_message(record.getMessage()),
}

# Add extra fields, redacting sensitive ones
if hasattr(record, 'context'):
log_entry['context'] = self._redact_dict(asdict(record.context))

return json.dumps(log_entry)

def _redact_message(self, message: str) -> str:
for pattern, replacement in self.REDACT_PATTERNS.values():
message = re.sub(pattern, replacement, message, flags=re.IGNORECASE)
return message

def _redact_dict(self, data: dict) -> dict:
result = {}
for key, value in data.items():
if key.lower() in self.REDACT_KEYS:
result[key] = '[REDACTED]'
elif isinstance(value, dict):
result[key] = self._redact_dict(value)
elif isinstance(value, str):
result[key] = self._redact_message(value)
else:
result[key] = value
return result

# Usage
logger = logging.getLogger('app')
handler = logging.StreamHandler()
handler.setFormatter(SecureJSONFormatter())
logger.addHandler(handler)

# This will automatically redact
logger.info("Processing payment for card 4111-1111-1111-1111")
# Output: "Processing payment for card [CC REDACTED]"

Separate Audit Logs with Encryption

import logging
from cryptography.fernet import Fernet

class EncryptedAuditLogger:
"""
For audit trails that NEED sensitive data,
encrypt before storing.
"""
def __init__(self, key: bytes):
self.fernet = Fernet(key)
self.logger = logging.getLogger('audit')

def log_sensitive_action(self, action: str, user_id: str,
sensitive_data: dict):
"""
Encrypt sensitive data, store separately from regular logs.
"""
encrypted = self.fernet.encrypt(
json.dumps(sensitive_data).encode()
)

# Store reference, not the data
audit_entry = {
'action': action,
'user_id': user_id,
'sensitive_data_ref': encrypted.decode(),
'timestamp': datetime.utcnow().isoformat()
}

# Store in separate, access-controlled audit log
self.logger.info(json.dumps(audit_entry))

Log Sampling for High-Volume

import random
import hashlib

class SampledLogger:
"""
For high-volume logs, sample based on user ID
to reduce exposure while maintaining debuggability.
"""
def __init__(self, sample_rate: float = 0.01):
self.sample_rate = sample_rate
self.logger = logging.getLogger('sampled')

def should_log(self, user_id: str) -> bool:
# Deterministic sampling based on user ID
# Same user always sampled or not (for debugging sessions)
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
return (hash_val % 100) < (self.sample_rate * 100)

def log(self, user_id: str, message: str, **kwargs):
if self.should_log(user_id):
self.logger.info(message, extra={'user_id': user_id, **kwargs})

Compliance Quick Reference

RegulationNotification DeadlineKey Requirements
GDPR72 hoursNotify DPA if breach affects EU residents
PCI-DSS24 hoursImmediate containment, forensic investigation
HIPAA60 daysNotify HHS, affected individuals
CCPA"Expeditiously"Notify California AG if 500+ residents
SOC 2Per policyDocument incident, notify auditor

Key Takeaways

  1. Stop immediately - Block new PII from being logged
  2. Assess scope - Understand what, where, how long
  3. Notify stakeholders - Legal, compliance, potentially regulators
  4. Remediate everywhere - Primary, backups, third parties
  5. Prevent recurrence - Auto-redaction, code reviews, scanning
  6. Document everything - For audits and legal protection

Remember: The cover-up is always worse than the crime. Document thoroughly, notify appropriately, and fix properly.