AsyncDatabase Class¶
The AsyncDatabase class provides async/await support for all database operations.
Connection¶
from pycopg import AsyncDatabase
# From environment
db = AsyncDatabase.from_env()
# From URL
db = AsyncDatabase.from_url("postgresql://user:pass@localhost:5432/mydb")
# As async context manager
async with AsyncDatabase.from_env() as db:
users = await db.execute("SELECT * FROM users")
Query Execution¶
execute()¶
# SELECT queries
users = await db.execute("SELECT * FROM users WHERE active = %s", [True])
# [{'id': 1, 'name': 'Alice', 'active': True}, ...]
# INSERT/UPDATE/DELETE
await db.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
await db.execute("UPDATE users SET active = %s WHERE id = %s", [False, 1])
# With autocommit
await db.execute("CREATE DATABASE newdb", autocommit=True)
execute_many()¶
Execute SQL for multiple parameter sets. Uses executemany() internally for better performance.
count = await db.execute_many(
"INSERT INTO users (name, email) VALUES (%s, %s)",
[
("Alice", "alice@example.com"),
("Bob", "bob@example.com"),
]
)
print(f"Inserted {count} rows")
insert_batch()¶
High-performance batch insert using a single INSERT with multiple VALUES tuples.
count = await db.insert_batch("users", [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
])
print(f"Inserted {count} rows")
# With ON CONFLICT (upsert)
await db.insert_batch("users", rows, on_conflict="(email) DO UPDATE SET name = EXCLUDED.name")
copy_insert()¶
Ultra-fast bulk insert using PostgreSQL’s COPY protocol.
rows = [{"name": f"User {i}", "email": f"user{i}@example.com"} for i in range(100000)]
count = await db.copy_insert("users", rows)
fetch_one()¶
user = await db.fetch_one("SELECT * FROM users WHERE id = %s", [1])
# {'id': 1, 'name': 'Alice', ...}
# Returns None if not found
missing = await db.fetch_one("SELECT * FROM users WHERE id = %s", [9999])
fetch_val()¶
count = await db.fetch_val("SELECT COUNT(*) FROM users")
# 42
name = await db.fetch_val("SELECT name FROM users WHERE id = %s", [1])
# 'Alice'
Session Mode¶
Session mode keeps a single connection open for multiple operations, reducing connection overhead.
# Without session: each operation opens/closes a connection
await db.execute("SELECT 1") # Open, execute, close
await db.execute("SELECT 2") # Open, execute, close
# With session: single connection for all operations
async with db.session() as session:
await session.execute("SELECT 1")
await session.execute("SELECT 2")
await session.insert_batch("users", rows)
# Connection closed automatically at end
# With autocommit mode
async with db.session(autocommit=True) as session:
await session.execute("CREATE DATABASE newdb")
# Check if in session mode
if db.in_session:
print("Currently in session mode")
Note: Nested sessions are not supported and will raise a
RuntimeError.
Context Managers¶
connect()¶
Async connection context manager.
async with db.connect() as conn:
result = await conn.execute("SELECT * FROM users")
rows = await result.fetchall()
# With autocommit
async with db.connect(autocommit=True) as conn:
await conn.execute("CREATE DATABASE newdb")
cursor()¶
Async cursor context manager with dict rows.
async with db.cursor() as cur:
await cur.execute("SELECT * FROM users WHERE id = %s", [1])
user = await cur.fetchone() # Returns dict
print(user['name'])
transaction()¶
Transaction context manager with automatic commit/rollback.
async with db.transaction() as conn:
await conn.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
await conn.execute("UPDATE stats SET count = count + 1")
# Commits automatically on success
# Rolls back automatically on exception
Streaming Results¶
For large result sets, use streaming to avoid loading everything into memory.
async for row in db.stream("SELECT * FROM large_table", batch_size=1000):
process(row)
# With parameters
async for row in db.stream(
"SELECT * FROM events WHERE date > %s",
params=["2024-01-01"],
batch_size=5000
):
process(row)
Batch Operations¶
insert_many()¶
Efficiently insert multiple rows.
count = await db.insert_many("users", [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
{"name": "Charlie", "email": "charlie@example.com"},
])
print(f"Inserted {count} rows")
# With ON CONFLICT
await db.insert_many(
"users",
rows,
on_conflict="(email) DO NOTHING"
)
upsert_many()¶
Insert or update rows based on conflict columns.
count = await db.upsert_many(
"users",
[
{"id": 1, "name": "Alice Updated", "email": "alice@new.com"},
{"id": 2, "name": "Bob Updated", "email": "bob@new.com"},
],
conflict_columns=["id"],
update_columns=["name", "email"]
)
LISTEN/NOTIFY¶
PostgreSQL’s pub/sub mechanism for real-time notifications.
notify()¶
Send a notification.
import json
# Simple notification
await db.notify("events", "user_created")
# JSON payload
await db.notify("events", json.dumps({
"type": "user_created",
"id": 1,
"name": "Alice"
}))
listen()¶
Listen for notifications.
import json
async for payload in db.listen("events"):
event = json.loads(payload)
print(f"Received: {event}")
if event["type"] == "user_created":
await handle_user_created(event["id"])
Schema Operations¶
All schema exploration methods are available asynchronously.
# Schemas
schemas = await db.list_schemas()
exists = await db.schema_exists("app")
await db.create_schema("new_schema")
# Tables
tables = await db.list_tables("public")
exists = await db.table_exists("users")
info = await db.table_info("users") # Column details
# Returns: column_name, data_type, is_nullable, column_default, etc.
# Lightweight column introspection
cols = await db.list_columns("users")
# ['id', 'name', 'email']
types = await db.columns_with_types("users")
# [('id', 'integer'), ('name', 'text')]
count = await db.row_count("users")
# Extensions
extensions = await db.list_extensions()
has_postgis = await db.has_extension("postgis")
await db.create_extension("uuid-ossp")
# Roles
roles = await db.list_roles()
exists = await db.role_exists("appuser")
# Size
size = await db.size()
table_size = await db.table_size("users")
DataFrame Operations¶
AsyncDatabase supports pandas and geopandas operations using run_sync internally, since pandas/geopandas are synchronous libraries.
import pandas as pd
import geopandas as gpd
# Read table to DataFrame
df = await db.to_dataframe("users")
# Write DataFrame to table
await db.from_dataframe(df, "users", if_exists="replace")
# Read spatial table to GeoDataFrame
gdf = await db.to_geodataframe("cities", geometry_column="location")
# Write GeoDataFrame to spatial table
await db.from_geodataframe(
gdf,
"cities",
if_exists="replace",
spatial_index=True # Create spatial index automatically
)
Note: DataFrame operations use
run_syncinternally to execute synchronous pandas/geopandas code in an async context. This is necessary because pandas and geopandas are not async-aware libraries.
Admin Operations¶
Full administrative capabilities are available asynchronously.
# Create table with columns
await db.create_table("products", {
"id": "SERIAL PRIMARY KEY",
"name": "TEXT NOT NULL",
"price": "DECIMAL(10,2)"
})
# Drop table
await db.drop_table("old_table")
# Create index
await db.create_index("products", "name", unique=True)
# Drop index
await db.drop_index("products_name_idx")
# List indexes
indexes = await db.list_indexes("products")
# List constraints
constraints = await db.list_constraints("products")
# Drop schema with cascade
await db.drop_schema("old_schema", cascade=True)
# Get table sizes
sizes = await db.table_sizes("public")
# Returns list of dicts with table names and sizes
Maintenance Operations¶
Database maintenance operations are fully async.
# Vacuum table (with analyze)
await db.vacuum("large_table", analyze=True)
# Analyze table statistics
await db.analyze("products")
# Explain query plan
plan = await db.explain(
"SELECT * FROM users WHERE created_at > %s",
params=["2024-01-01"],
analyze=True
)
for line in plan:
print(line)
Backup & Restore Operations¶
Backup operations use asyncio.create_subprocess_exec internally to run pg_dump/pg_restore asynchronously.
# Dump database to file (custom format)
await db.pg_dump("backup.dump")
# Restore database from file
await db.pg_restore("backup.dump", clean=True)
# Export table to CSV
rows_exported = await db.copy_to_csv("users", "users.csv")
# Import table from CSV
rows_imported = await db.copy_from_csv("users", "users.csv")
Note: pg_dump and pg_restore require the PostgreSQL client tools to be installed on the system.
Database Lifecycle¶
Create and drop databases asynchronously.
# Create new database
await db.create_database("analytics", owner="analyst")
# Drop database
await db.drop_database("old_db")
Note: These operations require autocommit mode and appropriate privileges.
Role Management¶
Full role and privilege management asynchronously.
# Create role with password
await db.create_role("appuser", password="secret123", login=True)
# Drop role
await db.drop_role("old_user")
# Alter role password
await db.alter_role("appuser", password="newsecret")
# Grant table privileges
await db.grant("SELECT", "users", "appuser")
await db.grant("INSERT,UPDATE", "orders", "appuser")
# Revoke privileges
await db.revoke("DELETE", "users", "appuser")
# Grant role membership
await db.grant_role("admin", "appuser", with_admin=False)
# Revoke role membership
await db.revoke_role("admin", "appuser")
# List role members
members = await db.list_role_members("admin")
# List role grants
grants = await db.list_role_grants("appuser")
PostGIS Operations¶
PostGIS spatial operations are available asynchronously.
# Create spatial index
await db.create_spatial_index("cities", column="location")
# List geometry columns
geom_cols = await db.list_geometry_columns()
# Returns: table_name, column_name, srid, type
TimescaleDB Operations¶
TimescaleDB hypertable and policy management asynchronously.
# Create hypertable
await db.create_hypertable("metrics", "timestamp", chunk_time_interval="1 day")
# Enable compression
await db.enable_compression(
"metrics",
segment_by="device_id",
order_by="timestamp DESC"
)
# Add compression policy
await db.add_compression_policy("metrics", compress_after="7 days")
# Add retention policy
await db.add_retention_policy("metrics", drop_after="90 days")
# List hypertables
hypertables = await db.list_hypertables()
# Get hypertable info
info = await db.hypertable_info("metrics")
Complete Example¶
import asyncio
import json
from pycopg import AsyncDatabase
async def main():
# Connect
db = AsyncDatabase.from_env()
try:
# Create table
await db.execute("""
CREATE TABLE IF NOT EXISTS events (
id SERIAL PRIMARY KEY,
type TEXT NOT NULL,
data JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
# Insert data
await db.insert_many("events", [
{"type": "user.created", "data": {"name": "Alice"}},
{"type": "user.updated", "data": {"name": "Alice B."}},
])
# Query
recent = await db.execute("""
SELECT * FROM events
WHERE created_at > NOW() - INTERVAL '1 hour'
ORDER BY created_at DESC
LIMIT 10
""")
for event in recent:
print(f"{event['type']}: {event['data']}")
# Stream large results
async for row in db.stream("SELECT * FROM events", batch_size=100):
process_event(row)
finally:
await db.close()
if __name__ == "__main__":
asyncio.run(main())