Skip to main content

Multi-Stream Projections (cross-aggregate views)

Single-stream projections are straightforward: one aggregate, one read model.

Real applications need views that span multiple aggregates:

  • "All accounts for customer X"
  • "Total deposits across all accounts today"
  • "Leaderboard of highest-balance accounts"

This is where event sourcing gets interesting.

The challenge

Events are partitioned by stream (aggregate). But queries often need data from multiple streams.

You have two main approaches:

  1. Subscribe to all events and filter/join in your projector
  2. Category streams (if your event store supports them)

Approach 1: Subscribe to $all

Most event stores support an "all events" subscription:

public interface IAllEventsSubscription
{
IAsyncEnumerable<(string StreamId, long GlobalPosition, StoredEvent Event)>
SubscribeToAll(long fromPosition, CancellationToken ct);
}

Customer accounts summary projection

public sealed record CustomerAccountsSummary(
Guid CustomerId,
int TotalAccounts,
int OpenAccounts,
int ClosedAccounts,
decimal TotalBalance,
DateTimeOffset LastActivityAt,
long LastProcessedPosition
);

public sealed class CustomerAccountsSummaryProjector
{
private readonly Dictionary<Guid, CustomerAccountsSummary> _summaries = new();
private readonly Dictionary<Guid, Guid> _accountToCustomer = new(); // account -> customer mapping

public CustomerAccountsSummary? Get(Guid customerId) =>
_summaries.TryGetValue(customerId, out var s) ? s : null;

public void Apply(string streamId, long globalPosition, StoredEvent storedEvent)
{
// Only process bank account events
if (!streamId.StartsWith("bank-account-"))
return;

var accountId = Guid.Parse(streamId.Replace("bank-account-", ""));
var @event = Deserialize(storedEvent);

switch (@event)
{
case AccountOpenedWithCustomer e:
HandleAccountOpened(accountId, e, globalPosition);
break;

case MoneyDeposited e:
HandleBalanceChange(accountId, e.Amount, globalPosition);
break;

case MoneyWithdrawn e:
HandleBalanceChange(accountId, -e.Amount, globalPosition);
break;

case AccountClosed:
HandleAccountClosed(accountId, globalPosition);
break;
}
}

private void HandleAccountOpened(Guid accountId, AccountOpenedWithCustomer e, long position)
{
_accountToCustomer[accountId] = e.CustomerId;

var summary = GetOrCreate(e.CustomerId);
_summaries[e.CustomerId] = summary with
{
TotalAccounts = summary.TotalAccounts + 1,
OpenAccounts = summary.OpenAccounts + 1,
LastActivityAt = DateTimeOffset.UtcNow,
LastProcessedPosition = position
};
}

private void HandleBalanceChange(Guid accountId, decimal delta, long position)
{
if (!_accountToCustomer.TryGetValue(accountId, out var customerId))
return;

var summary = GetOrCreate(customerId);
_summaries[customerId] = summary with
{
TotalBalance = summary.TotalBalance + delta,
LastActivityAt = DateTimeOffset.UtcNow,
LastProcessedPosition = position
};
}

private void HandleAccountClosed(Guid accountId, long position)
{
if (!_accountToCustomer.TryGetValue(accountId, out var customerId))
return;

var summary = GetOrCreate(customerId);
_summaries[customerId] = summary with
{
OpenAccounts = summary.OpenAccounts - 1,
ClosedAccounts = summary.ClosedAccounts + 1,
LastActivityAt = DateTimeOffset.UtcNow,
LastProcessedPosition = position
};
}

private CustomerAccountsSummary GetOrCreate(Guid customerId)
{
return _summaries.TryGetValue(customerId, out var s)
? s
: new CustomerAccountsSummary(customerId, 0, 0, 0, 0, DateTimeOffset.MinValue, 0);
}

private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
}

// Extended event with customer reference
public sealed record AccountOpenedWithCustomer(
Guid AccountId,
Guid CustomerId,
string OwnerName
) : IDomainEvent;

Approach 2: Category streams (KurrentDB/EventStoreDB)

KurrentDB automatically creates category streams:

  • Individual streams: bank-account-{id}
  • Category stream: $ce-bank-account (all bank account events)
// Subscribe to category stream instead of $all
await using var sub = client.SubscribeToStream(
"$ce-bank-account",
FromStream.Start);

await foreach (var message in sub.Messages)
{
if (message is StreamMessage.Event(var evnt))
{
var originalStreamId = evnt.Event.EventStreamId;
var eventType = evnt.Event.EventType;
// Process...
}
}

Join projections (multiple aggregate types)

Sometimes you need to join data from different aggregate types:

// Customer aggregate events
public sealed record CustomerRegistered(Guid CustomerId, string Name, string Email) : IDomainEvent;
public sealed record CustomerTierChanged(Guid CustomerId, string NewTier) : IDomainEvent;

// Account aggregate events (already defined)
public sealed record AccountOpened(Guid AccountId, string OwnerName) : IDomainEvent;

// Join projection: rich account view with customer details
public sealed record RichAccountView(
Guid AccountId,
Guid CustomerId,
string CustomerName,
string CustomerEmail,
string CustomerTier,
decimal Balance,
bool IsOpen,
long LastProcessedPosition
);

public sealed class RichAccountViewProjector
{
private readonly Dictionary<Guid, RichAccountView> _accounts = new();
private readonly Dictionary<Guid, CustomerData> _customers = new();
private readonly Dictionary<Guid, Guid> _accountToCustomer = new();

private sealed record CustomerData(string Name, string Email, string Tier);

public RichAccountView? GetAccount(Guid accountId) =>
_accounts.TryGetValue(accountId, out var v) ? v : null;

public IEnumerable<RichAccountView> GetAccountsForCustomer(Guid customerId) =>
_accounts.Values.Where(a => a.CustomerId == customerId);

public void Apply(string streamId, long position, IDomainEvent @event)
{
switch (@event)
{
// Customer events
case CustomerRegistered e:
_customers[e.CustomerId] = new CustomerData(e.Name, e.Email, "Standard");
RefreshAccountsForCustomer(e.CustomerId, position);
break;

case CustomerTierChanged e:
if (_customers.TryGetValue(e.CustomerId, out var existing))
{
_customers[e.CustomerId] = existing with { Tier = e.NewTier };
RefreshAccountsForCustomer(e.CustomerId, position);
}
break;

// Account events
case AccountOpenedWithCustomer e:
_accountToCustomer[e.AccountId] = e.CustomerId;
var customer = _customers.GetValueOrDefault(e.CustomerId);
_accounts[e.AccountId] = new RichAccountView(
AccountId: e.AccountId,
CustomerId: e.CustomerId,
CustomerName: customer?.Name ?? "Unknown",
CustomerEmail: customer?.Email ?? "",
CustomerTier: customer?.Tier ?? "Standard",
Balance: 0,
IsOpen: true,
LastProcessedPosition: position
);
break;

case MoneyDeposited e when TryGetAccountFromStream(streamId, out var accountId):
UpdateAccountBalance(accountId, e.Amount, position);
break;

case MoneyWithdrawn e when TryGetAccountFromStream(streamId, out var accountId):
UpdateAccountBalance(accountId, -e.Amount, position);
break;

case AccountClosed when TryGetAccountFromStream(streamId, out var accountId):
if (_accounts.TryGetValue(accountId, out var acc))
{
_accounts[accountId] = acc with { IsOpen = false, LastProcessedPosition = position };
}
break;
}
}

private void UpdateAccountBalance(Guid accountId, decimal delta, long position)
{
if (_accounts.TryGetValue(accountId, out var acc))
{
_accounts[accountId] = acc with
{
Balance = acc.Balance + delta,
LastProcessedPosition = position
};
}
}

private void RefreshAccountsForCustomer(Guid customerId, long position)
{
if (!_customers.TryGetValue(customerId, out var customer))
return;

foreach (var (accountId, custId) in _accountToCustomer)
{
if (custId == customerId && _accounts.TryGetValue(accountId, out var acc))
{
_accounts[accountId] = acc with
{
CustomerName = customer.Name,
CustomerEmail = customer.Email,
CustomerTier = customer.Tier,
LastProcessedPosition = position
};
}
}
}

private bool TryGetAccountFromStream(string streamId, out Guid accountId)
{
if (streamId.StartsWith("bank-account-"))
{
accountId = Guid.Parse(streamId.Replace("bank-account-", ""));
return true;
}
accountId = Guid.Empty;
return false;
}
}

Aggregation projections

Build aggregate statistics across all streams:

public sealed record DailyTransactionStats(
DateOnly Date,
int TotalDeposits,
decimal TotalDepositAmount,
int TotalWithdrawals,
decimal TotalWithdrawalAmount,
int AccountsOpened,
int AccountsClosed,
long LastProcessedPosition
);

public sealed class DailyStatsProjector
{
private readonly Dictionary<DateOnly, DailyTransactionStats> _stats = new();

public DailyTransactionStats? GetStats(DateOnly date) =>
_stats.TryGetValue(date, out var s) ? s : null;

public IEnumerable<DailyTransactionStats> GetStatsRange(DateOnly from, DateOnly to) =>
_stats.Where(kvp => kvp.Key >= from && kvp.Key <= to)
.OrderBy(kvp => kvp.Key)
.Select(kvp => kvp.Value);

public void Apply(DateTimeOffset occurredAt, long position, IDomainEvent @event)
{
var date = DateOnly.FromDateTime(occurredAt.Date);
var stats = GetOrCreate(date);

_stats[date] = @event switch
{
MoneyDeposited e => stats with
{
TotalDeposits = stats.TotalDeposits + 1,
TotalDepositAmount = stats.TotalDepositAmount + e.Amount,
LastProcessedPosition = position
},
MoneyWithdrawn e => stats with
{
TotalWithdrawals = stats.TotalWithdrawals + 1,
TotalWithdrawalAmount = stats.TotalWithdrawalAmount + e.Amount,
LastProcessedPosition = position
},
AccountOpened => stats with
{
AccountsOpened = stats.AccountsOpened + 1,
LastProcessedPosition = position
},
AccountClosed => stats with
{
AccountsClosed = stats.AccountsClosed + 1,
LastProcessedPosition = position
},
_ => stats with { LastProcessedPosition = position }
};
}

private DailyTransactionStats GetOrCreate(DateOnly date)
{
return _stats.TryGetValue(date, out var s)
? s
: new DailyTransactionStats(date, 0, 0, 0, 0, 0, 0, 0);
}
}

Leaderboard projection

public sealed record AccountLeaderboardEntry(
Guid AccountId,
string OwnerName,
decimal Balance,
int Rank
);

public sealed class BalanceLeaderboardProjector
{
private readonly Dictionary<Guid, (string OwnerName, decimal Balance)> _balances = new();

public IEnumerable<AccountLeaderboardEntry> GetTopN(int n)
{
return _balances
.OrderByDescending(kvp => kvp.Value.Balance)
.Take(n)
.Select((kvp, index) => new AccountLeaderboardEntry(
kvp.Key,
kvp.Value.OwnerName,
kvp.Value.Balance,
index + 1
));
}

public AccountLeaderboardEntry? GetRank(Guid accountId)
{
if (!_balances.TryGetValue(accountId, out var data))
return null;

var rank = _balances.Count(kvp => kvp.Value.Balance > data.Balance) + 1;
return new AccountLeaderboardEntry(accountId, data.OwnerName, data.Balance, rank);
}

public void Apply(string streamId, IDomainEvent @event)
{
if (!streamId.StartsWith("bank-account-"))
return;

var accountId = Guid.Parse(streamId.Replace("bank-account-", ""));

switch (@event)
{
case AccountOpened e:
_balances[accountId] = (e.OwnerName, 0);
break;

case MoneyDeposited e:
if (_balances.TryGetValue(accountId, out var d))
_balances[accountId] = (d.OwnerName, d.Balance + e.Amount);
break;

case MoneyWithdrawn e:
if (_balances.TryGetValue(accountId, out var w))
_balances[accountId] = (w.OwnerName, w.Balance - e.Amount);
break;

case AccountClosed:
_balances.Remove(accountId);
break;
}
}
}

Projection orchestrator

Coordinate multiple projectors from a single subscription:

public sealed class ProjectionOrchestrator
{
private readonly IAllEventsSubscription _subscription;
private readonly List<IProjector> _projectors;
private readonly ICheckpointStore _checkpoints;
private readonly ILogger<ProjectionOrchestrator> _logger;

public ProjectionOrchestrator(
IAllEventsSubscription subscription,
IEnumerable<IProjector> projectors,
ICheckpointStore checkpoints,
ILogger<ProjectionOrchestrator> logger)
{
_subscription = subscription;
_projectors = projectors.ToList();
_checkpoints = checkpoints;
_logger = logger;
}

public async Task Run(CancellationToken ct)
{
var checkpoint = await _checkpoints.GetCheckpoint("all-projections", ct);
var fromPosition = checkpoint?.Position ?? 0;

_logger.LogInformation("Starting projections from position {Position}", fromPosition);

await foreach (var (streamId, position, @event) in
_subscription.SubscribeToAll(fromPosition, ct))
{
var domainEvent = Deserialize(@event);

foreach (var projector in _projectors)
{
try
{
projector.Apply(streamId, position, @event.OccurredAt, domainEvent);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Projector {Projector} failed on event {EventType} at position {Position}",
projector.GetType().Name, @event.EventType, position);
// Decide: skip, retry, or halt
throw;
}
}

// Checkpoint periodically
if (position % 1000 == 0)
{
await _checkpoints.SaveCheckpoint("all-projections", position, ct);
}
}
}

private IDomainEvent Deserialize(StoredEvent e) => /* ... */;
}

public interface IProjector
{
void Apply(string streamId, long position, DateTimeOffset occurredAt, IDomainEvent @event);
}

public interface ICheckpointStore
{
Task<Checkpoint?> GetCheckpoint(string projectionName, CancellationToken ct);
Task SaveCheckpoint(string projectionName, long position, CancellationToken ct);
}

public sealed record Checkpoint(string ProjectionName, long Position, DateTimeOffset UpdatedAt);

SQL-backed multi-stream projection

For production, project to a relational database:

-- Customer accounts summary table
CREATE TABLE customer_accounts_summary (
customer_id UUID PRIMARY KEY,
total_accounts INT NOT NULL DEFAULT 0,
open_accounts INT NOT NULL DEFAULT 0,
closed_accounts INT NOT NULL DEFAULT 0,
total_balance DECIMAL(18,2) NOT NULL DEFAULT 0,
last_activity_at TIMESTAMPTZ,
last_processed_position BIGINT NOT NULL DEFAULT 0
);

-- Account to customer mapping
CREATE TABLE account_customer_map (
account_id UUID PRIMARY KEY,
customer_id UUID NOT NULL,
FOREIGN KEY (customer_id) REFERENCES customer_accounts_summary(customer_id)
);

CREATE INDEX idx_account_customer ON account_customer_map(customer_id);
public sealed class SqlCustomerAccountsSummaryProjector : IProjector
{
private readonly IDbConnection _db;

public SqlCustomerAccountsSummaryProjector(IDbConnection db)
{
_db = db;
}

public void Apply(string streamId, long position, DateTimeOffset occurredAt, IDomainEvent @event)
{
// Synchronous wrapper - in production use async
ApplyAsync(streamId, position, occurredAt, @event).GetAwaiter().GetResult();
}

private async Task ApplyAsync(
string streamId,
long position,
DateTimeOffset occurredAt,
IDomainEvent @event)
{
switch (@event)
{
case AccountOpenedWithCustomer e:
await HandleAccountOpened(e, position, occurredAt);
break;

case MoneyDeposited e when TryGetAccountId(streamId, out var accountId):
await HandleBalanceChange(accountId, e.Amount, position, occurredAt);
break;

case MoneyWithdrawn e when TryGetAccountId(streamId, out var accountId):
await HandleBalanceChange(accountId, -e.Amount, position, occurredAt);
break;

case AccountClosed when TryGetAccountId(streamId, out var accountId):
await HandleAccountClosed(accountId, position, occurredAt);
break;
}
}

private async Task HandleAccountOpened(
AccountOpenedWithCustomer e,
long position,
DateTimeOffset occurredAt)
{
// Ensure customer exists
await _db.ExecuteAsync(
@"INSERT INTO customer_accounts_summary (customer_id, last_processed_position)
VALUES (@CustomerId, @Position)
ON CONFLICT (customer_id) DO NOTHING",
new { e.CustomerId, Position = position });

// Map account to customer
await _db.ExecuteAsync(
@"INSERT INTO account_customer_map (account_id, customer_id)
VALUES (@AccountId, @CustomerId)
ON CONFLICT DO NOTHING",
new { e.AccountId, e.CustomerId });

// Update summary
await _db.ExecuteAsync(
@"UPDATE customer_accounts_summary SET
total_accounts = total_accounts + 1,
open_accounts = open_accounts + 1,
last_activity_at = @OccurredAt,
last_processed_position = @Position
WHERE customer_id = @CustomerId",
new { e.CustomerId, OccurredAt = occurredAt, Position = position });
}

private async Task HandleBalanceChange(
Guid accountId,
decimal delta,
long position,
DateTimeOffset occurredAt)
{
var customerId = await _db.QuerySingleOrDefaultAsync<Guid?>(
"SELECT customer_id FROM account_customer_map WHERE account_id = @accountId",
new { accountId });

if (customerId is null) return;

await _db.ExecuteAsync(
@"UPDATE customer_accounts_summary SET
total_balance = total_balance + @Delta,
last_activity_at = @OccurredAt,
last_processed_position = @Position
WHERE customer_id = @CustomerId",
new { Delta = delta, OccurredAt = occurredAt, Position = position, CustomerId = customerId });
}

private async Task HandleAccountClosed(Guid accountId, long position, DateTimeOffset occurredAt)
{
var customerId = await _db.QuerySingleOrDefaultAsync<Guid?>(
"SELECT customer_id FROM account_customer_map WHERE account_id = @accountId",
new { accountId });

if (customerId is null) return;

await _db.ExecuteAsync(
@"UPDATE customer_accounts_summary SET
open_accounts = open_accounts - 1,
closed_accounts = closed_accounts + 1,
last_activity_at = @OccurredAt,
last_processed_position = @Position
WHERE customer_id = @CustomerId",
new { OccurredAt = occurredAt, Position = position, CustomerId = customerId });
}

private bool TryGetAccountId(string streamId, out Guid accountId)
{
if (streamId.StartsWith("bank-account-"))
{
accountId = Guid.Parse(streamId.Replace("bank-account-", ""));
return true;
}
accountId = Guid.Empty;
return false;
}
}

Ordering and consistency

Global ordering vs per-stream ordering

  • Per-stream: events within one aggregate are ordered
  • Global: events across all streams have a total order

For multi-stream projections, you usually need global ordering.

Handling out-of-order events

If your event store doesn't guarantee global order:

public sealed class OutOfOrderBuffer
{
private readonly SortedDictionary<long, (string StreamId, IDomainEvent Event)> _buffer = new();
private long _lastProcessed = -1;

public IEnumerable<(string StreamId, IDomainEvent Event)> Add(
long position,
string streamId,
IDomainEvent @event)
{
_buffer[position] = (streamId, @event);

// Yield all contiguous events from buffer
while (_buffer.TryGetValue(_lastProcessed + 1, out var next))
{
_buffer.Remove(_lastProcessed + 1);
_lastProcessed++;
yield return next;
}
}
}

Next

GDPR and data privacy are critical when events contain personal data.

Next: GDPR and data privacy

Sources

  • https://www.eventstore.com/blog/projections-in-event-sourcing-build-any-model-you-want
  • https://learn.microsoft.com/en-us/azure/architecture/patterns/cqrs
  • https://github.com/kurrent-io/kurrentdb-client-dotnet (category streams documentation)