Feature Flag Explosion
The Interview Question
"We have 2,000 feature flags in our system. Nobody knows which ones are still needed. Some control critical functionality, others are from experiments 3 years ago. Deployments are becoming risky because of flag interactions. How do we clean this up?"
Asked at: Any company with extensive feature flag usage
Time to solve: 25-30 minutes
Difficulty: ⭐⭐⭐ (Senior)
Clarifying Questions to Ask
- "What's the flag management system?" → LaunchDarkly, homegrown, config files?
- "How are flags currently organized?" → Naming conventions? Ownership?
- "What's the business impact of a wrong removal?" → Risk tolerance
- "Are there dependencies between flags?" → Compound conditions
- "What's the deployment process?" → How are flags changed?
The Problem Visualized
# Actual code found in production:
def get_price(product, user):
price = product.base_price
if feature_flags.is_enabled('new_pricing_engine'):
if feature_flags.is_enabled('new_pricing_engine_v2'):
if not feature_flags.is_enabled('rollback_pricing_2021'):
if feature_flags.is_enabled('experiment_dynamic_pricing'):
if user.segment in ['premium', 'test_group_a']:
if feature_flags.is_enabled('black_friday_2022'): # It's 2024...
price = calculate_dynamic_price_v3(product, user)
# Nobody knows what this code actually does anymore
return price
Classification System
Step 1: Categorize All Flags
class FeatureFlagClassifier:
"""
Classify flags by type and lifecycle stage.
"""
FLAG_TYPES = {
'release': 'Controls feature rollout',
'experiment': 'A/B test or experiment',
'ops': 'Operational toggle (kill switch)',
'permission': 'User/account feature access',
'technical': 'Technical configuration',
}
LIFECYCLE_STAGES = {
'active': 'Currently in use, varying values',
'fully_rolled': 'Enabled for 100%, candidate for removal',
'fully_disabled': 'Disabled for 100%, candidate for removal',
'unknown': 'No recent evaluations or mixed state',
}
def classify(self, flag_data):
"""
Analyze flag usage and classify.
"""
return {
'flag_key': flag_data.key,
'type': self.infer_type(flag_data),
'lifecycle': self.determine_lifecycle(flag_data),
'last_modified': flag_data.last_modified,
'last_evaluated': flag_data.last_evaluated,
'evaluation_count_30d': flag_data.eval_count,
'percentage_true': flag_data.true_percentage,
'owner': flag_data.owner or 'UNKNOWN',
'code_references': self.find_code_refs(flag_data.key),
'removal_risk': self.assess_risk(flag_data),
}
def determine_lifecycle(self, flag_data):
if flag_data.eval_count == 0:
return 'unknown'
elif flag_data.true_percentage == 100:
return 'fully_rolled'
elif flag_data.true_percentage == 0:
return 'fully_disabled'
else:
return 'active'
Step 2: Generate Cleanup Report
def generate_cleanup_report(all_flags):
"""
Generate prioritized cleanup report.
"""
classified = [classifier.classify(f) for f in all_flags]
report = {
'safe_to_remove': [], # Fully rolled/disabled, no recent evals
'probably_safe': [], # Experiments ended, low activity
'needs_investigation': [], # Unknown state, missing data
'keep': [], # Active, ops flags, recent activity
}
for flag in classified:
if flag['lifecycle'] == 'fully_rolled' and flag['evaluation_count_30d'] > 0:
# Enabled everywhere - code can assume true
report['safe_to_remove'].append({
**flag,
'removal_action': 'Replace with `true` in code, then delete',
})
elif flag['lifecycle'] == 'fully_disabled' and flag['evaluation_count_30d'] > 0:
# Disabled everywhere - code is dead
report['safe_to_remove'].append({
**flag,
'removal_action': 'Delete flag and dead code branch',
})
elif flag['type'] == 'experiment' and flag['last_modified'] < days_ago(90):
# Experiment hasn't changed in 90 days
report['probably_safe'].append({
**flag,
'removal_action': 'Verify experiment concluded, then remove',
})
elif flag['evaluation_count_30d'] == 0:
# Not being evaluated - orphaned?
report['needs_investigation'].append({
**flag,
'removal_action': 'Check if code still exists',
})
else:
report['keep'].append(flag)
return report
Safe Removal Process
Step 1: Verify Flag is Removable
class FlagRemovalVerifier:
def verify_safe_to_remove(self, flag_key: str) -> dict:
"""
Multi-step verification before removal.
"""
results = {
'flag_key': flag_key,
'checks': {},
'safe_to_remove': False,
}
# Check 1: Code references
code_refs = self.search_codebase(flag_key)
results['checks']['code_references'] = {
'count': len(code_refs),
'locations': code_refs[:10], # First 10
}
# Check 2: Recent evaluations
evals = self.get_recent_evaluations(flag_key, days=30)
results['checks']['recent_evaluations'] = {
'count': evals.total,
'unique_users': evals.unique_users,
'services': evals.services,
}
# Check 3: Flag dependencies
dependencies = self.find_flag_dependencies(flag_key)
results['checks']['dependencies'] = {
'depends_on': dependencies.depends_on,
'depended_by': dependencies.depended_by,
}
# Check 4: Test coverage
tests = self.find_tests_using_flag(flag_key)
results['checks']['tests'] = {
'count': len(tests),
'test_files': tests,
}
# Determine if safe
results['safe_to_remove'] = (
len(code_refs) > 0 and # We know where it is
evals.total > 0 and # It's being evaluated
len(dependencies.depended_by) == 0 # Nothing depends on it
)
return results
Step 2: Remove Flag from Code
# Automated code transformation for fully-rolled flags
import ast
import astor
class FlagRemover(ast.NodeTransformer):
"""
Remove feature flag checks from code.
For flags that are 100% enabled.
"""
def __init__(self, flag_key: str, flag_value: bool):
self.flag_key = flag_key
self.flag_value = flag_value
def visit_If(self, node):
# Check if this is our flag
if self.is_flag_check(node.test):
if self.flag_value:
# Flag is always true - keep only the if body
return node.body
else:
# Flag is always false - keep only the else body
return node.orelse if node.orelse else []
return self.generic_visit(node)
def is_flag_check(self, node):
# Match: feature_flags.is_enabled('flag_name')
if isinstance(node, ast.Call):
if hasattr(node.func, 'attr') and node.func.attr == 'is_enabled':
if node.args and isinstance(node.args[0], ast.Constant):
return node.args[0].value == self.flag_key
return False
# Usage
def remove_flag_from_file(file_path, flag_key, flag_value):
with open(file_path) as f:
tree = ast.parse(f.read())
remover = FlagRemover(flag_key, flag_value)
new_tree = remover.visit(tree)
new_code = astor.to_source(new_tree)
with open(file_path, 'w') as f:
f.write(new_code)
Step 3: Gradual Removal with Safety
class SafeFlagRemoval:
"""
Remove flags with safety checks and rollback capability.
"""
def remove_flag(self, flag_key: str):
# Step 1: Mark flag as "pending removal"
self.flag_service.update(flag_key, {
'status': 'pending_removal',
'removal_started': datetime.now(),
})
# Step 2: Remove from 1 non-critical service first
test_service = self.get_lowest_risk_service(flag_key)
self.deploy_without_flag(test_service, flag_key)
# Step 3: Monitor for 24 hours
self.schedule_check(flag_key, delay=timedelta(hours=24))
def check_removal_health(self, flag_key: str):
# Check for issues after removal
errors = self.get_error_rate_increase(flag_key)
if errors > 0.01: # More than 1% increase
# Rollback - re-enable flag
self.flag_service.update(flag_key, {
'status': 'active',
'removal_failed': datetime.now(),
'failure_reason': 'Error rate increased',
})
alert(f"Flag removal failed for {flag_key}")
else:
# Safe - continue removal to more services
self.continue_removal(flag_key)
Prevention: Flag Lifecycle Management
class FeatureFlag:
"""
Feature flag with built-in lifecycle management.
"""
def __init__(
self,
key: str,
flag_type: str,
owner: str,
expires_at: datetime = None,
max_age_days: int = 90,
):
self.key = key
self.flag_type = flag_type
self.owner = owner
self.created_at = datetime.now()
# Force expiration for release flags
if flag_type == 'release':
self.expires_at = expires_at or (datetime.now() + timedelta(days=max_age_days))
elif flag_type == 'experiment':
self.expires_at = expires_at or (datetime.now() + timedelta(days=30))
else:
self.expires_at = None # Ops flags don't expire
def is_expired(self) -> bool:
if self.expires_at is None:
return False
return datetime.now() > self.expires_at
# Automated expiration enforcement
class FlagExpirationEnforcer:
@scheduled(cron="0 9 * * MON") # Every Monday 9 AM
def check_expired_flags(self):
expired = self.flag_service.get_expired_flags()
for flag in expired:
# Notify owner
send_notification(
to=flag.owner,
subject=f"Feature flag '{flag.key}' has expired",
body=f"Please remove or extend. Created: {flag.created_at}"
)
# If expired > 30 days, escalate
if flag.days_past_expiration > 30:
send_notification(
to=flag.owner.manager,
subject=f"Stale feature flag requires attention",
body=f"Flag '{flag.key}' expired {flag.days_past_expiration} days ago"
)
Flag Naming Convention
naming_convention:
pattern: "{type}_{team}_{feature}_{date}"
examples:
- "release_checkout_new_payment_flow_2024q1"
- "experiment_growth_signup_redesign_2024w12"
- "ops_platform_disable_recommendations"
- "permission_enterprise_advanced_analytics"
rules:
- type: One of [release, experiment, ops, permission]
- team: Owning team (checkout, growth, platform)
- feature: Brief description (no spaces, use underscores)
- date: When expected to be removed (YYYYQN or YYYYwWW)
Key Takeaways
- Classify everything - Know what each flag is for
- Enforce expiration - Release flags must have end dates
- Track ownership - No orphan flags
- Automate cleanup - Generate removal reports weekly
- Safe removal process - Test in low-risk areas first
- Code transformation - Automate flag removal from code
Golden rule: Every release flag should have a ticket for its removal created at the same time as the flag itself.