Debugging + Observability (finding problems in production)
Event sourcing gives you a perfect audit trailβif you know how to use it.
This guide covers how to debug, monitor, and operate event-sourced systems.
The observability advantageβ
Traditional systems: "The balance is wrong. Why?" Event-sourced systems: "Let me replay the events and show you exactly how we got here."
Essential metricsβ
1. Stream metricsβ
public sealed class StreamMetrics
{
private readonly IMetricsCollector _metrics;
public void RecordStreamLength(string streamId, long length)
{
var category = GetCategory(streamId);
_metrics.RecordHistogram("es.stream.length", length,
new[] { ("category", category) });
}
public void RecordAppendLatency(string streamId, TimeSpan duration, int eventCount)
{
var category = GetCategory(streamId);
_metrics.RecordHistogram("es.append.latency_ms", duration.TotalMilliseconds,
new[] { ("category", category) });
_metrics.RecordHistogram("es.append.event_count", eventCount,
new[] { ("category", category) });
}
public void RecordLoadLatency(string streamId, TimeSpan duration, int eventCount)
{
var category = GetCategory(streamId);
_metrics.RecordHistogram("es.load.latency_ms", duration.TotalMilliseconds,
new[] { ("category", category), ("event_count_bucket", GetBucket(eventCount)) });
}
public void RecordConcurrencyConflict(string streamId)
{
var category = GetCategory(streamId);
_metrics.IncrementCounter("es.concurrency.conflicts_total",
new[] { ("category", category) });
}
private string GetCategory(string streamId)
{
var lastDash = streamId.LastIndexOf('-');
return lastDash > 0 ? streamId[..lastDash] : streamId;
}
private string GetBucket(int count) => count switch
{
< 10 => "0-10",
< 50 => "10-50",
< 100 => "50-100",
< 500 => "100-500",
_ => "500+"
};
}
2. Projection metricsβ
public sealed class ProjectionMetrics
{
private readonly IMetricsCollector _metrics;
public void RecordProjectionLag(string projectionName, TimeSpan lag)
{
_metrics.RecordGauge("es.projection.lag_seconds", lag.TotalSeconds,
new[] { ("projection", projectionName) });
}
public void RecordProjectionPosition(string projectionName, long position)
{
_metrics.RecordGauge("es.projection.position", position,
new[] { ("projection", projectionName) });
}
public void RecordProjectionError(string projectionName, string errorType)
{
_metrics.IncrementCounter("es.projection.errors_total",
new[] { ("projection", projectionName), ("error_type", errorType) });
}
public void RecordEventProcessed(string projectionName, string eventType)
{
_metrics.IncrementCounter("es.projection.events_processed_total",
new[] { ("projection", projectionName), ("event_type", eventType) });
}
public void RecordProcessingLatency(string projectionName, TimeSpan duration)
{
_metrics.RecordHistogram("es.projection.processing_latency_ms",
duration.TotalMilliseconds,
new[] { ("projection", projectionName) });
}
}
3. Outbox metricsβ
public sealed class OutboxMetrics
{
private readonly IMetricsCollector _metrics;
public void RecordQueueDepth(int depth)
{
_metrics.RecordGauge("es.outbox.queue_depth", depth);
}
public void RecordMessagePublished(string topic)
{
_metrics.IncrementCounter("es.outbox.messages_published_total",
new[] { ("topic", topic) });
}
public void RecordPublishLatency(TimeSpan duration)
{
_metrics.RecordHistogram("es.outbox.publish_latency_ms", duration.TotalMilliseconds);
}
public void RecordPublishError(string topic, string errorType)
{
_metrics.IncrementCounter("es.outbox.publish_errors_total",
new[] { ("topic", topic), ("error_type", errorType) });
}
public void RecordMessageAge(TimeSpan age)
{
_metrics.RecordHistogram("es.outbox.message_age_seconds", age.TotalSeconds);
}
}
Distributed tracingβ
Correlation and causation IDsβ
Every event should carry:
- CorrelationId: Links all events from a single user request
- CausationId: The event that directly caused this event
public sealed record EventMetadata(
Guid EventId,
Guid CorrelationId,
Guid CausationId,
string? UserId,
string? RequestId,
DateTimeOffset Timestamp
);
public sealed class CorrelationContext
{
private static readonly AsyncLocal<CorrelationContext?> _current = new();
public static CorrelationContext Current =>
_current.Value ?? throw new InvalidOperationException("No correlation context");
public static CorrelationContext? CurrentOrDefault => _current.Value;
public Guid CorrelationId { get; }
public Guid CausationId { get; private set; }
public string? UserId { get; }
public string? RequestId { get; }
private CorrelationContext(Guid correlationId, Guid causationId, string? userId, string? requestId)
{
CorrelationId = correlationId;
CausationId = causationId;
UserId = userId;
RequestId = requestId;
}
public static IDisposable StartNew(string? userId = null, string? requestId = null)
{
var correlationId = Guid.NewGuid();
return Start(correlationId, correlationId, userId, requestId);
}
public static IDisposable Start(
Guid correlationId,
Guid causationId,
string? userId = null,
string? requestId = null)
{
var context = new CorrelationContext(correlationId, causationId, userId, requestId);
_current.Value = context;
return new ContextScope();
}
public void SetCausation(Guid eventId)
{
CausationId = eventId;
}
public EventMetadata CreateMetadata()
{
return new EventMetadata(
EventId: Guid.NewGuid(),
CorrelationId: CorrelationId,
CausationId: CausationId,
UserId: UserId,
RequestId: RequestId,
Timestamp: DateTimeOffset.UtcNow
);
}
private sealed class ContextScope : IDisposable
{
public void Dispose() => _current.Value = null;
}
}
ASP.NET Core middlewareβ
public sealed class CorrelationMiddleware
{
private readonly RequestDelegate _next;
public CorrelationMiddleware(RequestDelegate next)
{
_next = next;
}
public async Task InvokeAsync(HttpContext context)
{
var correlationId = context.Request.Headers["X-Correlation-Id"].FirstOrDefault();
var causationId = context.Request.Headers["X-Causation-Id"].FirstOrDefault();
using var scope = CorrelationContext.Start(
correlationId: correlationId is not null ? Guid.Parse(correlationId) : Guid.NewGuid(),
causationId: causationId is not null ? Guid.Parse(causationId) : Guid.NewGuid(),
userId: context.User?.Identity?.Name,
requestId: context.TraceIdentifier
);
// Add to response headers for debugging
context.Response.OnStarting(() =>
{
context.Response.Headers["X-Correlation-Id"] = CorrelationContext.Current.CorrelationId.ToString();
return Task.CompletedTask;
});
await _next(context);
}
}
Traced event storeβ
public sealed class TracedEventStore : IEventStore
{
private readonly IEventStore _inner;
private readonly StreamMetrics _metrics;
private readonly ILogger<TracedEventStore> _logger;
public TracedEventStore(
IEventStore inner,
StreamMetrics metrics,
ILogger<TracedEventStore> logger)
{
_inner = inner;
_metrics = metrics;
_logger = logger;
}
public async Task<AppendResult> AppendToStream(
string streamId,
long expectedVersion,
IReadOnlyList<StoredEvent> events,
CancellationToken ct)
{
var sw = Stopwatch.StartNew();
var correlationId = CorrelationContext.CurrentOrDefault?.CorrelationId;
try
{
var result = await _inner.AppendToStream(streamId, expectedVersion, events, ct);
sw.Stop();
_metrics.RecordAppendLatency(streamId, sw.Elapsed, events.Count);
_logger.LogInformation(
"Appended {EventCount} events to {StreamId} (version {Version} -> {NewVersion}) " +
"in {Duration}ms [CorrelationId={CorrelationId}]",
events.Count, streamId, expectedVersion, result.NextExpectedVersion,
sw.ElapsedMilliseconds, correlationId);
return result;
}
catch (WrongExpectedVersionException ex)
{
_metrics.RecordConcurrencyConflict(streamId);
_logger.LogWarning(
"Concurrency conflict on {StreamId}: expected {Expected}, actual {Actual} " +
"[CorrelationId={CorrelationId}]",
streamId, expectedVersion, ex.Message, correlationId);
throw;
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to append to {StreamId} [CorrelationId={CorrelationId}]",
streamId, correlationId);
throw;
}
}
public async IAsyncEnumerable<StoredEvent> ReadStream(
string streamId,
long fromVersionInclusive,
[EnumeratorCancellation] CancellationToken ct)
{
var sw = Stopwatch.StartNew();
var count = 0;
await foreach (var @event in _inner.ReadStream(streamId, fromVersionInclusive, ct))
{
count++;
yield return @event;
}
sw.Stop();
_metrics.RecordLoadLatency(streamId, sw.Elapsed, count);
_metrics.RecordStreamLength(streamId, count + fromVersionInclusive);
}
}
Debugging toolsβ
Stream inspectorβ
public sealed class StreamInspector
{
private readonly IEventStore _store;
public StreamInspector(IEventStore store)
{
_store = store;
}
public async Task<StreamInfo> GetStreamInfo(string streamId, CancellationToken ct)
{
var events = new List<EventInfo>();
var eventTypes = new Dictionary<string, int>();
DateTimeOffset? firstEvent = null;
DateTimeOffset? lastEvent = null;
await foreach (var se in _store.ReadStream(streamId, 0, ct))
{
if (firstEvent is null)
firstEvent = se.OccurredAt;
lastEvent = se.OccurredAt;
eventTypes.TryGetValue(se.EventType, out var count);
eventTypes[se.EventType] = count + 1;
events.Add(new EventInfo(
se.EventId,
se.EventType,
se.SchemaVersion,
se.OccurredAt,
se.Data.Length
));
}
return new StreamInfo(
StreamId: streamId,
EventCount: events.Count,
FirstEventAt: firstEvent,
LastEventAt: lastEvent,
EventTypeCounts: eventTypes,
Events: events
);
}
public async Task<string> DumpStream(string streamId, CancellationToken ct)
{
var sb = new StringBuilder();
sb.AppendLine($"Stream: {streamId}");
sb.AppendLine(new string('=', 60));
await foreach (var se in _store.ReadStream(streamId, 0, ct))
{
sb.AppendLine($"[{se.OccurredAt:O}] {se.EventType} v{se.SchemaVersion}");
var json = System.Text.Json.JsonSerializer.Deserialize<JsonElement>(se.Data);
var formatted = System.Text.Json.JsonSerializer.Serialize(json,
new JsonSerializerOptions { WriteIndented = true });
sb.AppendLine(formatted);
sb.AppendLine(new string('-', 40));
}
return sb.ToString();
}
public async Task<AggregateStateAtVersion<T>> GetStateAtVersion<T>(
string streamId,
long version,
Func<T> createAggregate,
Action<T, IDomainEvent> applyEvent,
Func<StoredEvent, IDomainEvent> deserialize,
CancellationToken ct) where T : class
{
var aggregate = createAggregate();
var appliedEvents = new List<IDomainEvent>();
await foreach (var se in _store.ReadStream(streamId, 0, ct))
{
if (appliedEvents.Count >= version)
break;
var @event = deserialize(se);
applyEvent(aggregate, @event);
appliedEvents.Add(@event);
}
return new AggregateStateAtVersion<T>(
State: aggregate,
Version: appliedEvents.Count,
LastEvent: appliedEvents.LastOrDefault()
);
}
}
public sealed record StreamInfo(
string StreamId,
int EventCount,
DateTimeOffset? FirstEventAt,
DateTimeOffset? LastEventAt,
Dictionary<string, int> EventTypeCounts,
List<EventInfo> Events
);
public sealed record EventInfo(
Guid EventId,
string EventType,
int SchemaVersion,
DateTimeOffset OccurredAt,
int DataSizeBytes
);
public sealed record AggregateStateAtVersion<T>(
T State,
long Version,
IDomainEvent? LastEvent
);
Correlation tracerβ
public sealed class CorrelationTracer
{
private readonly IEventStore _store;
public CorrelationTracer(IEventStore store)
{
_store = store;
}
public async Task<CorrelationTrace> TraceCorrelation(
Guid correlationId,
CancellationToken ct)
{
var events = new List<TracedEvent>();
// This requires a global event index or $all stream
await foreach (var (streamId, position, se) in _store.ReadAll(0, ct))
{
var metadata = TryGetMetadata(se);
if (metadata?.CorrelationId == correlationId)
{
events.Add(new TracedEvent(
StreamId: streamId,
GlobalPosition: position,
EventId: se.EventId,
EventType: se.EventType,
OccurredAt: se.OccurredAt,
CausationId: metadata.CausationId,
UserId: metadata.UserId
));
}
}
// Build causation tree
var root = BuildCausationTree(events);
return new CorrelationTrace(
CorrelationId: correlationId,
Events: events.OrderBy(e => e.OccurredAt).ToList(),
CausationTree: root
);
}
private EventMetadata? TryGetMetadata(StoredEvent se)
{
if (se.Metadata is null)
return null;
try
{
return System.Text.Json.JsonSerializer.Deserialize<EventMetadata>(se.Metadata);
}
catch
{
return null;
}
}
private CausationNode BuildCausationTree(List<TracedEvent> events)
{
var byId = events.ToDictionary(e => e.EventId);
var roots = new List<CausationNode>();
foreach (var @event in events)
{
if (!byId.ContainsKey(@event.CausationId))
{
// This is a root (caused by external trigger)
roots.Add(BuildNode(@event, events));
}
}
return roots.Count == 1
? roots[0]
: new CausationNode(
EventId: Guid.Empty,
EventType: "Root",
StreamId: "",
OccurredAt: DateTimeOffset.MinValue,
Children: roots
);
}
private CausationNode BuildNode(TracedEvent @event, List<TracedEvent> allEvents)
{
var children = allEvents
.Where(e => e.CausationId == @event.EventId)
.Select(e => BuildNode(e, allEvents))
.ToList();
return new CausationNode(
EventId: @event.EventId,
EventType: @event.EventType,
StreamId: @event.StreamId,
OccurredAt: @event.OccurredAt,
Children: children
);
}
}
public sealed record CorrelationTrace(
Guid CorrelationId,
List<TracedEvent> Events,
CausationNode CausationTree
);
public sealed record TracedEvent(
string StreamId,
long GlobalPosition,
Guid EventId,
string EventType,
DateTimeOffset OccurredAt,
Guid CausationId,
string? UserId
);
public sealed record CausationNode(
Guid EventId,
string EventType,
string StreamId,
DateTimeOffset OccurredAt,
List<CausationNode> Children
);
Replay debuggerβ
public sealed class ReplayDebugger
{
private readonly IEventStore _store;
public ReplayDebugger(IEventStore store)
{
_store = store;
}
public async Task<ReplayResult<T>> ReplayWithBreakpoints<T>(
string streamId,
Func<T> createAggregate,
Action<T, IDomainEvent> applyEvent,
Func<StoredEvent, IDomainEvent> deserialize,
Func<T, IDomainEvent, int, bool> breakCondition,
CancellationToken ct) where T : class
{
var aggregate = createAggregate();
var history = new List<ReplayStep<T>>();
var version = 0;
await foreach (var se in _store.ReadStream(streamId, 0, ct))
{
var @event = deserialize(se);
var stateBefore = CloneState(aggregate);
applyEvent(aggregate, @event);
version++;
var step = new ReplayStep<T>(
Version: version,
Event: @event,
EventType: se.EventType,
OccurredAt: se.OccurredAt,
StateBefore: stateBefore,
StateAfter: CloneState(aggregate)
);
history.Add(step);
if (breakCondition(aggregate, @event, version))
{
return new ReplayResult<T>(
FinalState: aggregate,
FinalVersion: version,
History: history,
StoppedAtBreakpoint: true,
BreakpointVersion: version
);
}
}
return new ReplayResult<T>(
FinalState: aggregate,
FinalVersion: version,
History: history,
StoppedAtBreakpoint: false,
BreakpointVersion: null
);
}
public async Task<T> ReplayToVersion<T>(
string streamId,
long targetVersion,
Func<T> createAggregate,
Action<T, IDomainEvent> applyEvent,
Func<StoredEvent, IDomainEvent> deserialize,
CancellationToken ct) where T : class
{
var aggregate = createAggregate();
var version = 0L;
await foreach (var se in _store.ReadStream(streamId, 0, ct))
{
if (version >= targetVersion)
break;
var @event = deserialize(se);
applyEvent(aggregate, @event);
version++;
}
return aggregate;
}
private T CloneState<T>(T state) where T : class
{
// Simple JSON clone - use proper cloning in production
var json = System.Text.Json.JsonSerializer.Serialize(state);
return System.Text.Json.JsonSerializer.Deserialize<T>(json)!;
}
}
public sealed record ReplayResult<T>(
T FinalState,
long FinalVersion,
List<ReplayStep<T>> History,
bool StoppedAtBreakpoint,
long? BreakpointVersion
);
public sealed record ReplayStep<T>(
long Version,
IDomainEvent Event,
string EventType,
DateTimeOffset OccurredAt,
T StateBefore,
T StateAfter
);
Alerting rulesβ
Prometheus/Grafana alertsβ
# prometheus-alerts.yml
groups:
- name: event-sourcing
rules:
# Projection lag alert
- alert: ProjectionLagHigh
expr: es_projection_lag_seconds > 60
for: 5m
labels:
severity: warning
annotations:
summary: "Projection {{ $labels.projection }} is lagging"
description: "Projection lag is {{ $value }}s (threshold: 60s)"
# Projection stopped
- alert: ProjectionStalled
expr: increase(es_projection_events_processed_total[5m]) == 0
for: 10m
labels:
severity: critical
annotations:
summary: "Projection {{ $labels.projection }} has stopped processing"
# High concurrency conflicts
- alert: HighConcurrencyConflicts
expr: rate(es_concurrency_conflicts_total[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High concurrency conflict rate on {{ $labels.category }}"
description: "{{ $value }} conflicts/sec"
# Outbox queue growing
- alert: OutboxQueueGrowing
expr: es_outbox_queue_depth > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Outbox queue depth is {{ $value }}"
# Stream too long
- alert: StreamTooLong
expr: es_stream_length > 10000
labels:
severity: warning
annotations:
summary: "Stream {{ $labels.stream_id }} has {{ $value }} events"
description: "Consider snapshotting or splitting"
# Append latency high
- alert: AppendLatencyHigh
expr: histogram_quantile(0.99, es_append_latency_ms_bucket) > 500
for: 5m
labels:
severity: warning
annotations:
summary: "P99 append latency is {{ $value }}ms"
Runbook snippetsβ
Investigate a specific aggregateβ
// CLI tool or admin endpoint
public sealed class AdminCommands
{
private readonly StreamInspector _inspector;
private readonly CorrelationTracer _tracer;
private readonly ReplayDebugger _debugger;
public async Task InvestigateAggregate(string streamId)
{
Console.WriteLine("=== Stream Info ===");
var info = await _inspector.GetStreamInfo(streamId, CancellationToken.None);
Console.WriteLine($"Events: {info.EventCount}");
Console.WriteLine($"First: {info.FirstEventAt}");
Console.WriteLine($"Last: {info.LastEventAt}");
Console.WriteLine("\nEvent types:");
foreach (var (type, count) in info.EventTypeCounts)
{
Console.WriteLine($" {type}: {count}");
}
Console.WriteLine("\n=== Full Dump ===");
var dump = await _inspector.DumpStream(streamId, CancellationToken.None);
Console.WriteLine(dump);
}
public async Task TraceRequest(Guid correlationId)
{
var trace = await _tracer.TraceCorrelation(correlationId, CancellationToken.None);
Console.WriteLine($"=== Correlation {correlationId} ===");
Console.WriteLine($"Total events: {trace.Events.Count}");
Console.WriteLine("\nTimeline:");
foreach (var @event in trace.Events)
{
Console.WriteLine($" [{@event.OccurredAt:HH:mm:ss.fff}] {@event.StreamId} -> {@event.EventType}");
}
Console.WriteLine("\nCausation tree:");
PrintCausationTree(trace.CausationTree, 0);
}
private void PrintCausationTree(CausationNode node, int indent)
{
var prefix = new string(' ', indent * 2);
Console.WriteLine($"{prefix}{node.EventType} ({node.StreamId})");
foreach (var child in node.Children)
{
PrintCausationTree(child, indent + 1);
}
}
public async Task FindWhenBalanceWentNegative(Guid accountId)
{
var streamId = $"bank-account-{accountId}";
var result = await _debugger.ReplayWithBreakpoints<BankAccount>(
streamId,
() => new BankAccount(),
(acc, e) => acc.LoadFromHistory(new[] { e }),
DeserializeEvent,
(acc, e, v) => acc.Balance < 0, // Break when balance goes negative
CancellationToken.None
);
if (result.StoppedAtBreakpoint)
{
Console.WriteLine($"Balance went negative at version {result.BreakpointVersion}");
var step = result.History.Last();
Console.WriteLine($"Event: {step.EventType} at {step.OccurredAt}");
Console.WriteLine($"Balance before: {((BankAccount)step.StateBefore).Balance}");
Console.WriteLine($"Balance after: {((BankAccount)step.StateAfter).Balance}");
}
else
{
Console.WriteLine("Balance never went negative");
}
}
private IDomainEvent DeserializeEvent(StoredEvent se) => /* ... */;
}
Health checksβ
public sealed class EventStoreHealthCheck : IHealthCheck
{
private readonly IEventStore _store;
private readonly string _testStreamId = "$health-check";
public EventStoreHealthCheck(IEventStore store)
{
_store = store;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
// Try to read
var count = 0;
await foreach (var _ in _store.ReadStream(_testStreamId, 0, ct))
{
count++;
if (count > 0) break;
}
return HealthCheckResult.Healthy("Event store is responsive");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Event store is not responding", ex);
}
}
}
public sealed class ProjectionHealthCheck : IHealthCheck
{
private readonly ICheckpointStore _checkpoints;
private readonly IEventStore _store;
private readonly string _projectionName;
private readonly TimeSpan _maxLag;
public ProjectionHealthCheck(
ICheckpointStore checkpoints,
IEventStore store,
string projectionName,
TimeSpan maxLag)
{
_checkpoints = checkpoints;
_store = store;
_projectionName = projectionName;
_maxLag = maxLag;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var checkpoint = await _checkpoints.GetCheckpoint(_projectionName, ct);
if (checkpoint is null)
{
return HealthCheckResult.Degraded("Projection has no checkpoint");
}
var currentPosition = await _store.GetCurrentPosition(ct);
var lag = currentPosition - checkpoint.Position;
if (lag > 1000) // More than 1000 events behind
{
return HealthCheckResult.Degraded(
$"Projection is {lag} events behind");
}
var timeSinceUpdate = DateTimeOffset.UtcNow - checkpoint.UpdatedAt;
if (timeSinceUpdate > _maxLag)
{
return HealthCheckResult.Degraded(
$"Projection hasn't updated in {timeSinceUpdate.TotalSeconds}s");
}
return HealthCheckResult.Healthy(
$"Projection is healthy (lag: {lag} events)");
}
}
Observability checklistβ
- Stream metrics (length, append latency, load latency)
- Projection metrics (lag, position, errors)
- Outbox metrics (queue depth, publish latency)
- Concurrency conflict rate
- Correlation IDs on all events
- Causation IDs for event chains
- Distributed tracing integration
- Health checks for event store and projections
- Alerting rules for lag, conflicts, errors
- Admin tools for stream inspection
- Replay debugging capability
- Runbooks for common issues
You're now an expertβ
Congratulations! You've completed the advanced event sourcing guide.
You now understand:
- Core ES concepts (aggregates, events, streams, projections)
- Advanced patterns (sagas, temporal queries, multi-stream projections)
- Operational concerns (GDPR, performance, debugging)
- Common pitfalls and how to avoid them
Go build something great.
Sourcesβ
https://opentelemetry.io/docs/https://prometheus.io/docs/alerting/latest/alerting_rules/https://learn.microsoft.com/en-us/aspnet/core/host-and-deploy/health-checkshttps://www.eventstore.com/blog/monitoring-event-store