Database¶
pg.zig native PostgreSQL driver -- 2-5x faster than psycopg3. Binary protocol, prepared statement caching, connection pooling, COPY protocol, pipelined queries.
HyperDjango uses a single database backend: the pg.zig native Zig extension compiled into _hyperdjango_native.so. There are no fallbacks to asyncpg, psycopg2, or any other Python driver. If the native extension is not built, database operations fail immediately with a clear build instruction.
Connection¶
Database URL Format¶
Connections use standard PostgreSQL connection URLs:
All components are optional with sensible defaults:
| Component | Default | Description |
|---|---|---|
user |
OS username ($USER) |
PostgreSQL role |
password |
(none) | Role password |
host |
localhost |
Server hostname or IP |
port |
5432 |
Server port |
dbname |
(required) | Database name |
Examples:
# Full URL
db = Database("postgres://myuser:secret@db.example.com:5432/myapp")
# Minimal — uses OS username, localhost, default port
db = Database("postgres://localhost/myapp")
# With password, default host
db = Database("postgres://admin:hunter2@localhost/myapp")
If no username is provided in the URL, HyperDjango reads $USER (or $USERNAME on Windows), falling back to postgres.
Creating a Connection¶
from hyperdjango.database import Database
db = Database("postgres://user:pass@localhost/mydb", max_size=32)
await db.connect()
Automatic Connection (Recommended)¶
When using HyperApp, database connections are managed automatically. Pass database= to the constructor and the pool is created lazily on first query:
from hyperdjango import HyperApp
app = HyperApp(
title="My App",
database="postgres://localhost/mydb", # Auto-connect on first query
)
@app.get("/users")
async def list_users(request):
# get_db() is called automatically — pool created on first use
users = await User.objects.all()
return [u.model_dump() for u in users]
The DATABASE_URL is stored in the settings system. get_db() lazily creates the pool the first time any model query runs. There is no need to manually call connect() or set_db().
Unified Connection Pool¶
HyperDjango uses a single shared connection pool for both the Zig HTTP server and the Python ORM. When app.run() starts the Zig server:
get_db()creates the pool (pg.zig native pool, 32 connections default)- The pool handle is shared with the Zig server via
configure_db_handle() - All database operations — Zig-native model routes and Python ORM queries — use the same pool
This eliminates connection competition and maximizes pool utilization across the 24-thread Zig worker pool.
Database Class API¶
Constructor parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
url |
str |
(required) | PostgreSQL connection URL |
min_size |
int |
2 |
Minimum connections in pool |
max_size |
int |
32 |
Maximum connections in pool (should be >= Zig thread count) |
Connection methods:
| Method | Signature | Description |
|---|---|---|
connect |
async def connect(self) -> None |
Create the connection pool. Calls _db_configure in pg.zig. Idempotent -- second call is a no-op. |
disconnect |
async def disconnect(self) -> None |
Close all connections and release the pool. Safe to call multiple times. |
Properties:
| Property | Type | Description |
|---|---|---|
is_connected |
bool |
True if pool is active |
backend |
str |
Returns "pgzig" when connected, "none" otherwise |
url |
str |
Connection URL passed to constructor |
min_size |
int |
Minimum pool size |
max_size |
int |
Maximum pool size |
Global Database Instance¶
HyperDjango maintains a single global database instance via get_db():
from hyperdjango.database import get_db
db = get_db() # Auto-creates from DATABASE_URL on first call
get_db() behavior:
- First call: reads
DATABASE_URLfrom settings, creates aDatabaseinstance, connects the pool, returns it - Subsequent calls: returns the cached instance (thread-safe, double-check locking)
- No DATABASE_URL: raises
RuntimeErrorwith a clear message
The HyperApp class pushes database= into DATABASE_URL automatically:
from hyperdjango import HyperApp
app = HyperApp(database_url="postgres://localhost/myapp")
# db is connected and set_db() called during app startup
Queries¶
query -- Multiple Rows¶
Execute a SQL query and return all rows as a list of dictionaries. Column names come from _db_get_last_columns() which returns list[tuple[str, int]] (name, OID pairs).
# All users over 18
rows = await db.query("SELECT * FROM users WHERE age > $1", 18)
# [{"id": 1, "name": "Alice", "age": 30}, {"id": 2, "name": "Bob", "age": 25}]
# With multiple parameters
rows = await db.query(
"SELECT * FROM orders WHERE user_id = $1 AND status = $2",
user_id, "shipped"
)
# Empty result returns empty list
rows = await db.query("SELECT * FROM users WHERE id = $1", 999999)
# []
Parameters use $1, $2, ... positional placeholders (PostgreSQL native protocol), not %s or ?.
query_one -- Single Row¶
Execute a query and return the first row as a dictionary, or None if no rows match.
user = await db.query_one("SELECT * FROM users WHERE id = $1", 1)
if user:
print(user["name"]) # "Alice"
# Returns None for no match
missing = await db.query_one("SELECT * FROM users WHERE id = $1", 999999)
assert missing is None
query_val -- Single Scalar¶
Execute a query and return the first column of the first row. Ideal for aggregates and existence checks.
count = await db.query_val("SELECT COUNT(*) FROM users")
# 42
exists = await db.query_val(
"SELECT EXISTS(SELECT 1 FROM users WHERE email = $1)", email
)
# True or False
max_age = await db.query_val("SELECT MAX(age) FROM users")
# 98
execute -- INSERT/UPDATE/DELETE¶
Execute a statement that modifies data. Returns a status string like "COMMAND 1" where the number is the affected row count.
# Insert
result = await db.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# "COMMAND 1"
# Update
result = await db.execute(
"UPDATE users SET active = $1 WHERE last_login < $2",
False, cutoff_date
)
# "COMMAND 15"
# Delete
result = await db.execute(
"DELETE FROM sessions WHERE expires_at < NOW()"
)
# "COMMAND 203"
execute_many -- Batch Execution¶
Execute the same statement with multiple parameter sets. Each set is executed sequentially.
await db.execute_many(
"INSERT INTO tags (name) VALUES ($1)",
[("python",), ("zig",), ("postgresql",)]
)
Parameter Types¶
Parameters are converted to PostgreSQL wire types automatically:
| Python Type | PostgreSQL Type | Example |
|---|---|---|
int |
int4 / int8 |
42 |
float |
float8 |
3.14 |
str |
text |
"hello" |
bool |
bool |
True |
None |
NULL |
None |
bytes |
bytea |
b"\x00\x01" |
datetime |
timestamptz |
datetime.now() |
date |
date |
date.today() |
UUID |
uuid |
uuid.uuid4() |
Decimal |
numeric |
Decimal("19.99") |
list[int] |
int4[] |
[1, 2, 3] |
list[str] |
text[] |
["a", "b"] |
dict |
jsonb |
{"key": "val"} |
Query Plan Analysis (EXPLAIN)¶
Analyze query performance with the native db.explain() API. Returns structured results with plan tree parsing, index usage detection, and execution timing.
Basic Usage¶
# Plan without executing
result = await db.explain("SELECT * FROM users WHERE id = $1", 1)
print(result.plan.node_type) # "Index Scan"
print(result.text) # Full text plan
# With execution timing
result = await db.explain("SELECT * FROM users", analyze=True)
print(result.execution_time) # 0.042 ms
# With buffer stats
result = await db.explain("SELECT * FROM posts ORDER BY hot_score DESC LIMIT 30",
analyze=True, buffers=True)
print(result.execution_time) # 0.07 ms
ExplainResult API¶
result = await db.explain(sql, *args, analyze=True)
result.text # Full text plan output
result.plan # Root ExplainNode (structured tree)
result.execution_time # Execution time in ms (analyze only)
result.planning_time # Planning time in ms (analyze only)
result.analyzed # Whether ANALYZE was used
# Convenience properties
result.has_seq_scan # True if any sequential scan on tables
result.seq_scan_tables # ["users", "posts"] — tables using seq scan
result.index_scans # [ExplainNode(...)] — all index scan nodes
result.all_nodes # Flat list of all plan tree nodes
ExplainNode Properties¶
node = result.plan
node.node_type # "Index Scan", "Seq Scan", "Limit", etc.
node.relation # Table name ("users")
node.index_name # Index used ("idx_users_email")
node.actual_rows # Rows returned (analyze only)
node.actual_total_time # Time in ms (analyze only)
node.shared_hit_blocks # Buffer cache hits
node.shared_read_blocks # Disk reads
node.children # Child nodes in plan tree
node.is_seq_scan # True if Seq Scan
node.is_index_scan # True if Index Scan / Index Only Scan
Performance Testing¶
# Assert index usage in tests
result = await db.explain(
"SELECT * FROM posts WHERE is_deleted = false ORDER BY hot_score DESC LIMIT 30",
analyze=True,
)
assert not result.has_seq_scan, f"Seq scan on: {result.seq_scan_tables}"
assert result.execution_time < 5.0, f"Too slow: {result.execution_time}ms"
Transactions¶
Basic Transaction¶
async with db.transaction():
await db.execute("INSERT INTO orders (user_id) VALUES ($1)", user_id)
await db.execute("UPDATE inventory SET stock = stock - 1 WHERE id = $1", item_id)
# Auto-commits on success, rolls back on exception
Nested Transactions (Savepoints)¶
Nested transaction() calls use PostgreSQL savepoints. Inner failures only roll back to the savepoint, not the outer transaction:
async with db.transaction(): # BEGIN
await db.execute("INSERT INTO users ...")
try:
async with db.transaction(): # SAVEPOINT sp_2
await db.execute("INSERT INTO audit ...")
raise ValueError("oops")
except ValueError:
pass # ROLLBACK TO SAVEPOINT sp_2
# Outer transaction continues
await db.execute("INSERT INTO logs ...")
# COMMIT (users + logs committed, audit rolled back)
Named Savepoints¶
async with db.transaction(savepoint_name="my_save"):
await db.execute("INSERT INTO users ...")
# Uses SAVEPOINT my_save / RELEASE SAVEPOINT my_save
atomic() Alias¶
db.atomic() is a Django-compatible alias for db.transaction():
Transaction Nesting Depth¶
Transaction depth is tracked per-thread via threading.local(). The outermost call issues BEGIN/COMMIT, inner calls issue SAVEPOINT/RELEASE SAVEPOINT. This is fully safe under free-threaded Python 3.14t.
Connection Pooling¶
pg.zig manages a connection pool internally with the following behavior:
- Pool sizing: configurable via
min_sizeandmax_size(default 2-10) - Thread-owned connections: each worker thread acquires its own connection, eliminating contention on the hot path
- Automatic health checks: connections are validated before use
- Graceful shutdown:
disconnect()drains all active connections
Pool Statistics¶
stats = db.pool_stats()
# Returns dict[str, int]:
# {
# "total": 10, # Total connections in pool
# "idle": 8, # Available connections
# "busy": 2, # In-use connections
# "stmt_cache_size": 42 # Cached prepared statements
# }
Recommended Pool Sizing¶
| Deployment | max_size |
Rationale |
|---|---|---|
| Development | 5 |
Single developer, low concurrency |
| Small app (2-4 workers) | 10 |
Default, good for most apps |
| Medium app (8-16 workers) | 20-30 |
max_size per worker, shared across threads |
| Large app (32+ workers) | 50-100 |
Watch max_connections in postgresql.conf |
PostgreSQL's default max_connections is 100. Across all application instances, your total max_size should stay well below this limit.
Prepared Statement Caching¶
pg.zig automatically caches prepared statements. When you execute the same SQL string repeatedly, the Parse phase is skipped on subsequent calls:
# First call: Parse + Bind + Execute
user = await db.query_one("SELECT * FROM users WHERE id = $1", 1)
# Second call: Bind + Execute only (Parse skipped)
user = await db.query_one("SELECT * FROM users WHERE id = $1", 2)
Performance impact: 33% faster on repeated queries. First-query warmup is 7.7x faster (494us to 65us) when the statement is already cached.
The cache is thread-safe, protected by mutexes in the Zig layer. Cache size is reported in pool_stats()["stmt_cache_size"].
Connection Pipelining¶
Execute multiple independent queries in a single network round-trip:
results = await db.pipeline([
"SELECT COUNT(*) FROM users",
"SELECT COUNT(*) FROM orders",
"SELECT COUNT(*) FROM products",
])
# [[( 1542,)], [(8923,)], [(456,)]]
Performance: 20 queries in 0.24ms vs 1.40ms sequential (5.74x faster). Pipeline sends all queries in one TCP write and reads all responses in one TCP read.
Note: Pipeline returns raw tuples, not dicts. Each result is a list[tuple] matching the query's columns.
COPY Protocol¶
PostgreSQL's COPY protocol for bulk data loading. 42.8x faster than row-by-row INSERT:
# Bulk import from CSV data
csv_data = "Alice,30\nBob,25\nCharlie,35\n"
await db.execute(
"COPY users (name, age) FROM STDIN WITH (FORMAT csv)",
csv_data
)
Benchmark: 536K rows/sec via COPY vs 12K rows/sec via INSERT. Use COPY for any bulk import of 1,000+ rows.
Supported Types¶
30+ PostgreSQL types with native binary decoding in the Zig layer. No Python-side type conversion overhead.
| Category | PostgreSQL Types | Python Type |
|---|---|---|
| Integer | int2 (smallint) |
int |
int4 (integer) |
int |
|
int8 (bigint) |
int |
|
| Float | float4 (real) |
float |
float8 (double) |
float |
|
| Text | text, varchar, char, name |
str |
| Boolean | bool |
bool |
| Date/Time | timestamp |
datetime (naive) |
timestamptz |
datetime (aware, UTC) |
|
date |
date |
|
time |
time |
|
timetz |
time (with tzinfo) |
|
interval |
timedelta |
|
| Binary | bytea |
bytes |
| JSON | json, jsonb |
dict / list / scalar |
| UUID | uuid |
UUID |
| Numeric | numeric / decimal |
Decimal |
| Money | money |
str (formatted) |
| Network | inet |
str |
cidr |
str |
|
| Bit | bit, varbit |
str |
| XML | xml |
str |
| Replication | pg_lsn |
str |
| Key-Value | hstore |
dict[str, str] |
| Arrays | int4[], int8[] |
list[int] |
text[], varchar[], name[] |
list[str] |
|
bool[] |
list[bool] |
|
float4[], float8[] |
list[float] |
|
timestamp[], timestamptz[] |
list[datetime] |
|
| Custom | enum types | str |
Custom Enum Types¶
PostgreSQL custom enum types are supported via dynamic OID registration:
# Auto-discover all enum types in the database
await db.execute("CREATE TYPE mood AS ENUM ('happy', 'sad', 'neutral')")
# pg.zig registers the OID automatically when it encounters the type
row = await db.query_one("SELECT 'happy'::mood AS current_mood")
# {"current_mood": "happy"} -- returned as str
The discover_enums() mechanism scans pg_type for all custom enum OIDs and registers them in the type decoder map. This happens automatically on first encounter.
Connection Lifecycle¶
Database("postgres://...") # 1. Create instance (no connection)
|
await db.connect() # 2. Call _db_configure() in pg.zig
| # - Parse URL, resolve host
| # - TCP connect (non-blocking + poll)
| # - PostgreSQL startup handshake
| # - Allocate connection pool
| # - Returns pool handle (int)
|
await db.query(...) # 3. Acquire connection from pool
| # - Check prepared statement cache
| # - Send Parse/Bind/Execute (binary protocol)
| # - Decode response tuples
| # - Return connection to pool
|
await db.disconnect() # 4. Call _db_close_pool()
# - Drain active connections
# - Free pool memory
Connection Timeouts¶
pg.zig uses non-blocking TCP connect with poll for connection establishment. The timeout is set to 10,000ms by default (the third parameter to _db_configure). PostgreSQL's statement_timeout can be set via startup parameters for query-level timeouts.
Error Handling¶
Connection Errors¶
from hyperdjango.database import Database
db = Database("postgres://localhost/nonexistent_db")
try:
await db.connect()
except RuntimeError as e:
# Native extension not available
print(e)
# "Native extension not available. Build it:
# uv run hyper-build"
# pg.zig connection failures raise RuntimeError with PostgreSQL error message
Query Errors¶
# SQL syntax error
try:
await db.query("SELEC * FROM users") # typo
except Exception as e:
print(e) # PostgreSQL error: syntax error at or near "SELEC"
# Constraint violation
try:
await db.execute(
"INSERT INTO users (email) VALUES ($1)",
"duplicate@example.com"
)
except Exception as e:
print(e) # duplicate key value violates unique constraint
# Type mismatch
try:
await db.query("SELECT * FROM users WHERE id = $1", "not_an_int")
except Exception as e:
print(e) # invalid input syntax for type integer
Pool Exhaustion¶
If all connections are busy, query/execute blocks until a connection is available. Design your max_size to accommodate peak concurrency.
# Check pool state before heavy operations
stats = db.pool_stats()
if stats["idle"] == 0:
logger.warning("All connections busy, queries may queue")
Disconnected State¶
All query methods check pool state and raise immediately if not connected:
db = Database("postgres://localhost/mydb")
# Forgot to call connect()
await db.query("SELECT 1")
# RuntimeError: Database not connected. Call await db.connect() first.
DataLoader -- N+1 Prevention¶
The DataLoader batches and deduplicates async database lookups within a single event loop tick:
from hyperdjango.dataloader import DataLoader
async def batch_load_users(keys: list[int]) -> list[dict | None]:
"""Load users by ID in a single query."""
rows = await db.query(
"SELECT * FROM users WHERE id = ANY($1)",
list(keys)
)
by_id = {r["id"]: r for r in rows}
return [by_id.get(k) for k in keys]
loader = DataLoader(batch_fn=batch_load_users, max_batch_size=100)
# These three calls are batched into ONE query:
user1, user2, user3 = await asyncio.gather(
loader.load(1),
loader.load(2),
loader.load(3),
)
DataLoader API¶
@dataclass
class DataLoader:
batch_fn: Callable[[list[K]], Awaitable[list[V]]]
max_batch_size: int = 100
cache_enabled: bool = True
| Method | Signature | Description |
|---|---|---|
load |
async def load(self, key: K) -> V |
Load single value, batched with other calls in same tick |
load_many |
async def load_many(self, keys: list[K]) -> list[V] |
Load multiple values, all batched together |
prime |
def prime(self, key: K, value: V) -> None |
Pre-populate cache with a known value |
clear |
def clear(self, key: K = None) -> None |
Clear cache (single key or all) |
DataLoader with Pipeline¶
For maximum performance, combine DataLoader with connection pipelining:
async def batch_load_with_pipeline(keys: list[int]) -> list[dict | None]:
queries = [
f"SELECT * FROM users WHERE id = {k}" for k in keys
]
results = await db.pipeline(queries)
return [r[0] if r else None for r in results]
loader = DataLoader(batch_fn=batch_load_with_pipeline)
Multi-Database¶
ConnectionManager¶
Manage multiple named database connections with automatic routing:
from hyperdjango.multi_db import ConnectionManager, PrimaryReplicaRouter
connections = ConnectionManager()
await connections.configure({
"default": "postgres://primary/myapp",
"replica": "postgres://replica/myapp",
"analytics": {
"url": "postgres://analytics-host/warehouse",
"min_size": 2,
"max_size": 20,
},
})
Database Routing¶
Route reads to replicas and writes to the primary:
from hyperdjango.multi_db import DatabaseRouter, PrimaryReplicaRouter
# Built-in router: reads to "replica", writes to "default"
connections.router = PrimaryReplicaRouter()
# Custom router
class MyRouter(DatabaseRouter):
def db_for_read(self, model):
if model.__name__ == "AnalyticsEvent":
return "analytics"
return "replica"
def db_for_write(self, model):
return "default"
connections.router = MyRouter()
Explicit Database Selection¶
Override routing for specific queries:
# Reads go to replica by default
users = await User.objects.all() # replica
# Writes go to primary by default
await User.objects.create(name="Alice") # primary
# Explicit selection
users = await User.objects.using("replica").all() # forced replica
events = await Event.objects.using("analytics").all() # analytics DB
Per-Model Database Binding¶
Bind a model to a specific database in its Meta class:
class AnalyticsEvent(Model):
class Meta:
table = "events"
database = "analytics" # Always uses "analytics" connection
Performance Reference¶
| Operation | pg.zig | psycopg3 | Speedup |
|---|---|---|---|
| SELECT by PK | 21K ops/s | 10K ops/s | 2.06x |
| SELECT range | 4.18x faster | baseline | 4.18x |
| UPDATE | 1.52x faster | baseline | 1.52x |
| COPY bulk import | 536K rows/s | 12K rows/s (INSERT) | 42.8x |
| SELECT 50 rows (micro) | 69us | 25ms | 365x |
| Pipeline 20 queries | 0.24ms | 1.40ms (sequential) | 5.74x |
| Prepared stmt warmup | 65us | 494us | 7.7x |
| Prepared stmt repeat | 33% faster | baseline | 1.33x |
All benchmarks on PostgreSQL 16, Apple M-series, single connection.