Change Streams
What are Change Streams?
Change Streams allow applications to access real-time data changes in MongoDB. They provide a way to listen to database changes and react accordingly.
MongoDB Change Streams
Basic Change Stream
const { MongoClient } = require('mongodb');
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');
const collection = db.collection('users');
// Watch for changes
const changeStream = collection.watch();
changeStream.on('change', (change) => {
console.log('Change detected:', change);
switch (change.operationType) {
case 'insert':
console.log('New document:', change.fullDocument);
break;
case 'update':
console.log('Updated fields:', change.updateDescription);
break;
case 'delete':
console.log('Deleted document ID:', change.documentKey);
break;
case 'replace':
console.log('Replaced document:', change.fullDocument);
break;
}
});
// Insert triggers change event
await collection.insertOne({ name: 'John Doe', email: 'john@example.com' });Filtered Change Streams
// Watch specific operations
const pipeline = [
{ $match: { operationType: 'insert' } }
];
const insertStream = collection.watch(pipeline);
insertStream.on('change', (change) => {
console.log('New document inserted:', change.fullDocument);
});
// Watch specific fields
const fieldPipeline = [
{
$match: {
'updateDescription.updatedFields.status': { $exists: true }
}
}
];
const statusStream = collection.watch(fieldPipeline);
// Watch specific documents
const userPipeline = [
{
$match: {
'fullDocument.userId': 'user-123'
}
}
];
const userStream = collection.watch(userPipeline, {
fullDocument: 'updateLookup'
});Resume Token
// Save resume token for crash recovery
let resumeToken;
const changeStream = collection.watch();
changeStream.on('change', (change) => {
resumeToken = change._id;
console.log('Processing change:', change);
});
// Resume from last position after restart
const resumedStream = collection.watch([], {
resumeAfter: resumeToken
});
// Start after specific timestamp
const startTime = new Date('2024-01-01');
const timeStream = collection.watch([], {
startAtOperationTime: startTime
});Real-Time Notifications
// Notify users of new messages
class NotificationService {
constructor(db) {
this.messagesCollection = db.collection('messages');
this.setupChangeStream();
}
setupChangeStream() {
const pipeline = [
{
$match: {
operationType: 'insert',
'fullDocument.read': false
}
}
];
const changeStream = this.messagesCollection.watch(pipeline);
changeStream.on('change', async (change) => {
const message = change.fullDocument;
await this.sendNotification(message.userId, {
title: 'New Message',
body: message.content,
messageId: message._id
});
});
}
async sendNotification(userId, notification) {
// Send push notification, email, etc.
console.log(`Notification to ${userId}:`, notification);
}
}Cache Invalidation
// Invalidate cache on data changes
const redis = require('redis');
const redisClient = redis.createClient();
await redisClient.connect();
const changeStream = collection.watch();
changeStream.on('change', async (change) => {
if (change.operationType === 'update' || change.operationType === 'delete') {
const userId = change.documentKey._id;
await redisClient.del(`user:${userId}`);
console.log(`Cache invalidated for user: ${userId}`);
}
});Data Synchronization
// Sync data to another database
class DataSyncService {
constructor(sourceDb, targetDb) {
this.sourceCollection = sourceDb.collection('users');
this.targetCollection = targetDb.collection('users_replica');
this.setupSync();
}
setupSync() {
const changeStream = this.sourceCollection.watch();
changeStream.on('change', async (change) => {
switch (change.operationType) {
case 'insert':
await this.targetCollection.insertOne(change.fullDocument);
break;
case 'update':
await this.targetCollection.updateOne(
{ _id: change.documentKey._id },
{ $set: change.updateDescription.updatedFields }
);
break;
case 'delete':
await this.targetCollection.deleteOne({ _id: change.documentKey._id });
break;
case 'replace':
await this.targetCollection.replaceOne(
{ _id: change.documentKey._id },
change.fullDocument
);
break;
}
});
}
}DynamoDB Streams
Enable Streams
const { DynamoDBClient, UpdateTableCommand } = require('@aws-sdk/client-dynamodb');
const client = new DynamoDBClient({ region: 'us-east-1' });
// Enable stream
const command = new UpdateTableCommand({
TableName: 'Users',
StreamSpecification: {
StreamEnabled: true,
StreamViewType: 'NEW_AND_OLD_IMAGES' // or NEW_IMAGE, OLD_IMAGE, KEYS_ONLY
}
});
await client.send(command);Process Stream Records
const { DynamoDBStreamsClient, GetRecordsCommand } = require('@aws-sdk/client-dynamodb-streams');
const streamsClient = new DynamoDBStreamsClient({ region: 'us-east-1' });
async function processStreamRecords(shardIterator) {
const command = new GetRecordsCommand({
ShardIterator: shardIterator
});
const response = await streamsClient.send(command);
for (const record of response.Records) {
console.log('Event:', record.eventName);
switch (record.eventName) {
case 'INSERT':
console.log('New item:', record.dynamodb.NewImage);
break;
case 'MODIFY':
console.log('Old:', record.dynamodb.OldImage);
console.log('New:', record.dynamodb.NewImage);
break;
case 'REMOVE':
console.log('Deleted:', record.dynamodb.OldImage);
break;
}
}
// Continue processing
if (response.NextShardIterator) {
await processStreamRecords(response.NextShardIterator);
}
}Lambda Trigger
// AWS Lambda function triggered by DynamoDB Stream
exports.handler = async (event) => {
for (const record of event.Records) {
if (record.eventName === 'INSERT') {
const newUser = record.dynamodb.NewImage;
// Send welcome email
await sendWelcomeEmail({
email: newUser.email.S,
name: newUser.name.S
});
}
if (record.eventName === 'MODIFY') {
const oldImage = record.dynamodb.OldImage;
const newImage = record.dynamodb.NewImage;
// Check if email changed
if (oldImage.email.S !== newImage.email.S) {
await sendEmailChangeNotification(newImage.email.S);
}
}
}
};Cassandra Change Data Capture
CDC Configuration
-- Enable CDC on table
ALTER TABLE users WITH cdc = true;
-- Query CDC log
SELECT * FROM system_distributed.cdc_log
WHERE table_name = 'users';Process CDC Events
const cassandra = require('cassandra-driver');
const client = new cassandra.Client({
contactPoints: ['localhost'],
localDataCenter: 'datacenter1'
});
// Poll CDC log
async function processCDC() {
const query = `
SELECT * FROM system_distributed.cdc_log
WHERE table_name = 'users'
AND cdc_time > ?
`;
const result = await client.execute(query, [lastProcessedTime], { prepare: true });
for (const row of result.rows) {
console.log('CDC Event:', row);
// Process change
}
}
setInterval(processCDC, 5000);Redis Keyspace Notifications
Enable Notifications
# redis.conf
notify-keyspace-events Ex
# E: Keyevent events
# x: Expired eventsSubscribe to Events
const redis = require('redis');
const subscriber = redis.createClient();
await subscriber.connect();
// Subscribe to expired keys
await subscriber.subscribe('__keyevent@0__:expired', (message) => {
console.log('Key expired:', message);
// Handle expiration
if (message.startsWith('session:')) {
const sessionId = message.split(':')[1];
console.log(`Session ${sessionId} expired`);
}
});
// Subscribe to set operations
await subscriber.subscribe('__keyevent@0__:set', (message) => {
console.log('Key set:', message);
});.NET Change Streams
using MongoDB.Driver;
using MongoDB.Bson;
public class ChangeStreamService
{
private readonly IMongoCollection<User> _users;
public void WatchChanges()
{
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<User>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Insert);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
using (var cursor = _users.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine($"New user: {change.FullDocument.Name}");
// Process change
ProcessNewUser(change.FullDocument);
}
}
}
public async Task WatchChangesAsync()
{
var changeStream = await _users.WatchAsync();
await changeStream.ForEachAsync(change =>
{
switch (change.OperationType)
{
case ChangeStreamOperationType.Insert:
Console.WriteLine($"Inserted: {change.FullDocument.Name}");
break;
case ChangeStreamOperationType.Update:
Console.WriteLine($"Updated: {change.DocumentKey}");
break;
case ChangeStreamOperationType.Delete:
Console.WriteLine($"Deleted: {change.DocumentKey}");
break;
}
});
}
}Use Cases
const changeStreamUseCases = {
realTimeNotifications: 'Push notifications to users',
cacheInvalidation: 'Invalidate cache on data changes',
dataSync: 'Sync data across databases',
audit: 'Track all data changes',
analytics: 'Real-time analytics pipeline',
search: 'Update search indexes',
eventSourcing: 'Capture all events',
replication: 'Custom replication logic'
};Best Practices
const bestPractices = [
'Use resume tokens for fault tolerance',
'Filter changes with aggregation pipeline',
'Handle errors and reconnect',
'Monitor change stream lag',
'Use appropriate fullDocument option',
'Limit change stream scope',
'Process changes idempotently',
'Scale consumers horizontally'
];Interview Tips
- Explain change streams: Real-time data change notifications
- Show MongoDB: Watch collections, resume tokens
- Demonstrate DynamoDB: Streams, Lambda triggers
- Discuss use cases: Notifications, cache invalidation, sync
- Mention fault tolerance: Resume tokens, error handling
- Show examples: Node.js, .NET implementations
Summary
Change streams provide real-time notifications of database changes. MongoDB offers change streams with filtering, resume tokens, and full document lookup. DynamoDB Streams capture item-level changes with Lambda integration. Cassandra provides CDC for change tracking. Redis supports keyspace notifications. Use cases include real-time notifications, cache invalidation, data synchronization, and audit logging. Implement fault tolerance with resume tokens. Essential for reactive, event-driven applications.
Test Your Knowledge
Take a quick quiz to test your understanding of this topic.