Advanced Conflict Resolution (beyond retry)
When two users try to modify the same aggregate simultaneously, you get a conflict.
The basic approach is retry: reload, re-evaluate, try again.
But sometimes you need smarter strategies.
Understanding conflicts
Optimistic concurrency conflict
User A: Load account (version 5)
User B: Load account (version 5)
User A: Withdraw $50 → Append (expect version 5) → Success (now version 6)
User B: Deposit $100 → Append (expect version 5) → FAIL (actual version 6)
User B's operation failed because the stream changed.
The naive retry
public async Task ExecuteWithRetry<T>(
Guid aggregateId,
Func<T, Task> action,
int maxRetries = 3,
CancellationToken ct = default) where T : AggregateRoot, new()
{
for (int attempt = 0; attempt < maxRetries; attempt++)
{
try
{
var aggregate = await Load<T>(aggregateId, ct);
await action(aggregate);
await Save(aggregate, ct);
return;
}
catch (WrongExpectedVersionException)
{
if (attempt == maxRetries - 1)
throw;
await Task.Delay(TimeSpan.FromMilliseconds(50 * Math.Pow(2, attempt)), ct);
}
}
}
This works for most cases. But what about:
- Non-commutative operations?
- User-facing conflicts?
- High-contention scenarios?
Strategy 1: Automatic merge (for commutative operations)
Some operations can be safely merged regardless of order.
Commutative operations
Operations are commutative if order doesn't matter:
+10then+20=+20then+10=+30SetFlag(true)thenSetFlag(true)= same result
Non-commutative operations
Order matters:
SetBalance(100)thenSetBalance(200)≠SetBalance(200)thenSetBalance(100)Withdraw(50)when balance is 60 might fail, but not if deposit happened first
Auto-merge implementation
public interface IConflictResolver<TAggregate> where TAggregate : AggregateRoot
{
ConflictResolution Resolve(
TAggregate currentState,
IReadOnlyList<IDomainEvent> conflictingEvents,
IReadOnlyList<IDomainEvent> ourPendingEvents);
}
public enum ConflictResolutionType
{
Retry, // Reload and re-execute command
Merge, // Apply our events on top
Reject, // Fail the operation
Custom // Custom resolution
}
public sealed record ConflictResolution(
ConflictResolutionType Type,
IReadOnlyList<IDomainEvent>? MergedEvents = null,
string? RejectionReason = null
);
public sealed class BankAccountConflictResolver : IConflictResolver<BankAccount>
{
public ConflictResolution Resolve(
BankAccount currentState,
IReadOnlyList<IDomainEvent> conflictingEvents,
IReadOnlyList<IDomainEvent> ourPendingEvents)
{
// Check if our operations are still valid against current state
foreach (var @event in ourPendingEvents)
{
switch (@event)
{
case MoneyDeposited:
// Deposits are always safe to merge (commutative, always valid)
continue;
case MoneyWithdrawn e:
// Check if withdrawal is still valid
if (currentState.Balance < e.Amount)
{
return new ConflictResolution(
ConflictResolutionType.Reject,
RejectionReason: "Insufficient funds after concurrent changes");
}
continue;
case AccountClosed:
// Can't close if already closed
if (currentState.IsClosed)
{
return new ConflictResolution(
ConflictResolutionType.Reject,
RejectionReason: "Account already closed");
}
continue;
}
}
// All operations still valid - merge them
return new ConflictResolution(
ConflictResolutionType.Merge,
MergedEvents: ourPendingEvents);
}
}
Smart retry with conflict resolution
public sealed class SmartRepository<TAggregate> where TAggregate : AggregateRoot, new()
{
private readonly IEventStore _store;
private readonly IConflictResolver<TAggregate> _resolver;
public async Task Execute(
string streamId,
Func<TAggregate, Task> action,
CancellationToken ct)
{
var aggregate = await Load(streamId, ct);
var originalVersion = aggregate.Version;
await action(aggregate);
var pendingEvents = aggregate.DequeueUncommittedEvents();
if (pendingEvents.Count == 0)
return;
while (true)
{
try
{
await Append(streamId, aggregate.Version, pendingEvents, ct);
return;
}
catch (WrongExpectedVersionException)
{
// Load current state
var currentAggregate = await Load(streamId, ct);
// Get events that happened since we loaded
var conflictingEvents = await GetEventsSince(streamId, originalVersion, ct);
// Try to resolve
var resolution = _resolver.Resolve(
currentAggregate,
conflictingEvents,
pendingEvents);
switch (resolution.Type)
{
case ConflictResolutionType.Merge:
// Update version and retry with merged events
aggregate = currentAggregate;
pendingEvents = resolution.MergedEvents!.ToList();
break;
case ConflictResolutionType.Retry:
// Re-execute the entire action
aggregate = currentAggregate;
await action(aggregate);
pendingEvents = aggregate.DequeueUncommittedEvents();
break;
case ConflictResolutionType.Reject:
throw new ConflictException(resolution.RejectionReason!);
default:
throw new InvalidOperationException("Unknown resolution type");
}
originalVersion = aggregate.Version;
}
}
}
private async Task<List<IDomainEvent>> GetEventsSince(
string streamId,
long fromVersion,
CancellationToken ct)
{
var events = new List<IDomainEvent>();
await foreach (var se in _store.ReadStream(streamId, fromVersion + 1, ct))
{
events.Add(Deserialize(se));
}
return events;
}
private Task<TAggregate> Load(string streamId, CancellationToken ct) => /* ... */;
private Task Append(string streamId, long version, IReadOnlyList<IDomainEvent> events, CancellationToken ct) => /* ... */;
private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
}
Strategy 2: User-facing conflict resolution
Sometimes the system can't decide—the user must choose.
Conflict detection and presentation
public sealed record ConflictInfo(
Guid AggregateId,
long YourVersion,
long CurrentVersion,
IReadOnlyList<ChangeDescription> YourChanges,
IReadOnlyList<ChangeDescription> ConflictingChanges
);
public sealed record ChangeDescription(
string Field,
string OldValue,
string NewValue,
DateTimeOffset ChangedAt,
string ChangedBy
);
public sealed class ConflictDetector
{
public async Task<ConflictInfo?> DetectConflict<T>(
string streamId,
long expectedVersion,
IReadOnlyList<IDomainEvent> pendingEvents,
CancellationToken ct) where T : AggregateRoot, new()
{
var currentVersion = await GetCurrentVersion(streamId, ct);
if (currentVersion == expectedVersion)
return null; // No conflict
var conflictingEvents = await GetEventsSince(streamId, expectedVersion, ct);
return new ConflictInfo(
AggregateId: ExtractId(streamId),
YourVersion: expectedVersion,
CurrentVersion: currentVersion,
YourChanges: DescribeChanges(pendingEvents),
ConflictingChanges: DescribeChanges(conflictingEvents)
);
}
private IReadOnlyList<ChangeDescription> DescribeChanges(IReadOnlyList<IDomainEvent> events)
{
return events.Select(e => e switch
{
MoneyDeposited d => new ChangeDescription(
"Balance", "", $"+{d.Amount:C}", DateTimeOffset.UtcNow, ""),
MoneyWithdrawn w => new ChangeDescription(
"Balance", "", $"-{w.Amount:C}", DateTimeOffset.UtcNow, ""),
// ... other event types
_ => new ChangeDescription(e.GetType().Name, "", "", DateTimeOffset.UtcNow, "")
}).ToList();
}
private Task<long> GetCurrentVersion(string streamId, CancellationToken ct) => /* ... */;
private Task<IReadOnlyList<IDomainEvent>> GetEventsSince(string streamId, long version, CancellationToken ct) => /* ... */;
private Guid ExtractId(string streamId) => /* ... */;
}
API response with conflict
public sealed class OrderController : ControllerBase
{
[HttpPut("{orderId}/items")]
public async Task<IActionResult> UpdateItems(
Guid orderId,
[FromBody] UpdateItemsRequest request)
{
try
{
await _orderService.UpdateItems(orderId, request.ExpectedVersion, request.Items);
return Ok();
}
catch (ConflictException ex)
{
return Conflict(new
{
Message = "The order was modified by another user",
YourVersion = ex.ExpectedVersion,
CurrentVersion = ex.ActualVersion,
ConflictingChanges = ex.ConflictingChanges,
Options = new[]
{
new { Action = "overwrite", Description = "Replace with your changes" },
new { Action = "merge", Description = "Merge changes together" },
new { Action = "reload", Description = "Discard your changes and reload" }
}
});
}
}
[HttpPut("{orderId}/items/resolve")]
public async Task<IActionResult> ResolveConflict(
Guid orderId,
[FromBody] ResolveConflictRequest request)
{
switch (request.Resolution)
{
case "overwrite":
// Force our version (use current version as expected)
await _orderService.UpdateItems(
orderId,
request.CurrentVersion,
request.Items);
break;
case "merge":
// Apply merge logic
await _orderService.MergeItems(
orderId,
request.CurrentVersion,
request.Items,
request.MergeStrategy);
break;
case "reload":
// Client will reload
break;
}
return Ok();
}
}
Strategy 3: Last-write-wins
Sometimes conflicts don't matter—latest write wins.
When to use
- Low-stakes data (preferences, drafts)
- UI state that users expect to overwrite
- Idempotent operations
Implementation
public sealed class LastWriteWinsRepository
{
private readonly IEventStore _store;
public async Task SaveWithLastWriteWins<T>(
T aggregate,
CancellationToken ct) where T : AggregateRoot
{
var streamId = GetStreamId(aggregate);
var events = aggregate.DequeueUncommittedEvents();
while (true)
{
try
{
// Get current version
var currentVersion = await GetCurrentVersion(streamId, ct);
// Append with current version (not our expected version)
await _store.AppendToStream(streamId, currentVersion, ToStored(events), ct);
return;
}
catch (WrongExpectedVersionException)
{
// Race condition - try again with new version
await Task.Delay(10, ct);
}
}
}
private Task<long> GetCurrentVersion(string streamId, CancellationToken ct) => /* ... */;
private string GetStreamId<T>(T aggregate) where T : AggregateRoot => /* ... */;
private IReadOnlyList<StoredEvent> ToStored(IReadOnlyList<IDomainEvent> events) => /* ... */;
}
Strategy 4: Conflict-free Replicated Data Types (CRDTs)
For eventually consistent systems, CRDTs guarantee convergence.
G-Counter (grow-only counter)
// Each node maintains its own counter
// Total = sum of all node counters
// Merge = take max of each node's counter
public sealed class GCounter
{
private readonly Dictionary<string, long> _counters = new();
public void Increment(string nodeId, long amount = 1)
{
if (!_counters.ContainsKey(nodeId))
_counters[nodeId] = 0;
_counters[nodeId] += amount;
}
public long Value => _counters.Values.Sum();
public void Merge(GCounter other)
{
foreach (var (nodeId, count) in other._counters)
{
if (!_counters.ContainsKey(nodeId) || _counters[nodeId] < count)
{
_counters[nodeId] = count;
}
}
}
}
// Event for CRDT state
public sealed record CounterIncremented(
string NodeId,
long NewNodeValue
) : IDomainEvent;
PN-Counter (positive-negative counter)
// Two G-Counters: one for increments, one for decrements
public sealed class PNCounter
{
private readonly GCounter _positive = new();
private readonly GCounter _negative = new();
public void Increment(string nodeId, long amount = 1)
{
_positive.Increment(nodeId, amount);
}
public void Decrement(string nodeId, long amount = 1)
{
_negative.Increment(nodeId, amount);
}
public long Value => _positive.Value - _negative.Value;
public void Merge(PNCounter other)
{
_positive.Merge(other._positive);
_negative.Merge(other._negative);
}
}
LWW-Register (last-writer-wins register)
public sealed class LWWRegister<T>
{
private T? _value;
private DateTimeOffset _timestamp;
public T? Value => _value;
public void Set(T value, DateTimeOffset timestamp)
{
if (timestamp > _timestamp)
{
_value = value;
_timestamp = timestamp;
}
}
public void Merge(LWWRegister<T> other)
{
if (other._timestamp > _timestamp)
{
_value = other._value;
_timestamp = other._timestamp;
}
}
}
// Event
public sealed record RegisterUpdated<T>(
T Value,
DateTimeOffset Timestamp
) : IDomainEvent;
Using CRDTs in aggregates
public sealed class DistributedCounter : AggregateRoot
{
private readonly PNCounter _counter = new();
private readonly string _nodeId;
public DistributedCounter(string nodeId)
{
_nodeId = nodeId;
}
public long Value => _counter.Value;
public void Increment(long amount = 1)
{
_counter.Increment(_nodeId, amount);
Raise(new CounterIncremented(_nodeId, _counter.Value));
}
public void Decrement(long amount = 1)
{
_counter.Decrement(_nodeId, amount);
Raise(new CounterDecremented(_nodeId, _counter.Value));
}
// Merge from another node's events
public void MergeFrom(PNCounter other)
{
_counter.Merge(other);
Raise(new CounterMerged(other));
}
protected override void Apply(IDomainEvent @event)
{
// CRDTs are self-describing, Apply just records
}
}
Strategy 5: Reservation pattern
Prevent conflicts by reserving resources first.
public sealed class InventoryReservation : AggregateRoot
{
public Guid ProductId { get; private set; }
public int AvailableQuantity { get; private set; }
public Dictionary<Guid, int> Reservations { get; } = new();
public void Reserve(Guid orderId, int quantity)
{
if (Reservations.ContainsKey(orderId))
throw new InvalidOperationException("Already reserved for this order");
if (AvailableQuantity < quantity)
throw new InvalidOperationException("Insufficient inventory");
Raise(new InventoryReserved(ProductId, orderId, quantity));
}
public void ConfirmReservation(Guid orderId)
{
if (!Reservations.ContainsKey(orderId))
throw new InvalidOperationException("No reservation found");
Raise(new ReservationConfirmed(ProductId, orderId));
}
public void CancelReservation(Guid orderId)
{
if (!Reservations.ContainsKey(orderId))
return; // Idempotent
Raise(new ReservationCancelled(ProductId, orderId));
}
protected override void Apply(IDomainEvent @event)
{
switch (@event)
{
case InventoryReserved e:
Reservations[e.OrderId] = e.Quantity;
AvailableQuantity -= e.Quantity;
break;
case ReservationConfirmed e:
Reservations.Remove(e.OrderId);
// Quantity already deducted
break;
case ReservationCancelled e:
if (Reservations.TryGetValue(e.OrderId, out var qty))
{
AvailableQuantity += qty;
Reservations.Remove(e.OrderId);
}
break;
}
}
}
Conflict resolution decision tree
Is the operation commutative?
├── Yes → Auto-merge
└── No
├── Is it user-facing?
│ ├── Yes → Present conflict to user
│ └── No
│ ├── Is data loss acceptable?
│ │ ├── Yes → Last-write-wins
│ │ └── No → Retry with validation
│ └── Is it distributed/multi-node?
│ ├── Yes → Consider CRDTs
│ └── No → Standard retry
Testing conflict resolution
public sealed class ConflictResolutionTests
{
[Fact]
public async Task Concurrent_deposits_are_merged()
{
var store = new InMemoryEventStore();
var resolver = new BankAccountConflictResolver();
var repo = new SmartRepository<BankAccount>(store, resolver);
var accountId = Guid.NewGuid();
var streamId = $"bank-account-{accountId}";
// Create account
await repo.Execute(streamId, async acc =>
{
var opened = BankAccount.Open(accountId, "Test");
// Copy state (simplified)
}, CancellationToken.None);
// Simulate concurrent deposits
var task1 = repo.Execute(streamId, acc =>
{
acc.Deposit(100);
return Task.CompletedTask;
}, CancellationToken.None);
var task2 = repo.Execute(streamId, acc =>
{
acc.Deposit(200);
return Task.CompletedTask;
}, CancellationToken.None);
await Task.WhenAll(task1, task2);
// Both deposits should succeed
var final = await LoadAccount(store, streamId);
Assert.Equal(300, final.Balance);
}
[Fact]
public async Task Concurrent_withdrawal_fails_if_insufficient_after_conflict()
{
var store = new InMemoryEventStore();
var resolver = new BankAccountConflictResolver();
var repo = new SmartRepository<BankAccount>(store, resolver);
var accountId = Guid.NewGuid();
var streamId = $"bank-account-{accountId}";
// Create account with $100
await repo.Execute(streamId, acc =>
{
BankAccount.Open(accountId, "Test");
acc.Deposit(100);
return Task.CompletedTask;
}, CancellationToken.None);
// User A: Withdraw $80 (should succeed)
var task1 = repo.Execute(streamId, acc =>
{
acc.Withdraw(80);
return Task.CompletedTask;
}, CancellationToken.None);
// User B: Withdraw $50 (should fail after A's withdrawal)
var task2 = repo.Execute(streamId, acc =>
{
acc.Withdraw(50);
return Task.CompletedTask;
}, CancellationToken.None);
await task1;
await Assert.ThrowsAsync<ConflictException>(() => task2);
}
private Task<BankAccount> LoadAccount(IEventStore store, string streamId) => /* ... */;
}
Next
Debugging and observability for event-sourced systems.
Next: Debugging and observability
Sources
https://martinfowler.com/articles/patterns-of-distributed-systems/https://crdt.tech/https://learn.microsoft.com/en-us/azure/architecture/patterns/event-sourcing#issues-and-considerationshttps://www.eventstore.com/blog/optimistic-concurrency-in-event-sourcing