PII Leaked in Logs
The Interview Question
"During a security audit, we discovered that we've been logging credit card numbers, SSNs, and passwords in plain text for 2 years. The logs are in Elasticsearch, S3 backups, and have been shipped to three third-party analytics tools. What do we do?"
Asked at: Any company handling sensitive data, especially fintech, healthcare
Time to solve: 35-40 minutes
Difficulty: ⭐⭐⭐⭐ (Senior, with compliance knowledge)
Clarifying Questions to Ask
- "What compliance frameworks apply?" → PCI-DSS, GDPR, HIPAA, SOC 2?
- "How much data volume?" → Affects remediation time
- "What third parties have the data?" → Legal and contractual obligations
- "Do we have data retention policies?" → Older logs might auto-delete
- "Has there been unauthorized access?" → Determines if it's a breach
Immediate Actions (First 24 Hours)
Hour 0-1: Containment
# 1. STOP THE BLEEDING - Deploy emergency log filter
class EmergencyLogFilter:
SENSITIVE_PATTERNS = [
r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', # Credit card
r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b', # SSN
r'password["\s:=]+\S+', # Password in logs
r'ssn["\s:=]+\S+',
r'credit_card["\s:=]+\S+',
]
def filter(self, record):
message = record.getMessage()
for pattern in self.SENSITIVE_PATTERNS:
message = re.sub(pattern, '[REDACTED]', message, flags=re.IGNORECASE)
record.msg = message
return True
# Apply to ALL loggers immediately
import logging
for logger_name in logging.root.manager.loggerDict:
logging.getLogger(logger_name).addFilter(EmergencyLogFilter())
# 2. Disable log shipping to third parties
# Stop Datadog agent
systemctl stop datadog-agent
# Stop Splunk forwarder
systemctl stop SplunkForwarder
# Pause Elasticsearch Logstash pipeline
curl -X POST "localhost:9200/_logstash/_pause"
Hour 1-4: Assessment
# Scan existing logs to understand scope
import re
from pathlib import Path
def scan_for_pii(log_path: Path):
"""
Scan logs to assess exposure scope.
"""
patterns = {
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ssn': r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'password': r'(password|passwd|pwd)["\s:=]+\S+',
}
findings = {pii_type: [] for pii_type in patterns}
for log_file in log_path.glob('**/*.log*'):
with open(log_file, 'r', errors='ignore') as f:
for line_num, line in enumerate(f):
for pii_type, pattern in patterns.items():
if re.search(pattern, line, re.IGNORECASE):
findings[pii_type].append({
'file': str(log_file),
'line': line_num,
'sample': line[:100] + '...' # Don't log full PII!
})
return findings
# Generate report
findings = scan_for_pii(Path('/var/log/app'))
for pii_type, instances in findings.items():
print(f"{pii_type}: {len(instances)} instances found")
Hour 4-24: Legal and Compliance Notification
notification_checklist:
internal:
- [ ] CISO / Security team
- [ ] Legal / Compliance team
- [ ] DPO (Data Protection Officer) if GDPR applies
- [ ] Executive leadership
external_if_required:
- [ ] PCI Council (if cardholder data) - 24 hour requirement
- [ ] Affected individuals (GDPR: 72 hours if breach)
- [ ] Regulatory bodies (varies by jurisdiction)
- [ ] Third-party processors (contractual obligations)
Data Remediation
Step 1: Delete from Elasticsearch
# Delete documents containing PII
from elasticsearch import Elasticsearch
es = Elasticsearch(['localhost:9200'])
# Query for documents with PII patterns
query = {
"query": {
"bool": {
"should": [
{"regexp": {"message": ".*\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}[-\\s]?\\d{4}.*"}},
{"regexp": {"message": ".*password.*:.*"}},
# ... more patterns
]
}
}
}
# Delete by query
result = es.delete_by_query(
index="application-logs-*",
body=query,
wait_for_completion=False, # Async for large datasets
conflicts="proceed"
)
print(f"Task ID: {result['task']}")
# Monitor progress
task_status = es.tasks.get(task_id=result['task'])
Step 2: Delete from S3 Backups
import boto3
import re
s3 = boto3.client('s3')
BUCKET = 'logs-backup'
def delete_pii_from_s3():
"""
For compliance, you might need to:
1. Delete objects with PII
2. Or overwrite with redacted versions
3. Document everything for audit trail
"""
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=BUCKET, Prefix='logs/'):
for obj in page.get('Contents', []):
key = obj['Key']
# Download, scan, redact, re-upload (or delete)
response = s3.get_object(Bucket=BUCKET, Key=key)
content = response['Body'].read().decode('utf-8')
if contains_pii(content):
# Option 1: Delete entirely
s3.delete_object(Bucket=BUCKET, Key=key)
log_deletion(key)
# Option 2: Redact and re-upload
# redacted = redact_pii(content)
# s3.put_object(Bucket=BUCKET, Key=key, Body=redacted)
print(f"Processed: {key}")
def contains_pii(content):
patterns = [
r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b',
]
return any(re.search(p, content) for p in patterns)
Step 3: Request Deletion from Third Parties
## Third-Party Data Deletion Request Template
Dear [Vendor],
Under our Data Processing Agreement dated [DATE], we are invoking
Section [X] regarding data deletion.
We have identified that logs shipped to your platform between
[START_DATE] and [END_DATE] contained personal data that was
inadvertently included. Specifically:
- Log indices/streams: [LIST]
- Data types affected: Credit card numbers, SSN
- Approximate volume: [X] GB
We request:
1. Immediate deletion of all affected data
2. Deletion from any backups within [TIMEFRAME]
3. Written confirmation of deletion
4. Confirmation that data was not shared with sub-processors
Please confirm receipt and provide deletion confirmation within
72 hours per our SLA.
Regards,
[Your Name]
[DPO/Security Officer]
Prevention: Proper Logging Architecture
Structured Logging with Auto-Redaction
import logging
import json
import re
from dataclasses import dataclass, asdict
@dataclass
class LogContext:
user_id: str
request_id: str
action: str
# Never add PII fields here!
class SecureJSONFormatter(logging.Formatter):
"""
JSON formatter that automatically redacts sensitive data.
"""
REDACT_PATTERNS = {
'credit_card': (r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CC REDACTED]'),
'ssn': (r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b', '[SSN REDACTED]'),
'password': (r'(password|passwd|pwd)["\s:=]+\S+', r'\1=[REDACTED]'),
'bearer_token': (r'Bearer\s+\S+', 'Bearer [REDACTED]'),
'api_key': (r'(api[_-]?key)["\s:=]+\S+', r'\1=[REDACTED]'),
}
REDACT_KEYS = {'password', 'ssn', 'credit_card', 'card_number',
'cvv', 'secret', 'token', 'api_key'}
def format(self, record):
log_entry = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': self._redact_message(record.getMessage()),
}
# Add extra fields, redacting sensitive ones
if hasattr(record, 'context'):
log_entry['context'] = self._redact_dict(asdict(record.context))
return json.dumps(log_entry)
def _redact_message(self, message: str) -> str:
for pattern, replacement in self.REDACT_PATTERNS.values():
message = re.sub(pattern, replacement, message, flags=re.IGNORECASE)
return message
def _redact_dict(self, data: dict) -> dict:
result = {}
for key, value in data.items():
if key.lower() in self.REDACT_KEYS:
result[key] = '[REDACTED]'
elif isinstance(value, dict):
result[key] = self._redact_dict(value)
elif isinstance(value, str):
result[key] = self._redact_message(value)
else:
result[key] = value
return result
# Usage
logger = logging.getLogger('app')
handler = logging.StreamHandler()
handler.setFormatter(SecureJSONFormatter())
logger.addHandler(handler)
# This will automatically redact
logger.info("Processing payment for card 4111-1111-1111-1111")
# Output: "Processing payment for card [CC REDACTED]"
Separate Audit Logs with Encryption
import logging
from cryptography.fernet import Fernet
class EncryptedAuditLogger:
"""
For audit trails that NEED sensitive data,
encrypt before storing.
"""
def __init__(self, key: bytes):
self.fernet = Fernet(key)
self.logger = logging.getLogger('audit')
def log_sensitive_action(self, action: str, user_id: str,
sensitive_data: dict):
"""
Encrypt sensitive data, store separately from regular logs.
"""
encrypted = self.fernet.encrypt(
json.dumps(sensitive_data).encode()
)
# Store reference, not the data
audit_entry = {
'action': action,
'user_id': user_id,
'sensitive_data_ref': encrypted.decode(),
'timestamp': datetime.utcnow().isoformat()
}
# Store in separate, access-controlled audit log
self.logger.info(json.dumps(audit_entry))
Log Sampling for High-Volume
import random
import hashlib
class SampledLogger:
"""
For high-volume logs, sample based on user ID
to reduce exposure while maintaining debuggability.
"""
def __init__(self, sample_rate: float = 0.01):
self.sample_rate = sample_rate
self.logger = logging.getLogger('sampled')
def should_log(self, user_id: str) -> bool:
# Deterministic sampling based on user ID
# Same user always sampled or not (for debugging sessions)
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
return (hash_val % 100) < (self.sample_rate * 100)
def log(self, user_id: str, message: str, **kwargs):
if self.should_log(user_id):
self.logger.info(message, extra={'user_id': user_id, **kwargs})
Compliance Quick Reference
| Regulation | Notification Deadline | Key Requirements |
|---|---|---|
| GDPR | 72 hours | Notify DPA if breach affects EU residents |
| PCI-DSS | 24 hours | Immediate containment, forensic investigation |
| HIPAA | 60 days | Notify HHS, affected individuals |
| CCPA | "Expeditiously" | Notify California AG if 500+ residents |
| SOC 2 | Per policy | Document incident, notify auditor |
Key Takeaways
- Stop immediately - Block new PII from being logged
- Assess scope - Understand what, where, how long
- Notify stakeholders - Legal, compliance, potentially regulators
- Remediate everywhere - Primary, backups, third parties
- Prevent recurrence - Auto-redaction, code reviews, scanning
- Document everything - For audits and legal protection
Remember: The cover-up is always worse than the crime. Document thoroughly, notify appropriately, and fix properly.