Skip to content

API Reference

HyperApp

from hyperdjango import HyperApp

app = HyperApp(
    title="My App",
    database="postgres://localhost/mydb",
    views_dir="views",  # auto-discover file-based routes
)

# Route decorators
@app.get("/path")
@app.post("/path")
@app.put("/path")
@app.patch("/path")
@app.delete("/path")
@app.route("/path", methods=["GET", "POST"])

# Middleware
app.use(middleware_instance)

# Run
app.run(host="0.0.0.0", port=8000)

Model

from hyperdjango import Model, Field

class User(Model):
    class Meta:
        table = "users"
        database = "default"  # optional: multi-db routing

    id: int = Field(primary_key=True, auto=True)
    name: str = Field()
    email: str = Field(unique=True)
    age: int = Field(ge=0, default=0)
    bio: str | None = Field(default=None)
    author_id: int = Field(foreign_key=Author)

# Instance methods
await user.save()
await user.delete()
await user.refresh_from_db()
user.pk  # primary key value

QuerySet

# Terminal methods (execute query)
items = await Model.objects.all()
item = await Model.objects.get(id=1)
item = await Model.objects.first()
item = await Model.objects.last()
count = await Model.objects.count()
exists = await Model.objects.exists()

# Chainable methods
qs = Model.objects.filter(age__gte=18)
qs = qs.exclude(name="Bob")
qs = qs.order_by("-created_at")
qs = qs.limit(10).offset(20)
qs = qs.distinct()
qs = qs.values("name", "email")
qs = qs.values_list("name", flat=True)
qs = qs.select_related("author")
qs = qs.prefetch_related("tags")
qs = qs.annotate(total=Sum("price"))
qs = qs.using("replica")
qs = qs.cache(ttl=60)

# Write operations
item = await Model.objects.create(name="New")
count = await Model.objects.filter(active=False).update(active=True)
count = await Model.objects.filter(id=1).delete()
items = await Model.objects.bulk_create([...])

# Aggregation
stats = await Model.objects.aggregate(
    total=Sum("price"),
    avg=Avg("price"),
    count=Count("id"),
)

Lookups

# Comparison
filter(age__gt=18, age__lte=65)
filter(age__range=(18, 65))

# String
filter(name__contains="ali")
filter(name__icontains="ALI")
filter(name__startswith="Al")
filter(email__endswith="@example.com")
filter(name__iexact="alice")

# Membership
filter(id__in=[1, 2, 3])
filter(bio__isnull=True)
filter(name__regex=r"^[A-Z]")

# Transforms
filter(created__year=2024)
filter(created__year__gte=2020)
filter(name__lower__contains="ali")
filter(name__length__gte=5)

Response

from hyperdjango.response import Response

Response.json(data, status=200, headers={})
Response.html(html_string, status=200)
Response.text(text_string, status=200)
Response.redirect(url, status=302)
Response.file(path)
Response.attachment(path, filename="download.pdf")
Response.stream(async_generator)
Response.sse(event_generator)

Forms

from hyperdjango.forms import Form, ModelForm, CharField, IntegerField, FormSet

class MyForm(Form):
    name = CharField(max_length=100, required=True)
    age = IntegerField(min_value=0, required=False)

form = MyForm(data=request.json)
if form.is_valid():
    print(form.cleaned_data)

# FormSet
formset = FormSet(MyForm, data=[...], extra=2, can_delete=True)

Paginator

from hyperdjango.paginator import Paginator

paginator = Paginator(queryset, per_page=25)
page = await paginator.page(1)

page.items, page.number, page.num_pages, page.count
page.has_next, page.has_previous
page.start_index, page.end_index, page.page_range

Class-Based Views

from hyperdjango.views import ListView, DetailView, CreateView, UpdateView, DeleteView

class UserList(ListView):
    model = User
    per_page = 25

app.route("/users")(UserList.as_view())

Model Mixins

from hyperdjango.mixins import TimestampMixin, SoftDeleteMixin, OwnershipMixin, VersionedMixin

# TimestampMixin — auto-managed created_at/updated_at (TIMESTAMPTZ)
class Article(TimestampMixin, Model):
    class Meta:
        table = "articles"
    id: int = Field(primary_key=True, auto=True)
    title: str = Field()

article = Article(title="Hello")
await article.save()  # created_at and updated_at set automatically

# SoftDeleteMixin — .delete() marks as deleted, auto-filtered QuerySet
class Post(SoftDeleteMixin, Model):
    class Meta:
        table = "posts"
    id: int = Field(primary_key=True, auto=True)
    title: str = Field()

await post.delete()                        # Soft delete (UPDATE is_deleted=TRUE)
await post.hard_delete()                   # Real DELETE
await post.restore()                       # Undo soft delete
posts = await Post.objects.all()           # Excludes soft-deleted
posts = await Post.objects.with_deleted().all()  # Includes all
posts = await Post.objects.only_deleted().all()  # Just deleted

# OwnershipMixin — tracks created_by/updated_by
class Doc(OwnershipMixin, Model):
    ...
await doc.save_as(user)       # Sets created_by on first save, updated_by on every save
await doc.save_as(user_id=42) # Accepts int or user object

# VersionedMixin — append-only versioning
class Config(VersionedMixin, Model):
    ...
await config.save()              # version=1, is_current=True
config.value = "updated"
await config.save()              # version=2 (new row), old row is_current=False
history = await config.get_history()  # All versions ordered by version
current = await Config.objects.all()  # Only is_current=True
all_ver = await Config.objects.with_versions().all()  # All versions

# Compose multiple mixins
class FullModel(TimestampMixin, SoftDeleteMixin, OwnershipMixin, Model):
    ...

Query Cache

from hyperdjango.query_cache import get_query_cache, configure_query_cache

# Configure (typically in app setup)
configure_query_cache(default_ttl=60)

# Per-query opt-in
users = await User.objects.cache(ttl=120).filter(active=True).all()

# Per-model default via Meta
class Product(Model):
    class Meta:
        table = "products"
        cache_ttl = 300  # All queries auto-cached 5 min

# Disable caching for specific query
products = await Product.objects.cache(False).all()

# Auto-invalidation: save/delete triggers signal → cache invalidated
await product.save()  # Automatically invalidates Product query cache

# Manual invalidation
qc = get_query_cache()
qc.invalidate_table("products")
qc.invalidate_row("products", pk=42)
qc.invalidate_all()

# Cache stats
print(qc.stats.hit_rate)     # 0.85
print(qc.stats.hits)         # 1700
print(qc.stats.misses)       # 300
print(qc.stats.invalidations) # 42

# Cache warming
qc.warm(key, precomputed_data, ttl=300)

Security

from hyperdjango.security import SecurityLog, SecurityEvent
from hyperdjango.standalone_middleware import SecurityHeadersMiddleware

# Security headers (all on by default)
app.use(SecurityHeadersMiddleware(
    hsts=True,
    csp="default-src 'self'",
    referrer_policy="strict-origin-when-cross-origin",
    permissions_policy="camera=(), microphone=()",
    cross_origin_opener_policy="same-origin",
))

# Security event audit log
log = SecurityLog(db)
await log.ensure_table()

await log.log(SecurityEvent.LOGIN_FAILED, ip="1.2.3.4", detail="bad password")
await log.log_from_request(SecurityEvent.PERMISSION_DENIED, request, detail="no admin")

events = await log.get_recent(limit=50)
user_events = await log.get_for_user(42)
ip_events = await log.get_for_ip("1.2.3.4")
count = await log.count_by_ip("1.2.3.4", SecurityEvent.LOGIN_FAILED, since_hours=1)

Pool Optimization

from hyperdjango.pool import SlowQueryLog, QueryTimer, PoolHealthChecker

# Persistent slow query log (PostgreSQL UNLOGGED)
slow_log = SlowQueryLog(db, threshold_ms=100)
await slow_log.ensure_table()
recent = await slow_log.get_recent(limit=20)
slowest = await slow_log.get_slowest(limit=10)
stats = await slow_log.get_stats()  # {total, avg_ms, max_ms, min_ms}

# Auto-timing wrapper (patches db.query/execute)
timer = QueryTimer(db, slow_log=slow_log, threshold_ms=100)
timer.install()
# Now all queries are automatically timed and slow ones logged
stats = timer.get_stats()  # {total_queries, avg_query_ms, in_flight}

# Graceful drain before shutdown
success = await timer.drain(timeout_seconds=30)

# Pool health checks
checker = PoolHealthChecker(db, interval_seconds=30)
healthy = await checker.check()  # SELECT 1 validation
checker.start()                  # Background periodic checks
stats = checker.get_stats()      # {healthy, checks, failures}

Cache Adapters

from hyperdjango.cache_adapters import (
    TwoTierCache, ConsistentHashRing, StampedeProtection,
    CacheMiddleware, register_adapter, get_adapter,
)
from hyperdjango.cache import LocMemCache, DatabaseCache

# Two-tier: fast L1 (in-process) + shared L2 (database)
cache = TwoTierCache(
    l1=LocMemCache(max_size=1000),
    l2=DatabaseCache(db),
    l1_ttl=10,
)
cache.set("key", value, ttl=300)
result = cache.get("key")  # L1 first, L2 fallback, auto-promote
stats = cache.get_stats()  # {l1_hits, l2_hits, misses, overall_hit_rate}

# Consistent hashing for sharded caches
ring = ConsistentHashRing(nodes={
    "shard1": LocMemCache(),
    "shard2": LocMemCache(),
    "shard3": LocMemCache(),
})
node = ring.get_node("user:42")  # Deterministic routing
node.set("user:42", data)

# Stampede protection (XFetch algorithm)
cache = StampedeProtection(backend=LocMemCache(), beta=1.0)
cache.set("popular", data, ttl=300, compute_time_ms=50)
# Near expiry: probabilistic early recompute prevents thundering herd

# Full-page response caching middleware
app.use(CacheMiddleware(cache, ttl=60, exclude=["/admin", "/api/auth"]))

# Custom adapter registration
register_adapter("custom", MyCustomCacheAdapter)
adapter_cls = get_adapter("custom")

Logging

from hyperdjango.logging import logger, AccessLogMiddleware

# Basic logging (kwargs captured in extra)
logger.info("User logged in", user_id=42, ip="10.0.0.1")
logger.warning("Slow query", duration_ms=1500, sql="SELECT ...")
logger.error("Connection failed", host="db.example.com")

# All 7 levels
logger.trace("Detailed trace")
logger.debug("Debug info")
logger.info("Normal event")
logger.success("Operation completed")
logger.warning("Potential issue")
logger.error("Error occurred")
logger.critical("System failure")

# Message formatting with positional args
logger.info("User {} logged in from {}", username, ip)

# Bind context (returns new logger, original unchanged)
log = logger.bind(request_id="abc-123", user_id=42)
log.info("Processing request")  # extra has request_id + user_id

# Options: lazy eval, depth control, raw mode, exception capture
logger.opt(lazy=True).debug("Expensive: {x}", x=lambda: heavy_compute())
logger.opt(exception=True).error("Failed")  # Attaches traceback
logger.opt(depth=1).info("From wrapper")    # Skip frame for caller detection
logger.opt(raw=True).info("No formatting {}")
logger.opt(capture=False).info("Test {x}", x=42)  # x not in extra

# Context manager (async/thread-safe via contextvars)
with logger.contextualize(task_id=123, step="validate"):
    logger.info("In context")  # extra has task_id + step

# Patch records globally
import socket
patched = logger.patch(lambda r: r["extra"].update(hostname=socket.gethostname()))

# Exception decorator
@logger.catch()
def risky():
    1 / 0  # Logged as ERROR with full traceback

@logger.catch(level="CRITICAL", message="Task crashed")
async def background_task():
    ...

# Custom levels
logger.level("AUDIT", no=35, color="\033[35m", icon="📋")
logger.log("AUDIT", "Password changed", user_id=1)

# Module enable/disable
logger.disable("noisy_library")
logger.enable("noisy_library.important")

# Add sinks
logger.add(sys.stderr, level="INFO")                    # Console
logger.add(sys.stderr, serialize=True)                   # JSON to stderr
logger.add("app.log", rotation="100 MB", retention=10)   # File with rotation
logger.add("app.log", rotation="daily", compression="gz") # Daily + gzip
logger.add(logging_handler, level="WARNING")             # stdlib bridge
logger.add(my_async_func, level="ERROR")                 # Async sink

# Dynamic format
logger.add(sys.stderr, format=lambda r: f"[{r['level'].name}] {r['message']}")

# Dict-based per-module filtering
logger.add(sys.stderr, filter={"": "WARNING", "myapp": "DEBUG", "noisy": False})

# Bulk configure
logger.configure(
    handlers=[
        {"sink": sys.stderr, "level": "INFO"},
        {"sink": "app.log", "level": "DEBUG", "rotation": "100 MB"},
    ],
    extra={"app": "myservice", "version": "1.0"},
    activation=[("noisy_lib", False)],
)

# System metrics
stats = logger.stats()  # {handlers, levels, min_level, writer_alive, queue_depth}

# Access log middleware
app.use(AccessLogMiddleware())  # Auto-logs: GET /api/users 200 12.3ms

# Record fields available in format strings:
# {time}, {level}, {message}, {name}, {function}, {file}, {line},
# {module}, {thread}, {process}, {elapsed}, {extra}, {exception}

Static Files

from hyperdjango.staticfiles import (
    StaticFilesMiddleware, StaticFilesFinder,
    ManifestStaticFilesStorage, get_static_url,
    set_manifest_storage,
)

# Development: serve with caching headers
app.use(StaticFilesMiddleware(
    static_dirs=["static", "node_modules"],
    prefix="/static/",
    max_age=3600,
    gzip_min_size=1024,
))

# Production: collect with hashed filenames
storage = ManifestStaticFilesStorage(
    static_dirs=["static"],
    static_root="staticfiles",
)
result = storage.collectstatic()       # {"copied": 42, ...}
url = storage.url("css/styles.css")    # "css/styles.a1b2c3d4e5f6.css"
name = storage.stored_name("app.js")   # ValueError if missing (strict=True)

# Load manifest (auto-loads from staticfiles.json)
manifest = storage.load_manifest()     # {"css/styles.css": "css/styles.hash.css", ...}

# Global helper for templates
set_manifest_storage(storage)
get_static_url("css/styles.css")       # "/static/css/styles.a1b2c3d4e5f6.css"

# Template usage:
# {{ static('css/styles.css') }}
# {{ 'css/styles.css'|static }}

# Finder: locate files across directories
finder = StaticFilesFinder(dirs=["static", ("vendor", "/path/to/vendor")])
abs_path = finder.find("css/style.css")
all_files = finder.list_all()          # [(rel_path, abs_path), ...]

# Serve collected files with immutable cache (production)
app.use(StaticFilesMiddleware(
    static_root="staticfiles",
    prefix="/static/",
    max_age=31536000,
    immutable=True,
))

Channels (WebSocket Pub/Sub)

from hyperdjango.channels import (
    InMemoryChannelLayer, PgChannelLayer, Message,
    websocket_channel_handler, set_channel_layer, get_channel_layer,
)

# In-memory (single-process)
layer = InMemoryChannelLayer(default_history_size=100)

# PostgreSQL (multi-process)
layer = PgChannelLayer(database_url="postgres://localhost/mydb")
await layer.connect()

# Channels
channel = layer.channel("chat:room1")
sub_id = channel.subscribe(callback)                    # Sync or async callback
sub_id = channel.subscribe(cb, filter_fn=my_filter)     # Filtered
sub_id = channel.subscribe(cb, user_id="alice")         # With auth check
await channel.publish({"text": "Hello!"}, sender="alice")
channel.unsubscribe(sub_id)
channel.subscriber_count()

# Per-channel auth
channel = layer.channel("private:staff", auth_fn=lambda ch, uid: uid in admins)

# Presence tracking
channel.join("user42", metadata={"name": "Alice"})
members = channel.presence()       # [{"user_id": ..., "name": ..., "joined_at": ...}]
channel.presence_count()
channel.leave("user42")

# Message history
recent = channel.history(limit=50)
channel.clear_history()

# Groups (fan-out)
group = layer.group("notifications")
group.add("user:1")
group.add("user:2")
await group.publish({"type": "alert", "text": "Update"})
group.discard("user:1")

# WebSocket bridge
@app.websocket("/ws/chat/{room}")
async def chat(ws):
    await ws.accept()
    ch = layer.channel(f"chat:{ws.path_params['room']}")
    await websocket_channel_handler(ws, ch, user_id="user42")

# Message serialization
msg = Message(channel="test", data={"key": "val"}, sender="alice")
json_str = msg.to_json()
restored = Message.from_json(json_str)

# Global singleton
set_channel_layer(layer)
layer = get_channel_layer()

# Layer management
layer.channel_names()       # ["chat:room1", ...]
layer.group_names()         # ["notifications", ...]
layer.remove_channel("old")
layer.remove_group("old")