Skip to main content

Feature Flag Explosion

The Interview Question

"We have 2,000 feature flags in our system. Nobody knows which ones are still needed. Some control critical functionality, others are from experiments 3 years ago. Deployments are becoming risky because of flag interactions. How do we clean this up?"

Asked at: Any company with extensive feature flag usage

Time to solve: 25-30 minutes

Difficulty: ⭐⭐⭐ (Senior)


Clarifying Questions to Ask

  1. "What's the flag management system?" → LaunchDarkly, homegrown, config files?
  2. "How are flags currently organized?" → Naming conventions? Ownership?
  3. "What's the business impact of a wrong removal?" → Risk tolerance
  4. "Are there dependencies between flags?" → Compound conditions
  5. "What's the deployment process?" → How are flags changed?

The Problem Visualized

# Actual code found in production:
def get_price(product, user):
price = product.base_price

if feature_flags.is_enabled('new_pricing_engine'):
if feature_flags.is_enabled('new_pricing_engine_v2'):
if not feature_flags.is_enabled('rollback_pricing_2021'):
if feature_flags.is_enabled('experiment_dynamic_pricing'):
if user.segment in ['premium', 'test_group_a']:
if feature_flags.is_enabled('black_friday_2022'): # It's 2024...
price = calculate_dynamic_price_v3(product, user)

# Nobody knows what this code actually does anymore
return price

Classification System

Step 1: Categorize All Flags

class FeatureFlagClassifier:
"""
Classify flags by type and lifecycle stage.
"""

FLAG_TYPES = {
'release': 'Controls feature rollout',
'experiment': 'A/B test or experiment',
'ops': 'Operational toggle (kill switch)',
'permission': 'User/account feature access',
'technical': 'Technical configuration',
}

LIFECYCLE_STAGES = {
'active': 'Currently in use, varying values',
'fully_rolled': 'Enabled for 100%, candidate for removal',
'fully_disabled': 'Disabled for 100%, candidate for removal',
'unknown': 'No recent evaluations or mixed state',
}

def classify(self, flag_data):
"""
Analyze flag usage and classify.
"""
return {
'flag_key': flag_data.key,
'type': self.infer_type(flag_data),
'lifecycle': self.determine_lifecycle(flag_data),
'last_modified': flag_data.last_modified,
'last_evaluated': flag_data.last_evaluated,
'evaluation_count_30d': flag_data.eval_count,
'percentage_true': flag_data.true_percentage,
'owner': flag_data.owner or 'UNKNOWN',
'code_references': self.find_code_refs(flag_data.key),
'removal_risk': self.assess_risk(flag_data),
}

def determine_lifecycle(self, flag_data):
if flag_data.eval_count == 0:
return 'unknown'
elif flag_data.true_percentage == 100:
return 'fully_rolled'
elif flag_data.true_percentage == 0:
return 'fully_disabled'
else:
return 'active'

Step 2: Generate Cleanup Report

def generate_cleanup_report(all_flags):
"""
Generate prioritized cleanup report.
"""
classified = [classifier.classify(f) for f in all_flags]

report = {
'safe_to_remove': [], # Fully rolled/disabled, no recent evals
'probably_safe': [], # Experiments ended, low activity
'needs_investigation': [], # Unknown state, missing data
'keep': [], # Active, ops flags, recent activity
}

for flag in classified:
if flag['lifecycle'] == 'fully_rolled' and flag['evaluation_count_30d'] > 0:
# Enabled everywhere - code can assume true
report['safe_to_remove'].append({
**flag,
'removal_action': 'Replace with `true` in code, then delete',
})

elif flag['lifecycle'] == 'fully_disabled' and flag['evaluation_count_30d'] > 0:
# Disabled everywhere - code is dead
report['safe_to_remove'].append({
**flag,
'removal_action': 'Delete flag and dead code branch',
})

elif flag['type'] == 'experiment' and flag['last_modified'] < days_ago(90):
# Experiment hasn't changed in 90 days
report['probably_safe'].append({
**flag,
'removal_action': 'Verify experiment concluded, then remove',
})

elif flag['evaluation_count_30d'] == 0:
# Not being evaluated - orphaned?
report['needs_investigation'].append({
**flag,
'removal_action': 'Check if code still exists',
})

else:
report['keep'].append(flag)

return report

Safe Removal Process

Step 1: Verify Flag is Removable

class FlagRemovalVerifier:
def verify_safe_to_remove(self, flag_key: str) -> dict:
"""
Multi-step verification before removal.
"""
results = {
'flag_key': flag_key,
'checks': {},
'safe_to_remove': False,
}

# Check 1: Code references
code_refs = self.search_codebase(flag_key)
results['checks']['code_references'] = {
'count': len(code_refs),
'locations': code_refs[:10], # First 10
}

# Check 2: Recent evaluations
evals = self.get_recent_evaluations(flag_key, days=30)
results['checks']['recent_evaluations'] = {
'count': evals.total,
'unique_users': evals.unique_users,
'services': evals.services,
}

# Check 3: Flag dependencies
dependencies = self.find_flag_dependencies(flag_key)
results['checks']['dependencies'] = {
'depends_on': dependencies.depends_on,
'depended_by': dependencies.depended_by,
}

# Check 4: Test coverage
tests = self.find_tests_using_flag(flag_key)
results['checks']['tests'] = {
'count': len(tests),
'test_files': tests,
}

# Determine if safe
results['safe_to_remove'] = (
len(code_refs) > 0 and # We know where it is
evals.total > 0 and # It's being evaluated
len(dependencies.depended_by) == 0 # Nothing depends on it
)

return results

Step 2: Remove Flag from Code

# Automated code transformation for fully-rolled flags
import ast
import astor

class FlagRemover(ast.NodeTransformer):
"""
Remove feature flag checks from code.
For flags that are 100% enabled.
"""

def __init__(self, flag_key: str, flag_value: bool):
self.flag_key = flag_key
self.flag_value = flag_value

def visit_If(self, node):
# Check if this is our flag
if self.is_flag_check(node.test):
if self.flag_value:
# Flag is always true - keep only the if body
return node.body
else:
# Flag is always false - keep only the else body
return node.orelse if node.orelse else []

return self.generic_visit(node)

def is_flag_check(self, node):
# Match: feature_flags.is_enabled('flag_name')
if isinstance(node, ast.Call):
if hasattr(node.func, 'attr') and node.func.attr == 'is_enabled':
if node.args and isinstance(node.args[0], ast.Constant):
return node.args[0].value == self.flag_key
return False

# Usage
def remove_flag_from_file(file_path, flag_key, flag_value):
with open(file_path) as f:
tree = ast.parse(f.read())

remover = FlagRemover(flag_key, flag_value)
new_tree = remover.visit(tree)

new_code = astor.to_source(new_tree)

with open(file_path, 'w') as f:
f.write(new_code)

Step 3: Gradual Removal with Safety

class SafeFlagRemoval:
"""
Remove flags with safety checks and rollback capability.
"""

def remove_flag(self, flag_key: str):
# Step 1: Mark flag as "pending removal"
self.flag_service.update(flag_key, {
'status': 'pending_removal',
'removal_started': datetime.now(),
})

# Step 2: Remove from 1 non-critical service first
test_service = self.get_lowest_risk_service(flag_key)
self.deploy_without_flag(test_service, flag_key)

# Step 3: Monitor for 24 hours
self.schedule_check(flag_key, delay=timedelta(hours=24))

def check_removal_health(self, flag_key: str):
# Check for issues after removal
errors = self.get_error_rate_increase(flag_key)

if errors > 0.01: # More than 1% increase
# Rollback - re-enable flag
self.flag_service.update(flag_key, {
'status': 'active',
'removal_failed': datetime.now(),
'failure_reason': 'Error rate increased',
})
alert(f"Flag removal failed for {flag_key}")
else:
# Safe - continue removal to more services
self.continue_removal(flag_key)

Prevention: Flag Lifecycle Management

class FeatureFlag:
"""
Feature flag with built-in lifecycle management.
"""

def __init__(
self,
key: str,
flag_type: str,
owner: str,
expires_at: datetime = None,
max_age_days: int = 90,
):
self.key = key
self.flag_type = flag_type
self.owner = owner
self.created_at = datetime.now()

# Force expiration for release flags
if flag_type == 'release':
self.expires_at = expires_at or (datetime.now() + timedelta(days=max_age_days))
elif flag_type == 'experiment':
self.expires_at = expires_at or (datetime.now() + timedelta(days=30))
else:
self.expires_at = None # Ops flags don't expire

def is_expired(self) -> bool:
if self.expires_at is None:
return False
return datetime.now() > self.expires_at

# Automated expiration enforcement
class FlagExpirationEnforcer:
@scheduled(cron="0 9 * * MON") # Every Monday 9 AM
def check_expired_flags(self):
expired = self.flag_service.get_expired_flags()

for flag in expired:
# Notify owner
send_notification(
to=flag.owner,
subject=f"Feature flag '{flag.key}' has expired",
body=f"Please remove or extend. Created: {flag.created_at}"
)

# If expired > 30 days, escalate
if flag.days_past_expiration > 30:
send_notification(
to=flag.owner.manager,
subject=f"Stale feature flag requires attention",
body=f"Flag '{flag.key}' expired {flag.days_past_expiration} days ago"
)

Flag Naming Convention

naming_convention:
pattern: "{type}_{team}_{feature}_{date}"

examples:
- "release_checkout_new_payment_flow_2024q1"
- "experiment_growth_signup_redesign_2024w12"
- "ops_platform_disable_recommendations"
- "permission_enterprise_advanced_analytics"

rules:
- type: One of [release, experiment, ops, permission]
- team: Owning team (checkout, growth, platform)
- feature: Brief description (no spaces, use underscores)
- date: When expected to be removed (YYYYQN or YYYYwWW)

Key Takeaways

  1. Classify everything - Know what each flag is for
  2. Enforce expiration - Release flags must have end dates
  3. Track ownership - No orphan flags
  4. Automate cleanup - Generate removal reports weekly
  5. Safe removal process - Test in low-risk areas first
  6. Code transformation - Automate flag removal from code

Golden rule: Every release flag should have a ticket for its removal created at the same time as the flag itself.