Skip to content

Database

pg.zig native PostgreSQL driver -- 2-5x faster than psycopg3. Binary protocol, prepared statement caching, connection pooling, COPY protocol, pipelined queries.

HyperDjango uses a single database backend: the pg.zig native Zig extension compiled into _hyperdjango_native.so. There are no fallbacks to asyncpg, psycopg2, or any other Python driver. If the native extension is not built, database operations fail immediately with a clear build instruction.

Connection

Database URL Format

Connections use standard PostgreSQL connection URLs:

postgres://user:password@host:port/dbname

All components are optional with sensible defaults:

Component Default Description
user OS username ($USER) PostgreSQL role
password (none) Role password
host localhost Server hostname or IP
port 5432 Server port
dbname (required) Database name

Examples:

# Full URL
db = Database("postgres://myuser:secret@db.example.com:5432/myapp")

# Minimal — uses OS username, localhost, default port
db = Database("postgres://localhost/myapp")

# With password, default host
db = Database("postgres://admin:hunter2@localhost/myapp")

If no username is provided in the URL, HyperDjango reads $USER (or $USERNAME on Windows), falling back to postgres.

Creating a Connection

from hyperdjango.database import Database

db = Database("postgres://user:pass@localhost/mydb", max_size=32)
await db.connect()

When using HyperApp, database connections are managed automatically. Pass database= to the constructor and the pool is created lazily on first query:

from hyperdjango import HyperApp

app = HyperApp(
    title="My App",
    database="postgres://localhost/mydb",  # Auto-connect on first query
)

@app.get("/users")
async def list_users(request):
    # get_db() is called automatically — pool created on first use
    users = await User.objects.all()
    return [u.model_dump() for u in users]

The DATABASE_URL is stored in the settings system. get_db() lazily creates the pool the first time any model query runs. There is no need to manually call connect() or set_db().

Unified Connection Pool

HyperDjango uses a single shared connection pool for both the Zig HTTP server and the Python ORM. When app.run() starts the Zig server:

  1. get_db() creates the pool (pg.zig native pool, 32 connections default)
  2. The pool handle is shared with the Zig server via configure_db_handle()
  3. All database operations — Zig-native model routes and Python ORM queries — use the same pool

This eliminates connection competition and maximizes pool utilization across the 24-thread Zig worker pool.

Database Class API

class Database:
    def __init__(self, url: str, min_size: int = 2, max_size: int = 32): ...

Constructor parameters:

Parameter Type Default Description
url str (required) PostgreSQL connection URL
min_size int 2 Minimum connections in pool
max_size int 32 Maximum connections in pool (should be >= Zig thread count)

Connection methods:

Method Signature Description
connect async def connect(self) -> None Create the connection pool. Calls _db_configure in pg.zig. Idempotent -- second call is a no-op.
disconnect async def disconnect(self) -> None Close all connections and release the pool. Safe to call multiple times.

Properties:

Property Type Description
is_connected bool True if pool is active
backend str Returns "pgzig" when connected, "none" otherwise
url str Connection URL passed to constructor
min_size int Minimum pool size
max_size int Maximum pool size

Global Database Instance

HyperDjango maintains a single global database instance via get_db():

from hyperdjango.database import get_db

db = get_db()  # Auto-creates from DATABASE_URL on first call

get_db() behavior:

  • First call: reads DATABASE_URL from settings, creates a Database instance, connects the pool, returns it
  • Subsequent calls: returns the cached instance (thread-safe, double-check locking)
  • No DATABASE_URL: raises RuntimeError with a clear message

The HyperApp class pushes database= into DATABASE_URL automatically:

from hyperdjango import HyperApp

app = HyperApp(database_url="postgres://localhost/myapp")
# db is connected and set_db() called during app startup

Queries

query -- Multiple Rows

async def query(self, sql: str, *args) -> list[dict[str, Any]]

Execute a SQL query and return all rows as a list of dictionaries. Column names come from _db_get_last_columns() which returns list[tuple[str, int]] (name, OID pairs).

# All users over 18
rows = await db.query("SELECT * FROM users WHERE age > $1", 18)
# [{"id": 1, "name": "Alice", "age": 30}, {"id": 2, "name": "Bob", "age": 25}]

# With multiple parameters
rows = await db.query(
    "SELECT * FROM orders WHERE user_id = $1 AND status = $2",
    user_id, "shipped"
)

# Empty result returns empty list
rows = await db.query("SELECT * FROM users WHERE id = $1", 999999)
# []

Parameters use $1, $2, ... positional placeholders (PostgreSQL native protocol), not %s or ?.

query_one -- Single Row

async def query_one(self, sql: str, *args) -> dict[str, Any] | None

Execute a query and return the first row as a dictionary, or None if no rows match.

user = await db.query_one("SELECT * FROM users WHERE id = $1", 1)
if user:
    print(user["name"])  # "Alice"

# Returns None for no match
missing = await db.query_one("SELECT * FROM users WHERE id = $1", 999999)
assert missing is None

query_val -- Single Scalar

async def query_val(self, sql: str, *args) -> Any | None

Execute a query and return the first column of the first row. Ideal for aggregates and existence checks.

count = await db.query_val("SELECT COUNT(*) FROM users")
# 42

exists = await db.query_val(
    "SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)", email
)
# True or False

max_age = await db.query_val("SELECT MAX(age) FROM users")
# 98

execute -- INSERT/UPDATE/DELETE

async def execute(self, sql: str, *args) -> str

Execute a statement that modifies data. Returns a status string like "COMMAND 1" where the number is the affected row count.

# Insert
result = await db.execute(
    "INSERT INTO users (name, email) VALUES ($1, $2)",
    "Alice", "alice@example.com"
)
# "COMMAND 1"

# Update
result = await db.execute(
    "UPDATE users SET active = $1 WHERE last_login < $2",
    False, cutoff_date
)
# "COMMAND 15"

# Delete
result = await db.execute(
    "DELETE FROM sessions WHERE expires_at < NOW()"
)
# "COMMAND 203"

execute_many -- Batch Execution

async def execute_many(self, sql: str, args_list: list[tuple]) -> None

Execute the same statement with multiple parameter sets. Each set is executed sequentially.

await db.execute_many(
    "INSERT INTO tags (name) VALUES ($1)",
    [("python",), ("zig",), ("postgresql",)]
)

Parameter Types

Parameters are converted to PostgreSQL wire types automatically:

Python Type PostgreSQL Type Example
int int4 / int8 42
float float8 3.14
str text "hello"
bool bool True
None NULL None
bytes bytea b"\x00\x01"
datetime timestamptz datetime.now()
date date date.today()
UUID uuid uuid.uuid4()
Decimal numeric Decimal("19.99")
list[int] int4[] [1, 2, 3]
list[str] text[] ["a", "b"]
dict jsonb {"key": "val"}

Query Plan Analysis (EXPLAIN)

Analyze query performance with the native db.explain() API. Returns structured results with plan tree parsing, index usage detection, and execution timing.

Basic Usage

# Plan without executing
result = await db.explain("SELECT * FROM users WHERE id = $1", 1)
print(result.plan.node_type)  # "Index Scan"
print(result.text)            # Full text plan

# With execution timing
result = await db.explain("SELECT * FROM users", analyze=True)
print(result.execution_time)  # 0.042 ms

# With buffer stats
result = await db.explain("SELECT * FROM posts ORDER BY hot_score DESC LIMIT 30",
                          analyze=True, buffers=True)
print(result.execution_time)  # 0.07 ms

ExplainResult API

result = await db.explain(sql, *args, analyze=True)

result.text              # Full text plan output
result.plan              # Root ExplainNode (structured tree)
result.execution_time    # Execution time in ms (analyze only)
result.planning_time     # Planning time in ms (analyze only)
result.analyzed          # Whether ANALYZE was used

# Convenience properties
result.has_seq_scan      # True if any sequential scan on tables
result.seq_scan_tables   # ["users", "posts"] — tables using seq scan
result.index_scans       # [ExplainNode(...)] — all index scan nodes
result.all_nodes         # Flat list of all plan tree nodes

ExplainNode Properties

node = result.plan
node.node_type           # "Index Scan", "Seq Scan", "Limit", etc.
node.relation            # Table name ("users")
node.index_name          # Index used ("idx_users_email")
node.actual_rows         # Rows returned (analyze only)
node.actual_total_time   # Time in ms (analyze only)
node.shared_hit_blocks   # Buffer cache hits
node.shared_read_blocks  # Disk reads
node.children            # Child nodes in plan tree
node.is_seq_scan         # True if Seq Scan
node.is_index_scan       # True if Index Scan / Index Only Scan

Performance Testing

# Assert index usage in tests
result = await db.explain(
    "SELECT * FROM posts WHERE is_deleted = false ORDER BY hot_score DESC LIMIT 30",
    analyze=True,
)
assert not result.has_seq_scan, f"Seq scan on: {result.seq_scan_tables}"
assert result.execution_time < 5.0, f"Too slow: {result.execution_time}ms"

Transactions

Basic Transaction

async with db.transaction():
    await db.execute("INSERT INTO orders (user_id) VALUES ($1)", user_id)
    await db.execute("UPDATE inventory SET stock = stock - 1 WHERE id = $1", item_id)
    # Auto-commits on success, rolls back on exception

Nested Transactions (Savepoints)

Nested transaction() calls use PostgreSQL savepoints. Inner failures only roll back to the savepoint, not the outer transaction:

async with db.transaction():                      # BEGIN
    await db.execute("INSERT INTO users ...")
    try:
        async with db.transaction():              # SAVEPOINT sp_2
            await db.execute("INSERT INTO audit ...")
            raise ValueError("oops")
    except ValueError:
        pass                                       # ROLLBACK TO SAVEPOINT sp_2
    # Outer transaction continues
    await db.execute("INSERT INTO logs ...")
# COMMIT (users + logs committed, audit rolled back)

Named Savepoints

async with db.transaction(savepoint_name="my_save"):
    await db.execute("INSERT INTO users ...")
    # Uses SAVEPOINT my_save / RELEASE SAVEPOINT my_save

atomic() Alias

db.atomic() is a Django-compatible alias for db.transaction():

async with db.atomic():
    await db.execute("INSERT INTO orders ...")

Transaction Nesting Depth

Transaction depth is tracked per-thread via threading.local(). The outermost call issues BEGIN/COMMIT, inner calls issue SAVEPOINT/RELEASE SAVEPOINT. This is fully safe under free-threaded Python 3.14t.

Connection Pooling

pg.zig manages a connection pool internally with the following behavior:

  • Pool sizing: configurable via min_size and max_size (default 2-10)
  • Thread-owned connections: each worker thread acquires its own connection, eliminating contention on the hot path
  • Automatic health checks: connections are validated before use
  • Graceful shutdown: disconnect() drains all active connections

Pool Statistics

stats = db.pool_stats()
# Returns dict[str, int]:
# {
#     "total": 10,        # Total connections in pool
#     "idle": 8,           # Available connections
#     "busy": 2,           # In-use connections
#     "stmt_cache_size": 42  # Cached prepared statements
# }
Deployment max_size Rationale
Development 5 Single developer, low concurrency
Small app (2-4 workers) 10 Default, good for most apps
Medium app (8-16 workers) 20-30 max_size per worker, shared across threads
Large app (32+ workers) 50-100 Watch max_connections in postgresql.conf

PostgreSQL's default max_connections is 100. Across all application instances, your total max_size should stay well below this limit.

Prepared Statement Caching

pg.zig automatically caches prepared statements. When you execute the same SQL string repeatedly, the Parse phase is skipped on subsequent calls:

# First call: Parse + Bind + Execute
user = await db.query_one("SELECT * FROM users WHERE id = $1", 1)

# Second call: Bind + Execute only (Parse skipped)
user = await db.query_one("SELECT * FROM users WHERE id = $1", 2)

Performance impact: 33% faster on repeated queries. First-query warmup is 7.7x faster (494us to 65us) when the statement is already cached.

The cache is thread-safe, protected by mutexes in the Zig layer. Cache size is reported in pool_stats()["stmt_cache_size"].

Connection Pipelining

Execute multiple independent queries in a single network round-trip:

async def pipeline(self, queries: list[str]) -> list[list[tuple]]
results = await db.pipeline([
    "SELECT COUNT(*) FROM users",
    "SELECT COUNT(*) FROM orders",
    "SELECT COUNT(*) FROM products",
])
# [[( 1542,)], [(8923,)], [(456,)]]

Performance: 20 queries in 0.24ms vs 1.40ms sequential (5.74x faster). Pipeline sends all queries in one TCP write and reads all responses in one TCP read.

Note: Pipeline returns raw tuples, not dicts. Each result is a list[tuple] matching the query's columns.

COPY Protocol

PostgreSQL's COPY protocol for bulk data loading. 42.8x faster than row-by-row INSERT:

# Bulk import from CSV data
csv_data = "Alice,30\nBob,25\nCharlie,35\n"
await db.execute(
    "COPY users (name, age) FROM STDIN WITH (FORMAT csv)",
    csv_data
)

Benchmark: 536K rows/sec via COPY vs 12K rows/sec via INSERT. Use COPY for any bulk import of 1,000+ rows.

Supported Types

30+ PostgreSQL types with native binary decoding in the Zig layer. No Python-side type conversion overhead.

Category PostgreSQL Types Python Type
Integer int2 (smallint) int
int4 (integer) int
int8 (bigint) int
Float float4 (real) float
float8 (double) float
Text text, varchar, char, name str
Boolean bool bool
Date/Time timestamp datetime (naive)
timestamptz datetime (aware, UTC)
date date
time time
timetz time (with tzinfo)
interval timedelta
Binary bytea bytes
JSON json, jsonb dict / list / scalar
UUID uuid UUID
Numeric numeric / decimal Decimal
Money money str (formatted)
Network inet str
cidr str
Bit bit, varbit str
XML xml str
Replication pg_lsn str
Key-Value hstore dict[str, str]
Arrays int4[], int8[] list[int]
text[], varchar[], name[] list[str]
bool[] list[bool]
float4[], float8[] list[float]
timestamp[], timestamptz[] list[datetime]
Custom enum types str

Custom Enum Types

PostgreSQL custom enum types are supported via dynamic OID registration:

# Auto-discover all enum types in the database
await db.execute("CREATE TYPE mood AS ENUM ('happy', 'sad', 'neutral')")

# pg.zig registers the OID automatically when it encounters the type
row = await db.query_one("SELECT 'happy'::mood AS current_mood")
# {"current_mood": "happy"}  -- returned as str

The discover_enums() mechanism scans pg_type for all custom enum OIDs and registers them in the type decoder map. This happens automatically on first encounter.

Connection Lifecycle

Database("postgres://...")      # 1. Create instance (no connection)
    |
await db.connect()              # 2. Call _db_configure() in pg.zig
    |                           #    - Parse URL, resolve host
    |                           #    - TCP connect (non-blocking + poll)
    |                           #    - PostgreSQL startup handshake
    |                           #    - Allocate connection pool
    |                           #    - Returns pool handle (int)
    |
await db.query(...)             # 3. Acquire connection from pool
    |                           #    - Check prepared statement cache
    |                           #    - Send Parse/Bind/Execute (binary protocol)
    |                           #    - Decode response tuples
    |                           #    - Return connection to pool
    |
await db.disconnect()           # 4. Call _db_close_pool()
                                #    - Drain active connections
                                #    - Free pool memory

Connection Timeouts

pg.zig uses non-blocking TCP connect with poll for connection establishment. The timeout is set to 10,000ms by default (the third parameter to _db_configure). PostgreSQL's statement_timeout can be set via startup parameters for query-level timeouts.

Error Handling

Connection Errors

from hyperdjango.database import Database

db = Database("postgres://localhost/nonexistent_db")
try:
    await db.connect()
except RuntimeError as e:
    # Native extension not available
    print(e)
    # "Native extension not available. Build it:
    #   uv run hyper-build"

# pg.zig connection failures raise RuntimeError with PostgreSQL error message

Query Errors

# SQL syntax error
try:
    await db.query("SELEC * FROM users")  # typo
except Exception as e:
    print(e)  # PostgreSQL error: syntax error at or near "SELEC"

# Constraint violation
try:
    await db.execute(
        "INSERT INTO users (email) VALUES ($1)",
        "duplicate@example.com"
    )
except Exception as e:
    print(e)  # duplicate key value violates unique constraint

# Type mismatch
try:
    await db.query("SELECT * FROM users WHERE id = $1", "not_an_int")
except Exception as e:
    print(e)  # invalid input syntax for type integer

Pool Exhaustion

If all connections are busy, query/execute blocks until a connection is available. Design your max_size to accommodate peak concurrency.

# Check pool state before heavy operations
stats = db.pool_stats()
if stats["idle"] == 0:
    logger.warning("All connections busy, queries may queue")

Disconnected State

All query methods check pool state and raise immediately if not connected:

db = Database("postgres://localhost/mydb")
# Forgot to call connect()
await db.query("SELECT 1")
# RuntimeError: Database not connected. Call await db.connect() first.

DataLoader -- N+1 Prevention

The DataLoader batches and deduplicates async database lookups within a single event loop tick:

from hyperdjango.dataloader import DataLoader

async def batch_load_users(keys: list[int]) -> list[dict | None]:
    """Load users by ID in a single query."""
    rows = await db.query(
        "SELECT * FROM users WHERE id = ANY($1)",
        list(keys)
    )
    by_id = {r["id"]: r for r in rows}
    return [by_id.get(k) for k in keys]

loader = DataLoader(batch_fn=batch_load_users, max_batch_size=100)

# These three calls are batched into ONE query:
user1, user2, user3 = await asyncio.gather(
    loader.load(1),
    loader.load(2),
    loader.load(3),
)

DataLoader API

@dataclass
class DataLoader:
    batch_fn: Callable[[list[K]], Awaitable[list[V]]]
    max_batch_size: int = 100
    cache_enabled: bool = True
Method Signature Description
load async def load(self, key: K) -> V Load single value, batched with other calls in same tick
load_many async def load_many(self, keys: list[K]) -> list[V] Load multiple values, all batched together
prime def prime(self, key: K, value: V) -> None Pre-populate cache with a known value
clear def clear(self, key: K = None) -> None Clear cache (single key or all)

DataLoader with Pipeline

For maximum performance, combine DataLoader with connection pipelining:

async def batch_load_with_pipeline(keys: list[int]) -> list[dict | None]:
    queries = [
        f"SELECT * FROM users WHERE id = {k}" for k in keys
    ]
    results = await db.pipeline(queries)
    return [r[0] if r else None for r in results]

loader = DataLoader(batch_fn=batch_load_with_pipeline)

Multi-Database

ConnectionManager

Manage multiple named database connections with automatic routing:

from hyperdjango.multi_db import ConnectionManager, PrimaryReplicaRouter

connections = ConnectionManager()
await connections.configure({
    "default": "postgres://primary/myapp",
    "replica": "postgres://replica/myapp",
    "analytics": {
        "url": "postgres://analytics-host/warehouse",
        "min_size": 2,
        "max_size": 20,
    },
})

Database Routing

Route reads to replicas and writes to the primary:

from hyperdjango.multi_db import DatabaseRouter, PrimaryReplicaRouter

# Built-in router: reads to "replica", writes to "default"
connections.router = PrimaryReplicaRouter()

# Custom router
class MyRouter(DatabaseRouter):
    def db_for_read(self, model):
        if model.__name__ == "AnalyticsEvent":
            return "analytics"
        return "replica"

    def db_for_write(self, model):
        return "default"

connections.router = MyRouter()

Explicit Database Selection

Override routing for specific queries:

# Reads go to replica by default
users = await User.objects.all()                    # replica

# Writes go to primary by default
await User.objects.create(name="Alice")             # primary

# Explicit selection
users = await User.objects.using("replica").all()   # forced replica
events = await Event.objects.using("analytics").all()  # analytics DB

Per-Model Database Binding

Bind a model to a specific database in its Meta class:

class AnalyticsEvent(Model):
    class Meta:
        table = "events"
        database = "analytics"  # Always uses "analytics" connection

Performance Reference

Operation pg.zig psycopg3 Speedup
SELECT by PK 21K ops/s 10K ops/s 2.06x
SELECT range 4.18x faster baseline 4.18x
UPDATE 1.52x faster baseline 1.52x
COPY bulk import 536K rows/s 12K rows/s (INSERT) 42.8x
SELECT 50 rows (micro) 69us 25ms 365x
Pipeline 20 queries 0.24ms 1.40ms (sequential) 5.74x
Prepared stmt warmup 65us 494us 7.7x
Prepared stmt repeat 33% faster baseline 1.33x

All benchmarks on PostgreSQL 16, Apple M-series, single connection.