Transactions¶
Atomic database operations with automatic savepoint nesting, connection pinning, and configurable isolation levels. All transaction management is backed by pg.zig (native Zig PostgreSQL driver).
Basic Transactions¶
Use db.transaction() as an async context manager. If the block completes normally, the transaction is committed. If an exception is raised, it is rolled back.
async with db.transaction():
user = await User.objects.create(name="Alice", email="alice@example.com")
await Profile.objects.create(user_id=user.id, bio="Hello!")
# Both committed together. If either fails, both roll back.
The context manager issues BEGIN on entry and COMMIT on successful exit. If any exception propagates out of the block, ROLLBACK is issued instead, and the exception is re-raised.
try:
async with db.transaction():
await Account.objects.create(name="Test")
raise ValueError("Something went wrong")
# ROLLBACK issued here
except ValueError:
# Account was NOT created
pass
transaction() API¶
@asynccontextmanager
async def transaction(self, savepoint_name: str | None = None):
"""Context manager for database transactions.
Args:
savepoint_name: Optional name for the savepoint (only used for
nested transactions). Defaults to "sp_N" where N
is the nesting depth.
"""
The context manager yields the Database instance, so you can optionally capture it:
async with db.transaction() as tx:
await tx.execute("INSERT INTO logs (msg) VALUES ($1)", "started")
await tx.query("SELECT * FROM logs")
Nested Transactions (Savepoints)¶
Nested transaction() calls automatically use PostgreSQL SAVEPOINT commands. The nesting depth is tracked per-thread using threading.local(), which is safe for concurrent requests in free-threaded Python 3.14t.
async with db.transaction():
# Depth 0: BEGIN
await Account.objects.create(name="Main Account")
try:
async with db.transaction():
# Depth 1: SAVEPOINT sp_2
await Transfer.objects.create(from_id=1, to_id=2, amount=100)
raise ValueError("Insufficient funds")
# ROLLBACK TO SAVEPOINT sp_2
except ValueError:
pass # Inner rolled back, outer continues
await AuditLog.objects.create(action="transfer_failed")
# COMMIT — outer transaction commits with Account + AuditLog
Depth Tracking¶
| Nesting Level | SQL on Entry | SQL on Success | SQL on Error |
|---|---|---|---|
| Level 0 (outermost) | BEGIN |
COMMIT |
ROLLBACK |
| Level 1 | SAVEPOINT sp_2 |
RELEASE SAVEPOINT sp_2 |
ROLLBACK TO SAVEPOINT sp_2 |
| Level 2 | SAVEPOINT sp_3 |
RELEASE SAVEPOINT sp_3 |
ROLLBACK TO SAVEPOINT sp_3 |
| Level N | SAVEPOINT sp_{N+1} |
RELEASE SAVEPOINT sp_{N+1} |
ROLLBACK TO SAVEPOINT sp_{N+1} |
You never need to manage savepoints manually -- the depth counter handles everything.
Named Savepoints¶
You can explicitly name a savepoint if you need to reference it:
async with db.transaction(savepoint_name="my_checkpoint"):
# SAVEPOINT my_checkpoint
await do_work()
# RELEASE SAVEPOINT my_checkpoint
Triple Nesting¶
Savepoints nest to arbitrary depth:
async with db.transaction():
# BEGIN
await db.execute("INSERT INTO t1 VALUES (1)")
async with db.transaction():
# SAVEPOINT sp_2
await db.execute("INSERT INTO t2 VALUES (2)")
async with db.transaction():
# SAVEPOINT sp_3
await db.execute("INSERT INTO t3 VALUES (3)")
# RELEASE SAVEPOINT sp_3
# RELEASE SAVEPOINT sp_2
# COMMIT
atomic() Alias¶
atomic() is a convenience alias for transaction(), matching Django's naming convention:
async with db.atomic():
await do_work()
# Equivalent to:
async with db.transaction():
await do_work()
# Named savepoints also work:
async with db.atomic(savepoint_name="my_save"):
await do_work()
Connection Pinning¶
During a transaction, all queries are routed through the same database connection (pinned from the pool). This is essential for correctness -- PostgreSQL transactions are per-connection.
async with db.transaction():
# All three statements use the SAME PostgreSQL connection
await db.execute("INSERT INTO accounts (name) VALUES ($1)", "Alice")
rows = await db.query("SELECT * FROM accounts WHERE name = $1", "Alice")
await db.execute("UPDATE accounts SET balance = $1 WHERE name = $2", 100, "Alice")
# Connection returned to pool on COMMIT
Without pinning, different statements could be routed to different pool connections and would not see each other's uncommitted changes. pg.zig handles this automatically.
Isolation Levels¶
PostgreSQL supports four isolation levels. The default is READ COMMITTED.
READ COMMITTED (Default)¶
Each statement in the transaction sees the most recently committed data. Two reads of the same row may return different results if another transaction commits between them.
async with db.transaction():
# Statement 1 sees data committed before it started
row1 = await db.query_one("SELECT balance FROM accounts WHERE id = $1", 1)
# If another transaction commits here...
# Statement 2 might see different data
row2 = await db.query_one("SELECT balance FROM accounts WHERE id = $1", 1)
# row1["balance"] might != row2["balance"]
This is usually fine for web applications. It prevents dirty reads but allows non-repeatable reads.
SERIALIZABLE¶
For strictest isolation, set SERIALIZABLE before the transaction. All reads see a consistent snapshot taken at the start of the transaction.
await db.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
async with db.transaction():
# All reads see a consistent snapshot
balance = await db.query_val("SELECT balance FROM accounts WHERE id = $1", 1)
# Even if another transaction commits, we still see the original balance
await db.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, 1
)
If two serializable transactions conflict (read-write dependency), PostgreSQL will abort one with a serialization failure. You must retry the transaction:
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
try:
await db.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
async with db.transaction():
await transfer_funds(from_id=1, to_id=2, amount=100)
break # Success
except Exception as e:
if "serialization failure" in str(e).lower() and attempt < MAX_RETRIES - 1:
continue # Retry
raise
REPEATABLE READ¶
Snapshot taken at the start of the first read statement. Prevents non-repeatable reads but may cause serialization failures on write conflicts.
await db.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
async with db.transaction():
# All reads see a consistent snapshot from the first SELECT
...
READ UNCOMMITTED¶
PostgreSQL treats this the same as READ COMMITTED -- dirty reads are never allowed in PostgreSQL regardless of isolation level.
Error Recovery Patterns¶
The PostgreSQL Aborted Transaction Problem¶
In PostgreSQL, when a statement fails inside a transaction, the transaction enters an "aborted" state. No further statements can execute -- even SELECT 1 will fail with "current transaction is aborted, commands ignored until end of transaction block." You must ROLLBACK (or rollback to a savepoint) to recover.
async with db.transaction():
try:
await risky_operation()
except IntegrityError:
# Transaction is now ABORTED in PostgreSQL
# Cannot do anything else -- must re-raise
raise
Savepoint-Based Recovery¶
To handle errors within a transaction without aborting the entire thing, wrap the risky operation in a nested transaction (savepoint):
async with db.transaction():
for item in items:
try:
async with db.transaction(): # SAVEPOINT
await process(item)
# RELEASE SAVEPOINT on success
except Exception:
# ROLLBACK TO SAVEPOINT — outer transaction continues
failed_items.append(item)
# Outer transaction commits with all successful items
await AuditLog.objects.create(
action="batch_process",
details=f"Failed: {len(failed_items)}"
)
This is the standard pattern for batch operations where individual items may fail but you want to commit the rest.
Constraint Violation Recovery¶
Handle unique constraint violations gracefully:
async with db.transaction():
try:
async with db.transaction(): # Savepoint
await User.objects.create(name="Alice", email="alice@example.com")
except Exception as e:
if "unique" in str(e).lower():
# Email already exists — update instead
await User.objects.filter(email="alice@example.com").update(name="Alice")
else:
raise
Idempotent Operations¶
Use savepoints to make operations idempotent (safe to retry):
async with db.transaction():
try:
async with db.transaction():
await db.execute(
"INSERT INTO processed_events (event_id) VALUES ($1)", event_id
)
except Exception:
# Already processed — skip
return
# Process the event (only reached if INSERT succeeded)
await handle_event(event_id)
Transactions in Views¶
Basic View Pattern¶
@app.post("/transfer")
async def transfer(request):
data = await request.json()
async with app.db.transaction():
sender = await Account.objects.get(id=data["from"])
receiver = await Account.objects.get(id=data["to"])
amount = data["amount"]
if sender.balance < amount:
raise HTTPException(400, "Insufficient funds")
sender.balance -= amount
await sender.save()
receiver.balance += amount
await receiver.save()
await TransferLog.objects.create(
from_id=sender.id,
to_id=receiver.id,
amount=amount,
)
return {"transferred": amount}
If the HTTPException is raised, the entire transaction is rolled back -- no partial balance updates.
Multiple Independent Operations¶
For operations that should succeed or fail independently, use separate transactions:
@app.post("/batch-update")
async def batch_update(request):
items = await request.json()
results = []
for item in items:
try:
async with app.db.transaction():
await process_item(item)
results.append({"id": item["id"], "status": "ok"})
except Exception as e:
results.append({"id": item["id"], "status": "error", "message": str(e)})
return {"results": results}
Read-Only Transactions¶
For consistency across multiple reads (e.g., generating a report), use a transaction even for reads:
@app.get("/report")
async def generate_report(request):
await db.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
async with app.db.transaction():
total_users = await db.query_val("SELECT COUNT(*) FROM users")
total_revenue = await db.query_val("SELECT SUM(amount) FROM payments")
active_sessions = await session_store.count()
return {
"users": total_users,
"revenue": total_revenue,
"sessions": active_sessions,
}
Advisory Locks¶
PostgreSQL advisory locks provide application-level locking without row-level contention. Use raw SQL within a transaction:
# Session-level advisory lock (held until end of session/transaction)
async with db.transaction():
# pg_advisory_xact_lock: released when transaction ends
await db.execute("SELECT pg_advisory_xact_lock($1)", lock_id)
# Only one connection can hold this lock at a time
await do_exclusive_work()
# Lock released on COMMIT/ROLLBACK
# Try-lock variant (non-blocking)
async with db.transaction():
result = await db.query_val(
"SELECT pg_try_advisory_xact_lock($1)", lock_id
)
if result:
await do_exclusive_work()
else:
# Lock held by another process
raise HTTPException(409, "Resource locked")
Common Advisory Lock Patterns¶
# Prevent concurrent cron job execution
CRON_LOCK_ID = 12345
async def run_cron():
async with db.transaction():
got_lock = await db.query_val(
"SELECT pg_try_advisory_xact_lock($1)", CRON_LOCK_ID
)
if not got_lock:
return # Another worker is running this cron
await do_cron_work()
# Per-resource locking using hash of resource identifier
import hashlib
def resource_lock_id(resource_type: str, resource_id: int) -> int:
"""Generate a deterministic lock ID from resource type + ID."""
h = hashlib.sha256(f"{resource_type}:{resource_id}".encode()).digest()
return int.from_bytes(h[:8], "big", signed=True)
async with db.transaction():
lock_id = resource_lock_id("invoice", invoice_id)
await db.execute("SELECT pg_advisory_xact_lock($1)", lock_id)
await generate_invoice(invoice_id)
Deadlock Detection¶
PostgreSQL automatically detects deadlocks and aborts one of the conflicting transactions. Handle this in your application:
async def safe_transfer(from_id, to_id, amount):
"""Transfer with deadlock prevention via consistent ordering."""
# Always lock accounts in ID order to prevent deadlocks
first_id, second_id = sorted([from_id, to_id])
async with db.transaction():
first = await Account.objects.get(id=first_id)
second = await Account.objects.get(id=second_id)
if from_id == first_id:
sender, receiver = first, second
else:
sender, receiver = second, first
sender.balance -= amount
receiver.balance += amount
await sender.save()
await receiver.save()
If you cannot control ordering, handle the deadlock exception:
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
try:
async with db.transaction():
await transfer_funds(from_id, to_id, amount)
break
except Exception as e:
if "deadlock" in str(e).lower() and attempt < MAX_RETRIES - 1:
continue # Retry
raise
Performance¶
Transaction Overhead¶
BEGIN/COMMIT: Minimal overhead. pg.zig issues these as simple queries on the pinned connection.- Savepoints: Each
SAVEPOINT/RELEASE SAVEPOINTadds approximately 0.1ms of overhead per nesting level. - Connection pinning: No extra overhead -- the connection is simply held from the pool for the duration of the transaction rather than being returned after each query.
UNLOGGED Tables in Transactions¶
UNLOGGED tables (sessions, cache, rate limits) do not write WAL records, making transactions involving them significantly faster:
async with db.transaction():
# This INSERT to an UNLOGGED table is ~2-3x faster than to a regular table
await session_store.create({"user_id": user.id})
# This INSERT to a regular table writes WAL normally
await AuditLog.objects.create(action="login")
Keep Transactions Short¶
Long-running transactions hold connections from the pool and can cause connection exhaustion under load:
# BAD: Long-running transaction holds connection
async with db.transaction():
data = await fetch_external_api() # 500ms network call
await db.execute("INSERT INTO ...", data)
# GOOD: Fetch data first, then transaction
data = await fetch_external_api()
async with db.transaction():
await db.execute("INSERT INTO ...", data)
Pipeline Queries¶
For multiple independent queries, consider db.pipeline() instead of a transaction. Pipelines send all queries in a single network round-trip but do not provide atomicity:
# 5.74x faster than sequential queries
results = await db.pipeline([
"SELECT COUNT(*) FROM users",
"SELECT COUNT(*) FROM orders",
"SELECT COUNT(*) FROM products",
])
Testing with Transactions¶
Savepoint Isolation in Tests¶
HyperDjango's TestCase wraps each test in a transaction and rolls it back after the test, so each test starts with a clean database:
from hyperdjango.testing import TestCase
class UserTests(TestCase):
async def test_create_user(self):
# This runs inside a transaction
user = await User.objects.create(name="Alice")
assert user.id is not None
# Transaction is rolled back after test -- user is gone
async def test_another(self):
# Clean slate -- no Alice here
users = await User.objects.all()
assert len(users) == 0
Testing Transaction Rollback¶
async def test_transaction_rollback(self):
try:
async with app.db.transaction():
await User.objects.create(name="Alice")
raise ValueError("Rollback!")
except ValueError:
pass
# User was not created (rolled back)
count = await User.objects.count()
assert count == 0
Testing Nested Savepoints¶
async def test_savepoint_partial_rollback(self):
async with app.db.transaction():
await User.objects.create(name="Alice")
try:
async with app.db.transaction(): # Savepoint
await User.objects.create(name="Bob")
raise ValueError("Undo Bob")
except ValueError:
pass
# Alice exists, Bob does not
users = await User.objects.all()
assert len(users) == 1
assert users[0].name == "Alice"
on_commit() Callbacks¶
Register callbacks to run after the outermost transaction commits. Callbacks are discarded on rollback.
async with db.transaction():
user = await User.objects.create(name="Alice", email="alice@example.com")
# This runs AFTER the COMMIT, not inside the transaction
db.on_commit(lambda: print(f"User {user.id} created!"))
# Async callbacks are supported
@db.on_commit
async def send_welcome():
await send_welcome_email(user.id)
Use Cases¶
- Send emails after user creation — don't send if the transaction rolls back
- Invalidate caches — only after data is actually committed
- Trigger webhooks — fire external calls only when the change is durable
- Update search index — index the committed state, not uncommitted
Behavior¶
- Callbacks run in registration order after
COMMIT - On
ROLLBACK, all callbacks are silently discarded - Both sync and async callbacks are supported
- Callbacks registered in nested transactions (savepoints) run after the outermost commit
on_commit()can be used as a decorator:@db.on_commit
# Practical example: email after order
async with db.transaction():
order = await Order.objects.create(user_id=user.id, total=99.99)
for item in cart_items:
await OrderItem.objects.create(order_id=order.id, **item)
db.on_commit(lambda: send_order_confirmation.delay(order_id=order.id))
# If any item fails, the entire transaction rolls back
# and the email is never sent