Models & ORM¶
Define your data schema as Python classes. HyperDjango models map to PostgreSQL tables with a high-performance async ORM backed by the native pg.zig driver.
All ORM operations are async. Every query flows through the pg.zig native PostgreSQL driver — there is no Python-level database adapter. Queries use parameterized $1, $2, ... placeholders and prepared statement caching for maximum throughput.
Table of Contents¶
- Defining Models
- Field Types
- Field Options
- Relationships
- QuerySet API — Methods Returning New QuerySets
- QuerySet API — Methods That Evaluate
- Field Lookups
- Aggregation Functions
- Q Objects
- F Expressions
- Raw SQL
- Transactions
- Model Inheritance
- Model Mixins
- Model Instance Methods
- Composite Primary Keys
- Meta Options
- Database Functions
- Query Cache
- Model Signals
- Performance
Defining Models¶
from hyperdjango import Model, Field
class Article(Model):
class Meta:
table = "articles"
id: int = Field(primary_key=True, auto=True)
title: str = Field()
content: str = Field(default="")
published: bool = Field(default=False)
views: int = Field(default=0)
author_id: int = Field(foreign_key=User)
Every model needs:
- A
Metainner class withtable— the PostgreSQL table name - At least one primary key field (usually
id: int = Field(primary_key=True, auto=True)) - Type-annotated fields with
Field()descriptors
Field Types¶
HyperDjango uses Python type annotations for field types. The ORM maps them to PostgreSQL column types:
| Python Type | PostgreSQL Type | Example |
|---|---|---|
int |
INTEGER / SERIAL |
id: int = Field(primary_key=True, auto=True) |
str |
TEXT |
name: str = Field() |
bool |
BOOLEAN |
active: bool = Field(default=True) |
float |
DOUBLE PRECISION |
price: float = Field(default=0.0) |
datetime |
TIMESTAMPTZ |
created: datetime = Field(default=None) |
date |
DATE |
birthday: date = Field(default=None) |
time |
TIME |
alarm: time = Field(default=None) |
Decimal |
NUMERIC |
amount: Decimal = Field(default=Decimal("0")) |
uuid.UUID |
UUID |
ref: UUID = Field(default=None) |
bytes |
BYTEA |
data: bytes = Field(default=b"") |
dict |
JSONB |
meta: dict[str, str] = Field(default_factory=dict) |
list |
JSONB |
tags: list[str] = Field(default_factory=list) |
Field Options¶
Field(
default=None, # Default value (or _MISSING for required)
primary_key=False, # This is the primary key
auto=False, # Auto-increment (SERIAL)
unique=False, # UNIQUE constraint
index=False, # Create an index
foreign_key=None, # Model class for FK (e.g., User)
related_name=None, # Reverse relation name
one_to_one=False, # OneToOneField (unique FK)
ge=None, # Greater than or equal (validation)
le=None, # Less than or equal (validation)
gt=None, # Greater than (validation)
lt=None, # Less than (validation)
min_length=None, # Minimum string length
pattern=None, # Regex pattern validation
)
Meta.indexes¶
Define composite, partial, GIN, and expression indexes declaratively on the Model:
from hyperdjango.models import Index
class Ticket(TenantMixin, TimestampMixin, Model):
class Meta:
table = "tickets"
indexes = [
# Composite index
Index(fields=("tenant_id", "status_id")),
# DESC ordering
Index(fields=("tenant_id", "-created_at")),
# Unique constraint
Index(fields=("tenant_id", "ticket_number"), unique=True),
# Partial index (WHERE clause)
Index(fields=("tenant_id", "assignee_id"), where="is_deleted = FALSE"),
# GIN full-text search (expression index)
Index(
expressions=("(to_tsvector('english', title || ' ' || description))",),
using="gin",
name="ix_tickets_search",
),
# GIN trigram (requires pg_trgm extension)
Index(fields=("title",), using="gin", opclasses=("gin_trgm_ops",)),
# HNSW vector index
Index(
fields=("embedding",),
using="hnsw",
opclasses=("vector_cosine_ops",),
params={"m": 16, "ef_construction": 64},
),
]
Index Options¶
| Option | Type | Default | Description |
|---|---|---|---|
fields |
tuple[str, ...] |
() |
Column names. Prefix with - for DESC. |
name |
str \| None |
auto | Index name. Auto: idx_{table}_{cols} or uq_{table}_{cols}. |
unique |
bool |
False |
UNIQUE constraint. |
using |
str |
"btree" |
Index type: btree, gin, gist, hnsw, ivfflat. |
where |
str \| None |
None |
Partial index WHERE clause (raw SQL). |
opclasses |
tuple[str, ...] |
() |
Per-column operator classes (e.g., gin_trgm_ops). |
include |
tuple[str, ...] |
() |
INCLUDE columns (covering index). |
params |
dict |
None |
WITH parameters (e.g., {"m": 16}). |
expressions |
tuple[str, ...] |
() |
Raw SQL expressions for functional indexes. |
Index names are auto-generated as idx_{table}_{col1}_{col2} (or uq_ for unique), truncated to 63 characters. Override with explicit name=.
Field(index=True) still works for simple single-column B-tree indexes. Use Meta.indexes for composite, partial, GIN, expression, and other advanced index types.
All indexes are created by hyper setup via generate_ddl_for_model().
Relationships¶
Foreign Keys¶
class Author(Model):
class Meta:
table = "authors"
id: int = Field(primary_key=True, auto=True)
name: str = Field()
class Article(Model):
class Meta:
table = "articles"
id: int = Field(primary_key=True, auto=True)
title: str = Field()
author_id: int = Field(foreign_key=Author)
# Query across FK
articles = await Article.objects.select_related("author").all()
for article in articles:
print(article.author.name) # resolved via JOIN
Many-to-Many¶
from hyperdjango.models import ManyToManyField
class Tag(Model):
class Meta:
table = "tags"
id: int = Field(primary_key=True, auto=True)
name: str = Field()
class Article(Model):
class Meta:
table = "articles"
id: int = Field(primary_key=True, auto=True)
title: str = Field()
tags: ClassVar = ManyToManyField("tags")
# M2M operations
await article.tags.add(tag1, tag2)
await article.tags.remove(tag1)
all_tags = await article.tags.all()
await article.tags.clear()
await article.tags.set([tag1, tag2, tag3])
count = await article.tags.count()
One-to-One¶
Use OneToOneField for relationships where each row in the source table maps to exactly one row in the target table (e.g., user profiles):
from hyperdjango.models import Field, Model, OneToOneField
class User(Model):
class Meta:
table = "users"
id: int = Field(primary_key=True, auto=True)
username: str = Field(unique=True)
class Profile(Model):
class Meta:
table = "profiles"
id: int = Field(primary_key=True, auto=True)
user_id: int = OneToOneField("users", related_name="profile")
bio: str = Field(default="")
website: str = Field(default="")
# Query
profile = await Profile.objects.filter(user_id=user.id).first()
# With JOIN
profiles = await Profile.objects.select_related("user_id").all()
OneToOneField creates a foreign key column with a UNIQUE constraint — the database enforces that each target row is referenced at most once.
QuerySet API — Methods Returning New QuerySets¶
These methods are chainable. They return a new QuerySet without hitting the database until you evaluate it (by calling .all(), iterating, etc.).
filter(**kwargs) -> QuerySet¶
Returns a new QuerySet containing objects that match the given lookup parameters.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**kwargs |
keyword arguments | Field lookups using field__lookup=value syntax |
SQL generated:
Example:
# Simple equality (exact is the default lookup)
published = await Article.objects.filter(published=True).all()
# With lookup operator
recent = await Article.objects.filter(views__gt=100).all()
# Multiple conditions (AND)
popular_published = await Article.objects.filter(published=True, views__gt=1000).all()
# Chaining (also AND)
same = await Article.objects.filter(published=True).filter(views__gt=1000).all()
Return type: QuerySet
Multiple keyword arguments are joined with AND. For OR conditions, use Q objects.
exclude(**kwargs) -> QuerySet¶
Returns a new QuerySet containing objects that do NOT match the given lookup parameters. The inverse of filter().
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**kwargs |
keyword arguments | Field lookups to negate |
SQL generated:
Example:
# Exclude unpublished
visible = await Article.objects.exclude(published=False).all()
# Exclude with lookups
not_empty = await Article.objects.exclude(content__isnull=True).all()
# Chain with filter
published_with_views = await Article.objects.filter(published=True).exclude(views=0).all()
Return type: QuerySet
annotate(*args, **kwargs) -> QuerySet¶
Adds computed fields to each object in the QuerySet using aggregation expressions.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**kwargs |
expression arguments | Named annotations: alias=Expression(...) |
SQL generated:
SELECT articles.*, COUNT(comments.id) AS comment_count
FROM articles
LEFT JOIN comments ON comments.article_id = articles.id
GROUP BY articles.id
Example:
from hyperdjango.expressions import Count, Sum, Avg, F
# Count related objects
articles = await Article.objects.annotate(
comment_count=Count("comments__id"),
).all()
for article in articles:
print(article.comment_count) # added attribute
# Sum across relations
authors = await Author.objects.annotate(
total_views=Sum("articles__views"),
).order_by("-total_views").all()
# Annotate + filter on the annotation
popular = await Article.objects.annotate(
comment_count=Count("comments__id"),
).filter(comment_count__gt=10).all()
# Multiple annotations
stats = await Article.objects.annotate(
comment_count=Count("comments__id"),
avg_rating=Avg("ratings__score"),
).all()
Return type: QuerySet (each object gains the annotated attribute)
order_by(*fields) -> QuerySet¶
Returns a new QuerySet ordered by the given fields.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
Field names. Prefix with - for descending order. |
SQL generated:
Example:
# Ascending (default)
await Article.objects.order_by("title").all()
# Descending (prefix with -)
await Article.objects.order_by("-views").all()
# Multiple columns
await Article.objects.order_by("-published", "title").all()
# Clear default ordering (if Meta.ordering is set)
await Article.objects.order_by().all()
Return type: QuerySet
reverse() -> QuerySet¶
Reverses the ordering of the QuerySet. Only meaningful on an ordered QuerySet.
Signature:
Parameters: None.
SQL generated:
Example:
# Get the last 5 articles by date
last_five = await Article.objects.order_by("pub_date").reverse().limit(5)
Return type: QuerySet
distinct(*fields) -> QuerySet¶
Returns a new QuerySet that uses SELECT DISTINCT in its SQL query. On PostgreSQL, you can pass field names to use DISTINCT ON.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
(Optional) Field names for DISTINCT ON. If empty, applies DISTINCT to all columns. |
SQL generated:
-- No arguments:
SELECT DISTINCT * FROM articles
-- With field arguments (PostgreSQL DISTINCT ON):
SELECT DISTINCT ON (author_id) * FROM articles ORDER BY author_id, pub_date DESC
Example:
# All distinct rows
unique_articles = await Article.objects.values("author_id").distinct().all()
# DISTINCT ON — one article per author (most recent)
one_per_author = await Article.objects.distinct("author_id").order_by("author_id", "-pub_date").all()
Return type: QuerySet
values(*fields) -> QuerySet¶
Returns a QuerySet that returns dictionaries instead of model instances, with only the specified fields.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
Field names to include. If empty, all fields are included. |
SQL generated:
Example:
# Specific columns as dicts
titles = await Article.objects.values("title", "views").all()
# [{"title": "Hello", "views": 42}, {"title": "World", "views": 99}]
# All columns as dicts
all_dicts = await Article.objects.values().all()
# Combined with filter
published_titles = await Article.objects.filter(published=True).values("title").all()
Return type: QuerySet (evaluates to list[dict[str, ...]])
values_list(*fields, flat=False) -> QuerySet¶
Like values() but returns tuples instead of dictionaries. With flat=True and a single field, returns a flat list.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
Field names to include |
flat |
bool |
If True and exactly one field, return flat list instead of 1-tuples |
SQL generated:
Example:
# Tuples
pairs = await Article.objects.values_list("title", "views").all()
# [("Hello", 42), ("World", 99)]
# Flat list of single field
all_titles = await Article.objects.values_list("title", flat=True).all()
# ["Hello", "World"]
# All IDs
all_ids = await Article.objects.values_list("id", flat=True).all()
# [1, 2, 3, ...]
Return type: QuerySet (evaluates to list[tuple[...]] or list[T] when flat=True)
none() -> QuerySet¶
Returns an empty QuerySet that never hits the database. Useful as a sentinel value or base for conditional query building.
Signature:
Parameters: None.
SQL generated: No SQL is executed.
Example:
# Returns empty list, no query
empty = await Article.objects.none().all()
# []
# Useful for conditional queries
if not user.is_authenticated:
qs = Article.objects.none()
else:
qs = Article.objects.filter(author_id=user.id)
results = await qs.all()
Return type: QuerySet (always evaluates to empty)
all() -> QuerySet¶
Returns a copy of the current QuerySet. Often used to evaluate the QuerySet (triggering the database query) or to get a fresh copy of the manager's base QuerySet.
Signature:
Parameters: None.
SQL generated:
Example:
# Get all objects
articles = await Article.objects.all()
# Clone a queryset
qs = Article.objects.filter(published=True)
qs_copy = qs.all() # independent copy
Return type: QuerySet
union(*other_qs, all=False) -> QuerySet¶
Combines results from two or more QuerySets using SQL UNION. By default, removes duplicates. Pass all=True for UNION ALL.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*other_qs |
QuerySet |
One or more QuerySets to union with |
all |
bool |
If True, use UNION ALL (keep duplicates) |
SQL generated:
Example:
qs1 = Article.objects.filter(published=True)
qs2 = Article.objects.filter(views__gt=1000)
# UNION (deduplicates)
combined = await qs1.union(qs2).all()
# UNION ALL (keeps duplicates)
combined_all = await qs1.union(qs2, all=True).all()
Return type: QuerySet
Only LIMIT, OFFSET, and ORDER BY are allowed after union(). You cannot call filter(), exclude(), etc. on the result.
intersection(*other_qs) -> QuerySet¶
Returns only the rows that appear in ALL of the given QuerySets using SQL INTERSECT.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*other_qs |
QuerySet |
One or more QuerySets to intersect with |
SQL generated:
(SELECT * FROM articles WHERE published = true)
INTERSECT
(SELECT * FROM articles WHERE views > 1000)
Example:
published = Article.objects.filter(published=True)
popular = Article.objects.filter(views__gt=1000)
# Only published AND popular
both = await published.intersection(popular).all()
Return type: QuerySet
difference(*other_qs) -> QuerySet¶
Returns rows from the first QuerySet that are NOT in any of the other QuerySets, using SQL EXCEPT.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*other_qs |
QuerySet |
One or more QuerySets to subtract |
SQL generated:
Example:
published = Article.objects.filter(published=True)
popular = Article.objects.filter(views__gt=1000)
# Published but NOT popular
quiet = await published.difference(popular).all()
Return type: QuerySet
select_related(*fields) -> QuerySet¶
Returns a QuerySet that follows foreign-key relationships via LEFT JOIN, loading related objects in the same database query. Eliminates N+1 queries for forward FK relationships.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
FK field names to follow. Use __ for nested relationships. |
SQL generated:
SELECT articles.*, authors.id AS author__id, authors.name AS author__name
FROM articles
LEFT JOIN authors ON articles.author_id = authors.id
Example:
# Single FK — resolved via LEFT JOIN
articles = await Article.objects.select_related("author").all()
for a in articles:
print(a.author.name) # no extra query
# Nested FK (author -> publisher)
articles = await Article.objects.select_related("author__publisher").all()
for a in articles:
print(a.author.publisher.name) # all from one query
# Multiple FKs
articles = await Article.objects.select_related("author", "category").all()
Return type: QuerySet
Use select_related for foreign key and one-to-one relationships (forward direction). For reverse FK and many-to-many, use prefetch_related.
prefetch_related(*lookups) -> QuerySet¶
Returns a QuerySet that prefetches reverse FK and many-to-many relationships in a separate query, then joins them in Python. Prevents N+1 queries for reverse and M2M relationships.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*lookups |
str |
Related names to prefetch |
SQL generated:
-- First query: main objects
SELECT * FROM authors
-- Second query: related objects (executed separately)
SELECT * FROM articles WHERE author_id IN ($1, $2, $3, ...)
Example:
# Reverse FK — resolved via separate query
authors = await Author.objects.prefetch_related("articles").all()
for author in authors:
print(author.articles) # pre-loaded, no N+1
# M2M
articles = await Article.objects.prefetch_related("tags").all()
for article in articles:
print(article.tags) # pre-loaded
# Multiple prefetches
authors = await Author.objects.prefetch_related("articles", "reviews").all()
Return type: QuerySet
Unlike select_related (which uses JOINs), prefetch_related executes a separate query for each prefetch and does the joining in Python. This is more efficient for one-to-many and many-to-many relationships.
defer(*fields) -> QuerySet¶
Returns a QuerySet that defers loading of the specified fields. Deferred fields are not loaded from the database until accessed on the instance. Useful for excluding large text or binary columns.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
Field names to exclude from the initial SELECT |
SQL generated:
Example:
# Defer large content column
articles = await Article.objects.defer("content").all()
for a in articles:
print(a.title) # loaded immediately
print(a.content) # triggers a separate query on access
Return type: QuerySet
You cannot defer the primary key field.
only(*fields) -> QuerySet¶
The opposite of defer(). Returns a QuerySet that loads ONLY the specified fields immediately. All other fields are deferred.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
Field names to include in the SELECT |
SQL generated:
Example:
# Only load id and title
articles = await Article.objects.only("id", "title").all()
for a in articles:
print(a.title) # loaded
print(a.content) # triggers separate query
Return type: QuerySet
The primary key is always included even if not specified.
using(alias) -> QuerySet¶
Selects which database to use for the query. For multi-database setups.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
alias |
str |
Database alias as configured in your database settings |
SQL generated: Same SQL, routed to a different database connection.
Example:
# Read from replica
articles = await Article.objects.using("replica").filter(published=True).all()
# Write to primary
await Article.objects.using("primary").create(title="New Article")
Return type: QuerySet
See Multi-Database for full multi-database routing documentation.
select_for_update(nowait=False, skip_locked=False, of=()) -> QuerySet¶
Locks the selected rows until the end of the transaction. Maps to PostgreSQL SELECT ... FOR UPDATE.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
nowait |
bool |
If True, raise an error immediately if the row is already locked (FOR UPDATE NOWAIT) |
skip_locked |
bool |
If True, skip already-locked rows (FOR UPDATE SKIP LOCKED) |
of |
tuple[str, ...] |
Specific tables to lock in a multi-table query |
SQL generated:
SELECT * FROM accounts WHERE id = $1 FOR UPDATE
-- With nowait:
SELECT * FROM accounts WHERE id = $1 FOR UPDATE NOWAIT
-- With skip_locked:
SELECT * FROM accounts WHERE id = $1 FOR UPDATE SKIP LOCKED
Example:
# Basic row locking
async with db.transaction():
account = await Account.objects.filter(id=1).select_for_update().get()
account.balance -= 100
await account.save()
# NOWAIT — fail immediately if locked
async with db.transaction():
try:
account = await Account.objects.filter(id=1).select_for_update(nowait=True).get()
except DatabaseError:
# Row is locked by another transaction
pass
# SKIP LOCKED — process unlocked rows (job queue pattern)
async with db.transaction():
jobs = await Job.objects.filter(
status="pending"
).select_for_update(skip_locked=True).limit(10)
for job in jobs:
job.status = "processing"
await job.save()
Return type: QuerySet
Must be used inside a db.transaction(). The lock is released when the transaction commits or rolls back.
raw(sql, params=()) -> QuerySet¶
Execute raw SQL and return the results mapped to model instances. For queries that do not map to models, use db.query() instead.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
sql |
str |
Raw SQL query with $1, $2, ... placeholders |
params |
tuple |
Parameter values |
SQL generated: The exact SQL you provide.
Example:
# Raw query returning model instances
articles = await Article.objects.raw(
"SELECT * FROM articles WHERE views > $1 ORDER BY views DESC", (100,)
)
# With complex SQL
articles = await Article.objects.raw("""
SELECT a.*, COUNT(c.id) as comment_count
FROM articles a
LEFT JOIN comments c ON c.article_id = a.id
WHERE a.published = $1
GROUP BY a.id
ORDER BY comment_count DESC
""", (True,))
Return type: QuerySet (model instances with extra attributes from raw columns)
For non-model queries, use Raw SQL (db.query()).
QuerySet API — Methods That Evaluate¶
These methods execute a database query and return results. They are terminal — you cannot chain further QuerySet methods after them.
get(**kwargs) -> Model¶
Returns exactly one object matching the given lookups. Raises DoesNotExist if no object is found, or MultipleObjectsReturned if more than one object matches.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**kwargs |
keyword arguments | Field lookups (same as filter()) |
SQL generated:
The LIMIT 2 is used internally to detect if more than one row matches.
Example:
# Get by primary key
article = await Article.objects.get(id=1)
# Get by unique field
user = await User.objects.get(email="alice@example.com")
# Chained with filter
article = await Article.objects.filter(published=True).get(title="Hello World")
# Handle exceptions
from hyperdjango.models import DoesNotExist, MultipleObjectsReturned
try:
article = await Article.objects.get(id=999)
except DoesNotExist:
print("Not found")
except MultipleObjectsReturned:
print("Multiple matches")
Return type: Model instance
create(**kwargs) -> Model¶
Creates a new object, saves it to the database, and returns it. Equivalent to instantiating, setting fields, and calling save().
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**kwargs |
keyword arguments | Field names and their values |
SQL generated:
Example:
# Create and save in one step
article = await Article.objects.create(
title="New Article",
content="Body text",
published=True,
author_id=1,
)
print(article.id) # auto-assigned by PostgreSQL
Return type: Model instance (with auto-generated fields populated)
get_or_create(defaults=None, **kwargs) -> tuple[Model, bool]¶
Looks up an object by the given kwargs. If not found, creates it using the defaults dict. Returns a tuple of (object, created).
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
defaults |
dict[str, ...] or None |
Fields to set only on creation (not used for lookup) |
**kwargs |
keyword arguments | Fields used for the lookup |
SQL generated:
-- First tries:
SELECT * FROM articles WHERE title = $1 LIMIT 2
-- If not found:
INSERT INTO articles (title, content, published) VALUES ($1, $2, $3) RETURNING *
Example:
# Lookup by title, create with defaults if not found
article, created = await Article.objects.get_or_create(
title="Welcome",
defaults={"content": "Hello world", "published": True},
)
if created:
print("Created new article")
else:
print("Found existing article")
Return type: tuple[Model, bool] — the model instance and whether it was created
update_or_create(defaults=None, **kwargs) -> tuple[Model, bool]¶
Like get_or_create(), but if the object already exists, updates it with the defaults dict.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
defaults |
dict[str, ...] or None |
Fields to set on creation OR update |
**kwargs |
keyword arguments | Fields used for the lookup |
SQL generated:
-- First tries:
SELECT * FROM articles WHERE title = $1 LIMIT 2
-- If found, updates:
UPDATE articles SET content = $1, published = $2 WHERE id = $3
-- If not found, inserts:
INSERT INTO articles (title, content, published) VALUES ($1, $2, $3) RETURNING *
Example:
# Upsert pattern: create or update
article, created = await Article.objects.update_or_create(
title="Daily Report",
defaults={"content": "Updated content", "published": True},
)
# If "Daily Report" existed, content and published are updated
# If it didn't exist, a new article is created with all fields
Return type: tuple[Model, bool] — the model instance and whether it was created
bulk_create(objs, batch_size=None, ignore_conflicts=False, update_conflicts=False, update_fields=None, unique_fields=None) -> list[Model]¶
Inserts multiple objects in a single SQL statement. Dramatically faster than individual save() calls.
Signature:
await QuerySet.bulk_create(
objs,
batch_size=None,
ignore_conflicts=False,
update_conflicts=False,
update_fields=None,
unique_fields=None,
) -> list[Model]
Parameters:
| Parameter | Type | Description |
|---|---|---|
objs |
list[Model] |
Model instances to insert |
batch_size |
int or None |
Maximum number of objects per INSERT statement |
ignore_conflicts |
bool |
If True, silently skip rows that violate unique constraints (ON CONFLICT DO NOTHING) |
update_conflicts |
bool |
If True, update existing rows on conflict (ON CONFLICT ... DO UPDATE) |
update_fields |
list[str] or None |
Fields to update on conflict (required when update_conflicts=True) |
unique_fields |
list[str] or None |
Fields that define the unique constraint for conflict detection |
SQL generated:
-- Basic bulk insert:
INSERT INTO articles (title, content, published) VALUES
($1, $2, $3),
($4, $5, $6),
($7, $8, $9)
RETURNING *
-- With ignore_conflicts:
INSERT INTO articles (title, content) VALUES ($1, $2), ($3, $4)
ON CONFLICT DO NOTHING
-- With update_conflicts:
INSERT INTO articles (title, content) VALUES ($1, $2), ($3, $4)
ON CONFLICT (title) DO UPDATE SET content = EXCLUDED.content
Example:
# Insert 100 articles in one query
articles = [Article(title=f"Article {i}") for i in range(100)]
created = await Article.objects.bulk_create(articles)
# With batch_size (split into smaller INSERTs)
articles = [Article(title=f"Article {i}") for i in range(10000)]
await Article.objects.bulk_create(articles, batch_size=1000)
# Ignore duplicates
await Article.objects.bulk_create(
[Article(title="Unique Title", content="body")],
ignore_conflicts=True,
)
# Upsert pattern
await Article.objects.bulk_create(
[Article(title="Daily Report", content="Updated body")],
update_conflicts=True,
update_fields=["content"],
unique_fields=["title"],
)
Return type: list[Model]
For truly massive inserts (100K+ rows), use the native COPY protocol via db.copy_in() which achieves 536K rows/sec.
bulk_update(objs, fields, batch_size=None) -> int¶
Updates specific fields on multiple objects in a single query.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
objs |
list[Model] |
Model instances with updated values |
fields |
list[str] |
Field names to update |
batch_size |
int or None |
Maximum number of objects per UPDATE statement |
SQL generated:
Example:
# Fetch, modify, bulk update
articles = await Article.objects.filter(published=True).all()
for article in articles:
article.views += 1
count = await Article.objects.bulk_update(articles, ["views"])
print(f"Updated {count} articles")
# With batch_size
await Article.objects.bulk_update(articles, ["views", "title"], batch_size=500)
Return type: int (number of rows updated)
The primary key field cannot be updated. All objects must have their primary key set.
count() -> int¶
Returns the number of rows matching the query. More efficient than len(await qs.all()) because it uses SELECT COUNT(*).
Signature:
Parameters: None.
SQL generated:
Example:
# Total count
total = await Article.objects.count()
# Filtered count
published_count = await Article.objects.filter(published=True).count()
Return type: int
in_bulk(id_list=None, field_name="id") -> dict[..., Model]¶
Returns a dictionary mapping field values to model instances for the given list of IDs.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
id_list |
list or None |
List of values to look up. If None, returns all objects. |
field_name |
str |
Field to use as the dictionary key (must be unique) |
SQL generated:
Example:
# Get multiple articles by ID, as a dict
articles_by_id = await Article.objects.in_bulk([1, 2, 3])
# {1: <Article id=1>, 2: <Article id=2>, 3: <Article id=3>}
article = articles_by_id[2]
# By a different unique field
users_by_email = await User.objects.in_bulk(
["alice@example.com", "bob@example.com"],
field_name="email",
)
Return type: dict[key_type, Model]
iterator(chunk_size=2000) -> AsyncIterator[Model]¶
Returns an async iterator that fetches results in chunks, keeping memory usage constant for large result sets.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
chunk_size |
int |
Number of rows to fetch per database round-trip |
SQL generated:
-- Uses server-side cursors internally:
DECLARE _cursor CURSOR FOR SELECT * FROM articles
FETCH 2000 FROM _cursor
Example:
# Process millions of rows without loading all into memory
async for article in Article.objects.filter(published=True).iterator(chunk_size=500):
process(article)
Return type: AsyncIterator[Model]
latest(*fields) -> Model¶
Returns the latest object by the given date/time field(s). Raises DoesNotExist if the QuerySet is empty.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
Field names to order by (descending). If omitted, uses Meta.ordering. |
SQL generated:
Example:
# Latest by explicit field
latest_article = await Article.objects.latest("pub_date")
# Latest using Meta.ordering (if ordering = ["-pub_date"])
latest_article = await Article.objects.latest()
# Latest within a filtered set
latest_published = await Article.objects.filter(published=True).latest("pub_date")
Return type: Model instance
earliest(*fields) -> Model¶
Returns the earliest (oldest) object by the given date/time field(s). Raises DoesNotExist if the QuerySet is empty.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
*fields |
str |
Field names to order by (ascending). If omitted, uses Meta.ordering. |
SQL generated:
Example:
# Earliest by explicit field
first_article = await Article.objects.earliest("pub_date")
# First user ever created
first_user = await User.objects.earliest("id")
Return type: Model instance
first() -> Model | None¶
Returns the first object of the QuerySet, or None if the QuerySet is empty. Does not raise an exception.
Signature:
Parameters: None.
SQL generated:
Example:
# First by primary key
article = await Article.objects.first()
if article:
print(article.title)
# First by custom ordering
newest = await Article.objects.order_by("-pub_date").first()
Return type: Model | None
last() -> Model | None¶
Returns the last object of the QuerySet, or None if the QuerySet is empty. Reverses the ordering and takes the first result.
Signature:
Parameters: None.
SQL generated:
Example:
# Last by primary key
article = await Article.objects.last()
# Last within ordered set
oldest = await Article.objects.order_by("-pub_date").last()
Return type: Model | None
aggregate(**kwargs) -> dict[str, ...]¶
Returns a dictionary of aggregate values computed over the QuerySet. Unlike annotate(), this returns a single dict, not a QuerySet.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**kwargs |
expression arguments | Named aggregations: alias=AggregateFunc("field") |
SQL generated:
Example:
from hyperdjango.expressions import Count, Sum, Avg, Max, Min, StdDev, Variance
# Single aggregate
result = await Article.objects.aggregate(total=Count("id"))
# {"total": 150}
# Multiple aggregates
stats = await Article.objects.aggregate(
total=Count("id"),
avg_views=Avg("views"),
max_views=Max("views"),
min_views=Min("views"),
total_views=Sum("views"),
)
# {"total": 150, "avg_views": 42.5, "max_views": 1000, "min_views": 0, "total_views": 6375}
# Filtered aggregate
stats = await Article.objects.filter(published=True).aggregate(
count=Count("id"),
avg_views=Avg("views"),
)
Return type: dict[str, int | float | Decimal | None]
exists() -> bool¶
Returns True if the QuerySet contains any results. More efficient than count() > 0 because it uses SELECT 1 ... LIMIT 1.
Signature:
Parameters: None.
SQL generated:
Example:
# Check existence
has_published = await Article.objects.filter(published=True).exists()
if has_published:
print("There are published articles")
# More efficient than:
# if await Article.objects.filter(published=True).count() > 0:
Return type: bool
contains(obj) -> bool¶
Returns True if the QuerySet contains the given object.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
obj |
Model |
A model instance to check for |
SQL generated:
Example:
article = await Article.objects.get(id=1)
is_published = await Article.objects.filter(published=True).contains(article)
Return type: bool
update(**kwargs) -> int¶
Updates all rows matched by the QuerySet and returns the number of rows affected. Does not call save() or trigger signals.
Signature:
Parameters:
| Parameter | Type | Description |
|---|---|---|
**kwargs |
keyword arguments | Field names and their new values. Supports F expressions. |
SQL generated:
Example:
# Publish all articles with enough views
count = await Article.objects.filter(views__gt=100).update(published=True)
print(f"Published {count} articles")
# Atomic increment with F expression
from hyperdjango.expressions import F
await Article.objects.filter(id=1).update(views=F("views") + 1)
# Bulk update multiple fields
await Article.objects.filter(archived=True).update(published=False, views=0)
Return type: int (number of rows updated)
This is a SQL-level update. It does NOT call model save(), does NOT trigger pre_save/post_save signals, and does NOT run model validation. For those, use individual save() calls.
delete() -> tuple[int, dict[str, int]]¶
Deletes all rows matched by the QuerySet and returns the count of deleted objects (including cascaded deletes).
Signature:
Parameters: None.
SQL generated:
Example:
# Delete unpublished articles
count, details = await Article.objects.filter(published=False).delete()
print(f"Deleted {count} objects")
# details: {"articles": 5, "comments": 23} (includes cascaded deletes)
# Delete all (be careful!)
count, details = await Article.objects.all().delete()
Return type: tuple[int, dict[str, int]] — total count and per-table breakdown
This triggers pre_delete and post_delete signals. For bulk deletion without signals, use raw SQL.
explain(analyze=False, verbose=False, costs=True, buffers=False, format="text") -> str¶
Returns the PostgreSQL query execution plan for the QuerySet.
Signature:
await QuerySet.explain(
analyze=False,
verbose=False,
costs=True,
buffers=False,
format="text",
) -> str
Parameters:
| Parameter | Type | Description |
|---|---|---|
analyze |
bool |
If True, actually executes the query and reports real timing data |
verbose |
bool |
If True, show additional detail for each plan node |
costs |
bool |
If True, show estimated startup and total costs (default) |
buffers |
bool |
If True, show buffer usage information (requires analyze=True) |
format |
str |
Output format: "text", "json", "xml", or "yaml" |
SQL generated:
EXPLAIN SELECT * FROM articles WHERE published = $1
-- With analyze:
EXPLAIN (ANALYZE) SELECT * FROM articles WHERE published = $1
-- Full options:
EXPLAIN (ANALYZE, VERBOSE, BUFFERS, FORMAT JSON) SELECT * FROM articles WHERE published = $1
Example:
# Basic plan (estimated costs, no execution)
plan = await Article.objects.filter(published=True).explain()
print(plan)
# Seq Scan on articles (cost=0.00..12.50 rows=100 width=256)
# Filter: (published = true)
# Analyzed plan (actually executes the query)
plan = await Article.objects.filter(published=True).explain(analyze=True)
print(plan)
# Seq Scan on articles (cost=0.00..12.50 rows=100 width=256) (actual time=0.015..0.032 rows=75 loops=1)
# Filter: (published = true)
# Rows Removed by Filter: 25
# Planning Time: 0.052 ms
# Execution Time: 0.048 ms
# JSON format for programmatic use
plan_json = await Article.objects.filter(published=True).explain(format="json")
Return type: str
Use explain(analyze=True) to see real execution times. Without analyze, PostgreSQL only estimates costs without executing the query.
Limiting¶
# First N results
await Article.objects.limit(10)
# Offset + limit (pagination)
await Article.objects.offset(20).limit(10)
These map to SQL LIMIT and OFFSET. For proper pagination with page objects, see Pagination.
Field Lookups¶
Field lookups are specified as keyword arguments to filter(), exclude(), and get(). The syntax is field__lookup=value. If no lookup is specified, exact is assumed.
String Lookups¶
exact¶
Exact match. This is the default lookup when no lookup type is specified.
await Article.objects.filter(id__exact=1).all()
# Shorthand (exact is default):
await Article.objects.filter(id=1).all()
SQL: WHERE id = $1
iexact¶
Case-insensitive exact match.
SQL: WHERE UPPER(username) = UPPER($1) (or WHERE username ILIKE $1)
contains¶
Case-sensitive containment test.
SQL: WHERE title LIKE '%' || $1 || '%'
icontains¶
Case-insensitive containment test.
SQL: WHERE title ILIKE '%' || $1 || '%'
startswith¶
Case-sensitive starts-with.
SQL: WHERE title LIKE $1 || '%'
istartswith¶
Case-insensitive starts-with.
SQL: WHERE title ILIKE $1 || '%'
endswith¶
Case-sensitive ends-with.
SQL: WHERE title LIKE '%' || $1
iendswith¶
Case-insensitive ends-with.
SQL: WHERE title ILIKE '%' || $1
regex¶
Case-sensitive regular expression match (PostgreSQL ~ operator).
SQL: WHERE title ~ $1
iregex¶
Case-insensitive regular expression match (PostgreSQL ~* operator).
SQL: WHERE title ~* $1
Comparison Lookups¶
gt¶
Greater than.
SQL: WHERE views > $1
gte¶
Greater than or equal to.
SQL: WHERE views >= $1
lt¶
Less than.
SQL: WHERE views < $1
lte¶
Less than or equal to.
SQL: WHERE views <= $1
in¶
In a given iterable (list, tuple, or subquery).
# List of values
await Article.objects.filter(id__in=[1, 2, 3]).all()
# Subquery
published_author_ids = Article.objects.filter(published=True).values_list("author_id", flat=True)
await Author.objects.filter(id__in=published_author_ids).all()
SQL: WHERE id IN ($1, $2, $3)
range¶
Inclusive range (maps to BETWEEN).
import datetime
await Article.objects.filter(views__range=(10, 100)).all()
await Article.objects.filter(
pub_date__range=(datetime.date(2024, 1, 1), datetime.date(2024, 12, 31))
).all()
SQL: WHERE views BETWEEN $1 AND $2
isnull¶
Is NULL / IS NOT NULL.
# NULL check
await Article.objects.filter(content__isnull=True).all()
# NOT NULL check
await Article.objects.filter(content__isnull=False).all()
SQL: WHERE content IS NULL / WHERE content IS NOT NULL
Date/Time Lookups¶
These lookups extract components from date, datetime, and time fields.
date¶
Extract the date portion from a datetime field.
SQL: WHERE pub_date::date = $1
year¶
Extract the year.
SQL: WHERE EXTRACT(YEAR FROM pub_date) = $1
iso_year¶
Extract the ISO 8601 year (may differ from calendar year at year boundaries).
SQL: WHERE EXTRACT(ISOYEAR FROM pub_date) = $1
month¶
Extract the month (1-12).
SQL: WHERE EXTRACT(MONTH FROM pub_date) = $1
day¶
Extract the day of the month (1-31).
SQL: WHERE EXTRACT(DAY FROM pub_date) = $1
week¶
Extract the ISO 8601 week number (1-52 or 53).
SQL: WHERE EXTRACT(WEEK FROM pub_date) = $1
week_day¶
Day of the week (1=Sunday through 7=Saturday).
SQL: WHERE EXTRACT(DOW FROM pub_date) + 1 = $1
iso_week_day¶
ISO 8601 day of the week (1=Monday through 7=Sunday).
SQL: WHERE EXTRACT(ISODOW FROM pub_date) = $1
quarter¶
Quarter of the year (1-4).
SQL: WHERE EXTRACT(QUARTER FROM pub_date) = $1
time¶
Extract the time portion from a datetime field.
SQL: WHERE pub_date::time = $1
hour¶
Extract the hour (0-23).
SQL: WHERE EXTRACT(HOUR FROM pub_date) = $1
minute¶
Extract the minute (0-59).
SQL: WHERE EXTRACT(MINUTE FROM pub_date) = $1
second¶
Extract the second (0-59).
SQL: WHERE EXTRACT(SECOND FROM pub_date) = $1
Lookup Chaining¶
Lookups can be chained with date/time transforms:
# Articles published on a Monday in June
await Article.objects.filter(
pub_date__month=6,
pub_date__week_day=2,
).all()
# Articles from Q1 2024 with views > 100
await Article.objects.filter(
pub_date__year=2024,
pub_date__quarter=1,
views__gt=100,
).all()
Lookup Reference Table¶
| Lookup | SQL | Example |
|---|---|---|
exact (default) |
= $1 |
filter(id=1) |
iexact |
ILIKE $1 |
filter(name__iexact="alice") |
contains |
LIKE '%..%' |
filter(title__contains="Django") |
icontains |
ILIKE '%..%' |
filter(title__icontains="django") |
startswith |
LIKE '..%' |
filter(title__startswith="How") |
istartswith |
ILIKE '..%' |
filter(title__istartswith="how") |
endswith |
LIKE '%..' |
filter(title__endswith="?") |
iendswith |
ILIKE '%..' |
filter(title__iendswith="?") |
gt |
> $1 |
filter(views__gt=100) |
gte |
>= $1 |
filter(views__gte=100) |
lt |
< $1 |
filter(views__lt=10) |
lte |
<= $1 |
filter(views__lte=10) |
in |
IN ($1, $2, ...) |
filter(id__in=[1, 2, 3]) |
range |
BETWEEN $1 AND $2 |
filter(views__range=(10, 100)) |
isnull |
IS NULL / IS NOT NULL |
filter(content__isnull=True) |
regex |
~ $1 |
filter(title__regex=r"^\d+") |
iregex |
~* $1 |
filter(title__iregex=r"^how") |
date |
::date = $1 |
filter(pub_date__date=date(2024, 6, 15)) |
year |
EXTRACT(YEAR ...) = $1 |
filter(pub_date__year=2024) |
iso_year |
EXTRACT(ISOYEAR ...) = $1 |
filter(pub_date__iso_year=2024) |
month |
EXTRACT(MONTH ...) = $1 |
filter(pub_date__month=6) |
day |
EXTRACT(DAY ...) = $1 |
filter(pub_date__day=15) |
week |
EXTRACT(WEEK ...) = $1 |
filter(pub_date__week=25) |
week_day |
EXTRACT(DOW ...) + 1 = $1 |
filter(pub_date__week_day=2) |
iso_week_day |
EXTRACT(ISODOW ...) = $1 |
filter(pub_date__iso_week_day=1) |
quarter |
EXTRACT(QUARTER ...) = $1 |
filter(pub_date__quarter=2) |
time |
::time = $1 |
filter(pub_date__time=time(14, 30)) |
hour |
EXTRACT(HOUR ...) = $1 |
filter(pub_date__hour=14) |
minute |
EXTRACT(MINUTE ...) = $1 |
filter(pub_date__minute=30) |
second |
EXTRACT(SECOND ...) = $1 |
filter(pub_date__second=0) |
Aggregation Functions¶
All aggregation functions are imported from hyperdjango.expressions.
Count(field, distinct=False)¶
Counts the number of non-NULL values. Use distinct=True to count only distinct values.
from hyperdjango.expressions import Count
# Total articles
stats = await Article.objects.aggregate(total=Count("id"))
# {"total": 150}
# Distinct authors
stats = await Article.objects.aggregate(authors=Count("author_id", distinct=True))
# {"authors": 12}
# As annotation
articles = await Author.objects.annotate(
article_count=Count("articles__id"),
).all()
SQL: COUNT(id) or COUNT(DISTINCT author_id)
Sum(field)¶
Computes the sum of all values. Returns None if the QuerySet is empty.
from hyperdjango.expressions import Sum
stats = await Article.objects.aggregate(total_views=Sum("views"))
# {"total_views": 6375}
# Filtered sum
stats = await Article.objects.filter(published=True).aggregate(
total_views=Sum("views")
)
# As annotation
authors = await Author.objects.annotate(
total_article_views=Sum("articles__views"),
).all()
SQL: SUM(views)
Avg(field)¶
Computes the arithmetic mean. Returns None if the QuerySet is empty.
from hyperdjango.expressions import Avg
stats = await Article.objects.aggregate(avg_views=Avg("views"))
# {"avg_views": 42.5}
SQL: AVG(views)
Max(field)¶
Returns the maximum value. Returns None if the QuerySet is empty.
from hyperdjango.expressions import Max
stats = await Article.objects.aggregate(max_views=Max("views"))
# {"max_views": 1000}
# Useful for dates
stats = await Article.objects.aggregate(latest=Max("pub_date"))
SQL: MAX(views)
Min(field)¶
Returns the minimum value. Returns None if the QuerySet is empty.
from hyperdjango.expressions import Min
stats = await Article.objects.aggregate(min_views=Min("views"))
# {"min_views": 0}
# Earliest date
stats = await Article.objects.aggregate(earliest=Min("pub_date"))
SQL: MIN(views)
StdDev(field, sample=False)¶
Computes the standard deviation. By default uses population standard deviation. Pass sample=True for sample standard deviation.
from hyperdjango.expressions import StdDev
# Population standard deviation
stats = await Article.objects.aggregate(stddev=StdDev("views"))
# Sample standard deviation
stats = await Article.objects.aggregate(stddev=StdDev("views", sample=True))
SQL: STDDEV_POP(views) or STDDEV_SAMP(views)
Variance(field, sample=False)¶
Computes the variance. By default uses population variance. Pass sample=True for sample variance.
from hyperdjango.expressions import Variance
# Population variance
stats = await Article.objects.aggregate(var=Variance("views"))
# Sample variance
stats = await Article.objects.aggregate(var=Variance("views", sample=True))
SQL: VAR_POP(views) or VAR_SAMP(views)
Combining Aggregations¶
Multiple aggregation functions can be used together in a single aggregate() call:
from hyperdjango.expressions import Count, Sum, Avg, Max, Min, StdDev, Variance
stats = await Article.objects.filter(published=True).aggregate(
count=Count("id"),
total_views=Sum("views"),
avg_views=Avg("views"),
max_views=Max("views"),
min_views=Min("views"),
stddev_views=StdDev("views"),
variance_views=Variance("views"),
)
# Single query, one result dict
SQL:
SELECT
COUNT(id) AS count,
SUM(views) AS total_views,
AVG(views) AS avg_views,
MAX(views) AS max_views,
MIN(views) AS min_views,
STDDEV_POP(views) AS stddev_views,
VAR_POP(views) AS variance_views
FROM articles
WHERE published = true
Q Objects¶
Build complex queries with AND, OR, and NOT using Q objects.
OR Queries¶
# Published OR has many views
await Article.objects.filter(
Q(published=True) | Q(views__gt=1000)
).all()
SQL: WHERE published = true OR views > 1000
AND Queries¶
# Published AND recent (Q objects default to AND)
await Article.objects.filter(
Q(published=True) & Q(pub_date__year=2024)
).all()
SQL: WHERE published = true AND EXTRACT(YEAR FROM pub_date) = 2024
NOT Queries¶
SQL: WHERE NOT published = true
Complex Combinations¶
# (published OR featured) AND NOT archived
await Article.objects.filter(
(Q(published=True) | Q(featured=True)) & ~Q(archived=True)
).all()
SQL: WHERE (published = true OR featured = true) AND NOT archived = true
Mixing Q Objects with Keyword Arguments¶
# Keyword args are ANDed, Q objects allow OR
await Article.objects.filter(
Q(title__contains="Python") | Q(title__contains="Django"),
published=True, # ANDed with the Q expression
).all()
SQL: WHERE (title LIKE '%Python%' OR title LIKE '%Django%') AND published = true
Dynamic Q Construction¶
# Build queries programmatically
conditions = Q()
for tag_name in ["python", "django", "async"]:
conditions |= Q(tags__name=tag_name)
await Article.objects.filter(conditions).all()
F Expressions¶
Reference database columns in queries without loading them into Python. Enables atomic operations and column-to-column comparisons.
Atomic Updates¶
# Increment views atomically (no race condition)
await Article.objects.filter(id=1).update(views=F("views") + 1)
SQL: UPDATE articles SET views = views + 1 WHERE id = 1
Arithmetic Operations¶
# Decrement
await Account.objects.filter(id=1).update(balance=F("balance") - 100)
# Multiply
await Product.objects.filter(id=1).update(price=F("price") * 1.1)
# Divide
await Stats.objects.update(average=F("total") / F("count"))
# Modulo
await Article.objects.update(bucket=F("id") % 10)
Column Comparisons¶
# Articles where views exceed the minimum threshold
await Article.objects.filter(views__gt=F("min_views")).all()
# Entries where word count exceeds the summary limit
await Article.objects.filter(word_count__gt=F("summary_limit")).all()
SQL: WHERE views > min_views
F Expressions with Annotations¶
from hyperdjango.expressions import F, Value
# Computed column
articles = await Article.objects.annotate(
double_views=F("views") * 2,
).all()
# Difference between fields
articles = await Article.objects.annotate(
view_deficit=F("target_views") - F("views"),
).filter(view_deficit__gt=0).all()
F Expressions in order_by¶
Raw SQL¶
For queries that cannot be expressed with the ORM, use the db interface directly. All raw queries go through pg.zig.
db.query(sql, *params) -> list[tuple]¶
Execute a SELECT and return rows as tuples.
db.query_val(sql, *params) -> Any¶
Execute a query and return a single scalar value.
count = await db.query_val("SELECT COUNT(*) FROM articles")
max_views = await db.query_val("SELECT MAX(views) FROM articles WHERE published = $1", True)
db.execute(sql, *params) -> int¶
Execute an INSERT, UPDATE, or DELETE and return the number of affected rows.
db.copy_in(table, columns, rows)¶
Bulk-insert using PostgreSQL's COPY protocol (536K rows/sec).
rows = [(f"Article {i}", f"Content {i}", True) for i in range(100000)]
await db.copy_in("articles", ["title", "content", "published"], rows)
Transactions¶
Basic Transactions¶
async with db.transaction():
article = await Article.objects.create(title="Draft")
await Choice.objects.create(question_id=article.id, choice_text="Yes")
# Both committed together, or both rolled back on error
Nested Transactions (SAVEPOINTs)¶
async with db.transaction():
await do_outer_work()
async with db.transaction(): # SAVEPOINT
await do_inner_work() # can fail independently
# Triple nesting
async with db.transaction():
await step_one()
async with db.transaction(): # SAVEPOINT sp1
await step_two()
async with db.transaction(): # SAVEPOINT sp2
await step_three() # can fail without rolling back sp1
Transaction with select_for_update¶
async with db.transaction():
# Lock the account row
account = await Account.objects.filter(id=1).select_for_update().get()
if account.balance >= amount:
account.balance -= amount
await account.save()
else:
raise InsufficientFunds()
# Lock released on commit/rollback
Model Inheritance¶
Abstract Models¶
Share fields without creating a table:
class TimestampMixin(Model):
class Meta:
abstract = True
created_at: datetime | None = Field(default=None)
updated_at: datetime | None = Field(default=None)
class Article(TimestampMixin, Model):
class Meta:
table = "articles"
id: int = Field(primary_key=True, auto=True)
title: str = Field()
# Inherits created_at and updated_at
Abstract models never create a database table. They exist purely to share field definitions and methods across concrete models.
Proxy Models¶
Same table, different Python class:
class PublishedArticle(Article):
class Meta:
proxy = True
# Same table, can add methods or override behavior
@classmethod
async def get_recent(cls, days=7):
cutoff = datetime.now() - timedelta(days=days)
return await cls.objects.filter(
published=True, pub_date__gte=cutoff
).all()
Proxy models share the same database table as their parent. They can add Python methods, change default ordering, or customize the manager, but cannot add new fields.
Single-Table Inheritance (STI)¶
Multiple model types sharing one table, distinguished by a discriminator column:
class Vehicle(Model):
class Meta:
table = "vehicles"
id: int = Field(primary_key=True, auto=True)
name: str = Field()
type: str = Field(default="vehicle") # discriminator column
top_speed: int = Field(default=0)
class Car(Vehicle):
class Meta:
sti = True # shares parent's table
sti_type = "car" # discriminator value
doors: int = Field(default=4)
class Truck(Vehicle):
class Meta:
sti = True
sti_type = "truck"
payload_tons: int = Field(default=0)
How it works:
- All rows live in
vehiclestable.CarandTruckadd nullable columns. Car.objects.all()auto-addsWHERE type = 'car'— you only see cars.Vehicle.objects.all()returns all rows (cars, trucks, base vehicles).car.save()auto-setstype = 'car'before INSERT.
Configuration options:
| Meta attribute | Default | Description |
|---|---|---|
sti |
False |
Enable single-table inheritance |
sti_type |
classname.lower() |
Discriminator value for this child |
sti_column |
"type" |
Name of the discriminator column |
When to use STI vs separate tables:
- STI: Few child types, similar fields, query across types often. One table, one index.
- Separate tables: Many fields per child, rarely query across types, strict nullability.
Multi-Level Inheritance¶
class BaseContent(Model):
class Meta:
abstract = True
id: int = Field(primary_key=True, auto=True)
title: str = Field()
created_at: datetime | None = Field(default=None)
class Article(BaseContent):
class Meta:
table = "articles"
content: str = Field(default="")
published: bool = Field(default=False)
class Video(BaseContent):
class Meta:
table = "videos"
url: str = Field()
duration: int = Field(default=0)
Model Mixins¶
Built-in mixins for common patterns:
from hyperdjango.mixins import TimestampMixin, SoftDeleteMixin, OwnershipMixin, VersionedMixin
class Article(TimestampMixin, SoftDeleteMixin, Model):
class Meta:
table = "articles"
id: int = Field(primary_key=True, auto=True)
title: str = Field()
TimestampMixin¶
Auto-managed created_at and updated_at fields. created_at is set on first save, updated_at is set on every save.
SoftDeleteMixin¶
Adds deleted_at field. Calling .delete() sets the timestamp instead of removing the row. Queries automatically exclude soft-deleted records. Use .hard_delete() to permanently remove.
await article.delete() # Sets deleted_at, row still in DB
await article.hard_delete() # Actually removes the row
# Include soft-deleted in queries
all_including_deleted = await Article.objects.with_deleted().all()
# Restore a soft-deleted object
await article.restore()
OwnershipMixin¶
Tracks created_by and updated_by user references.
VersionedMixin¶
Append-only versioning. Every save creates a new version record.
history = await article.get_history()
# [<ArticleVersion v3>, <ArticleVersion v2>, <ArticleVersion v1>]
Composing Mixins¶
class AuditedArticle(TimestampMixin, SoftDeleteMixin, OwnershipMixin, VersionedMixin, Model):
class Meta:
table = "audited_articles"
id: int = Field(primary_key=True, auto=True)
title: str = Field()
# Has: created_at, updated_at, deleted_at, created_by, updated_by, version history
Model Instance Methods¶
article = Article(title="Hello")
# Save (INSERT on first save, UPDATE after)
await article.save()
# Check if persisted
article.is_persisted # True after save
# Primary key
article.pk # shortcut for article.id
# Refresh from database
await article.refresh_from_db()
# Delete
await article.delete()
save()¶
Inserts a new row (if the object has no primary key or has not been saved before) or updates an existing row.
article = Article(title="Draft", published=False)
await article.save() # INSERT
print(article.id) # auto-assigned
article.published = True
await article.save() # UPDATE
SQL:
-- First save:
INSERT INTO articles (title, published) VALUES ($1, $2) RETURNING id
-- Subsequent saves:
UPDATE articles SET title = $1, published = $2 WHERE id = $3
refresh_from_db()¶
Reloads the object's field values from the database.
article = await Article.objects.get(id=1)
# ... another process updates the article ...
await article.refresh_from_db()
# article now has the latest values
delete()¶
Deletes the object from the database.
Composite Primary Keys¶
class OrderItem(Model):
class Meta:
table = "order_items"
order_id: int = Field(primary_key=True)
product_id: int = Field(primary_key=True)
quantity: int = Field(default=1)
# Composite PK as tuple
item = await OrderItem.objects.get(order_id=1, product_id=42)
item.pk # (1, 42)
Composite primary keys are fully supported. The .pk property returns a tuple of the key fields.
Meta Options¶
Control model behavior with Meta inner class options:
class Article(Model):
class Meta:
table = "articles"
ordering = ["-pub_date"] # Default sort order
abstract = False # Set True for abstract base models
proxy = False # Set True for proxy models
unlogged = False # Set True for UNLOGGED tables (no WAL, fast, lost on crash)
cache_ttl = 60 # Query cache TTL in seconds
database = "default" # Database alias for multi-db routing
| Option | Type | Description |
|---|---|---|
table |
str |
PostgreSQL table name (required for concrete models) |
ordering |
list[str] |
Default ORDER BY columns. Prefix - for descending. |
abstract |
bool |
True for abstract base models (no table created) |
proxy |
bool |
True for proxy models (same table, different class) |
unlogged |
bool |
True for UNLOGGED tables — no WAL writes, faster inserts, data lost on crash. Ideal for caches, rate-limit logs, analytics scratch tables. |
cache_ttl |
int |
Query cache TTL in seconds (see Query Cache) |
database |
str |
Target database for multi-db routing (see Multi-Database) |
Database Functions¶
Use native PostgreSQL functions via raw SQL:
# COALESCE — return first non-null value
await db.query("SELECT COALESCE(bio, 'No bio') FROM users WHERE id = $1", 1)
# EXTRACT — pull date/time components
await db.query("SELECT EXTRACT(YEAR FROM created_at) as year FROM articles")
# CONCAT — string concatenation
await db.query("SELECT CONCAT(first_name, ' ', last_name) as full_name FROM users")
# CAST — type conversion
await db.query("SELECT CAST(views AS TEXT) FROM articles WHERE id = $1", 1)
# NOW — current timestamp
await db.query("SELECT * FROM articles WHERE pub_date <= NOW()")
# LOWER / UPPER
await db.query("SELECT * FROM users WHERE LOWER(username) = $1", "alice")
# LENGTH
await db.query("SELECT * FROM articles WHERE LENGTH(title) > $1", 100)
# SUBSTRING
await db.query("SELECT SUBSTRING(title FROM 1 FOR 50) as short_title FROM articles")
# ARRAY_AGG
await db.query("""
SELECT author_id, ARRAY_AGG(title) as titles
FROM articles
GROUP BY author_id
""")
# JSON operators
await db.query("SELECT meta->>'key' FROM articles WHERE meta @> $1::jsonb", '{"type": "tutorial"}')
# Window functions
await db.query("""
SELECT title, views,
ROW_NUMBER() OVER (ORDER BY views DESC) as rank
FROM articles
WHERE published = true
""")
# Common Table Expressions (CTE)
await db.query("""
WITH popular AS (
SELECT * FROM articles WHERE views > $1
)
SELECT a.*, u.name as author_name
FROM popular a
JOIN users u ON u.id = a.author_id
""", 1000)
Query Cache¶
Automatic caching with version-based invalidation:
class Article(Model):
class Meta:
table = "articles"
cache_ttl = 60 # Cache queries for 60 seconds
id: int = Field(primary_key=True, auto=True)
title: str = Field()
When cache_ttl is set, queries are cached automatically. The cache is invalidated when any write operation (create, update, delete, bulk_create, bulk_update) occurs on the model. FK dependencies are tracked so that a write to a related model also invalidates the cache.
See Query Cache for full documentation including cache warming, statistics, and manual invalidation.
Model Signals¶
Hook into model lifecycle events with signals:
from hyperdjango.signals import pre_save, post_save, pre_delete, post_delete
@post_save.connect(sender=Article)
async def on_article_saved(sender, instance, created, **kwargs):
if created:
print(f"New article: {instance.title}")
@pre_delete.connect(sender=Article)
async def on_article_deleting(sender, instance, **kwargs):
print(f"About to delete: {instance.title}")
@post_delete.connect(sender=Article)
async def on_article_deleted(sender, instance, **kwargs):
print(f"Deleted article: {instance.title}")
@pre_save.connect(sender=Article)
async def on_article_saving(sender, instance, **kwargs):
# Modify before save
instance.updated_at = datetime.now()
Available signals:
| Signal | When it fires |
|---|---|
pre_save |
Before save() (both INSERT and UPDATE) |
post_save |
After save() (includes created boolean) |
pre_delete |
Before delete() |
post_delete |
After delete() |
See Signals for the full signal API including custom signals, pre_init, post_init, and user authentication signals.
Performance¶
The pg.zig native driver delivers:
- SELECT by PK: 2x faster than psycopg3
- SELECT range: 4x faster
- COPY bulk import: 43x faster (536K rows/sec)
- Prepared statement caching: repeated queries skip Parse phase
- Connection pipelining: 20 queries in 0.24ms vs 1.40ms sequential
Performance Tips¶
- Use
select_related()for forward FK lookups to avoid N+1 queries. - Use
prefetch_related()for reverse FK and M2M lookups. - Use
values()orvalues_list()when you only need specific columns. - Use
defer()to skip loading large text/binary columns. - Use
only()when you need just a few fields from a wide table. - Use
exists()instead ofcount() > 0for existence checks. - Use
bulk_create()for batch inserts instead of individualsave()calls. - Use
bulk_update()for batch updates instead of individualsave()calls. - Use
update()for SQL-level updates that do not need model validation or signals. - Use
iterator()for processing large result sets without loading everything into memory. - Use
select_for_update(skip_locked=True)for job queue patterns. - Use
explain(analyze=True)to diagnose slow queries. - Use
db.copy_in()for truly massive inserts (100K+ rows). - Set
Meta.cache_ttlon read-heavy models for automatic query caching. - Use
F()expressions for atomic counter updates to avoid race conditions.