Event store + optimistic concurrency (append correctly)
Everything breaks if you get appends wrong.
The core requirement:
Append events atomically and in order, and prevent lost updates.
The minimal event store APIβ
Weβll define an event store API that supports:
- read by stream
- append to stream with expected version
public sealed record StoredEvent(
Guid EventId,
string EventType,
int SchemaVersion,
DateTimeOffset OccurredAt,
byte[] Data,
byte[]? Metadata
);
public sealed record AppendResult(long NextExpectedVersion);
public sealed class WrongExpectedVersionException : Exception
{
public WrongExpectedVersionException(string streamId, long expected, long actual)
: base($"WrongExpectedVersion stream={streamId} expected={expected} actual={actual}") { }
}
public interface IEventStore
{
IAsyncEnumerable<StoredEvent> ReadStream(string streamId, long fromVersionInclusive, CancellationToken ct);
Task<AppendResult> AppendToStream(
string streamId,
long expectedVersion,
IReadOnlyList<StoredEvent> events,
CancellationToken ct
);
}
Expected version rules (important)β
Weβll use this convention:
- empty stream has version 0
- after you append N events, next expected version is N
- when appending, you pass the current version you observed
Example:
- read stream => it has 3 events => expected version = 3
- append 2 new events => next expected version = 5
(Real event stores may use βlast revision numberβ instead of βcountβ. Same idea.)
In-memory implementation (correct concurrency semantics)β
public sealed class InMemoryEventStore : IEventStore
{
private readonly object _gate = new();
private readonly Dictionary<string, List<StoredEvent>> _streams = new();
public async IAsyncEnumerable<StoredEvent> ReadStream(
string streamId,
long fromVersionInclusive,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct
)
{
List<StoredEvent> snapshot;
lock (_gate)
{
snapshot = _streams.TryGetValue(streamId, out var list)
? list.ToList()
: new List<StoredEvent>();
}
for (var i = (int)fromVersionInclusive; i < snapshot.Count; i++)
{
ct.ThrowIfCancellationRequested();
yield return snapshot[i];
await Task.Yield();
}
}
public Task<AppendResult> AppendToStream(
string streamId,
long expectedVersion,
IReadOnlyList<StoredEvent> events,
CancellationToken ct
)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentException("streamId is required.");
if (expectedVersion < 0) throw new ArgumentOutOfRangeException(nameof(expectedVersion));
if (events.Count == 0) return Task.FromResult(new AppendResult(expectedVersion));
lock (_gate)
{
if (!_streams.TryGetValue(streamId, out var list))
{
list = new List<StoredEvent>();
_streams[streamId] = list;
}
var actualVersion = list.Count;
if (actualVersion != expectedVersion)
throw new WrongExpectedVersionException(streamId, expectedVersion, actualVersion);
// idempotency guard (optional but recommended):
// never write the same EventId twice to a stream.
var existingIds = list.Select(e => e.EventId).ToHashSet();
foreach (var e in events)
{
if (existingIds.Contains(e.EventId))
continue;
list.Add(e);
existingIds.Add(e.EventId);
}
return Task.FromResult(new AppendResult(list.Count));
}
}
}
Loading an aggregate and saving its new eventsβ
Youβll do this pattern constantly:
- Read history
- Rebuild aggregate
- Execute a command method (emits new events)
- Append new events with expected version
public static class BankAccountRepository
{
public static async Task<BankAccount> Load(IEventStore store, Guid accountId, CancellationToken ct)
{
var streamId = $"bank-account-{accountId}";
var domainEvents = new List<IDomainEvent>();
await foreach (var se in store.ReadStream(streamId, fromVersionInclusive: 0, ct))
{
domainEvents.Add(DeserializeDomainEvent(se));
}
var acc = new BankAccount();
acc.LoadFromHistory(domainEvents);
return acc;
}
public static async Task Save(IEventStore store, BankAccount acc, CancellationToken ct)
{
var streamId = $"bank-account-{acc.Id}";
var newDomainEvents = acc.DequeueUncommittedEvents();
var stored = newDomainEvents.Select(e => new StoredEvent(
EventId: Guid.NewGuid(),
EventType: e.GetType().Name,
SchemaVersion: 1,
OccurredAt: DateTimeOffset.UtcNow,
Data: SerializeDomainEvent(e),
Metadata: null
)).ToList();
await store.AppendToStream(streamId, expectedVersion: acc.Version, stored, ct);
}
// Keep serialization explicit in docs (avoid magic). Replace with a real registry.
private static byte[] SerializeDomainEvent(IDomainEvent e) =>
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(e, e.GetType());
private static IDomainEvent DeserializeDomainEvent(StoredEvent e)
{
var type = e.EventType switch
{
nameof(AccountOpened) => typeof(AccountOpened),
nameof(MoneyDeposited) => typeof(MoneyDeposited),
nameof(MoneyWithdrawn) => typeof(MoneyWithdrawn),
nameof(AccountClosed) => typeof(AccountClosed),
_ => throw new NotSupportedException($"Unknown event type: {e.EventType}")
};
return (IDomainEvent)(System.Text.Json.JsonSerializer.Deserialize(e.Data, type)
?? throw new InvalidOperationException("Failed to deserialize event."));
}
}
What happens on concurrency conflicts?β
If two writers load version 3 and both try to append:
- first append succeeds (stream becomes version 4)
- second append fails with
WrongExpectedVersionException
Your handler typically:
- reloads
- re-evaluates the command against the new state
- retries if still valid
Nextβ
Now we can write events. To be useful, we must read efficiently (projections).