TimescaleDB Support¶
pycopg provides built-in support for TimescaleDB, a PostgreSQL extension for time-series data.
Prerequisites¶
TimescaleDB extension installed on your PostgreSQL server
Regular pycopg installation (no additional packages needed)
Setup¶
from pycopg import Database
db = Database.from_env()
# Enable TimescaleDB extension
db.create_extension("timescaledb")
# Verify installation
if db.has_extension("timescaledb"):
print("TimescaleDB is ready")
Creating Hypertables¶
A hypertable is TimescaleDB’s core table type, automatically partitioning data by time.
Basic Creation¶
# First, create a regular table with a time column
db.execute("""
CREATE TABLE events (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
)
""")
# Convert to hypertable
db.create_hypertable("events", "time")
With Options¶
db.create_hypertable(
"events",
"time",
schema="metrics", # Target schema
chunk_time_interval="1 week", # Chunk interval (default: 1 day)
if_not_exists=True, # Don't error if exists
migrate_data=True, # Migrate existing data
)
Common Chunk Intervals¶
Use Case |
Interval |
|---|---|
High-frequency IoT data |
|
Standard metrics |
|
Long-term storage |
|
Compression¶
Enable compression to reduce storage for older data.
Enable Compression¶
# Enable compression on hypertable
db.enable_compression(
"events",
segment_by="device_id", # Group compressed data
order_by="time DESC", # Order within segments
)
Compression Policy¶
Automatically compress chunks older than a threshold.
# Compress chunks older than 7 days
db.add_compression_policy("events", compress_after="7 days")
# Or custom interval
db.add_compression_policy("events", compress_after="30 days")
Data Retention¶
Automatically drop old data chunks.
# Drop chunks older than 90 days
db.add_retention_policy("logs", drop_after="90 days")
# For metrics, keep 1 year
db.add_retention_policy("metrics", drop_after="365 days")
Listing Hypertables¶
hypertables = db.list_hypertables()
# [
# {
# 'schema': 'public',
# 'table_name': 'events',
# 'num_dimensions': 1,
# 'num_chunks': 30,
# 'compression_enabled': True
# },
# ...
# ]
Hypertable Info¶
info = db.hypertable_info("events")
# {
# 'total_size': '1.2 GB',
# 'detailed_size': {...}
# }
Time-Series Queries¶
Time Bucketing¶
# Average temperature per hour
result = db.execute("""
SELECT
time_bucket('1 hour', time) AS bucket,
device_id,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp
FROM events
WHERE time > NOW() - INTERVAL '1 day'
GROUP BY bucket, device_id
ORDER BY bucket DESC
""")
Last Values¶
# Get last reading for each device
result = db.execute("""
SELECT DISTINCT ON (device_id)
device_id,
time,
temperature,
humidity
FROM events
ORDER BY device_id, time DESC
""")
Moving Average¶
# 5-minute moving average
result = db.execute("""
SELECT
time,
device_id,
temperature,
AVG(temperature) OVER (
PARTITION BY device_id
ORDER BY time
RANGE BETWEEN INTERVAL '5 minutes' PRECEDING AND CURRENT ROW
) AS moving_avg
FROM events
WHERE time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC
""")
Gap Filling¶
# Fill gaps in time series
result = db.execute("""
SELECT
time_bucket_gapfill('1 minute', time) AS bucket,
device_id,
COALESCE(AVG(temperature), locf(AVG(temperature))) AS temperature
FROM events
WHERE time BETWEEN NOW() - INTERVAL '1 hour' AND NOW()
GROUP BY bucket, device_id
ORDER BY bucket
""")
Continuous Aggregates¶
Create materialized views that automatically update.
# Create continuous aggregate
db.execute("""
CREATE MATERIALIZED VIEW hourly_metrics
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
device_id,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp,
COUNT(*) AS samples
FROM events
GROUP BY bucket, device_id
WITH NO DATA
""")
# Add refresh policy
db.execute("""
SELECT add_continuous_aggregate_policy('hourly_metrics',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
)
""")
Complete Example¶
from pycopg import Database
import pandas as pd
from datetime import datetime, timedelta
db = Database.from_env()
# Setup
db.create_extension("timescaledb")
# Create sensor table
db.execute("""
CREATE TABLE IF NOT EXISTS sensors (
time TIMESTAMPTZ NOT NULL,
sensor_id TEXT NOT NULL,
location TEXT,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
pressure DOUBLE PRECISION
)
""")
# Convert to hypertable
db.create_hypertable("sensors", "time", if_not_exists=True)
# Create indexes
db.execute("""
CREATE INDEX IF NOT EXISTS idx_sensors_sensor_id_time
ON sensors (sensor_id, time DESC)
""")
# Enable compression
db.enable_compression(
"sensors",
segment_by="sensor_id",
order_by="time DESC"
)
# Add policies
db.add_compression_policy("sensors", compress_after="7 days")
db.add_retention_policy("sensors", drop_after="90 days")
# Insert sample data
import random
now = datetime.now()
data = [
{
"time": now - timedelta(minutes=i),
"sensor_id": f"sensor_{i % 5}",
"location": f"room_{i % 3}",
"temperature": 20 + random.uniform(-5, 10),
"humidity": 50 + random.uniform(-20, 30),
"pressure": 1013 + random.uniform(-10, 10),
}
for i in range(1000)
]
df = pd.DataFrame(data)
db.from_dataframe(df, "sensors", if_exists="append")
# Query: hourly averages
hourly = db.execute("""
SELECT
time_bucket('1 hour', time) AS hour,
sensor_id,
AVG(temperature) AS avg_temp,
AVG(humidity) AS avg_humidity
FROM sensors
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY hour, sensor_id
ORDER BY hour DESC
""")
# Query: latest readings
latest = db.execute("""
SELECT DISTINCT ON (sensor_id)
sensor_id,
location,
time,
temperature,
humidity,
pressure
FROM sensors
ORDER BY sensor_id, time DESC
""")
# Check hypertable info
print(db.list_hypertables())
db.close()
Best Practices¶
1. Choose Appropriate Chunk Intervals¶
# High-frequency data (1000+ rows/second)
db.create_hypertable("events", "time", chunk_time_interval="1 hour")
# Standard metrics
db.create_hypertable("metrics", "time", chunk_time_interval="1 day")
# Low-frequency, long-term data
db.create_hypertable("monthly_reports", "time", chunk_time_interval="1 month")
2. Use Appropriate Indexes¶
# Index for common queries
db.execute("""
CREATE INDEX ON sensors (sensor_id, time DESC)
""")
# Covering index for specific queries
db.execute("""
CREATE INDEX ON sensors (sensor_id, time DESC)
INCLUDE (temperature, humidity)
""")
3. Monitor Chunk Sizes¶
# Check chunk information
chunks = db.execute("""
SELECT
chunk_name,
range_start,
range_end,
is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensors'
ORDER BY range_start DESC
LIMIT 10
""")
4. Use Continuous Aggregates for Dashboards¶
Pre-aggregate data for faster dashboard queries instead of computing on the fly.