AsyncDatabase Class

The AsyncDatabase class provides async/await support for all database operations.

Connection

from pycopg import AsyncDatabase

# From environment
db = AsyncDatabase.from_env()

# From URL
db = AsyncDatabase.from_url("postgresql://user:pass@localhost:5432/mydb")

# As async context manager
async with AsyncDatabase.from_env() as db:
    users = await db.execute("SELECT * FROM users")

Query Execution

execute()

# SELECT queries
users = await db.execute("SELECT * FROM users WHERE active = %s", [True])
# [{'id': 1, 'name': 'Alice', 'active': True}, ...]

# INSERT/UPDATE/DELETE
await db.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
await db.execute("UPDATE users SET active = %s WHERE id = %s", [False, 1])

# With autocommit
await db.execute("CREATE DATABASE newdb", autocommit=True)

execute_many()

Execute SQL for multiple parameter sets. Uses executemany() internally for better performance.

count = await db.execute_many(
    "INSERT INTO users (name, email) VALUES (%s, %s)",
    [
        ("Alice", "alice@example.com"),
        ("Bob", "bob@example.com"),
    ]
)
print(f"Inserted {count} rows")

insert_batch()

High-performance batch insert using a single INSERT with multiple VALUES tuples.

count = await db.insert_batch("users", [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"},
])
print(f"Inserted {count} rows")

# With ON CONFLICT (upsert)
await db.insert_batch("users", rows, on_conflict="(email) DO UPDATE SET name = EXCLUDED.name")

copy_insert()

Ultra-fast bulk insert using PostgreSQL’s COPY protocol.

rows = [{"name": f"User {i}", "email": f"user{i}@example.com"} for i in range(100000)]
count = await db.copy_insert("users", rows)

fetch_one()

user = await db.fetch_one("SELECT * FROM users WHERE id = %s", [1])
# {'id': 1, 'name': 'Alice', ...}

# Returns None if not found
missing = await db.fetch_one("SELECT * FROM users WHERE id = %s", [9999])

fetch_val()

count = await db.fetch_val("SELECT COUNT(*) FROM users")
# 42

name = await db.fetch_val("SELECT name FROM users WHERE id = %s", [1])
# 'Alice'

Session Mode

Session mode keeps a single connection open for multiple operations, reducing connection overhead.

# Without session: each operation opens/closes a connection
await db.execute("SELECT 1")  # Open, execute, close
await db.execute("SELECT 2")  # Open, execute, close

# With session: single connection for all operations
async with db.session() as session:
    await session.execute("SELECT 1")
    await session.execute("SELECT 2")
    await session.insert_batch("users", rows)
    # Connection closed automatically at end

# With autocommit mode
async with db.session(autocommit=True) as session:
    await session.execute("CREATE DATABASE newdb")

# Check if in session mode
if db.in_session:
    print("Currently in session mode")

Note: Nested sessions are not supported and will raise a RuntimeError.

Context Managers

connect()

Async connection context manager.

async with db.connect() as conn:
    result = await conn.execute("SELECT * FROM users")
    rows = await result.fetchall()

# With autocommit
async with db.connect(autocommit=True) as conn:
    await conn.execute("CREATE DATABASE newdb")

cursor()

Async cursor context manager with dict rows.

async with db.cursor() as cur:
    await cur.execute("SELECT * FROM users WHERE id = %s", [1])
    user = await cur.fetchone()  # Returns dict
    print(user['name'])

transaction()

Transaction context manager with automatic commit/rollback.

async with db.transaction() as conn:
    await conn.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
    await conn.execute("UPDATE stats SET count = count + 1")
    # Commits automatically on success
    # Rolls back automatically on exception

Streaming Results

For large result sets, use streaming to avoid loading everything into memory.

async for row in db.stream("SELECT * FROM large_table", batch_size=1000):
    process(row)

# With parameters
async for row in db.stream(
    "SELECT * FROM events WHERE date > %s",
    params=["2024-01-01"],
    batch_size=5000
):
    process(row)

Batch Operations

insert_many()

Efficiently insert multiple rows.

count = await db.insert_many("users", [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"},
    {"name": "Charlie", "email": "charlie@example.com"},
])
print(f"Inserted {count} rows")

# With ON CONFLICT
await db.insert_many(
    "users",
    rows,
    on_conflict="(email) DO NOTHING"
)

upsert_many()

Insert or update rows based on conflict columns.

count = await db.upsert_many(
    "users",
    [
        {"id": 1, "name": "Alice Updated", "email": "alice@new.com"},
        {"id": 2, "name": "Bob Updated", "email": "bob@new.com"},
    ],
    conflict_columns=["id"],
    update_columns=["name", "email"]
)

LISTEN/NOTIFY

PostgreSQL’s pub/sub mechanism for real-time notifications.

notify()

Send a notification.

import json

# Simple notification
await db.notify("events", "user_created")

# JSON payload
await db.notify("events", json.dumps({
    "type": "user_created",
    "id": 1,
    "name": "Alice"
}))

listen()

Listen for notifications.

import json

async for payload in db.listen("events"):
    event = json.loads(payload)
    print(f"Received: {event}")

    if event["type"] == "user_created":
        await handle_user_created(event["id"])

Schema Operations

All schema exploration methods are available asynchronously.

# Schemas
schemas = await db.list_schemas()
exists = await db.schema_exists("app")
await db.create_schema("new_schema")

# Tables
tables = await db.list_tables("public")
exists = await db.table_exists("users")
info = await db.table_info("users")  # Column details
# Returns: column_name, data_type, is_nullable, column_default, etc.

# Lightweight column introspection
cols = await db.list_columns("users")
# ['id', 'name', 'email']

types = await db.columns_with_types("users")
# [('id', 'integer'), ('name', 'text')]

count = await db.row_count("users")

# Extensions
extensions = await db.list_extensions()
has_postgis = await db.has_extension("postgis")
await db.create_extension("uuid-ossp")

# Roles
roles = await db.list_roles()
exists = await db.role_exists("appuser")

# Size
size = await db.size()
table_size = await db.table_size("users")

DataFrame Operations

AsyncDatabase supports pandas and geopandas operations using run_sync internally, since pandas/geopandas are synchronous libraries.

import pandas as pd
import geopandas as gpd

# Read table to DataFrame
df = await db.to_dataframe("users")

# Write DataFrame to table
await db.from_dataframe(df, "users", if_exists="replace")

# Read spatial table to GeoDataFrame
gdf = await db.to_geodataframe("cities", geometry_column="location")

# Write GeoDataFrame to spatial table
await db.from_geodataframe(
    gdf,
    "cities",
    if_exists="replace",
    spatial_index=True  # Create spatial index automatically
)

Note: DataFrame operations use run_sync internally to execute synchronous pandas/geopandas code in an async context. This is necessary because pandas and geopandas are not async-aware libraries.

Admin Operations

Full administrative capabilities are available asynchronously.

# Create table with columns
await db.create_table("products", {
    "id": "SERIAL PRIMARY KEY",
    "name": "TEXT NOT NULL",
    "price": "DECIMAL(10,2)"
})

# Drop table
await db.drop_table("old_table")

# Create index
await db.create_index("products", "name", unique=True)

# Drop index
await db.drop_index("products_name_idx")

# List indexes
indexes = await db.list_indexes("products")

# List constraints
constraints = await db.list_constraints("products")

# Drop schema with cascade
await db.drop_schema("old_schema", cascade=True)

# Get table sizes
sizes = await db.table_sizes("public")
# Returns list of dicts with table names and sizes

Maintenance Operations

Database maintenance operations are fully async.

# Vacuum table (with analyze)
await db.vacuum("large_table", analyze=True)

# Analyze table statistics
await db.analyze("products")

# Explain query plan
plan = await db.explain(
    "SELECT * FROM users WHERE created_at > %s",
    params=["2024-01-01"],
    analyze=True
)
for line in plan:
    print(line)

Backup & Restore Operations

Backup operations use asyncio.create_subprocess_exec internally to run pg_dump/pg_restore asynchronously.

# Dump database to file (custom format)
await db.pg_dump("backup.dump")

# Restore database from file
await db.pg_restore("backup.dump", clean=True)

# Export table to CSV
rows_exported = await db.copy_to_csv("users", "users.csv")

# Import table from CSV
rows_imported = await db.copy_from_csv("users", "users.csv")

Note: pg_dump and pg_restore require the PostgreSQL client tools to be installed on the system.

Database Lifecycle

Create and drop databases asynchronously.

# Create new database
await db.create_database("analytics", owner="analyst")

# Drop database
await db.drop_database("old_db")

Note: These operations require autocommit mode and appropriate privileges.

Role Management

Full role and privilege management asynchronously.

# Create role with password
await db.create_role("appuser", password="secret123", login=True)

# Drop role
await db.drop_role("old_user")

# Alter role password
await db.alter_role("appuser", password="newsecret")

# Grant table privileges
await db.grant("SELECT", "users", "appuser")
await db.grant("INSERT,UPDATE", "orders", "appuser")

# Revoke privileges
await db.revoke("DELETE", "users", "appuser")

# Grant role membership
await db.grant_role("admin", "appuser", with_admin=False)

# Revoke role membership
await db.revoke_role("admin", "appuser")

# List role members
members = await db.list_role_members("admin")

# List role grants
grants = await db.list_role_grants("appuser")

PostGIS Operations

PostGIS spatial operations are available asynchronously.

# Create spatial index
await db.create_spatial_index("cities", column="location")

# List geometry columns
geom_cols = await db.list_geometry_columns()
# Returns: table_name, column_name, srid, type

TimescaleDB Operations

TimescaleDB hypertable and policy management asynchronously.

# Create hypertable
await db.create_hypertable("metrics", "timestamp", chunk_time_interval="1 day")

# Enable compression
await db.enable_compression(
    "metrics",
    segment_by="device_id",
    order_by="timestamp DESC"
)

# Add compression policy
await db.add_compression_policy("metrics", compress_after="7 days")

# Add retention policy
await db.add_retention_policy("metrics", drop_after="90 days")

# List hypertables
hypertables = await db.list_hypertables()

# Get hypertable info
info = await db.hypertable_info("metrics")

Complete Example

import asyncio
import json
from pycopg import AsyncDatabase

async def main():
    # Connect
    db = AsyncDatabase.from_env()

    try:
        # Create table
        await db.execute("""
            CREATE TABLE IF NOT EXISTS events (
                id SERIAL PRIMARY KEY,
                type TEXT NOT NULL,
                data JSONB,
                created_at TIMESTAMPTZ DEFAULT NOW()
            )
        """)

        # Insert data
        await db.insert_many("events", [
            {"type": "user.created", "data": {"name": "Alice"}},
            {"type": "user.updated", "data": {"name": "Alice B."}},
        ])

        # Query
        recent = await db.execute("""
            SELECT * FROM events
            WHERE created_at > NOW() - INTERVAL '1 hour'
            ORDER BY created_at DESC
            LIMIT 10
        """)

        for event in recent:
            print(f"{event['type']}: {event['data']}")

        # Stream large results
        async for row in db.stream("SELECT * FROM events", batch_size=100):
            process_event(row)

    finally:
        await db.close()

if __name__ == "__main__":
    asyncio.run(main())