What is database sharding and how do you implement it?
Answer
Database sharding is a horizontal partitioning technique that splits a large database into smaller, more manageable pieces called shards. Each shard is a separate database that contains a subset of the total data, distributed across multiple servers to improve performance, scalability, and availability.
Sharding Strategies
1. Range-Based Sharding
-- Shard by date ranges
-- Shard 1: Orders from 2023
CREATE TABLE orders_2023 (
order_id BIGINT PRIMARY KEY,
customer_id INT,
order_date DATE,
total DECIMAL(10,2)
) -- Stored on Server 1
-- Shard 2: Orders from 2024
CREATE TABLE orders_2024 (
order_id BIGINT PRIMARY KEY,
customer_id INT,
order_date DATE,
total DECIMAL(10,2)
) -- Stored on Server 2
-- Application logic for routing
function getShardForDate(orderDate) {
if (orderDate >= '2024-01-01') return 'shard_2024';
if (orderDate >= '2023-01-01') return 'shard_2023';
return 'shard_archive';
}
2. Hash-Based Sharding
-- Shard by customer ID hash
-- Shard determination: customer_id % 4
-- Shard 0: customer_id % 4 = 0
CREATE TABLE customers_shard_0 (
customer_id INT PRIMARY KEY,
customer_name VARCHAR(100),
email VARCHAR(100)
) -- Server 1
-- Shard 1: customer_id % 4 = 1
CREATE TABLE customers_shard_1 (
customer_id INT PRIMARY KEY,
customer_name VARCHAR(100),
email VARCHAR(100)
) -- Server 2
-- Application routing logic
function getCustomerShard(customerId) {
return `shard_${customerId % 4}`;
}
-- Consistent hashing for better distribution
function consistentHash(key, shardCount) {
const hash = md5(key);
return parseInt(hash.substring(0, 8), 16) % shardCount;
}
3. Directory-Based Sharding
-- Lookup service to map data to shards
CREATE TABLE shard_directory (
entity_type VARCHAR(50),
entity_id VARCHAR(100),
shard_id VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (entity_type, entity_id)
);
-- Example entries
INSERT INTO shard_directory VALUES
('customer', '12345', 'shard_west', NOW()),
('customer', '67890', 'shard_east', NOW()),
('order', 'ORD001', 'shard_west', NOW());
-- Query to find shard
SELECT shard_id FROM shard_directory
WHERE entity_type = 'customer' AND entity_id = '12345';
Implementation Approaches
1. Application-Level Sharding
class ShardManager:
def __init__(self):
self.shards = {
'shard_0': 'mysql://user:pass@db1:3306/shard_0',
'shard_1': 'mysql://user:pass@db2:3306/shard_1',
'shard_2': 'mysql://user:pass@db3:3306/shard_2',
'shard_3': 'mysql://user:pass@db4:3306/shard_3'
}
def get_shard(self, customer_id):
shard_key = f"shard_{customer_id % 4}"
return self.shards[shard_key]
def execute_query(self, customer_id, query, params=None):
connection_string = self.get_shard(customer_id)
conn = connect(connection_string)
return conn.execute(query, params)
def execute_cross_shard_query(self, query, params=None):
results = []
for shard_name, connection_string in self.shards.items():
conn = connect(connection_string)
result = conn.execute(query, params)
results.extend(result)
return results
2. Middleware-Based Sharding
-- Using ProxySQL for MySQL sharding
-- Configure sharding rules
INSERT INTO mysql_query_rules (
rule_id, active, match_pattern, destination_hostgroup, apply
) VALUES
(1, 1, '^SELECT.*FROM customers WHERE customer_id = ([0-9]+).*',
0, 1), -- Route to appropriate hostgroup based on customer_id
-- Hostgroup configuration
INSERT INTO mysql_servers (hostgroup_id, hostname, port) VALUES
(0, 'shard0.example.com', 3306),
(1, 'shard1.example.com', 3306),
(2, 'shard2.example.com', 3306),
(3, 'shard3.example.com', 3306);
3. Database-Native Sharding
-- PostgreSQL: Foreign Data Wrappers for sharding
CREATE EXTENSION postgres_fdw;
-- Create foreign servers
CREATE SERVER shard_1 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'shard1.example.com', port '5432', dbname 'shard_db');
CREATE SERVER shard_2 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'shard2.example.com', port '5432', dbname 'shard_db');
-- Create user mappings
CREATE USER MAPPING FOR current_user SERVER shard_1
OPTIONS (user 'shard_user', password 'password');
-- Create foreign tables
CREATE FOREIGN TABLE customers_shard_1 (
customer_id INT,
customer_name VARCHAR(100),
email VARCHAR(100)
) SERVER shard_1 OPTIONS (schema_name 'public', table_name 'customers');
-- Create partitioned table with foreign table partitions
CREATE TABLE customers (
customer_id INT,
customer_name VARCHAR(100),
email VARCHAR(100)
) PARTITION BY HASH (customer_id);
-- Attach foreign tables as partitions
ALTER TABLE customers ATTACH PARTITION customers_shard_1
FOR VALUES WITH (MODULUS 4, REMAINDER 0);
Cross-Shard Operations
1. Cross-Shard Queries
-- Aggregation across shards
-- Application-level aggregation
function getTotalOrdersByRegion() {
const results = {};
for (const shard of shards) {
const shardResults = shard.query(`
SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY region
`);
// Merge results
for (const row of shardResults) {
if (!results[row.region]) {
results[row.region] = { count: 0, amount: 0 };
}
results[row.region].count += row.order_count;
results[row.region].amount += row.total_amount;
}
}
return results;
}
-- Using UNION ALL for cross-shard queries
SELECT region, SUM(order_count) as total_orders, SUM(total_amount) as total_revenue
FROM (
SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
FROM shard_1.orders WHERE order_date >= '2024-01-01' GROUP BY region
UNION ALL
SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
FROM shard_2.orders WHERE order_date >= '2024-01-01' GROUP BY region
UNION ALL
SELECT region, COUNT(*) as order_count, SUM(total) as total_amount
FROM shard_3.orders WHERE order_date >= '2024-01-01' GROUP BY region
) combined_results
GROUP BY region;
2. Distributed Transactions
# Two-Phase Commit for cross-shard transactions
class DistributedTransaction:
def __init__(self, shard_manager):
self.shard_manager = shard_manager
self.participants = []
def begin_transaction(self, shard_ids):
self.participants = []
for shard_id in shard_ids:
conn = self.shard_manager.get_connection(shard_id)
conn.execute("BEGIN")
self.participants.append((shard_id, conn))
def prepare_phase(self):
prepared_shards = []
try:
for shard_id, conn in self.participants:
# Phase 1: Prepare
result = conn.execute("PREPARE TRANSACTION 'xact_001'")
if result.success:
prepared_shards.append((shard_id, conn))
else:
raise Exception(f"Prepare failed on shard {shard_id}")
return prepared_shards
except Exception as e:
# Abort on all prepared shards
for shard_id, conn in prepared_shards:
conn.execute("ROLLBACK PREPARED 'xact_001'")
raise e
def commit_phase(self, prepared_shards):
for shard_id, conn in prepared_shards:
# Phase 2: Commit
conn.execute("COMMIT PREPARED 'xact_001'")
Shard Management
1. Shard Splitting
-- Split a shard when it becomes too large
-- Original shard: customer_id % 2 = 0 (customers 0, 2, 4, 6, ...)
-- Split into:
-- customer_id % 4 = 0 (customers 0, 4, 8, ...)
-- customer_id % 4 = 2 (customers 2, 6, 10, ...)
-- Step 1: Create new shard
CREATE TABLE customers_shard_new (
customer_id INT PRIMARY KEY,
customer_name VARCHAR(100),
email VARCHAR(100)
);
-- Step 2: Migrate data
INSERT INTO customers_shard_new
SELECT * FROM customers_shard_0
WHERE customer_id % 4 = 2;
-- Step 3: Remove migrated data from original shard
DELETE FROM customers_shard_0
WHERE customer_id % 4 = 2;
-- Step 4: Update application routing logic
function getCustomerShard(customerId) {
return `shard_${customerId % 4}`; // Changed from % 2 to % 4
}
2. Shard Rebalancing
class ShardRebalancer:
def __init__(self, shard_manager):
self.shard_manager = shard_manager
def analyze_shard_distribution(self):
shard_stats = {}
for shard_id in self.shard_manager.get_all_shards():
stats = self.shard_manager.execute_query(shard_id, """
SELECT
COUNT(*) as row_count,
pg_size_pretty(pg_total_relation_size('customers')) as size
FROM customers
""")
shard_stats[shard_id] = stats
return shard_stats
def rebalance_shards(self, source_shard, target_shard, migration_criteria):
# Step 1: Identify data to migrate
migration_data = self.shard_manager.execute_query(source_shard, f"""
SELECT * FROM customers WHERE {migration_criteria}
""")
# Step 2: Insert into target shard
for row in migration_data:
self.shard_manager.execute_query(target_shard,
"INSERT INTO customers VALUES (%s, %s, %s)",
(row.customer_id, row.customer_name, row.email))
# Step 3: Remove from source shard
self.shard_manager.execute_query(source_shard, f"""
DELETE FROM customers WHERE {migration_criteria}
""")
Challenges and Solutions
1. Hot Spots
-- Problem: Uneven data distribution
-- Solution: Better shard key selection
-- Bad: Sequential IDs create hot spots
-- All new customers go to the latest shard
CREATE TABLE customers (
customer_id SERIAL PRIMARY KEY, -- Sequential, creates hot spots
customer_name VARCHAR(100)
);
-- Good: Use compound shard key
CREATE TABLE customers (
customer_id BIGINT PRIMARY KEY,
region VARCHAR(10),
customer_name VARCHAR(100)
);
-- Shard by region + hash of customer_id
function getShardKey(customerId, region) {
const regionHash = hash(region) % 2; // 2 regions per shard group
const customerHash = hash(customerId) % 2; // 2 customer groups per region
return `shard_${regionHash}_${customerHash}`;
}
2. Cross-Shard Joins
-- Challenge: JOINs across shards are expensive
-- Solution 1: Denormalization
CREATE TABLE order_details_denormalized (
order_id BIGINT,
customer_id INT,
customer_name VARCHAR(100), -- Denormalized from customers table
customer_email VARCHAR(100), -- Denormalized from customers table
product_id INT,
product_name VARCHAR(100), -- Denormalized from products table
quantity INT,
unit_price DECIMAL(10,2)
);
-- Solution 2: Co-location of related data
-- Shard both customers and orders by customer_id
function getShardForCustomerData(customerId) {
return `shard_${customerId % 4}`;
}
-- Both tables use same sharding strategy
CREATE TABLE customers_shard_0 AS SELECT * FROM customers WHERE customer_id % 4 = 0;
CREATE TABLE orders_shard_0 AS SELECT * FROM orders WHERE customer_id % 4 = 0;
-- Now JOINs work within each shard
SELECT c.customer_name, COUNT(o.order_id) as order_count
FROM customers_shard_0 c
LEFT JOIN orders_shard_0 o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.customer_name;
3. Global Constraints
-- Challenge: Unique constraints across shards
-- Solution: Global ID generation service
-- Centralized ID generator
CREATE TABLE global_sequences (
sequence_name VARCHAR(50) PRIMARY KEY,
current_value BIGINT NOT NULL,
increment_by INT DEFAULT 1
);
-- Function to get next global ID
CREATE OR REPLACE FUNCTION get_next_global_id(seq_name VARCHAR(50))
RETURNS BIGINT AS $$
DECLARE
next_id BIGINT;
BEGIN
UPDATE global_sequences
SET current_value = current_value + increment_by
WHERE sequence_name = seq_name
RETURNING current_value INTO next_id;
RETURN next_id;
END;
$$ LANGUAGE plpgsql;
-- Alternative: UUID-based approach
-- Use UUIDs to ensure global uniqueness without coordination
CREATE TABLE customers (
customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_name VARCHAR(100),
email VARCHAR(100) UNIQUE -- Still need to check uniqueness across shards
);
Monitoring and Maintenance
1. Shard Health Monitoring
class ShardMonitor:
def __init__(self, shard_manager):
self.shard_manager = shard_manager
def check_shard_health(self):
health_report = {}
for shard_id in self.shard_manager.get_all_shards():
try:
# Check connectivity
conn = self.shard_manager.get_connection(shard_id)
# Check basic metrics
metrics = conn.execute("""
SELECT
(SELECT COUNT(*) FROM customers) as customer_count,
(SELECT COUNT(*) FROM orders) as order_count,
pg_database_size(current_database()) as db_size
""").fetchone()
health_report[shard_id] = {
'status': 'healthy',
'customer_count': metrics.customer_count,
'order_count': metrics.order_count,
'db_size': metrics.db_size
}
except Exception as e:
health_report[shard_id] = {
'status': 'unhealthy',
'error': str(e)
}
return health_report
2. Performance Monitoring
-- Monitor query performance across shards
CREATE TABLE shard_performance_log (
shard_id VARCHAR(50),
query_type VARCHAR(50),
execution_time_ms INT,
rows_affected INT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Log slow queries
CREATE OR REPLACE FUNCTION log_shard_performance(
p_shard_id VARCHAR(50),
p_query_type VARCHAR(50),
p_execution_time INT,
p_rows_affected INT
) RETURNS VOID AS $$
BEGIN
INSERT INTO shard_performance_log
(shard_id, query_type, execution_time_ms, rows_affected)
VALUES (p_shard_id, p_query_type, p_execution_time, p_rows_affected);
END;
$$ LANGUAGE plpgsql;
-- Analyze performance trends
SELECT
shard_id,
query_type,
AVG(execution_time_ms) as avg_execution_time,
MAX(execution_time_ms) as max_execution_time,
COUNT(*) as query_count
FROM shard_performance_log
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY shard_id, query_type
ORDER BY avg_execution_time DESC;
Best Practices
1. Shard Key Selection
- Choose keys with good distribution (avoid hot spots)
- Consider query patterns (minimize cross-shard queries)
- Plan for future scaling (avoid frequent resharding)
- Use compound keys when necessary
2. Data Consistency
- Implement eventual consistency where possible
- Use distributed transactions sparingly
- Design for idempotency
- Handle partial failures gracefully
3. Operational Considerations
- Automate shard provisioning and scaling
- Implement comprehensive monitoring
- Plan for disaster recovery across shards
- Test failover scenarios regularly
Interview Tips
- Understand when sharding is necessary vs other scaling approaches
- Know different sharding strategies and their trade-offs
- Be familiar with challenges like cross-shard queries and global constraints
- Understand the complexity of distributed transactions
- Know how to design shard keys for even distribution
- Practice explaining shard management operations (splitting, rebalancing)
- Be aware of alternatives like read replicas and vertical partitioning
Test Your Knowledge
Take a quick quiz to test your understanding of this topic.