Skip to main content

Event store + optimistic concurrency (append correctly)

Everything breaks if you get appends wrong.

The core requirement:

Append events atomically and in order, and prevent lost updates.

The minimal event store API​

We’ll define an event store API that supports:

  • read by stream
  • append to stream with expected version
public sealed record StoredEvent(
Guid EventId,
string EventType,
int SchemaVersion,
DateTimeOffset OccurredAt,
byte[] Data,
byte[]? Metadata
);

public sealed record AppendResult(long NextExpectedVersion);

public sealed class WrongExpectedVersionException : Exception
{
public WrongExpectedVersionException(string streamId, long expected, long actual)
: base($"WrongExpectedVersion stream={streamId} expected={expected} actual={actual}") { }
}

public interface IEventStore
{
IAsyncEnumerable<StoredEvent> ReadStream(string streamId, long fromVersionInclusive, CancellationToken ct);
Task<AppendResult> AppendToStream(
string streamId,
long expectedVersion,
IReadOnlyList<StoredEvent> events,
CancellationToken ct
);
}

Expected version rules (important)​

We’ll use this convention:

  • empty stream has version 0
  • after you append N events, next expected version is N
  • when appending, you pass the current version you observed

Example:

  • read stream => it has 3 events => expected version = 3
  • append 2 new events => next expected version = 5

(Real event stores may use β€œlast revision number” instead of β€œcount”. Same idea.)

In-memory implementation (correct concurrency semantics)​

public sealed class InMemoryEventStore : IEventStore
{
private readonly object _gate = new();
private readonly Dictionary<string, List<StoredEvent>> _streams = new();

public async IAsyncEnumerable<StoredEvent> ReadStream(
string streamId,
long fromVersionInclusive,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct
)
{
List<StoredEvent> snapshot;

lock (_gate)
{
snapshot = _streams.TryGetValue(streamId, out var list)
? list.ToList()
: new List<StoredEvent>();
}

for (var i = (int)fromVersionInclusive; i < snapshot.Count; i++)
{
ct.ThrowIfCancellationRequested();
yield return snapshot[i];
await Task.Yield();
}
}

public Task<AppendResult> AppendToStream(
string streamId,
long expectedVersion,
IReadOnlyList<StoredEvent> events,
CancellationToken ct
)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentException("streamId is required.");
if (expectedVersion < 0) throw new ArgumentOutOfRangeException(nameof(expectedVersion));
if (events.Count == 0) return Task.FromResult(new AppendResult(expectedVersion));

lock (_gate)
{
if (!_streams.TryGetValue(streamId, out var list))
{
list = new List<StoredEvent>();
_streams[streamId] = list;
}

var actualVersion = list.Count;
if (actualVersion != expectedVersion)
throw new WrongExpectedVersionException(streamId, expectedVersion, actualVersion);

// idempotency guard (optional but recommended):
// never write the same EventId twice to a stream.
var existingIds = list.Select(e => e.EventId).ToHashSet();
foreach (var e in events)
{
if (existingIds.Contains(e.EventId))
continue;

list.Add(e);
existingIds.Add(e.EventId);
}

return Task.FromResult(new AppendResult(list.Count));
}
}
}

Loading an aggregate and saving its new events​

You’ll do this pattern constantly:

  1. Read history
  2. Rebuild aggregate
  3. Execute a command method (emits new events)
  4. Append new events with expected version
public static class BankAccountRepository
{
public static async Task<BankAccount> Load(IEventStore store, Guid accountId, CancellationToken ct)
{
var streamId = $"bank-account-{accountId}";
var domainEvents = new List<IDomainEvent>();

await foreach (var se in store.ReadStream(streamId, fromVersionInclusive: 0, ct))
{
domainEvents.Add(DeserializeDomainEvent(se));
}

var acc = new BankAccount();
acc.LoadFromHistory(domainEvents);
return acc;
}

public static async Task Save(IEventStore store, BankAccount acc, CancellationToken ct)
{
var streamId = $"bank-account-{acc.Id}";
var newDomainEvents = acc.DequeueUncommittedEvents();

var stored = newDomainEvents.Select(e => new StoredEvent(
EventId: Guid.NewGuid(),
EventType: e.GetType().Name,
SchemaVersion: 1,
OccurredAt: DateTimeOffset.UtcNow,
Data: SerializeDomainEvent(e),
Metadata: null
)).ToList();

await store.AppendToStream(streamId, expectedVersion: acc.Version, stored, ct);
}

// Keep serialization explicit in docs (avoid magic). Replace with a real registry.
private static byte[] SerializeDomainEvent(IDomainEvent e) =>
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(e, e.GetType());

private static IDomainEvent DeserializeDomainEvent(StoredEvent e)
{
var type = e.EventType switch
{
nameof(AccountOpened) => typeof(AccountOpened),
nameof(MoneyDeposited) => typeof(MoneyDeposited),
nameof(MoneyWithdrawn) => typeof(MoneyWithdrawn),
nameof(AccountClosed) => typeof(AccountClosed),
_ => throw new NotSupportedException($"Unknown event type: {e.EventType}")
};

return (IDomainEvent)(System.Text.Json.JsonSerializer.Deserialize(e.Data, type)
?? throw new InvalidOperationException("Failed to deserialize event."));
}
}

What happens on concurrency conflicts?​

If two writers load version 3 and both try to append:

  • first append succeeds (stream becomes version 4)
  • second append fails with WrongExpectedVersionException

Your handler typically:

  • reloads
  • re-evaluates the command against the new state
  • retries if still valid

Next​

Now we can write events. To be useful, we must read efficiently (projections).

Next: Projections + CQRS