Distributed Transactions
Challenges
Distributed transactions span multiple databases or services, making ACID guarantees difficult to achieve.
Service A ──┐
├──> Transaction
Service B ──┤
│
Service C ──┘Two-Phase Commit (2PC)
Phase 1: Prepare
class TwoPhaseCommit {
async executeTransaction(operations) {
const txId = generateTransactionId();
const participants = [];
// Phase 1: Prepare
for (const op of operations) {
const prepared = await this.prepare(op.service, op.data, txId);
if (!prepared) {
await this.abort(txId, participants);
throw new Error('Transaction aborted');
}
participants.push(op.service);
}
// Phase 2: Commit
for (const service of participants) {
await this.commit(service, txId);
}
return txId;
}
async prepare(service, data, txId) {
// Lock resources and prepare
const result = await service.prepare({
transactionId: txId,
data,
state: 'prepared'
});
return result.canCommit;
}
async commit(service, txId) {
await service.commit(txId);
}
async abort(txId, participants) {
for (const service of participants) {
await service.rollback(txId);
}
}
}MongoDB 2PC Implementation
// Manual 2PC with MongoDB
class MongoTwoPhaseCommit {
async transfer(fromAccount, toAccount, amount) {
const txId = new ObjectId();
try {
// Phase 1: Prepare
const prepared = await Promise.all([
this.prepareDebit(fromAccount, amount, txId),
this.prepareCredit(toAccount, amount, txId)
]);
if (!prepared.every(p => p)) {
throw new Error('Prepare failed');
}
// Phase 2: Commit
await Promise.all([
this.commitDebit(fromAccount, txId),
this.commitCredit(toAccount, txId)
]);
return txId;
} catch (error) {
await this.rollback(txId);
throw error;
}
}
async prepareDebit(accountId, amount, txId) {
const result = await db.collection('accounts').updateOne(
{
_id: accountId,
balance: { $gte: amount },
'pendingTransactions.txId': { $ne: txId }
},
{
$push: {
pendingTransactions: {
txId,
amount: -amount,
state: 'prepared'
}
}
}
);
return result.modifiedCount === 1;
}
async commitDebit(accountId, txId) {
const account = await db.collection('accounts').findOne({ _id: accountId });
const tx = account.pendingTransactions.find(t => t.txId.equals(txId));
await db.collection('accounts').updateOne(
{ _id: accountId },
{
$inc: { balance: tx.amount },
$pull: { pendingTransactions: { txId } }
}
);
}
async rollback(txId) {
await db.collection('accounts').updateMany(
{ 'pendingTransactions.txId': txId },
{ $pull: { pendingTransactions: { txId } } }
);
}
}Saga Pattern
Choreography-Based Saga
// Each service publishes events
class OrderService {
async createOrder(orderData) {
const order = await db.orders.insertOne({
...orderData,
status: 'pending'
});
// Publish event
await eventBus.publish('OrderCreated', {
orderId: order.insertedId,
customerId: orderData.customerId,
items: orderData.items
});
return order;
}
async handlePaymentFailed(event) {
// Compensating transaction
await db.orders.updateOne(
{ _id: event.orderId },
{ $set: { status: 'cancelled' } }
);
}
}
class PaymentService {
async handleOrderCreated(event) {
try {
await this.processPayment(event.customerId, event.total);
await eventBus.publish('PaymentSucceeded', {
orderId: event.orderId
});
} catch (error) {
await eventBus.publish('PaymentFailed', {
orderId: event.orderId,
reason: error.message
});
}
}
}
class InventoryService {
async handlePaymentSucceeded(event) {
try {
await this.reserveInventory(event.orderId);
await eventBus.publish('InventoryReserved', {
orderId: event.orderId
});
} catch (error) {
await eventBus.publish('InventoryFailed', {
orderId: event.orderId
});
// Trigger compensation
await eventBus.publish('RefundRequired', {
orderId: event.orderId
});
}
}
}Orchestration-Based Saga
// Central orchestrator
class OrderSagaOrchestrator {
async executeOrderSaga(orderData) {
const sagaId = generateId();
const state = {
sagaId,
step: 0,
compensations: []
};
try {
// Step 1: Create order
const order = await this.createOrder(orderData);
state.compensations.push(() => this.cancelOrder(order.id));
state.step = 1;
// Step 2: Reserve inventory
await this.reserveInventory(order.items);
state.compensations.push(() => this.releaseInventory(order.items));
state.step = 2;
// Step 3: Process payment
await this.processPayment(order.customerId, order.total);
state.compensations.push(() => this.refundPayment(order.id));
state.step = 3;
// Step 4: Confirm order
await this.confirmOrder(order.id);
return order;
} catch (error) {
await this.compensate(state);
throw error;
}
}
async compensate(state) {
console.log(`Compensating from step ${state.step}`);
// Execute compensations in reverse order
for (const compensation of state.compensations.reverse()) {
try {
await compensation();
} catch (error) {
console.error('Compensation failed:', error);
}
}
}
}Outbox Pattern
// Ensure message delivery with database transaction
class OutboxService {
async createOrderWithOutbox(orderData) {
const session = client.startSession();
try {
await session.withTransaction(async () => {
// Insert order
const order = await db.collection('orders').insertOne(
orderData,
{ session }
);
// Insert outbox message
await db.collection('outbox').insertOne({
aggregateId: order.insertedId,
eventType: 'OrderCreated',
payload: orderData,
createdAt: new Date(),
processed: false
}, { session });
});
} finally {
await session.endSession();
}
}
async processOutbox() {
const messages = await db.collection('outbox').find({
processed: false
}).limit(100).toArray();
for (const message of messages) {
try {
// Publish to event bus
await eventBus.publish(message.eventType, message.payload);
// Mark as processed
await db.collection('outbox').updateOne(
{ _id: message._id },
{ $set: { processed: true, processedAt: new Date() } }
);
} catch (error) {
console.error('Failed to process outbox message:', error);
}
}
}
}
// Run periodically
setInterval(() => outboxService.processOutbox(), 5000);Event Sourcing
// Store all changes as events
class EventStore {
async appendEvent(aggregateId, event) {
await db.collection('events').insertOne({
aggregateId,
eventType: event.type,
data: event.data,
timestamp: new Date(),
version: event.version
});
}
async getEvents(aggregateId) {
return await db.collection('events')
.find({ aggregateId })
.sort({ version: 1 })
.toArray();
}
async rebuildAggregate(aggregateId) {
const events = await this.getEvents(aggregateId);
let state = {};
for (const event of events) {
state = this.applyEvent(state, event);
}
return state;
}
applyEvent(state, event) {
switch (event.eventType) {
case 'OrderCreated':
return { ...state, ...event.data, status: 'pending' };
case 'PaymentProcessed':
return { ...state, status: 'paid' };
case 'OrderShipped':
return { ...state, status: 'shipped' };
default:
return state;
}
}
}Idempotency
// Ensure operations can be retried safely
class IdempotentService {
async processPayment(paymentId, amount) {
// Check if already processed
const existing = await db.collection('payments').findOne({
paymentId
});
if (existing) {
return existing; // Already processed
}
// Process payment
const result = await paymentGateway.charge(amount);
// Store result with idempotency key
await db.collection('payments').insertOne({
paymentId,
amount,
result,
processedAt: new Date()
});
return result;
}
}Distributed Locks
// Redis-based distributed lock
class DistributedLock {
async acquireLock(resource, ttl = 10000) {
const lockKey = `lock:${resource}`;
const lockValue = generateId();
const acquired = await redis.set(
lockKey,
lockValue,
'PX',
ttl,
'NX'
);
if (acquired) {
return { lockValue, release: () => this.releaseLock(lockKey, lockValue) };
}
return null;
}
async releaseLock(lockKey, lockValue) {
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
await redis.eval(script, 1, lockKey, lockValue);
}
}
// Usage
const lock = await distributedLock.acquireLock('account:123');
if (lock) {
try {
// Critical section
await updateAccount('account:123');
} finally {
await lock.release();
}
}.NET Distributed Transactions
using MongoDB.Driver;
public class SagaOrchestrator
{
public async Task<Order> ExecuteOrderSaga(OrderData orderData)
{
var compensations = new Stack<Func<Task>>();
try
{
// Step 1: Create order
var order = await CreateOrder(orderData);
compensations.Push(() => CancelOrder(order.Id));
// Step 2: Reserve inventory
await ReserveInventory(order.Items);
compensations.Push(() => ReleaseInventory(order.Items));
// Step 3: Process payment
await ProcessPayment(order.CustomerId, order.Total);
compensations.Push(() => RefundPayment(order.Id));
// Step 4: Confirm
await ConfirmOrder(order.Id);
return order;
}
catch
{
// Execute compensations
while (compensations.Count > 0)
{
var compensation = compensations.Pop();
await compensation();
}
throw;
}
}
}Best Practices
const distributedTxBestPractices = [
'Use saga pattern for long-running transactions',
'Implement idempotency for all operations',
'Use outbox pattern for reliable messaging',
'Design compensating transactions',
'Monitor saga execution',
'Handle partial failures gracefully',
'Use distributed locks sparingly',
'Implement retry logic with backoff'
];Interview Tips
- Explain challenges: Distributed ACID is hard
- Show 2PC: Two-phase commit protocol
- Demonstrate saga: Choreography vs orchestration
- Discuss patterns: Outbox, event sourcing
- Mention idempotency: Safe retries
- Show examples: MongoDB, distributed locks
Summary
Distributed transactions span multiple services/databases. Two-phase commit (2PC) provides ACID but has availability issues. Saga pattern uses compensating transactions for eventual consistency. Choreography-based sagas use events; orchestration uses central coordinator. Outbox pattern ensures reliable message delivery. Event sourcing stores all changes as events. Implement idempotency for safe retries. Use distributed locks carefully. Essential for microservices and distributed systems.
Test Your Knowledge
Take a quick quiz to test your understanding of this topic.