Skip to main content

Event-Driven Architecture

Event-Driven Architecture (EDA) is a software design paradigm where the flow of the program is determined by eventsβ€”significant changes in state that the system should react to.

Core Concepts​

What is an Event?​

// Events represent facts that have happened
public record OrderPlacedEvent
{
public Guid EventId { get; init; } = Guid.NewGuid();
public DateTime OccurredAt { get; init; } = DateTime.UtcNow;

// Business data
public Guid OrderId { get; init; }
public Guid CustomerId { get; init; }
public List<OrderItem> Items { get; init; }
public decimal TotalAmount { get; init; }
}

// Events are immutable facts
// Past tense naming: OrderPlaced, PaymentReceived, ItemShipped

Event Types​

Event-Driven Patterns​

Publish-Subscribe​

// Publisher doesn't know about subscribers
public class OrderService
{
private readonly IEventBus _eventBus;

public async Task SubmitOrderAsync(Order order)
{
order.Submit();
await _repository.SaveAsync(order);

// Publish event - don't care who listens
await _eventBus.PublishAsync(new OrderSubmittedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
});
}
}

// Multiple independent subscribers
public class InventorySubscriber : IEventHandler<OrderSubmittedEvent>
{
public async Task HandleAsync(OrderSubmittedEvent @event)
{
foreach (var item in @event.Items)
{
await _inventoryService.ReserveStockAsync(item.ProductId, item.Quantity);
}
}
}

public class NotificationSubscriber : IEventHandler<OrderSubmittedEvent>
{
public async Task HandleAsync(OrderSubmittedEvent @event)
{
await _emailService.SendOrderConfirmationAsync(@event.CustomerId, @event.OrderId);
}
}

Competing Consumers​

// Multiple instances process from same queue
// Each message processed by exactly one consumer
public class CompetingConsumerConfig
{
public static void Configure(IRabbitMqBusFactoryConfigurator cfg)
{
cfg.ReceiveEndpoint("order-processing", e =>
{
e.PrefetchCount = 16; // Limits concurrent processing
e.ConfigureConsumer<OrderProcessor>(context);

e.UseMessageRetry(r => r.Intervals(
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(30)));
});
}
}

Dead Letter Queue​

public class DeadLetterProcessor : IConsumer<Fault<OrderSubmittedEvent>>
{
public async Task Consume(ConsumeContext<Fault<OrderSubmittedEvent>> context)
{
// Log for investigation
await _dlqRepo.SaveAsync(new DeadLetterEntry
{
MessageId = context.MessageId,
MessageType = typeof(OrderSubmittedEvent).Name,
Payload = JsonSerializer.Serialize(context.Message.Message),
Exceptions = context.Message.Exceptions.Select(e => e.Message).ToList(),
FailedAt = DateTime.UtcNow
});

// Alert operations team
await _alertService.SendAlertAsync(
"Dead Letter Alert",
$"Order {context.Message.Message.OrderId} failed processing");
}
}

Outbox Pattern for Reliability​

// Ensure events are published even if broker is down
public class OrderService
{
public async Task CreateOrderAsync(CreateOrderCommand command)
{
using var transaction = await _db.BeginTransactionAsync();

try
{
var order = Order.Create(command);
await _orderRepo.SaveAsync(order, transaction);

// Save events to outbox in same transaction
foreach (var @event in order.DomainEvents)
{
await _outboxRepo.SaveAsync(new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = @event.GetType().AssemblyQualifiedName,
Payload = JsonSerializer.Serialize(@event),
CreatedAt = DateTime.UtcNow
}, transaction);
}

await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}

// Background processor publishes from outbox
public class OutboxProcessor : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var messages = await GetPendingMessagesAsync();

foreach (var message in messages)
{
await _eventBus.PublishAsync(message.Payload);
await MarkAsPublishedAsync(message.Id);
}

await Task.Delay(TimeSpan.FromSeconds(1), ct);
}
}
}

Handling Eventual Consistency​

public class OrderController
{
[HttpPost]
public async Task<IActionResult> CreateOrder(CreateOrderRequest request)
{
var orderId = await _orderService.CreateOrderAsync(request);

// Return immediately with order ID
return Accepted(new { orderId });
}

[HttpGet("{orderId}")]
public async Task<IActionResult> GetOrder(Guid orderId)
{
var order = await _queryService.GetOrderAsync(orderId);

if (order == null)
{
// Might not be projected yet
return StatusCode(202, new { message = "Order is being processed", retryAfter = 1 });
}

return Ok(order);
}
}

Event Schema Evolution​

[EventType("OrderPlaced", Version = 1)]
public record OrderPlacedEventV1
{
public Guid OrderId { get; init; }
public decimal Total { get; init; }
}

[EventType("OrderPlaced", Version = 2)]
public record OrderPlacedEventV2
{
public Guid OrderId { get; init; }
public Money TotalAmount { get; init; } // Changed to Money type
public string Currency { get; init; } // Added field
}

// Upcaster handles old events
public class OrderPlacedUpcaster : IEventUpcaster
{
public object Upcast(object @event, int fromVersion, int toVersion)
{
if (@event is OrderPlacedEventV1 v1 && toVersion == 2)
{
return new OrderPlacedEventV2
{
OrderId = v1.OrderId,
TotalAmount = new Money(v1.Total, "USD"),
Currency = "USD" // Default for old events
};
}
return @event;
}
}

Key Takeaways​

  1. Events are Facts: Immutable records of what happened, not commands
  2. Loose Coupling: Publishers don't know about subscribers
  3. Eventual Consistency: Embrace it with proper patterns (outbox, idempotency)
  4. Schema Evolution: Plan for event versioning from the start
  5. Observability: Correlation IDs and distributed tracing are essential