Temporal Queries (time travel for your data)
One of the superpowers of event sourcing: you can answer "what was the state at any point in time?"
Traditional databases overwrite state. Event sourcing keeps the full history.
Types of temporal queries
1. Point-in-time state
"What was the account balance on January 15th?"
2. State changes over time
"Show me all balance changes for this account in Q1"
3. Bi-temporal queries
"What did we think the balance was on Jan 15th, as of our knowledge on Feb 1st?" (Useful for auditing corrections)
Point-in-time reconstruction
Basic implementation
public static class TemporalQueries
{
public static async Task<BankAccount> LoadAtPointInTime(
IEventStore store,
Guid accountId,
DateTimeOffset asOf,
CancellationToken ct)
{
var streamId = $"bank-account-{accountId}";
var acc = new BankAccount();
var events = new List<IDomainEvent>();
await foreach (var se in store.ReadStream(streamId, fromVersionInclusive: 0, ct))
{
// Stop when we hit events after our target time
if (se.OccurredAt > asOf)
break;
events.Add(DeserializeDomainEvent(se));
}
acc.LoadFromHistory(events);
return acc;
}
private static IDomainEvent DeserializeDomainEvent(StoredEvent e)
{
var type = e.EventType switch
{
nameof(AccountOpened) => typeof(AccountOpened),
nameof(MoneyDeposited) => typeof(MoneyDeposited),
nameof(MoneyWithdrawn) => typeof(MoneyWithdrawn),
nameof(AccountClosed) => typeof(AccountClosed),
_ => throw new NotSupportedException($"Unknown event type: {e.EventType}")
};
return (IDomainEvent)(System.Text.Json.JsonSerializer.Deserialize(e.Data, type)
?? throw new InvalidOperationException("Failed to deserialize event."));
}
}
Usage
// What was the balance on January 15th, 2025?
var historicalAccount = await TemporalQueries.LoadAtPointInTime(
store,
accountId,
new DateTimeOffset(2025, 1, 15, 0, 0, 0, TimeSpan.Zero),
ct);
Console.WriteLine($"Balance on Jan 15: {historicalAccount.Balance}");
Temporal projections
Instead of just current state, project state at multiple points:
public sealed record AccountBalanceAtTime(
Guid AccountId,
DateTimeOffset AsOf,
decimal Balance
);
public sealed class TemporalBalanceProjector
{
private readonly Dictionary<(Guid, DateOnly), AccountBalanceAtTime> _dailySnapshots = new();
public void Apply(Guid accountId, DateTimeOffset occurredAt, IDomainEvent @event, decimal currentBalance)
{
var date = DateOnly.FromDateTime(occurredAt.Date);
var key = (accountId, date);
// Update end-of-day balance
_dailySnapshots[key] = new AccountBalanceAtTime(
accountId,
occurredAt,
currentBalance
);
}
public AccountBalanceAtTime? GetBalanceOn(Guid accountId, DateOnly date)
{
// Find the most recent snapshot on or before the date
return _dailySnapshots
.Where(kvp => kvp.Key.Item1 == accountId && kvp.Key.Item2 <= date)
.OrderByDescending(kvp => kvp.Key.Item2)
.Select(kvp => kvp.Value)
.FirstOrDefault();
}
public IEnumerable<AccountBalanceAtTime> GetBalanceHistory(
Guid accountId,
DateOnly from,
DateOnly to)
{
return _dailySnapshots
.Where(kvp =>
kvp.Key.Item1 == accountId &&
kvp.Key.Item2 >= from &&
kvp.Key.Item2 <= to)
.OrderBy(kvp => kvp.Key.Item2)
.Select(kvp => kvp.Value);
}
}
State changes over time (audit trail)
public sealed record StateChange(
DateTimeOffset OccurredAt,
string EventType,
string Description,
decimal? BalanceBefore,
decimal? BalanceAfter
);
public sealed class AccountAuditTrailProjector
{
private readonly Dictionary<Guid, List<StateChange>> _auditTrails = new();
private readonly Dictionary<Guid, decimal> _currentBalances = new();
public void Apply(Guid accountId, DateTimeOffset occurredAt, IDomainEvent @event)
{
if (!_auditTrails.ContainsKey(accountId))
_auditTrails[accountId] = new List<StateChange>();
if (!_currentBalances.ContainsKey(accountId))
_currentBalances[accountId] = 0;
var balanceBefore = _currentBalances[accountId];
decimal? balanceAfter = null;
string description;
switch (@event)
{
case AccountOpened e:
_currentBalances[accountId] = 0;
balanceAfter = 0;
description = $"Account opened for {e.OwnerName}";
break;
case MoneyDeposited e:
_currentBalances[accountId] += e.Amount;
balanceAfter = _currentBalances[accountId];
description = $"Deposited {e.Amount:C}";
break;
case MoneyWithdrawn e:
_currentBalances[accountId] -= e.Amount;
balanceAfter = _currentBalances[accountId];
description = $"Withdrew {e.Amount:C}";
break;
case AccountClosed:
description = "Account closed";
balanceAfter = _currentBalances[accountId];
break;
default:
description = @event.GetType().Name;
break;
}
_auditTrails[accountId].Add(new StateChange(
occurredAt,
@event.GetType().Name,
description,
balanceBefore,
balanceAfter
));
}
public IEnumerable<StateChange> GetAuditTrail(
Guid accountId,
DateTimeOffset? from = null,
DateTimeOffset? to = null)
{
if (!_auditTrails.TryGetValue(accountId, out var trail))
return Enumerable.Empty<StateChange>();
var query = trail.AsEnumerable();
if (from.HasValue)
query = query.Where(c => c.OccurredAt >= from.Value);
if (to.HasValue)
query = query.Where(c => c.OccurredAt <= to.Value);
return query.OrderBy(c => c.OccurredAt);
}
}
Bi-temporal modeling
Bi-temporal = two time dimensions:
- Valid time: when the fact was true in the real world
- Transaction time: when we recorded it in the system
Why? Because sometimes you record things late, or correct past mistakes.
Example: Late-arriving deposit
// Event with both time dimensions
public sealed record MoneyDepositedBiTemporal(
decimal Amount,
DateTimeOffset ValidTime, // when deposit actually happened
DateTimeOffset TransactionTime // when we recorded it
) : IDomainEvent;
// The stored event envelope already has OccurredAt (transaction time)
// ValidTime goes in the payload
Bi-temporal query
public static class BiTemporalQueries
{
/// <summary>
/// What did we think the balance was at validTime,
/// based on knowledge we had at transactionTime?
/// </summary>
public static async Task<decimal> GetBalanceAsOfKnowledge(
IEventStore store,
Guid accountId,
DateTimeOffset validTime,
DateTimeOffset transactionTime,
CancellationToken ct)
{
var streamId = $"bank-account-{accountId}";
decimal balance = 0;
await foreach (var se in store.ReadStream(streamId, 0, ct))
{
// Only consider events recorded before our knowledge cutoff
if (se.OccurredAt > transactionTime)
continue;
var @event = DeserializeDomainEvent(se);
// Get the valid time (when it actually happened)
var eventValidTime = GetValidTime(@event, se.OccurredAt);
// Only apply if the event's valid time is before our query time
if (eventValidTime <= validTime)
{
balance = ApplyToBalance(balance, @event);
}
}
return balance;
}
private static DateTimeOffset GetValidTime(IDomainEvent @event, DateTimeOffset fallback)
{
// If event has explicit valid time, use it; otherwise use transaction time
return @event switch
{
MoneyDepositedBiTemporal e => e.ValidTime,
_ => fallback
};
}
private static decimal ApplyToBalance(decimal balance, IDomainEvent @event)
{
return @event switch
{
AccountOpened => 0,
MoneyDeposited e => balance + e.Amount,
MoneyDepositedBiTemporal e => balance + e.Amount,
MoneyWithdrawn e => balance - e.Amount,
_ => balance
};
}
private static IDomainEvent DeserializeDomainEvent(StoredEvent e) => /* ... */;
}
Practical use case
// Scenario: On Feb 1st, we discover a deposit from Jan 10th wasn't recorded
// We record it now (Feb 1st) with valid time of Jan 10th
var lateDeposit = new MoneyDepositedBiTemporal(
Amount: 500m,
ValidTime: new DateTimeOffset(2025, 1, 10, 14, 30, 0, TimeSpan.Zero),
TransactionTime: DateTimeOffset.UtcNow // Feb 1st
);
// Query 1: What's the current balance? (includes the late deposit)
var currentBalance = await BiTemporalQueries.GetBalanceAsOfKnowledge(
store, accountId,
validTime: DateTimeOffset.UtcNow,
transactionTime: DateTimeOffset.UtcNow,
ct);
// Query 2: What did we THINK the balance was on Jan 15th,
// BEFORE we knew about the late deposit?
var historicalView = await BiTemporalQueries.GetBalanceAsOfKnowledge(
store, accountId,
validTime: new DateTimeOffset(2025, 1, 15, 0, 0, 0, TimeSpan.Zero),
transactionTime: new DateTimeOffset(2025, 1, 31, 0, 0, 0, TimeSpan.Zero),
ct);
// This won't include the late deposit because we didn't know about it on Jan 31st
Temporal projections with SQL
For production systems, project temporal data to a queryable format:
-- Daily balance snapshots table
CREATE TABLE account_daily_balances (
account_id UUID NOT NULL,
balance_date DATE NOT NULL,
opening_balance DECIMAL(18,2) NOT NULL,
closing_balance DECIMAL(18,2) NOT NULL,
total_deposits DECIMAL(18,2) NOT NULL,
total_withdrawals DECIMAL(18,2) NOT NULL,
transaction_count INT NOT NULL,
last_event_version BIGINT NOT NULL,
PRIMARY KEY (account_id, balance_date)
);
-- Query: Balance on any date
SELECT closing_balance
FROM account_daily_balances
WHERE account_id = @accountId
AND balance_date <= @targetDate
ORDER BY balance_date DESC
LIMIT 1;
-- Query: Balance trend over time
SELECT balance_date, closing_balance
FROM account_daily_balances
WHERE account_id = @accountId
AND balance_date BETWEEN @startDate AND @endDate
ORDER BY balance_date;
// Projector that maintains daily snapshots
public sealed class DailyBalanceProjector
{
private readonly IDbConnection _db;
public async Task Apply(
Guid accountId,
DateTimeOffset occurredAt,
IDomainEvent @event,
long streamVersion,
CancellationToken ct)
{
var date = DateOnly.FromDateTime(occurredAt.Date);
var existing = await GetOrCreateSnapshot(accountId, date, ct);
var updated = @event switch
{
MoneyDeposited e => existing with
{
ClosingBalance = existing.ClosingBalance + e.Amount,
TotalDeposits = existing.TotalDeposits + e.Amount,
TransactionCount = existing.TransactionCount + 1,
LastEventVersion = streamVersion
},
MoneyWithdrawn e => existing with
{
ClosingBalance = existing.ClosingBalance - e.Amount,
TotalWithdrawals = existing.TotalWithdrawals + e.Amount,
TransactionCount = existing.TransactionCount + 1,
LastEventVersion = streamVersion
},
_ => existing with { LastEventVersion = streamVersion }
};
await Upsert(updated, ct);
}
private async Task<DailySnapshot> GetOrCreateSnapshot(
Guid accountId,
DateOnly date,
CancellationToken ct)
{
// Get previous day's closing balance as today's opening
var previousClosing = await _db.QuerySingleOrDefaultAsync<decimal?>(
@"SELECT closing_balance FROM account_daily_balances
WHERE account_id = @accountId AND balance_date < @date
ORDER BY balance_date DESC LIMIT 1",
new { accountId, date });
return new DailySnapshot(
AccountId: accountId,
BalanceDate: date,
OpeningBalance: previousClosing ?? 0,
ClosingBalance: previousClosing ?? 0,
TotalDeposits: 0,
TotalWithdrawals: 0,
TransactionCount: 0,
LastEventVersion: 0
);
}
private async Task Upsert(DailySnapshot snapshot, CancellationToken ct)
{
await _db.ExecuteAsync(
@"INSERT INTO account_daily_balances
(account_id, balance_date, opening_balance, closing_balance,
total_deposits, total_withdrawals, transaction_count, last_event_version)
VALUES (@AccountId, @BalanceDate, @OpeningBalance, @ClosingBalance,
@TotalDeposits, @TotalWithdrawals, @TransactionCount, @LastEventVersion)
ON CONFLICT (account_id, balance_date) DO UPDATE SET
closing_balance = @ClosingBalance,
total_deposits = @TotalDeposits,
total_withdrawals = @TotalWithdrawals,
transaction_count = @TransactionCount,
last_event_version = @LastEventVersion",
snapshot);
}
}
public sealed record DailySnapshot(
Guid AccountId,
DateOnly BalanceDate,
decimal OpeningBalance,
decimal ClosingBalance,
decimal TotalDeposits,
decimal TotalWithdrawals,
int TransactionCount,
long LastEventVersion
);
Performance considerations
Indexing for temporal queries
-- Index for point-in-time queries
CREATE INDEX idx_events_stream_time
ON events (stream_id, occurred_at);
-- Index for bi-temporal queries
CREATE INDEX idx_events_bitemporal
ON events (stream_id, valid_time, transaction_time);
Snapshot-assisted temporal queries
For long streams, combine snapshots with temporal queries:
public static async Task<BankAccount> LoadAtPointInTimeWithSnapshots(
IEventStore store,
ISnapshotStore snapshots,
Guid accountId,
DateTimeOffset asOf,
CancellationToken ct)
{
var streamId = $"bank-account-{accountId}";
// Find the most recent snapshot BEFORE our target time
var snap = await snapshots.GetLatestBefore(streamId, asOf, ct);
BankAccount acc;
long fromVersion;
if (snap is not null)
{
acc = BankAccount.FromSnapshot(
System.Text.Json.JsonSerializer.Deserialize<BankAccountState>(snap.State)!,
snap.StreamVersion);
fromVersion = snap.StreamVersion;
}
else
{
acc = new BankAccount();
fromVersion = 0;
}
// Replay only events between snapshot and target time
var events = new List<IDomainEvent>();
await foreach (var se in store.ReadStream(streamId, fromVersion, ct))
{
if (se.OccurredAt > asOf) break;
events.Add(DeserializeDomainEvent(se));
}
acc.LoadFromHistory(events);
return acc;
}
Testing temporal queries
public sealed class TemporalQueryTests
{
[Fact]
public async Task LoadAtPointInTime_returns_state_at_specified_time()
{
var store = new InMemoryEventStore();
var accountId = Guid.NewGuid();
var streamId = $"bank-account-{accountId}";
// Create timeline:
// Jan 1: Open with $0
// Jan 5: Deposit $100
// Jan 10: Withdraw $30
// Jan 15: Deposit $50
var events = new[]
{
CreateEvent(new AccountOpened(accountId, "Test"),
new DateTimeOffset(2025, 1, 1, 10, 0, 0, TimeSpan.Zero)),
CreateEvent(new MoneyDeposited(100m),
new DateTimeOffset(2025, 1, 5, 10, 0, 0, TimeSpan.Zero)),
CreateEvent(new MoneyWithdrawn(30m),
new DateTimeOffset(2025, 1, 10, 10, 0, 0, TimeSpan.Zero)),
CreateEvent(new MoneyDeposited(50m),
new DateTimeOffset(2025, 1, 15, 10, 0, 0, TimeSpan.Zero)),
};
await store.AppendToStream(streamId, 0, events, CancellationToken.None);
// Query at different points
var jan3 = await TemporalQueries.LoadAtPointInTime(
store, accountId,
new DateTimeOffset(2025, 1, 3, 0, 0, 0, TimeSpan.Zero),
CancellationToken.None);
Assert.Equal(0m, jan3.Balance); // Only opened
var jan7 = await TemporalQueries.LoadAtPointInTime(
store, accountId,
new DateTimeOffset(2025, 1, 7, 0, 0, 0, TimeSpan.Zero),
CancellationToken.None);
Assert.Equal(100m, jan7.Balance); // Opened + deposit
var jan12 = await TemporalQueries.LoadAtPointInTime(
store, accountId,
new DateTimeOffset(2025, 1, 12, 0, 0, 0, TimeSpan.Zero),
CancellationToken.None);
Assert.Equal(70m, jan12.Balance); // 100 - 30
var jan20 = await TemporalQueries.LoadAtPointInTime(
store, accountId,
new DateTimeOffset(2025, 1, 20, 0, 0, 0, TimeSpan.Zero),
CancellationToken.None);
Assert.Equal(120m, jan20.Balance); // 100 - 30 + 50
}
private static StoredEvent CreateEvent(IDomainEvent @event, DateTimeOffset occurredAt)
{
return new StoredEvent(
EventId: Guid.NewGuid(),
EventType: @event.GetType().Name,
SchemaVersion: 1,
OccurredAt: occurredAt,
Data: System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType()),
Metadata: null
);
}
}
Next
Multi-stream projections let you build views across multiple aggregates.
Next: Multi-stream projections
Sources
https://martinfowler.com/articles/bitemporal-history.htmlhttps://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-tableshttps://www.eventstore.com/blog/what-is-event-sourcing(temporal query section)