Skip to main content

Advanced Conflict Resolution (beyond retry)

When two users try to modify the same aggregate simultaneously, you get a conflict.

The basic approach is retry: reload, re-evaluate, try again.

But sometimes you need smarter strategies.

Understanding conflicts

Optimistic concurrency conflict

User A: Load account (version 5)
User B: Load account (version 5)
User A: Withdraw $50 → Append (expect version 5) → Success (now version 6)
User B: Deposit $100 → Append (expect version 5) → FAIL (actual version 6)

User B's operation failed because the stream changed.

The naive retry

public async Task ExecuteWithRetry<T>(
Guid aggregateId,
Func<T, Task> action,
int maxRetries = 3,
CancellationToken ct = default) where T : AggregateRoot, new()
{
for (int attempt = 0; attempt < maxRetries; attempt++)
{
try
{
var aggregate = await Load<T>(aggregateId, ct);
await action(aggregate);
await Save(aggregate, ct);
return;
}
catch (WrongExpectedVersionException)
{
if (attempt == maxRetries - 1)
throw;

await Task.Delay(TimeSpan.FromMilliseconds(50 * Math.Pow(2, attempt)), ct);
}
}
}

This works for most cases. But what about:

  • Non-commutative operations?
  • User-facing conflicts?
  • High-contention scenarios?

Strategy 1: Automatic merge (for commutative operations)

Some operations can be safely merged regardless of order.

Commutative operations

Operations are commutative if order doesn't matter:

  • +10 then +20 = +20 then +10 = +30
  • SetFlag(true) then SetFlag(true) = same result

Non-commutative operations

Order matters:

  • SetBalance(100) then SetBalance(200)SetBalance(200) then SetBalance(100)
  • Withdraw(50) when balance is 60 might fail, but not if deposit happened first

Auto-merge implementation

public interface IConflictResolver<TAggregate> where TAggregate : AggregateRoot
{
ConflictResolution Resolve(
TAggregate currentState,
IReadOnlyList<IDomainEvent> conflictingEvents,
IReadOnlyList<IDomainEvent> ourPendingEvents);
}

public enum ConflictResolutionType
{
Retry, // Reload and re-execute command
Merge, // Apply our events on top
Reject, // Fail the operation
Custom // Custom resolution
}

public sealed record ConflictResolution(
ConflictResolutionType Type,
IReadOnlyList<IDomainEvent>? MergedEvents = null,
string? RejectionReason = null
);

public sealed class BankAccountConflictResolver : IConflictResolver<BankAccount>
{
public ConflictResolution Resolve(
BankAccount currentState,
IReadOnlyList<IDomainEvent> conflictingEvents,
IReadOnlyList<IDomainEvent> ourPendingEvents)
{
// Check if our operations are still valid against current state
foreach (var @event in ourPendingEvents)
{
switch (@event)
{
case MoneyDeposited:
// Deposits are always safe to merge (commutative, always valid)
continue;

case MoneyWithdrawn e:
// Check if withdrawal is still valid
if (currentState.Balance < e.Amount)
{
return new ConflictResolution(
ConflictResolutionType.Reject,
RejectionReason: "Insufficient funds after concurrent changes");
}
continue;

case AccountClosed:
// Can't close if already closed
if (currentState.IsClosed)
{
return new ConflictResolution(
ConflictResolutionType.Reject,
RejectionReason: "Account already closed");
}
continue;
}
}

// All operations still valid - merge them
return new ConflictResolution(
ConflictResolutionType.Merge,
MergedEvents: ourPendingEvents);
}
}

Smart retry with conflict resolution

public sealed class SmartRepository<TAggregate> where TAggregate : AggregateRoot, new()
{
private readonly IEventStore _store;
private readonly IConflictResolver<TAggregate> _resolver;

public async Task Execute(
string streamId,
Func<TAggregate, Task> action,
CancellationToken ct)
{
var aggregate = await Load(streamId, ct);
var originalVersion = aggregate.Version;

await action(aggregate);
var pendingEvents = aggregate.DequeueUncommittedEvents();

if (pendingEvents.Count == 0)
return;

while (true)
{
try
{
await Append(streamId, aggregate.Version, pendingEvents, ct);
return;
}
catch (WrongExpectedVersionException)
{
// Load current state
var currentAggregate = await Load(streamId, ct);

// Get events that happened since we loaded
var conflictingEvents = await GetEventsSince(streamId, originalVersion, ct);

// Try to resolve
var resolution = _resolver.Resolve(
currentAggregate,
conflictingEvents,
pendingEvents);

switch (resolution.Type)
{
case ConflictResolutionType.Merge:
// Update version and retry with merged events
aggregate = currentAggregate;
pendingEvents = resolution.MergedEvents!.ToList();
break;

case ConflictResolutionType.Retry:
// Re-execute the entire action
aggregate = currentAggregate;
await action(aggregate);
pendingEvents = aggregate.DequeueUncommittedEvents();
break;

case ConflictResolutionType.Reject:
throw new ConflictException(resolution.RejectionReason!);

default:
throw new InvalidOperationException("Unknown resolution type");
}

originalVersion = aggregate.Version;
}
}
}

private async Task<List<IDomainEvent>> GetEventsSince(
string streamId,
long fromVersion,
CancellationToken ct)
{
var events = new List<IDomainEvent>();
await foreach (var se in _store.ReadStream(streamId, fromVersion + 1, ct))
{
events.Add(Deserialize(se));
}
return events;
}

private Task<TAggregate> Load(string streamId, CancellationToken ct) => /* ... */;
private Task Append(string streamId, long version, IReadOnlyList<IDomainEvent> events, CancellationToken ct) => /* ... */;
private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
}

Strategy 2: User-facing conflict resolution

Sometimes the system can't decide—the user must choose.

Conflict detection and presentation

public sealed record ConflictInfo(
Guid AggregateId,
long YourVersion,
long CurrentVersion,
IReadOnlyList<ChangeDescription> YourChanges,
IReadOnlyList<ChangeDescription> ConflictingChanges
);

public sealed record ChangeDescription(
string Field,
string OldValue,
string NewValue,
DateTimeOffset ChangedAt,
string ChangedBy
);

public sealed class ConflictDetector
{
public async Task<ConflictInfo?> DetectConflict<T>(
string streamId,
long expectedVersion,
IReadOnlyList<IDomainEvent> pendingEvents,
CancellationToken ct) where T : AggregateRoot, new()
{
var currentVersion = await GetCurrentVersion(streamId, ct);

if (currentVersion == expectedVersion)
return null; // No conflict

var conflictingEvents = await GetEventsSince(streamId, expectedVersion, ct);

return new ConflictInfo(
AggregateId: ExtractId(streamId),
YourVersion: expectedVersion,
CurrentVersion: currentVersion,
YourChanges: DescribeChanges(pendingEvents),
ConflictingChanges: DescribeChanges(conflictingEvents)
);
}

private IReadOnlyList<ChangeDescription> DescribeChanges(IReadOnlyList<IDomainEvent> events)
{
return events.Select(e => e switch
{
MoneyDeposited d => new ChangeDescription(
"Balance", "", $"+{d.Amount:C}", DateTimeOffset.UtcNow, ""),
MoneyWithdrawn w => new ChangeDescription(
"Balance", "", $"-{w.Amount:C}", DateTimeOffset.UtcNow, ""),
// ... other event types
_ => new ChangeDescription(e.GetType().Name, "", "", DateTimeOffset.UtcNow, "")
}).ToList();
}

private Task<long> GetCurrentVersion(string streamId, CancellationToken ct) => /* ... */;
private Task<IReadOnlyList<IDomainEvent>> GetEventsSince(string streamId, long version, CancellationToken ct) => /* ... */;
private Guid ExtractId(string streamId) => /* ... */;
}

API response with conflict

public sealed class OrderController : ControllerBase
{
[HttpPut("{orderId}/items")]
public async Task<IActionResult> UpdateItems(
Guid orderId,
[FromBody] UpdateItemsRequest request)
{
try
{
await _orderService.UpdateItems(orderId, request.ExpectedVersion, request.Items);
return Ok();
}
catch (ConflictException ex)
{
return Conflict(new
{
Message = "The order was modified by another user",
YourVersion = ex.ExpectedVersion,
CurrentVersion = ex.ActualVersion,
ConflictingChanges = ex.ConflictingChanges,
Options = new[]
{
new { Action = "overwrite", Description = "Replace with your changes" },
new { Action = "merge", Description = "Merge changes together" },
new { Action = "reload", Description = "Discard your changes and reload" }
}
});
}
}

[HttpPut("{orderId}/items/resolve")]
public async Task<IActionResult> ResolveConflict(
Guid orderId,
[FromBody] ResolveConflictRequest request)
{
switch (request.Resolution)
{
case "overwrite":
// Force our version (use current version as expected)
await _orderService.UpdateItems(
orderId,
request.CurrentVersion,
request.Items);
break;

case "merge":
// Apply merge logic
await _orderService.MergeItems(
orderId,
request.CurrentVersion,
request.Items,
request.MergeStrategy);
break;

case "reload":
// Client will reload
break;
}

return Ok();
}
}

Strategy 3: Last-write-wins

Sometimes conflicts don't matter—latest write wins.

When to use

  • Low-stakes data (preferences, drafts)
  • UI state that users expect to overwrite
  • Idempotent operations

Implementation

public sealed class LastWriteWinsRepository
{
private readonly IEventStore _store;

public async Task SaveWithLastWriteWins<T>(
T aggregate,
CancellationToken ct) where T : AggregateRoot
{
var streamId = GetStreamId(aggregate);
var events = aggregate.DequeueUncommittedEvents();

while (true)
{
try
{
// Get current version
var currentVersion = await GetCurrentVersion(streamId, ct);

// Append with current version (not our expected version)
await _store.AppendToStream(streamId, currentVersion, ToStored(events), ct);
return;
}
catch (WrongExpectedVersionException)
{
// Race condition - try again with new version
await Task.Delay(10, ct);
}
}
}

private Task<long> GetCurrentVersion(string streamId, CancellationToken ct) => /* ... */;
private string GetStreamId<T>(T aggregate) where T : AggregateRoot => /* ... */;
private IReadOnlyList<StoredEvent> ToStored(IReadOnlyList<IDomainEvent> events) => /* ... */;
}

Strategy 4: Conflict-free Replicated Data Types (CRDTs)

For eventually consistent systems, CRDTs guarantee convergence.

G-Counter (grow-only counter)

// Each node maintains its own counter
// Total = sum of all node counters
// Merge = take max of each node's counter

public sealed class GCounter
{
private readonly Dictionary<string, long> _counters = new();

public void Increment(string nodeId, long amount = 1)
{
if (!_counters.ContainsKey(nodeId))
_counters[nodeId] = 0;

_counters[nodeId] += amount;
}

public long Value => _counters.Values.Sum();

public void Merge(GCounter other)
{
foreach (var (nodeId, count) in other._counters)
{
if (!_counters.ContainsKey(nodeId) || _counters[nodeId] < count)
{
_counters[nodeId] = count;
}
}
}
}

// Event for CRDT state
public sealed record CounterIncremented(
string NodeId,
long NewNodeValue
) : IDomainEvent;

PN-Counter (positive-negative counter)

// Two G-Counters: one for increments, one for decrements
public sealed class PNCounter
{
private readonly GCounter _positive = new();
private readonly GCounter _negative = new();

public void Increment(string nodeId, long amount = 1)
{
_positive.Increment(nodeId, amount);
}

public void Decrement(string nodeId, long amount = 1)
{
_negative.Increment(nodeId, amount);
}

public long Value => _positive.Value - _negative.Value;

public void Merge(PNCounter other)
{
_positive.Merge(other._positive);
_negative.Merge(other._negative);
}
}

LWW-Register (last-writer-wins register)

public sealed class LWWRegister<T>
{
private T? _value;
private DateTimeOffset _timestamp;

public T? Value => _value;

public void Set(T value, DateTimeOffset timestamp)
{
if (timestamp > _timestamp)
{
_value = value;
_timestamp = timestamp;
}
}

public void Merge(LWWRegister<T> other)
{
if (other._timestamp > _timestamp)
{
_value = other._value;
_timestamp = other._timestamp;
}
}
}

// Event
public sealed record RegisterUpdated<T>(
T Value,
DateTimeOffset Timestamp
) : IDomainEvent;

Using CRDTs in aggregates

public sealed class DistributedCounter : AggregateRoot
{
private readonly PNCounter _counter = new();
private readonly string _nodeId;

public DistributedCounter(string nodeId)
{
_nodeId = nodeId;
}

public long Value => _counter.Value;

public void Increment(long amount = 1)
{
_counter.Increment(_nodeId, amount);
Raise(new CounterIncremented(_nodeId, _counter.Value));
}

public void Decrement(long amount = 1)
{
_counter.Decrement(_nodeId, amount);
Raise(new CounterDecremented(_nodeId, _counter.Value));
}

// Merge from another node's events
public void MergeFrom(PNCounter other)
{
_counter.Merge(other);
Raise(new CounterMerged(other));
}

protected override void Apply(IDomainEvent @event)
{
// CRDTs are self-describing, Apply just records
}
}

Strategy 5: Reservation pattern

Prevent conflicts by reserving resources first.

public sealed class InventoryReservation : AggregateRoot
{
public Guid ProductId { get; private set; }
public int AvailableQuantity { get; private set; }
public Dictionary<Guid, int> Reservations { get; } = new();

public void Reserve(Guid orderId, int quantity)
{
if (Reservations.ContainsKey(orderId))
throw new InvalidOperationException("Already reserved for this order");

if (AvailableQuantity < quantity)
throw new InvalidOperationException("Insufficient inventory");

Raise(new InventoryReserved(ProductId, orderId, quantity));
}

public void ConfirmReservation(Guid orderId)
{
if (!Reservations.ContainsKey(orderId))
throw new InvalidOperationException("No reservation found");

Raise(new ReservationConfirmed(ProductId, orderId));
}

public void CancelReservation(Guid orderId)
{
if (!Reservations.ContainsKey(orderId))
return; // Idempotent

Raise(new ReservationCancelled(ProductId, orderId));
}

protected override void Apply(IDomainEvent @event)
{
switch (@event)
{
case InventoryReserved e:
Reservations[e.OrderId] = e.Quantity;
AvailableQuantity -= e.Quantity;
break;

case ReservationConfirmed e:
Reservations.Remove(e.OrderId);
// Quantity already deducted
break;

case ReservationCancelled e:
if (Reservations.TryGetValue(e.OrderId, out var qty))
{
AvailableQuantity += qty;
Reservations.Remove(e.OrderId);
}
break;
}
}
}

Conflict resolution decision tree

Is the operation commutative?
├── Yes → Auto-merge
└── No
├── Is it user-facing?
│ ├── Yes → Present conflict to user
│ └── No
│ ├── Is data loss acceptable?
│ │ ├── Yes → Last-write-wins
│ │ └── No → Retry with validation
│ └── Is it distributed/multi-node?
│ ├── Yes → Consider CRDTs
│ └── No → Standard retry

Testing conflict resolution

public sealed class ConflictResolutionTests
{
[Fact]
public async Task Concurrent_deposits_are_merged()
{
var store = new InMemoryEventStore();
var resolver = new BankAccountConflictResolver();
var repo = new SmartRepository<BankAccount>(store, resolver);

var accountId = Guid.NewGuid();
var streamId = $"bank-account-{accountId}";

// Create account
await repo.Execute(streamId, async acc =>
{
var opened = BankAccount.Open(accountId, "Test");
// Copy state (simplified)
}, CancellationToken.None);

// Simulate concurrent deposits
var task1 = repo.Execute(streamId, acc =>
{
acc.Deposit(100);
return Task.CompletedTask;
}, CancellationToken.None);

var task2 = repo.Execute(streamId, acc =>
{
acc.Deposit(200);
return Task.CompletedTask;
}, CancellationToken.None);

await Task.WhenAll(task1, task2);

// Both deposits should succeed
var final = await LoadAccount(store, streamId);
Assert.Equal(300, final.Balance);
}

[Fact]
public async Task Concurrent_withdrawal_fails_if_insufficient_after_conflict()
{
var store = new InMemoryEventStore();
var resolver = new BankAccountConflictResolver();
var repo = new SmartRepository<BankAccount>(store, resolver);

var accountId = Guid.NewGuid();
var streamId = $"bank-account-{accountId}";

// Create account with $100
await repo.Execute(streamId, acc =>
{
BankAccount.Open(accountId, "Test");
acc.Deposit(100);
return Task.CompletedTask;
}, CancellationToken.None);

// User A: Withdraw $80 (should succeed)
var task1 = repo.Execute(streamId, acc =>
{
acc.Withdraw(80);
return Task.CompletedTask;
}, CancellationToken.None);

// User B: Withdraw $50 (should fail after A's withdrawal)
var task2 = repo.Execute(streamId, acc =>
{
acc.Withdraw(50);
return Task.CompletedTask;
}, CancellationToken.None);

await task1;

await Assert.ThrowsAsync<ConflictException>(() => task2);
}

private Task<BankAccount> LoadAccount(IEventStore store, string streamId) => /* ... */;
}

Next

Debugging and observability for event-sourced systems.

Next: Debugging and observability

Sources

  • https://martinfowler.com/articles/patterns-of-distributed-systems/
  • https://crdt.tech/
  • https://learn.microsoft.com/en-us/azure/architecture/patterns/event-sourcing#issues-and-considerations
  • https://www.eventstore.com/blog/optimistic-concurrency-in-event-sourcing