Event Sourcing & CQRS
Event Sourcing
Definition: Store all changes to application state as a sequence of events, rather than storing current state.
Traditional Approach:
Users Table:
| id | name | email | balance |
| 123 | John | john@email.com | 500 |
Update: balance = 500 (lost history of how we got here)Event Sourcing Approach:
Events:
1. UserCreated { id: 123, name: "John", email: "john@email.com" }
2. DepositMade { userId: 123, amount: 1000 }
3. WithdrawalMade { userId: 123, amount: 300 }
4. WithdrawalMade { userId: 123, amount: 200 }
Current balance = 1000 - 300 - 200 = 500 (can see full history)Benefits
Complete Audit Trail: Every change recorded with timestamp and reason
Time Travel: Reconstruct state at any point in time
Event Replay: Rebuild state by replaying events
Debugging: See exactly what happened and when
Business Intelligence: Analyze historical patterns
Undo/Redo: Easy to implement
Challenges
Storage: Events accumulate over time
Performance: Replaying many events can be slow
Schema Evolution: Old events may have different structure
Complexity: More complex than CRUD
Eventual Consistency: Read models may lag behind events
CQRS (Command Query Responsibility Segregation)
Definition: Separate read and write operations into different models.
Traditional Approach:
Same model for reads and writes:
- User Service handles both queries and commands
- Same database schema for bothCQRS Approach:
Write Model (Commands):
- Handles: CreateUser, UpdateUser, DeleteUser
- Optimized for writes
- Enforces business rules
Read Model (Queries):
- Handles: GetUser, SearchUsers, GetUserOrders
- Optimized for reads
- Denormalized for fast queriesBenefits
Independent Scaling: Scale reads and writes separately
Optimized Models: Each optimized for its purpose
Simpler Queries: Read model can be denormalized
Better Performance: No complex joins in read model
Flexibility: Different databases for reads and writes
When to Use
Use CQRS When:
- Complex domain logic
- Different read/write patterns
- High read-to-write ratio
- Need for different data representations
Don’t Use CQRS When:
- Simple CRUD applications
- Small applications
- Team lacks experience with pattern
Event Sourcing + CQRS Together
Common Pattern: Use event sourcing for write model, CQRS for read model
Flow:
- Command creates event
- Event stored in event store
- Event published to event bus
- Read model projections update based on events
Command → Event Store → Event Bus → Read Model Projections
↓
Write ModelImplementation Example
Event Store
class EventStore {
constructor(db) {
this.db = db;
}
// Append event
async appendEvent(aggregateId, event) {
const eventRecord = {
aggregateId,
eventType: event.type,
eventData: event.data,
timestamp: new Date(),
version: await this.getNextVersion(aggregateId)
};
await this.db.collection('events').insertOne(eventRecord);
// Publish event
await this.publishEvent(event);
}
// Get all events for aggregate
async getEvents(aggregateId) {
return await this.db.collection('events')
.find({ aggregateId })
.sort({ version: 1 })
.toArray();
}
// Get events since version
async getEventsSince(aggregateId, version) {
return await this.db.collection('events')
.find({
aggregateId,
version: { $gt: version }
})
.sort({ version: 1 })
.toArray();
}
async getNextVersion(aggregateId) {
const lastEvent = await this.db.collection('events')
.findOne({ aggregateId }, { sort: { version: -1 } });
return lastEvent ? lastEvent.version + 1 : 1;
}
}Aggregate (Write Model)
class BankAccount {
constructor(accountId) {
this.accountId = accountId;
this.balance = 0;
this.version = 0;
this.changes = [];
}
// Commands
deposit(amount) {
if (amount <= 0) {
throw new Error('Amount must be positive');
}
this.applyChange({
type: 'MoneyDeposited',
data: { accountId: this.accountId, amount }
});
}
withdraw(amount) {
if (amount <= 0) {
throw new Error('Amount must be positive');
}
if (this.balance < amount) {
throw new Error('Insufficient funds');
}
this.applyChange({
type: 'MoneyWithdrawn',
data: { accountId: this.accountId, amount }
});
}
// Apply event to state
applyChange(event) {
this.apply(event);
this.changes.push(event);
}
apply(event) {
switch (event.type) {
case 'AccountCreated':
this.balance = 0;
break;
case 'MoneyDeposited':
this.balance += event.data.amount;
break;
case 'MoneyWithdrawn':
this.balance -= event.data.amount;
break;
}
this.version++;
}
// Load from events
loadFromHistory(events) {
events.forEach(event => this.apply(event));
}
// Get uncommitted changes
getUncommittedChanges() {
return this.changes;
}
// Mark changes as committed
markChangesAsCommitted() {
this.changes = [];
}
}Repository
class BankAccountRepository {
constructor(eventStore) {
this.eventStore = eventStore;
}
// Load aggregate from events
async getById(accountId) {
const events = await this.eventStore.getEvents(accountId);
if (events.length === 0) {
return null;
}
const account = new BankAccount(accountId);
account.loadFromHistory(events);
return account;
}
// Save aggregate changes
async save(account) {
const changes = account.getUncommittedChanges();
for (const event of changes) {
await this.eventStore.appendEvent(account.accountId, event);
}
account.markChangesAsCommitted();
}
}Command Handler
class BankAccountCommandHandler {
constructor(repository) {
this.repository = repository;
}
async handleDeposit(command) {
const { accountId, amount } = command;
// Load aggregate
let account = await this.repository.getById(accountId);
if (!account) {
account = new BankAccount(accountId);
account.applyChange({
type: 'AccountCreated',
data: { accountId }
});
}
// Execute command
account.deposit(amount);
// Save events
await this.repository.save(account);
}
async handleWithdraw(command) {
const { accountId, amount } = command;
const account = await this.repository.getById(accountId);
if (!account) {
throw new Error('Account not found');
}
account.withdraw(amount);
await this.repository.save(account);
}
}Read Model Projection
class AccountBalanceProjection {
constructor(db) {
this.db = db;
}
// Handle events and update read model
async handleEvent(event) {
switch (event.type) {
case 'AccountCreated':
await this.db.collection('account_balances').insertOne({
accountId: event.data.accountId,
balance: 0,
lastUpdated: new Date()
});
break;
case 'MoneyDeposited':
await this.db.collection('account_balances').updateOne(
{ accountId: event.data.accountId },
{
$inc: { balance: event.data.amount },
$set: { lastUpdated: new Date() }
}
);
break;
case 'MoneyWithdrawn':
await this.db.collection('account_balances').updateOne(
{ accountId: event.data.accountId },
{
$inc: { balance: -event.data.amount },
$set: { lastUpdated: new Date() }
}
);
break;
}
}
// Query read model
async getBalance(accountId) {
return await this.db.collection('account_balances')
.findOne({ accountId });
}
}Query Handler
class AccountQueryHandler {
constructor(db) {
this.db = db;
}
// Fast queries against read model
async getAccountBalance(accountId) {
return await this.db.collection('account_balances')
.findOne({ accountId });
}
async getAccountsWithLowBalance(threshold) {
return await this.db.collection('account_balances')
.find({ balance: { $lt: threshold } })
.toArray();
}
async getAccountHistory(accountId) {
return await this.db.collection('account_history')
.find({ accountId })
.sort({ timestamp: -1 })
.toArray();
}
}Snapshots
Problem: Replaying thousands of events is slow
Solution: Periodically save snapshots of aggregate state
class SnapshotStore {
async saveSnapshot(aggregateId, state, version) {
await this.db.collection('snapshots').updateOne(
{ aggregateId },
{
$set: {
aggregateId,
state,
version,
timestamp: new Date()
}
},
{ upsert: true }
);
}
async getSnapshot(aggregateId) {
return await this.db.collection('snapshots')
.findOne({ aggregateId });
}
}
// Load with snapshot
async getById(accountId) {
// Try to load snapshot
const snapshot = await this.snapshotStore.getSnapshot(accountId);
const account = new BankAccount(accountId);
if (snapshot) {
// Load from snapshot
account.loadFromSnapshot(snapshot.state);
// Load events since snapshot
const events = await this.eventStore.getEventsSince(
accountId,
snapshot.version
);
account.loadFromHistory(events);
} else {
// Load all events
const events = await this.eventStore.getEvents(accountId);
account.loadFromHistory(events);
}
return account;
}.NET Implementation
// Event
public abstract class Event
{
public Guid AggregateId { get; set; }
public DateTime Timestamp { get; set; }
public int Version { get; set; }
}
public class MoneyDepositedEvent : Event
{
public decimal Amount { get; set; }
}
// Aggregate
public class BankAccount
{
public Guid Id { get; private set; }
public decimal Balance { get; private set; }
private readonly List<Event> _changes = new();
public void Deposit(decimal amount)
{
if (amount <= 0)
throw new ArgumentException("Amount must be positive");
ApplyChange(new MoneyDepositedEvent
{
AggregateId = Id,
Amount = amount,
Timestamp = DateTime.UtcNow
});
}
private void ApplyChange(Event @event)
{
Apply(@event);
_changes.Add(@event);
}
private void Apply(Event @event)
{
switch (@event)
{
case MoneyDepositedEvent e:
Balance += e.Amount;
break;
case MoneyWithdrawnEvent e:
Balance -= e.Amount;
break;
}
}
public void LoadFromHistory(IEnumerable<Event> events)
{
foreach (var @event in events)
{
Apply(@event);
}
}
public IEnumerable<Event> GetUncommittedChanges()
{
return _changes;
}
}
// Repository
public class BankAccountRepository
{
private readonly IEventStore _eventStore;
public async Task<BankAccount> GetByIdAsync(Guid accountId)
{
var events = await _eventStore.GetEventsAsync(accountId);
var account = new BankAccount();
account.LoadFromHistory(events);
return account;
}
public async Task SaveAsync(BankAccount account)
{
var changes = account.GetUncommittedChanges();
foreach (var @event in changes)
{
await _eventStore.AppendEventAsync(@event);
}
}
}Best Practices
- Use snapshots - Avoid replaying too many events
- Version events - Handle schema evolution
- Idempotent projections - Handle duplicate events
- Monitor lag - Ensure read models stay current
- Test event replay - Verify projections work correctly
- Document events - Clear event schemas
- Handle failures - Retry failed projections
- Use correlation IDs - Track event causality
Interview Tips
- Explain event sourcing: Store changes as events
- Show CQRS: Separate read and write models
- Demonstrate benefits: Audit trail, time travel
- Discuss challenges: Storage, complexity
- Mention snapshots: Performance optimization
- Show implementation: Event store, projections
Summary
Event Sourcing stores all changes as events instead of current state. Provides complete audit trail, time travel, and event replay. CQRS separates read and write models for independent optimization and scaling. Write model handles commands and business logic. Read model optimized for queries with denormalized data. Events flow from write model to read model projections. Use snapshots to avoid replaying many events. Implement idempotent projections. Best for complex domains with audit requirements. Adds complexity but provides powerful capabilities. Essential for systems requiring full history and flexible querying.
Test Your Knowledge
Take a quick quiz to test your understanding of this topic.