Multi-Database Routing¶
Route queries to different PostgreSQL instances -- read replicas, per-model databases, or explicit selection. HyperDjango's multi-database system supports automatic routing via DatabaseRouter, per-model binding via Meta.database, and explicit selection via QuerySet.using().
Quick Start¶
from hyperdjango.multi_db import ConnectionManager, set_connections
connections = ConnectionManager()
await connections.configure({
"default": "postgres://localhost/myapp",
"replica": "postgres://replica-host/myapp",
"analytics": "postgres://analytics-host/warehouse",
})
set_connections(connections)
Connection Manager¶
ConnectionManager is the central registry for named database connections. Each connection is a Database instance with its own connection pool.
Configuration¶
Pass a dictionary of name-to-URL mappings. Each URL creates a Database instance and connects immediately:
from hyperdjango.multi_db import ConnectionManager, set_connections
connections = ConnectionManager()
# Simple URL strings
await connections.configure({
"default": "postgres://localhost/myapp",
"replica": "postgres://replica-host/myapp",
})
# Or config dicts with pool sizing
await connections.configure({
"default": {
"url": "postgres://localhost/myapp",
"min_size": 5,
"max_size": 20,
},
"replica": {
"url": "postgres://replica-host/myapp",
"min_size": 2,
"max_size": 10,
},
"analytics": {
"url": "postgres://analytics-host/warehouse",
"min_size": 1,
"max_size": 5,
},
})
# Register as global singleton
set_connections(connections)
When a "default" connection is configured, it is automatically set as the global database via set_db(). This means all queries without explicit routing use the "default" connection.
Config Dict Keys¶
| Key | Type | Default | Description |
|---|---|---|---|
url |
str |
required | PostgreSQL connection URL |
min_size |
int |
2 | Minimum pool connections |
max_size |
int |
10 | Maximum pool connections |
Accessing Connections¶
# Dictionary-style access (raises KeyError if not found)
default_db = connections["default"]
replica_db = connections["replica"]
# .get() with optional default (returns None if not found)
analytics_db = connections.get("analytics")
missing_db = connections.get("missing") # None
# Check if a connection exists
if "replica" in connections:
...
# Get all configured databases
all_dbs = connections.databases # dict[str, Database]
Closing All Connections¶
QuerySet.using()¶
Explicitly select which database to query. This is the highest-priority routing mechanism:
# Read from replica
users = await User.objects.using("replica").filter(active=True).all()
# Write to default (primary)
await User.objects.using("default").create(name="Alice")
# Run analytics queries against a dedicated database
events = await AnalyticsEvent.objects.using("analytics").filter(
event_type="purchase",
created_at__gte="2024-01-01",
).all()
Using a Database Instance Directly¶
You can pass a Database instance instead of a string alias:
from hyperdjango.database import Database
db = Database("postgres://other-host/otherdb")
await db.connect()
users = await User.objects.using(db).all()
Chaining with Other QuerySet Methods¶
.using() returns a new QuerySet and chains with all other methods:
# Complex query on replica
top_authors = await Author.objects.using("replica").annotate(
article_count=Count("articles"),
).filter(
article_count__gt=10,
).order_by("-article_count").limit(20)
Database Router¶
Route reads and writes automatically by implementing a DatabaseRouter subclass. Return a database alias string, or None to fall through to the default:
from hyperdjango.multi_db import DatabaseRouter
class MyRouter(DatabaseRouter):
def db_for_read(self, model):
"""Return database alias for reads, or None for default."""
return "replica"
def db_for_write(self, model):
"""Return database alias for writes, or None for default."""
return "default"
connections.router = MyRouter()
# Now all reads go to replica, writes to default — no .using() needed
users = await User.objects.filter(active=True).all() # reads from replica
await User.objects.create(name="Alice") # writes to default
DatabaseRouter Protocol¶
The base DatabaseRouter class defines four methods. Override any combination:
class DatabaseRouter:
def db_for_read(self, model) -> str | None:
"""Return database alias for read queries, or None for default."""
return None
def db_for_write(self, model) -> str | None:
"""Return database alias for write queries, or None for default."""
return None
def allow_relation(self, obj1, obj2) -> bool | None:
"""Allow or deny cross-database relations.
Return True to allow, False to deny, None to defer to next router.
"""
return None
def allow_migrate(self, db: str, model) -> bool | None:
"""Allow or deny running migrations on a specific database.
Return True to allow, False to deny, None to defer.
"""
return None
Model-Specific Routing¶
Route different models to different databases based on the model class:
class ShardRouter(DatabaseRouter):
def db_for_read(self, model):
if model.__name__ == "AnalyticsEvent":
return "analytics"
if model.__name__ in ("AuditLog", "SecurityEvent"):
return "audit_db"
return "replica"
def db_for_write(self, model):
if model.__name__ == "AnalyticsEvent":
return "analytics"
if model.__name__ in ("AuditLog", "SecurityEvent"):
return "audit_db"
return "default"
def allow_migrate(self, db, model):
# Only migrate analytics models on the analytics database
if model.__name__ == "AnalyticsEvent":
return db == "analytics"
if model.__name__ in ("AuditLog", "SecurityEvent"):
return db == "audit_db"
# All other models on default
return db == "default"
App-Based Routing¶
Route based on module path or app label:
class AppRouter(DatabaseRouter):
ANALYTICS_MODELS = {"AnalyticsEvent", "PageView", "Conversion"}
LOGGING_MODELS = {"AuditLog", "SecurityEvent", "AccessLog"}
def _get_db(self, model):
name = model.__name__
if name in self.ANALYTICS_MODELS:
return "analytics"
if name in self.LOGGING_MODELS:
return "logging"
return None # Fall through to default
def db_for_read(self, model):
return self._get_db(model)
def db_for_write(self, model):
return self._get_db(model)
Built-in: PrimaryReplicaRouter¶
The most common pattern -- route all reads to a replica, all writes to the primary:
from hyperdjango.multi_db import PrimaryReplicaRouter
connections.router = PrimaryReplicaRouter(
replica="replica", # default: "replica"
primary="default", # default: "default"
)
# All reads go to replica
users = await User.objects.filter(active=True).all() # replica
# All writes go to primary
await User.objects.create(name="Alice") # default
Custom Replica Names¶
# If your replica has a different alias
connections.router = PrimaryReplicaRouter(
replica="read_replica_1",
primary="primary_db",
)
Per-Model Database Binding¶
Bind a model to a specific database via Meta.database. This is useful when a model's data always lives on a specific database:
class AnalyticsEvent(Model):
class Meta:
table = "events"
database = "analytics" # Always uses the "analytics" connection
id: int = Field(primary_key=True, auto=True)
event_type: str = Field()
payload: str = Field()
created_at: datetime = Field()
class AuditLog(Model):
class Meta:
table = "audit_log"
database = "audit_db"
id: int = Field(primary_key=True, auto=True)
action: str = Field()
details: str = Field()
Per-model binding takes priority over router configuration. No .using() call needed:
# Automatically uses "analytics" database
events = await AnalyticsEvent.objects.filter(event_type="purchase").all()
# Automatically uses "audit_db" database
logs = await AuditLog.objects.filter(action="login").all()
Resolution Priority¶
When resolving which database to use for a query, the system checks in this order:
QuerySet.using("name")-- explicit selection (highest priority)Meta.database = "name"-- per-model bindingDatabaseRouter.db_for_read/write()-- router-based routing"default"-- fallback to the default connection
# Example: model has Meta.database = "analytics", router says "replica"
# .using() wins over everything
await Event.objects.using("default").all() # Uses "default"
# Meta.database wins over router
await Event.objects.all() # Uses "analytics" (not "replica")
# Router is used when no Meta.database and no .using()
await User.objects.all() # Uses router's db_for_read() result
Resolution Methods¶
ConnectionManager exposes the resolution logic:
# Resolve read database for a model
db = connections.resolve_for_read(User) # Checks Meta → Router → default
db = connections.resolve_for_write(User) # Checks Meta → Router → default
Sticky Connections for Writes¶
A common challenge with read replicas is replication lag -- after writing to the primary, an immediate read from a replica may return stale data. The sticky connection pattern routes reads to the primary for a short window after any write:
import time
class StickyPrimaryReplicaRouter(DatabaseRouter):
"""Route reads to replica, but stick to primary briefly after writes."""
def __init__(self, replica="replica", primary="default", sticky_seconds=2.0):
self.replica = replica
self.primary = primary
self.sticky_seconds = sticky_seconds
self._last_write: float = 0.0
def db_for_read(self, model):
# If a write happened recently, read from primary to avoid stale data
if time.monotonic() - self._last_write < self.sticky_seconds:
return self.primary
return self.replica
def db_for_write(self, model):
self._last_write = time.monotonic()
return self.primary
connections.router = StickyPrimaryReplicaRouter(sticky_seconds=2.0)
Per-Request Sticky Routing¶
For web applications, you typically want sticky routing scoped to the current request:
import contextvars
_wrote_in_request = contextvars.ContextVar("wrote_in_request", default=False)
class RequestStickyRouter(DatabaseRouter):
"""Sticky to primary for the rest of the request after any write."""
def __init__(self, replica="replica", primary="default"):
self.replica = replica
self.primary = primary
def db_for_read(self, model):
if _wrote_in_request.get():
return self.primary
return self.replica
def db_for_write(self, model):
_wrote_in_request.set(True)
return self.primary
# Reset at the start of each request via middleware
async def sticky_reset_middleware(request, call_next):
_wrote_in_request.set(False)
return await call_next(request)
Connection Health Monitoring¶
Monitor connection pool health across all databases:
async def check_all_connections():
"""Health check for all configured databases."""
results = {}
for name, db in connections.databases.items():
try:
rows = await db.query("SELECT 1")
results[name] = {"status": "healthy", "pool_size": db.pool_size}
except Exception as e:
results[name] = {"status": "unhealthy", "error": str(e)}
return results
Health Check Endpoint¶
@app.route("/health/db")
async def db_health(request: Request) -> Response:
health = await check_all_connections()
all_healthy = all(r["status"] == "healthy" for r in health.values())
status = 200 if all_healthy else 503
return Response.json(health, status=status)
Router Composition¶
For complex routing needs, compose multiple routing strategies:
class CompositeRouter(DatabaseRouter):
"""Chain multiple routing strategies."""
def __init__(self):
self.model_routes = {
"AnalyticsEvent": "analytics",
"AuditLog": "audit_db",
}
self.sticky = StickyPrimaryReplicaRouter()
def db_for_read(self, model):
# First check model-specific routes
name = model.__name__
if name in self.model_routes:
return self.model_routes[name]
# Fall back to sticky primary/replica
return self.sticky.db_for_read(model)
def db_for_write(self, model):
name = model.__name__
if name in self.model_routes:
return self.model_routes[name]
return self.sticky.db_for_write(model)
def allow_migrate(self, db, model):
name = model.__name__
if name in self.model_routes:
return db == self.model_routes[name]
return db == "default"
Cross-Database Relations¶
The allow_relation method controls whether two model instances from different databases can have a foreign key relationship:
class StrictRouter(DatabaseRouter):
def allow_relation(self, obj1, obj2):
# Only allow relations between objects on the same database
db1 = obj1._meta.database or "default"
db2 = obj2._meta.database or "default"
if db1 == db2:
return True
return False
Global Connection Manager¶
Access the global connection manager singleton:
from hyperdjango.multi_db import get_connections, set_connections
# Get the global manager (creates empty one if not configured)
connections = get_connections()
# Replace with a custom instance
custom = ConnectionManager()
await custom.configure({...})
set_connections(custom)
Practical Examples¶
E-Commerce with Read Replicas¶
connections = ConnectionManager()
await connections.configure({
"default": "postgres://primary/ecommerce",
"replica": "postgres://replica/ecommerce",
"analytics": "postgres://analytics/warehouse",
})
connections.router = PrimaryReplicaRouter()
set_connections(connections)
class ProductView:
async def get(self, request, product_id):
# Reads from replica automatically
product = await Product.objects.get(id=product_id)
return Response.json(product.to_dict())
async def post(self, request, product_id):
# Writes to primary automatically
product = await Product.objects.get(id=product_id)
product.stock -= 1
await product.save()
return Response.json({"status": "ok"})
Multi-Tenant with Separate Databases¶
class TenantRouter(DatabaseRouter):
"""Route based on a context variable set by middleware."""
def db_for_read(self, model):
tenant = current_tenant.get()
return f"tenant_{tenant}" if tenant else "default"
def db_for_write(self, model):
tenant = current_tenant.get()
return f"tenant_{tenant}" if tenant else "default"
# Middleware sets the tenant context
async def tenant_middleware(request, call_next):
tenant = request.headers.get("X-Tenant-ID", "default")
token = current_tenant.set(tenant)
try:
return await call_next(request)
finally:
current_tenant.reset(token)
Migration Control¶
Control which models can migrate on which databases:
class MigrationRouter(DatabaseRouter):
def allow_migrate(self, db, model):
if model.__name__ == "AnalyticsEvent":
return db == "analytics"
if model.__name__ == "AuditLog":
return db == "audit_db"
# Default models only on default
return db == "default"
Reference¶
Imports¶
from hyperdjango.multi_db import (
ConnectionManager, # Named database pool registry
DatabaseRouter, # Base router class
PrimaryReplicaRouter, # Built-in read/write splitter
get_connections, # Get global ConnectionManager
set_connections, # Set global ConnectionManager
)
ConnectionManager API¶
| Method | Returns | Description |
|---|---|---|
await configure(databases) |
None | Configure and connect all databases |
[name] |
Database |
Get connection by name (raises KeyError) |
get(name, default) |
Database \| None |
Get connection or default |
name in connections |
bool |
Check if connection exists |
databases |
dict[str, Database] |
All configured connections |
resolve_for_read(model) |
Database |
Resolve read database |
resolve_for_write(model) |
Database |
Resolve write database |
await close_all() |
None | Disconnect all databases |
DatabaseRouter API¶
| Method | Returns | Description |
|---|---|---|
db_for_read(model) |
str \| None |
Database alias for reads |
db_for_write(model) |
str \| None |
Database alias for writes |
allow_relation(obj1, obj2) |
bool \| None |
Allow cross-db FK |
allow_migrate(db, model) |
bool \| None |
Allow migration on db |