TimescaleDB Support

pycopg provides built-in support for TimescaleDB, a PostgreSQL extension for time-series data.

Prerequisites

  1. TimescaleDB extension installed on your PostgreSQL server

  2. Regular pycopg installation (no additional packages needed)

Setup

from pycopg import Database

db = Database.from_env()

# Enable TimescaleDB extension
db.create_extension("timescaledb")

# Verify installation
if db.has_extension("timescaledb"):
    print("TimescaleDB is ready")

Creating Hypertables

A hypertable is TimescaleDB’s core table type, automatically partitioning data by time.

Basic Creation

# First, create a regular table with a time column
db.execute("""
    CREATE TABLE events (
        time TIMESTAMPTZ NOT NULL,
        device_id TEXT NOT NULL,
        temperature DOUBLE PRECISION,
        humidity DOUBLE PRECISION
    )
""")

# Convert to hypertable
db.create_hypertable("events", "time")

With Options

db.create_hypertable(
    "events",
    "time",
    schema="metrics",              # Target schema
    chunk_time_interval="1 week",  # Chunk interval (default: 1 day)
    if_not_exists=True,            # Don't error if exists
    migrate_data=True,             # Migrate existing data
)

Common Chunk Intervals

Use Case

Interval

High-frequency IoT data

1 hour

Standard metrics

1 day

Long-term storage

1 week or 1 month

Compression

Enable compression to reduce storage for older data.

Enable Compression

# Enable compression on hypertable
db.enable_compression(
    "events",
    segment_by="device_id",       # Group compressed data
    order_by="time DESC",         # Order within segments
)

Compression Policy

Automatically compress chunks older than a threshold.

# Compress chunks older than 7 days
db.add_compression_policy("events", compress_after="7 days")

# Or custom interval
db.add_compression_policy("events", compress_after="30 days")

Data Retention

Automatically drop old data chunks.

# Drop chunks older than 90 days
db.add_retention_policy("logs", drop_after="90 days")

# For metrics, keep 1 year
db.add_retention_policy("metrics", drop_after="365 days")

Listing Hypertables

hypertables = db.list_hypertables()
# [
#     {
#         'schema': 'public',
#         'table_name': 'events',
#         'num_dimensions': 1,
#         'num_chunks': 30,
#         'compression_enabled': True
#     },
#     ...
# ]

Hypertable Info

info = db.hypertable_info("events")
# {
#     'total_size': '1.2 GB',
#     'detailed_size': {...}
# }

Time-Series Queries

Time Bucketing

# Average temperature per hour
result = db.execute("""
    SELECT
        time_bucket('1 hour', time) AS bucket,
        device_id,
        AVG(temperature) AS avg_temp,
        MAX(temperature) AS max_temp,
        MIN(temperature) AS min_temp
    FROM events
    WHERE time > NOW() - INTERVAL '1 day'
    GROUP BY bucket, device_id
    ORDER BY bucket DESC
""")

Last Values

# Get last reading for each device
result = db.execute("""
    SELECT DISTINCT ON (device_id)
        device_id,
        time,
        temperature,
        humidity
    FROM events
    ORDER BY device_id, time DESC
""")

Moving Average

# 5-minute moving average
result = db.execute("""
    SELECT
        time,
        device_id,
        temperature,
        AVG(temperature) OVER (
            PARTITION BY device_id
            ORDER BY time
            RANGE BETWEEN INTERVAL '5 minutes' PRECEDING AND CURRENT ROW
        ) AS moving_avg
    FROM events
    WHERE time > NOW() - INTERVAL '1 hour'
    ORDER BY time DESC
""")

Gap Filling

# Fill gaps in time series
result = db.execute("""
    SELECT
        time_bucket_gapfill('1 minute', time) AS bucket,
        device_id,
        COALESCE(AVG(temperature), locf(AVG(temperature))) AS temperature
    FROM events
    WHERE time BETWEEN NOW() - INTERVAL '1 hour' AND NOW()
    GROUP BY bucket, device_id
    ORDER BY bucket
""")

Continuous Aggregates

Create materialized views that automatically update.

# Create continuous aggregate
db.execute("""
    CREATE MATERIALIZED VIEW hourly_metrics
    WITH (timescaledb.continuous) AS
    SELECT
        time_bucket('1 hour', time) AS bucket,
        device_id,
        AVG(temperature) AS avg_temp,
        MAX(temperature) AS max_temp,
        MIN(temperature) AS min_temp,
        COUNT(*) AS samples
    FROM events
    GROUP BY bucket, device_id
    WITH NO DATA
""")

# Add refresh policy
db.execute("""
    SELECT add_continuous_aggregate_policy('hourly_metrics',
        start_offset => INTERVAL '3 hours',
        end_offset => INTERVAL '1 hour',
        schedule_interval => INTERVAL '1 hour'
    )
""")

Complete Example

from pycopg import Database
import pandas as pd
from datetime import datetime, timedelta

db = Database.from_env()

# Setup
db.create_extension("timescaledb")

# Create sensor table
db.execute("""
    CREATE TABLE IF NOT EXISTS sensors (
        time TIMESTAMPTZ NOT NULL,
        sensor_id TEXT NOT NULL,
        location TEXT,
        temperature DOUBLE PRECISION,
        humidity DOUBLE PRECISION,
        pressure DOUBLE PRECISION
    )
""")

# Convert to hypertable
db.create_hypertable("sensors", "time", if_not_exists=True)

# Create indexes
db.execute("""
    CREATE INDEX IF NOT EXISTS idx_sensors_sensor_id_time
    ON sensors (sensor_id, time DESC)
""")

# Enable compression
db.enable_compression(
    "sensors",
    segment_by="sensor_id",
    order_by="time DESC"
)

# Add policies
db.add_compression_policy("sensors", compress_after="7 days")
db.add_retention_policy("sensors", drop_after="90 days")

# Insert sample data
import random
now = datetime.now()
data = [
    {
        "time": now - timedelta(minutes=i),
        "sensor_id": f"sensor_{i % 5}",
        "location": f"room_{i % 3}",
        "temperature": 20 + random.uniform(-5, 10),
        "humidity": 50 + random.uniform(-20, 30),
        "pressure": 1013 + random.uniform(-10, 10),
    }
    for i in range(1000)
]

df = pd.DataFrame(data)
db.from_dataframe(df, "sensors", if_exists="append")

# Query: hourly averages
hourly = db.execute("""
    SELECT
        time_bucket('1 hour', time) AS hour,
        sensor_id,
        AVG(temperature) AS avg_temp,
        AVG(humidity) AS avg_humidity
    FROM sensors
    WHERE time > NOW() - INTERVAL '24 hours'
    GROUP BY hour, sensor_id
    ORDER BY hour DESC
""")

# Query: latest readings
latest = db.execute("""
    SELECT DISTINCT ON (sensor_id)
        sensor_id,
        location,
        time,
        temperature,
        humidity,
        pressure
    FROM sensors
    ORDER BY sensor_id, time DESC
""")

# Check hypertable info
print(db.list_hypertables())

db.close()

Best Practices

1. Choose Appropriate Chunk Intervals

# High-frequency data (1000+ rows/second)
db.create_hypertable("events", "time", chunk_time_interval="1 hour")

# Standard metrics
db.create_hypertable("metrics", "time", chunk_time_interval="1 day")

# Low-frequency, long-term data
db.create_hypertable("monthly_reports", "time", chunk_time_interval="1 month")

2. Use Appropriate Indexes

# Index for common queries
db.execute("""
    CREATE INDEX ON sensors (sensor_id, time DESC)
""")

# Covering index for specific queries
db.execute("""
    CREATE INDEX ON sensors (sensor_id, time DESC)
    INCLUDE (temperature, humidity)
""")

3. Monitor Chunk Sizes

# Check chunk information
chunks = db.execute("""
    SELECT
        chunk_name,
        range_start,
        range_end,
        is_compressed
    FROM timescaledb_information.chunks
    WHERE hypertable_name = 'sensors'
    ORDER BY range_start DESC
    LIMIT 10
""")

4. Use Continuous Aggregates for Dashboards

Pre-aggregate data for faster dashboard queries instead of computing on the fly.