Performance + Scaling (when your event store grows up)
Event sourcing starts simple. Then you hit 10 million events and things get interesting.
This guide covers the performance challenges you'll face and how to solve them.
The three scaling dimensions
- Read scaling: Loading aggregates, running projections
- Write scaling: Append throughput, concurrency conflicts
- Storage scaling: Total event volume, archival
Problem 1: Slow aggregate loading
Symptom
Loading an aggregate takes seconds because it has 50,000 events.
Solution: Snapshots (recap + advanced)
Basic snapshotting (covered earlier) stores periodic state checkpoints.
Advanced: Inline snapshots
Store snapshots in the same stream as events:
public sealed record AggregateSnapshot(
string AggregateType,
long AtVersion,
byte[] State
) : IDomainEvent; // Yes, it's an "event"
public sealed class SnapshotAwareRepository
{
private readonly IEventStore _store;
private const int SnapshotInterval = 100;
public async Task<BankAccount> Load(Guid accountId, CancellationToken ct)
{
var streamId = $"bank-account-{accountId}";
var acc = new BankAccount();
long fromVersion = 0;
// Read backwards to find latest snapshot
var snapshot = await FindLatestSnapshot(streamId, ct);
if (snapshot is not null)
{
acc = DeserializeSnapshot(snapshot);
fromVersion = snapshot.AtVersion;
}
// Replay events after snapshot
await foreach (var se in _store.ReadStream(streamId, fromVersion, ct))
{
if (se.EventType == nameof(AggregateSnapshot))
continue; // Skip snapshot events during replay
acc.LoadFromHistory(new[] { DeserializeDomainEvent(se) });
}
return acc;
}
public async Task Save(BankAccount acc, CancellationToken ct)
{
var streamId = $"bank-account-{acc.Id}";
var newEvents = acc.DequeueUncommittedEvents();
var stored = new List<StoredEvent>();
foreach (var e in newEvents)
{
stored.Add(ToStoredEvent(e));
}
// Add snapshot if we've crossed interval
var newVersion = acc.Version + newEvents.Count;
if (newVersion % SnapshotInterval == 0)
{
stored.Add(CreateSnapshotEvent(acc, newVersion));
}
await _store.AppendToStream(streamId, acc.Version, stored, ct);
}
private StoredEvent CreateSnapshotEvent(BankAccount acc, long version)
{
var snapshot = new AggregateSnapshot(
AggregateType: nameof(BankAccount),
AtVersion: version,
State: SerializeState(acc)
);
return new StoredEvent(
EventId: Guid.NewGuid(),
EventType: nameof(AggregateSnapshot),
SchemaVersion: 1,
OccurredAt: DateTimeOffset.UtcNow,
Data: System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(snapshot),
Metadata: null
);
}
private async Task<AggregateSnapshot?> FindLatestSnapshot(string streamId, CancellationToken ct)
{
// Read backwards, find first snapshot
// Implementation depends on event store capabilities
throw new NotImplementedException();
}
private BankAccount DeserializeSnapshot(AggregateSnapshot snapshot) => /* ... */;
private byte[] SerializeState(BankAccount acc) => /* ... */;
private StoredEvent ToStoredEvent(IDomainEvent e) => /* ... */;
private IDomainEvent DeserializeDomainEvent(StoredEvent e) => /* ... */;
}
Solution: Aggregate splitting
If an aggregate grows too large, maybe it's doing too much.
Before: One Order aggregate with all line items, payments, shipments
After: Separate Order, OrderPayment, OrderShipment aggregates
// Instead of one massive Order aggregate
public sealed class Order : AggregateRoot
{
public List<LineItem> Items { get; } = new(); // Could be thousands
public List<Payment> Payments { get; } = new(); // Grows over time
public List<Shipment> Shipments { get; } = new(); // Grows over time
}
// Split into focused aggregates
public sealed class OrderHeader : AggregateRoot
{
public Guid CustomerId { get; private set; }
public OrderStatus Status { get; private set; }
public decimal TotalAmount { get; private set; }
// Just the core order data
}
public sealed class OrderLineItems : AggregateRoot
{
public Guid OrderId { get; private set; }
public List<LineItem> Items { get; } = new();
// Separate stream: order-items-{orderId}
}
public sealed class OrderPayments : AggregateRoot
{
public Guid OrderId { get; private set; }
public List<Payment> Payments { get; } = new();
// Separate stream: order-payments-{orderId}
}
Problem 2: Projection lag
Symptom
Read models are minutes behind the event store.
Solution: Parallel projection processing
public sealed class ParallelProjectionRunner
{
private readonly IEventStore _store;
private readonly IProjector[] _projectors;
private readonly int _batchSize;
private readonly int _parallelism;
public ParallelProjectionRunner(
IEventStore store,
IProjector[] projectors,
int batchSize = 1000,
int parallelism = 4)
{
_store = store;
_projectors = projectors;
_batchSize = batchSize;
_parallelism = parallelism;
}
public async Task RebuildAll(CancellationToken ct)
{
var batches = new List<List<(string StreamId, StoredEvent Event)>>();
var currentBatch = new List<(string, StoredEvent)>();
await foreach (var (streamId, _, @event) in _store.ReadAll(0, ct))
{
currentBatch.Add((streamId, @event));
if (currentBatch.Count >= _batchSize)
{
batches.Add(currentBatch);
currentBatch = new List<(string, StoredEvent)>();
}
}
if (currentBatch.Count > 0)
batches.Add(currentBatch);
// Process batches in parallel
await Parallel.ForEachAsync(
batches,
new ParallelOptions
{
MaxDegreeOfParallelism = _parallelism,
CancellationToken = ct
},
async (batch, token) =>
{
foreach (var (streamId, @event) in batch)
{
var domainEvent = Deserialize(@event);
foreach (var projector in _projectors)
{
projector.Apply(streamId, @event.OccurredAt, domainEvent);
}
}
});
}
private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
}
Solution: Partitioned projections
Different projectors process different streams:
public sealed class PartitionedProjectionRunner
{
private readonly int _partitionCount;
public PartitionedProjectionRunner(int partitionCount)
{
_partitionCount = partitionCount;
}
public int GetPartition(string streamId)
{
// Consistent hashing
var hash = streamId.GetHashCode();
return Math.Abs(hash) % _partitionCount;
}
public async Task RunPartition(
int partition,
IEventStore store,
IProjector projector,
CancellationToken ct)
{
await foreach (var (streamId, position, @event) in store.ReadAll(0, ct))
{
if (GetPartition(streamId) != partition)
continue;
var domainEvent = Deserialize(@event);
projector.Apply(streamId, @event.OccurredAt, domainEvent);
}
}
private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
}
// Run multiple partition workers
public sealed class ProjectionWorkerHost : BackgroundService
{
private readonly int _totalPartitions = 8;
private readonly int _myPartition; // From config/environment
protected override async Task ExecuteAsync(CancellationToken ct)
{
var runner = new PartitionedProjectionRunner(_totalPartitions);
await runner.RunPartition(_myPartition, _store, _projector, ct);
}
}
Problem 3: Write contention
Symptom
High WrongExpectedVersionException rate on hot aggregates.
Solution: Reduce aggregate scope
The smaller the aggregate, the less contention:
// BAD: Global counter aggregate - every write conflicts
public sealed class GlobalStats : AggregateRoot
{
public long TotalTransactions { get; private set; }
public void RecordTransaction()
{
Raise(new TransactionRecorded()); // Everyone fights for this
}
}
// BETTER: Sharded counters
public sealed class ShardedCounter : AggregateRoot
{
public int Shard { get; private set; }
public long Count { get; private set; }
}
public sealed class ShardedCounterService
{
private const int ShardCount = 16;
public string GetStreamId()
{
var shard = Random.Shared.Next(ShardCount);
return $"counter-shard-{shard}";
}
public async Task<long> GetTotal(IEventStore store, CancellationToken ct)
{
long total = 0;
for (int i = 0; i < ShardCount; i++)
{
var counter = await Load(store, i, ct);
total += counter.Count;
}
return total;
}
}
Solution: Optimistic concurrency retry with backoff
public sealed class RetryingRepository
{
private readonly IEventStore _store;
private readonly int _maxRetries;
private readonly TimeSpan _baseDelay;
public RetryingRepository(IEventStore store, int maxRetries = 5)
{
_store = store;
_maxRetries = maxRetries;
_baseDelay = TimeSpan.FromMilliseconds(50);
}
public async Task ExecuteWithRetry<T>(
Guid aggregateId,
Func<T, Task> action,
CancellationToken ct) where T : AggregateRoot, new()
{
for (int attempt = 0; attempt < _maxRetries; attempt++)
{
try
{
var aggregate = await Load<T>(aggregateId, ct);
await action(aggregate);
await Save(aggregate, ct);
return;
}
catch (WrongExpectedVersionException)
{
if (attempt == _maxRetries - 1)
throw;
// Exponential backoff with jitter
var delay = _baseDelay * Math.Pow(2, attempt);
var jitter = TimeSpan.FromMilliseconds(Random.Shared.Next(50));
await Task.Delay(delay + jitter, ct);
}
}
}
private Task<T> Load<T>(Guid id, CancellationToken ct) where T : AggregateRoot, new()
=> /* ... */;
private Task Save<T>(T aggregate, CancellationToken ct) where T : AggregateRoot
=> /* ... */;
}
Problem 4: Storage growth
Solution: Event archival
Move old events to cold storage:
public sealed class EventArchiver
{
private readonly IEventStore _hotStore;
private readonly IEventStore _coldStore; // S3, Azure Blob, etc.
private readonly TimeSpan _archiveAfter;
public EventArchiver(
IEventStore hotStore,
IEventStore coldStore,
TimeSpan archiveAfter)
{
_hotStore = hotStore;
_coldStore = coldStore;
_archiveAfter = archiveAfter;
}
public async Task ArchiveOldEvents(CancellationToken ct)
{
var cutoff = DateTimeOffset.UtcNow - _archiveAfter;
await foreach (var (streamId, position, @event) in _hotStore.ReadAll(0, ct))
{
if (@event.OccurredAt >= cutoff)
continue; // Still hot
// Copy to cold storage
await _coldStore.AppendToStream(
streamId,
position, // Preserve position
new[] { @event },
ct);
// Mark as archived in hot store (or delete if store supports)
await MarkArchived(streamId, position, ct);
}
}
private Task MarkArchived(string streamId, long position, CancellationToken ct)
{
// Implementation depends on event store
throw new NotImplementedException();
}
}
Solution: Stream compaction
For streams where you only need recent state, compact old events:
public sealed class StreamCompactor
{
private readonly IEventStore _store;
/// <summary>
/// Replace N events with a single snapshot event
/// WARNING: This loses history - only use for specific use cases
/// </summary>
public async Task CompactStream<T>(
string streamId,
long compactUpToVersion,
CancellationToken ct) where T : AggregateRoot, new()
{
// Load aggregate up to version
var aggregate = new T();
var events = new List<IDomainEvent>();
await foreach (var se in _store.ReadStream(streamId, 0, ct))
{
if (se.StreamVersion > compactUpToVersion)
break;
events.Add(Deserialize(se));
}
aggregate.LoadFromHistory(events);
// Create compacted stream with snapshot + remaining events
var compactedStreamId = $"{streamId}-compacted";
var snapshot = CreateSnapshot(aggregate, compactUpToVersion);
await _store.AppendToStream(compactedStreamId, 0, new[] { snapshot }, ct);
// Copy events after compaction point
await foreach (var se in _store.ReadStream(streamId, compactUpToVersion + 1, ct))
{
await _store.AppendToStream(compactedStreamId, -1, new[] { se }, ct);
}
}
private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
private StoredEvent CreateSnapshot<T>(T aggregate, long version) where T : AggregateRoot
=> /* ... */;
}
Problem 5: Projection rebuild takes forever
Solution: Incremental rebuilds
Don't rebuild everything—track what needs rebuilding:
public sealed class IncrementalRebuildManager
{
private readonly ICheckpointStore _checkpoints;
public async Task<long> GetRebuildStartPosition(
string projectionName,
CancellationToken ct)
{
var checkpoint = await _checkpoints.GetCheckpoint(projectionName, ct);
return checkpoint?.Position ?? 0;
}
public async Task MarkRebuildProgress(
string projectionName,
long position,
CancellationToken ct)
{
await _checkpoints.SaveCheckpoint(projectionName, position, ct);
}
}
Solution: Blue-green projection rebuilds
Build new projection while old one serves traffic:
public sealed class BlueGreenProjectionManager
{
private readonly IProjectionStore _blueStore;
private readonly IProjectionStore _greenStore;
private volatile bool _useBlue = true;
public IProjectionStore ActiveStore => _useBlue ? _blueStore : _greenStore;
public IProjectionStore RebuildStore => _useBlue ? _greenStore : _blueStore;
public async Task RebuildAndSwitch(
IEventStore eventStore,
IProjector projector,
CancellationToken ct)
{
var target = RebuildStore;
// Clear target
await target.Clear(ct);
// Rebuild into target
await foreach (var (streamId, position, @event) in eventStore.ReadAll(0, ct))
{
var domainEvent = Deserialize(@event);
var result = projector.Project(streamId, @event.OccurredAt, domainEvent);
await target.Save(result, ct);
}
// Atomic switch
_useBlue = !_useBlue;
}
private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
}
Performance monitoring
Key metrics to track
public sealed class EventSourcingMetrics
{
private readonly IMetricsCollector _metrics;
public void RecordAppendLatency(TimeSpan duration, string streamPrefix)
{
_metrics.RecordHistogram(
"es.append.latency.ms",
duration.TotalMilliseconds,
new[] { ("stream_prefix", streamPrefix) });
}
public void RecordLoadLatency(TimeSpan duration, int eventCount)
{
_metrics.RecordHistogram(
"es.load.latency.ms",
duration.TotalMilliseconds,
new[] { ("event_count_bucket", GetBucket(eventCount)) });
}
public void RecordConcurrencyConflict(string streamPrefix)
{
_metrics.IncrementCounter(
"es.concurrency.conflicts",
new[] { ("stream_prefix", streamPrefix) });
}
public void RecordProjectionLag(string projectionName, TimeSpan lag)
{
_metrics.RecordGauge(
"es.projection.lag.seconds",
lag.TotalSeconds,
new[] { ("projection", projectionName) });
}
public void RecordStreamLength(string streamId, long length)
{
_metrics.RecordHistogram(
"es.stream.length",
length,
new[] { ("stream_prefix", GetPrefix(streamId)) });
}
private string GetBucket(int count) => count switch
{
< 10 => "0-10",
< 100 => "10-100",
< 1000 => "100-1000",
_ => "1000+"
};
private string GetPrefix(string streamId)
{
var dashIndex = streamId.LastIndexOf('-');
return dashIndex > 0 ? streamId[..dashIndex] : streamId;
}
}
Instrumented repository
public sealed class InstrumentedRepository
{
private readonly IEventStore _store;
private readonly EventSourcingMetrics _metrics;
public async Task<BankAccount> Load(Guid accountId, CancellationToken ct)
{
var sw = Stopwatch.StartNew();
var streamId = $"bank-account-{accountId}";
var events = new List<IDomainEvent>();
await foreach (var se in _store.ReadStream(streamId, 0, ct))
{
events.Add(Deserialize(se));
}
var acc = new BankAccount();
acc.LoadFromHistory(events);
sw.Stop();
_metrics.RecordLoadLatency(sw.Elapsed, events.Count);
_metrics.RecordStreamLength(streamId, events.Count);
return acc;
}
public async Task Save(BankAccount acc, CancellationToken ct)
{
var sw = Stopwatch.StartNew();
var streamId = $"bank-account-{acc.Id}";
try
{
var newEvents = acc.DequeueUncommittedEvents();
var stored = newEvents.Select(ToStoredEvent).ToList();
await _store.AppendToStream(streamId, acc.Version, stored, ct);
sw.Stop();
_metrics.RecordAppendLatency(sw.Elapsed, "bank-account");
}
catch (WrongExpectedVersionException)
{
_metrics.RecordConcurrencyConflict("bank-account");
throw;
}
}
private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
private StoredEvent ToStoredEvent(IDomainEvent e) => /* ... */;
}
Scaling checklist
- Snapshot long-lived aggregates (> 100 events)
- Monitor stream lengths and set alerts
- Track projection lag and set SLOs
- Implement retry with backoff for concurrency conflicts
- Consider aggregate splitting for hot streams
- Plan archival strategy before you need it
- Use parallel/partitioned projection rebuilds
- Monitor append and load latencies
- Test rebuild time regularly
Next
Anti-patterns: the mistakes that will bite you.
Sources
https://www.eventstore.com/blog/scaling-event-sourcinghttps://learn.microsoft.com/en-us/azure/architecture/patterns/event-sourcing#issues-and-considerationshttps://microservices.io/patterns/data/event-sourcing.html