Database Sharding
What is Sharding?
Sharding is a database partitioning technique that splits large datasets across multiple database servers (shards) to improve performance and scalability.
// Without sharding: Single database
const singleDB = {
users: '100M records',
storage: '10TB',
queries: 'Slow for large datasets',
scalability: 'Limited by single server'
};
// With sharding: Distributed across 10 shards
const shardedDB = {
shard1: '10M users',
shard2: '10M users',
// ... 10 shards total
storage: '1TB per shard',
queries: 'Faster (parallel queries)',
scalability: 'Add more shards as needed'
};Sharding Strategies
1. Hash-Based Sharding
class HashSharding {
constructor(shards) {
this.shards = shards;
}
hash(key) {
let hash = 0;
for (let i = 0; i < key.length; i++) {
hash = ((hash << 5) - hash) + key.charCodeAt(i);
hash = hash & hash;
}
return Math.abs(hash);
}
getShardIndex(userId) {
return this.hash(userId) % this.shards.length;
}
getShard(userId) {
const index = this.getShardIndex(userId);
return this.shards[index];
}
}
// Usage
const sharding = new HashSharding([
'shard-1.db.com',
'shard-2.db.com',
'shard-3.db.com',
'shard-4.db.com'
]);
console.log(sharding.getShard('user-123')); // shard-2.db.com
console.log(sharding.getShard('user-456')); // shard-1.db.com2. Range-Based Sharding
class RangeSharding {
constructor(ranges) {
this.ranges = ranges;
}
getShard(userId) {
const id = parseInt(userId.split('-')[1]);
for (const range of this.ranges) {
if (id >= range.min && id <= range.max) {
return range.shard;
}
}
throw new Error('No shard found for user');
}
}
// Usage
const rangeSharding = new RangeSharding([
{ min: 1, max: 1000000, shard: 'shard-1.db.com' },
{ min: 1000001, max: 2000000, shard: 'shard-2.db.com' },
{ min: 2000001, max: 3000000, shard: 'shard-3.db.com' }
]);
console.log(rangeSharding.getShard('user-500000')); // shard-1
console.log(rangeSharding.getShard('user-1500000')); // shard-23. Geographic Sharding
class GeographicSharding {
constructor(shards) {
this.shards = shards;
}
getShard(userLocation) {
const region = this.getRegion(userLocation);
return this.shards[region];
}
getRegion(location) {
const { country } = location;
if (['US', 'CA', 'MX'].includes(country)) return 'americas';
if (['GB', 'FR', 'DE', 'IT'].includes(country)) return 'europe';
if (['CN', 'JP', 'IN', 'SG'].includes(country)) return 'asia';
return 'global';
}
}
// Usage
const geoSharding = new GeographicSharding({
americas: 'us-east-1.db.com',
europe: 'eu-west-1.db.com',
asia: 'ap-southeast-1.db.com',
global: 'global.db.com'
});4. Directory-Based Sharding
class DirectorySharding {
constructor() {
this.directory = new Map();
}
// Lookup table maps keys to shards
registerUser(userId, shard) {
this.directory.set(userId, shard);
}
getShard(userId) {
return this.directory.get(userId);
}
// Allows flexible shard assignment
rebalance(userId, newShard) {
this.directory.set(userId, newShard);
}
}Shard Key Selection
const shardKeyConsiderations = {
cardinality: {
good: 'userId (unique per user)',
bad: 'status (only few values)',
reason: 'High cardinality ensures even distribution'
},
queryPatterns: {
good: 'Key used in most queries',
bad: 'Key rarely queried',
reason: 'Avoid cross-shard queries'
},
writePattern: {
good: 'Evenly distributed writes',
bad: 'Monotonically increasing (timestamp)',
reason: 'Prevent hotspots'
},
dataGrowth: {
good: 'Predictable growth per shard',
bad: 'Uneven growth',
reason: 'Balanced shard sizes'
}
};
// Example: Good shard keys
const goodShardKeys = {
users: 'userId',
orders: 'customerId',
messages: 'conversationId',
events: 'hash(userId + timestamp)'
};
// Example: Bad shard keys
const badShardKeys = {
users: 'createdAt', // Monotonic, creates hotspots
orders: 'status', // Low cardinality
messages: 'timestamp', // All writes to latest shard
events: 'type' // Uneven distribution
};Handling Cross-Shard Queries
class CrossShardQuery {
constructor(shards) {
this.shards = shards;
}
// Scatter-gather pattern
async queryAllShards(query) {
const promises = this.shards.map(shard =>
this.executeQuery(shard, query)
);
const results = await Promise.all(promises);
// Merge and sort results
return this.mergeResults(results);
}
async executeQuery(shard, query) {
const connection = await this.getConnection(shard);
return await connection.query(query);
}
mergeResults(results) {
// Flatten arrays
const merged = results.flat();
// Sort by timestamp
return merged.sort((a, b) => b.timestamp - a.timestamp);
}
// Aggregate across shards
async aggregateCount() {
const counts = await Promise.all(
this.shards.map(shard =>
this.executeQuery(shard, 'SELECT COUNT(*) FROM users')
)
);
return counts.reduce((sum, count) => sum + count, 0);
}
}Rebalancing Shards
class ShardRebalancer {
async rebalance(fromShard, toShard, userIds) {
console.log(`Rebalancing ${userIds.length} users...`);
for (const userId of userIds) {
await this.moveUser(userId, fromShard, toShard);
}
console.log('Rebalancing complete');
}
async moveUser(userId, fromShard, toShard) {
// 1. Copy data to new shard
const userData = await fromShard.query(
'SELECT * FROM users WHERE id = ?',
[userId]
);
await toShard.query(
'INSERT INTO users VALUES (?)',
[userData]
);
// 2. Update directory/routing
await this.updateDirectory(userId, toShard);
// 3. Delete from old shard
await fromShard.query(
'DELETE FROM users WHERE id = ?',
[userId]
);
}
}MongoDB Sharding
// Enable sharding
sh.enableSharding("mydb");
// Shard collection with hashed key
sh.shardCollection("mydb.users", { userId: "hashed" });
// Shard with compound key
sh.shardCollection("mydb.orders", { customerId: 1, orderDate: 1 });
// Check shard distribution
db.users.getShardDistribution();
// Add new shard
sh.addShard("shard03/mongo3:27017,mongo4:27017,mongo5:27017");
// Remove shard (with data migration)
sh.removeShard("shard03");
// Zone sharding (geographic)
sh.addShardTag("shard01", "US");
sh.addShardTag("shard02", "EU");
sh.addTagRange(
"mydb.users",
{ country: "US" },
{ country: "US\uffff" },
"US"
);Consistent Hashing
class ConsistentHashing {
constructor(nodes, virtualNodes = 150) {
this.ring = new Map();
this.nodes = nodes;
this.virtualNodes = virtualNodes;
this.buildRing();
}
buildRing() {
for (const node of this.nodes) {
for (let i = 0; i < this.virtualNodes; i++) {
const hash = this.hash(`${node}-${i}`);
this.ring.set(hash, node);
}
}
this.sortedHashes = Array.from(this.ring.keys()).sort((a, b) => a - b);
}
hash(key) {
let hash = 0;
for (let i = 0; i < key.length; i++) {
hash = ((hash << 5) - hash) + key.charCodeAt(i);
hash = hash & hash;
}
return Math.abs(hash);
}
getNode(key) {
const hash = this.hash(key);
// Find first node >= hash
for (const nodeHash of this.sortedHashes) {
if (nodeHash >= hash) {
return this.ring.get(nodeHash);
}
}
// Wrap around to first node
return this.ring.get(this.sortedHashes[0]);
}
addNode(node) {
this.nodes.push(node);
for (let i = 0; i < this.virtualNodes; i++) {
const hash = this.hash(`${node}-${i}`);
this.ring.set(hash, node);
}
this.sortedHashes = Array.from(this.ring.keys()).sort((a, b) => a - b);
}
}
// Usage
const ch = new ConsistentHashing(['node1', 'node2', 'node3']);
console.log(ch.getNode('user-123')); // node2
console.log(ch.getNode('user-456')); // node1
// Add node - only affects ~1/4 of keys
ch.addNode('node4');Handling Shard Failures
class ShardFailureHandler {
constructor(shards) {
this.shards = shards;
this.healthChecks = new Map();
this.startHealthChecks();
}
startHealthChecks() {
setInterval(() => {
this.shards.forEach(shard => this.checkHealth(shard));
}, 10000);
}
async checkHealth(shard) {
try {
await shard.ping();
this.healthChecks.set(shard.id, { healthy: true, lastCheck: Date.now() });
} catch (error) {
this.healthChecks.set(shard.id, { healthy: false, lastCheck: Date.now() });
await this.handleFailure(shard);
}
}
async handleFailure(shard) {
console.log(`Shard ${shard.id} is down`);
// Redirect to replica
if (shard.replica) {
console.log(`Redirecting to replica: ${shard.replica.id}`);
this.updateRouting(shard.id, shard.replica.id);
}
// Alert operations team
await this.sendAlert(`Shard ${shard.id} failure`);
}
}.NET Sharding Implementation
using MongoDB.Driver;
public class ShardingService
{
private readonly List<IMongoDatabase> _shards;
public ShardingService(List<string> connectionStrings)
{
_shards = connectionStrings.Select(cs =>
new MongoClient(cs).GetDatabase("mydb")
).ToList();
}
// Hash-based sharding
public IMongoDatabase GetShard(string userId)
{
var hash = GetHash(userId);
var shardIndex = hash % _shards.Count;
return _shards[shardIndex];
}
private int GetHash(string key)
{
return Math.Abs(key.GetHashCode());
}
// Query single shard
public async Task<User> GetUser(string userId)
{
var shard = GetShard(userId);
var collection = shard.GetCollection<User>("users");
return await collection.Find(u => u.Id == userId).FirstOrDefaultAsync();
}
// Query all shards (scatter-gather)
public async Task<List<User>> GetAllActiveUsers()
{
var tasks = _shards.Select(shard =>
{
var collection = shard.GetCollection<User>("users");
return collection.Find(u => u.Status == "active").ToListAsync();
});
var results = await Task.WhenAll(tasks);
return results.SelectMany(r => r).ToList();
}
}Sharding Best Practices
const shardingBestPractices = [
'Choose shard key carefully (high cardinality, even distribution)',
'Avoid monotonically increasing shard keys',
'Design to minimize cross-shard queries',
'Plan for shard rebalancing',
'Monitor shard distribution',
'Use consistent hashing for flexibility',
'Implement shard failure handling',
'Document sharding strategy',
'Test with production-like data volumes',
'Consider geographic sharding for global apps',
'Use connection pooling per shard',
'Implement retry logic for shard failures'
];Interview Tips
- Explain sharding purpose: Horizontal database scaling
- Show strategies: Hash, range, geographic, directory
- Demonstrate shard key selection: Cardinality, query patterns
- Discuss cross-shard queries: Scatter-gather pattern
- Mention consistent hashing: Minimize data movement
- Show rebalancing: How to add/remove shards
Summary
Sharding partitions data across multiple database servers for horizontal scalability. Use hash-based sharding for even distribution, range-based for ordered data, geographic for data locality. Choose shard keys with high cardinality and even distribution. Avoid cross-shard queries when possible; use scatter-gather when necessary. Implement consistent hashing to minimize data movement when adding shards. Monitor shard distribution and rebalance as needed. Handle shard failures with replicas. Essential for scaling databases beyond single server capacity.
Test Your Knowledge
Take a quick quiz to test your understanding of this topic.