Time Series Data
What is Time Series Data?
Time series data is a sequence of data points indexed in time order, commonly used for metrics, logs, IoT sensor data, and financial data.
MongoDB Time Series Collections
Create Time Series Collection
// MongoDB 5.0+ native time series support
db.createCollection("weather", {
timeseries: {
timeField: "timestamp",
metaField: "metadata",
granularity: "hours"
}
});
// Insert time series data
await db.collection('weather').insertMany([
{
timestamp: new Date("2024-01-01T10:00:00Z"),
metadata: { sensorId: "sensor-1", location: "NYC" },
temperature: 25.5,
humidity: 60
},
{
timestamp: new Date("2024-01-01T11:00:00Z"),
metadata: { sensorId: "sensor-1", location: "NYC" },
temperature: 26.0,
humidity: 58
}
]);Query Time Series Data
// Query by time range
const data = await db.collection('weather').find({
timestamp: {
$gte: new Date("2024-01-01T00:00:00Z"),
$lt: new Date("2024-01-02T00:00:00Z")
},
"metadata.sensorId": "sensor-1"
}).toArray();
// Aggregation on time series
const hourlyAvg = await db.collection('weather').aggregate([
{
$match: {
timestamp: {
$gte: new Date("2024-01-01T00:00:00Z"),
$lt: new Date("2024-01-02T00:00:00Z")
}
}
},
{
$group: {
_id: {
$dateTrunc: {
date: "$timestamp",
unit: "hour"
}
},
avgTemp: { $avg: "$temperature" },
avgHumidity: { $avg: "$humidity" }
}
},
{
$sort: { _id: 1 }
}
]).toArray();Cassandra Time Series
Table Design
-- Time series table with clustering order
CREATE TABLE sensor_data (
sensor_id UUID,
timestamp TIMESTAMP,
temperature DOUBLE,
humidity DOUBLE,
PRIMARY KEY (sensor_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);
-- Partition by time bucket
CREATE TABLE sensor_data_by_day (
sensor_id UUID,
day DATE,
timestamp TIMESTAMP,
temperature DOUBLE,
PRIMARY KEY ((sensor_id, day), timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);Insert and Query
const cassandra = require('cassandra-driver');
const client = new cassandra.Client({
contactPoints: ['localhost'],
localDataCenter: 'datacenter1',
keyspace: 'iot'
});
// Insert time series data
await client.execute(
'INSERT INTO sensor_data (sensor_id, timestamp, temperature, humidity) VALUES (?, ?, ?, ?)',
[sensorId, new Date(), 25.5, 60],
{ prepare: true }
);
// Query recent data
const result = await client.execute(
'SELECT * FROM sensor_data WHERE sensor_id = ? AND timestamp > ? LIMIT 100',
[sensorId, new Date(Date.now() - 86400000)],
{ prepare: true }
);
// Query by day partition
const dayResult = await client.execute(
'SELECT * FROM sensor_data_by_day WHERE sensor_id = ? AND day = ?',
[sensorId, '2024-01-01'],
{ prepare: true }
);InfluxDB (Purpose-Built Time Series)
Write Data
const { InfluxDB, Point } = require('@influxdata/influxdb-client');
const influxDB = new InfluxDB({
url: 'http://localhost:8086',
token: 'my-token'
});
const writeApi = influxDB.getWriteApi('my-org', 'my-bucket');
// Write point
const point = new Point('temperature')
.tag('sensor', 'sensor-1')
.tag('location', 'NYC')
.floatField('value', 25.5)
.timestamp(new Date());
writeApi.writePoint(point);
await writeApi.close();
// Batch write
const points = [
new Point('temperature').tag('sensor', 'sensor-1').floatField('value', 25.5),
new Point('humidity').tag('sensor', 'sensor-1').floatField('value', 60)
];
writeApi.writePoints(points);
await writeApi.close();Query Data
const queryApi = influxDB.getQueryApi('my-org');
// Flux query
const query = `
from(bucket: "my-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.sensor == "sensor-1")
|> aggregateWindow(every: 5m, fn: mean)
`;
const data = [];
await queryApi.queryRows(query, {
next(row, tableMeta) {
const o = tableMeta.toObject(row);
data.push(o);
},
error(error) {
console.error(error);
},
complete() {
console.log('Query complete');
}
});Redis Time Series
RedisTimeSeries Module
const redis = require('redis');
const client = redis.createClient();
await client.connect();
// Create time series
await client.ts.create('temperature:sensor-1', {
RETENTION: 86400000, // 24 hours in ms
LABELS: { sensor: 'sensor-1', location: 'NYC' }
});
// Add data points
await client.ts.add('temperature:sensor-1', '*', 25.5);
await client.ts.add('temperature:sensor-1', Date.now(), 26.0);
// Query range
const data = await client.ts.range('temperature:sensor-1',
Date.now() - 3600000, // Last hour
Date.now()
);
// Aggregation
const hourlyAvg = await client.ts.range('temperature:sensor-1',
Date.now() - 86400000,
Date.now(),
{
AGGREGATION: {
type: 'AVG',
timeBucket: 3600000 // 1 hour
}
}
);DynamoDB Time Series
Table Design
// Partition by device and time bucket
const tableSchema = {
TableName: 'SensorData',
KeySchema: [
{ AttributeName: 'deviceId_bucket', KeyType: 'HASH' },
{ AttributeName: 'timestamp', KeyType: 'RANGE' }
],
AttributeDefinitions: [
{ AttributeName: 'deviceId_bucket', AttributeType: 'S' },
{ AttributeName: 'timestamp', AttributeType: 'N' }
]
};
// Insert with time bucket
async function insertReading(deviceId, reading) {
const timestamp = Date.now();
const bucket = Math.floor(timestamp / (24 * 60 * 60 * 1000)); // Daily bucket
await docClient.send(new PutCommand({
TableName: 'SensorData',
Item: {
deviceId_bucket: `${deviceId}#${bucket}`,
timestamp,
temperature: reading.temperature,
humidity: reading.humidity
}
}));
}
// Query time range
async function queryRange(deviceId, startTime, endTime) {
const startBucket = Math.floor(startTime / (24 * 60 * 60 * 1000));
const endBucket = Math.floor(endTime / (24 * 60 * 60 * 1000));
const results = [];
for (let bucket = startBucket; bucket <= endBucket; bucket++) {
const command = new QueryCommand({
TableName: 'SensorData',
KeyConditionExpression: 'deviceId_bucket = :pk AND #ts BETWEEN :start AND :end',
ExpressionAttributeNames: {
'#ts': 'timestamp'
},
ExpressionAttributeValues: {
':pk': `${deviceId}#${bucket}`,
':start': startTime,
':end': endTime
}
});
const response = await docClient.send(command);
results.push(...response.Items);
}
return results;
}Downsampling and Aggregation
// MongoDB aggregation for downsampling
async function downsampleToHourly() {
await db.collection('sensor_data_hourly').deleteMany({});
await db.collection('sensor_data').aggregate([
{
$group: {
_id: {
sensorId: "$metadata.sensorId",
hour: {
$dateTrunc: {
date: "$timestamp",
unit: "hour"
}
}
},
avgTemp: { $avg: "$temperature" },
minTemp: { $min: "$temperature" },
maxTemp: { $max: "$temperature" },
count: { $sum: 1 }
}
},
{
$out: "sensor_data_hourly"
}
]).toArray();
}
// Scheduled downsampling
setInterval(downsampleToHourly, 3600000); // Every hourData Retention
// MongoDB TTL index for automatic deletion
db.sensor_data.createIndex(
{ timestamp: 1 },
{ expireAfterSeconds: 2592000 } // 30 days
);
// Manual cleanup
async function cleanupOldData() {
const cutoffDate = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000);
await db.collection('sensor_data').deleteMany({
timestamp: { $lt: cutoffDate }
});
}
// Cassandra TTL
await client.execute(
'INSERT INTO sensor_data (sensor_id, timestamp, temperature) VALUES (?, ?, ?) USING TTL 2592000',
[sensorId, new Date(), 25.5],
{ prepare: true }
);Rollup Strategy
// Multi-resolution storage
class TimeSeriesRollup {
async insertRawData(sensorId, data) {
// Raw data (1 minute resolution, 7 days retention)
await db.collection('sensor_data_raw').insertOne({
sensorId,
timestamp: new Date(),
...data
});
}
async rollupToHourly() {
// Hourly aggregates (30 days retention)
await db.collection('sensor_data_raw').aggregate([
{
$match: {
timestamp: {
$gte: new Date(Date.now() - 3600000),
$lt: new Date()
}
}
},
{
$group: {
_id: {
sensorId: "$sensorId",
hour: { $dateTrunc: { date: "$timestamp", unit: "hour" } }
},
avg: { $avg: "$temperature" },
min: { $min: "$temperature" },
max: { $max: "$temperature" }
}
},
{
$merge: {
into: "sensor_data_hourly",
whenMatched: "replace"
}
}
]).toArray();
}
async rollupToDaily() {
// Daily aggregates (1 year retention)
await db.collection('sensor_data_hourly').aggregate([
{
$match: {
"_id.hour": {
$gte: new Date(Date.now() - 86400000),
$lt: new Date()
}
}
},
{
$group: {
_id: {
sensorId: "$_id.sensorId",
day: { $dateTrunc: { date: "$_id.hour", unit: "day" } }
},
avg: { $avg: "$avg" },
min: { $min: "$min" },
max: { $max: "$max" }
}
},
{
$merge: {
into: "sensor_data_daily",
whenMatched: "replace"
}
}
]).toArray();
}
}.NET Time Series
using MongoDB.Driver;
public class TimeSeriesService
{
private readonly IMongoCollection<SensorReading> _readings;
public async Task InsertReading(SensorReading reading)
{
reading.Timestamp = DateTime.UtcNow;
await _readings.InsertOneAsync(reading);
}
public async Task<List<SensorReading>> GetRange(
string sensorId,
DateTime start,
DateTime end)
{
return await _readings.Find(r =>
r.Metadata.SensorId == sensorId &&
r.Timestamp >= start &&
r.Timestamp < end
).ToListAsync();
}
public async Task<List<HourlyAggregate>> GetHourlyAggregates(
string sensorId,
DateTime start,
DateTime end)
{
var pipeline = _readings.Aggregate()
.Match(r =>
r.Metadata.SensorId == sensorId &&
r.Timestamp >= start &&
r.Timestamp < end)
.Group(
r => new
{
SensorId = r.Metadata.SensorId,
Hour = r.Timestamp.Date.AddHours(r.Timestamp.Hour)
},
g => new HourlyAggregate
{
SensorId = g.Key.SensorId,
Hour = g.Key.Hour,
AvgTemperature = g.Average(r => r.Temperature),
MinTemperature = g.Min(r => r.Temperature),
MaxTemperature = g.Max(r => r.Temperature)
}
)
.SortBy(a => a.Hour);
return await pipeline.ToListAsync();
}
}Best Practices
const timeSeriesBestPractices = [
'Use time-bucketed partitions',
'Create indexes on timestamp',
'Implement data retention policies',
'Use downsampling for old data',
'Store multiple resolutions',
'Batch writes for efficiency',
'Use appropriate granularity',
'Monitor storage growth',
'Compress old data',
'Use purpose-built databases for scale'
];Interview Tips
- Explain time series: Sequential data indexed by time
- Show MongoDB: Time series collections, aggregations
- Demonstrate Cassandra: Clustering order, time buckets
- Discuss retention: TTL, cleanup strategies
- Mention rollups: Multi-resolution storage
- Show examples: IoT, metrics, financial data
Summary
Time series data is sequential data indexed by time. MongoDB 5.0+ offers native time series collections with optimized storage. Cassandra uses clustering order for time-based queries. InfluxDB and RedisTimeSeries are purpose-built for time series. DynamoDB uses time-bucketed partitions. Implement data retention with TTL indexes. Use downsampling and rollups for historical data. Store multiple resolutions (raw, hourly, daily). Essential for IoT, monitoring, and analytics applications.
Test Your Knowledge
Take a quick quiz to test your understanding of this topic.