Security¶
Security event audit logging, HTTP security headers, CSRF protection, CORS, rate limiting, and compression middleware.
Quick Start¶
from hyperdjango.standalone_middleware import (
SecurityHeadersMiddleware, CSRFMiddleware, CORSMiddleware,
RateLimitMiddleware, CompressionMiddleware,
)
from hyperdjango.security import SecurityLog, SecurityEvent
app = HyperApp()
# Security headers on all responses
app.use(SecurityHeadersMiddleware(hsts=True, csp="default-src 'self'"))
# CSRF protection
app.use(CSRFMiddleware(secret="your-csrf-secret"))
# CORS
app.use(CORSMiddleware(origins=["https://example.com"]))
# Rate limiting (100 requests per minute per IP)
app.use(RateLimitMiddleware(max_requests=100, window=60))
SecurityLog¶
Tracks security-relevant events in a PostgreSQL UNLOGGED table for fast writes with multi-server visibility.
from hyperdjango.security import SecurityLog, SecurityEvent
log = SecurityLog(db)
await log.ensure_table()
ensure_table()¶
Creates the hyper_security_log table and indexes. Uses UNLOGGED for performance (no WAL writes). Falls back to a regular table if UNLOGGED is not supported. Safe to call multiple times.
Table Schema¶
CREATE UNLOGGED TABLE hyper_security_log (
id SERIAL PRIMARY KEY,
event VARCHAR(50) NOT NULL,
user_id INTEGER,
ip_address VARCHAR(45),
user_agent VARCHAR(500),
path VARCHAR(2000),
detail TEXT,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes on event, user_id, ip_address, timestamp DESC
log()¶
Log a security event manually:
await log.log(SecurityEvent.LOGIN_SUCCESS, user_id=42, ip="1.2.3.4")
await log.log(SecurityEvent.LOGIN_FAILED, ip="1.2.3.4", detail="invalid password")
await log.log(SecurityEvent.PERMISSION_DENIED, user_id=42, detail="missing edit_product")
await log.log(SecurityEvent.RATE_LIMIT_HIT, ip="1.2.3.4", detail="100/min exceeded")
| Parameter | Type | Default | Description |
|---|---|---|---|
event |
SecurityEvent |
required | Event type enum value |
user_id |
int \| None |
None |
User ID if known |
ip |
str \| None |
None |
Client IP address |
user_agent |
str \| None |
None |
User-Agent header (truncated to 500 chars) |
path |
str \| None |
None |
Request path |
detail |
str \| None |
None |
Free-text detail about the event |
log_from_request()¶
Log an event with IP, user-agent, path, and user auto-extracted from a Request object:
Auto-extracts:
ipfromrequest.client_ipuser_agentfromrequest.headers["user-agent"](truncated to 500 chars)pathfromrequest.pathuser_idfromrequest.user.idorrequest.user.pk(ifuser_idnot provided)
Event Types¶
Authentication Events¶
| Event | Description |
|---|---|
LOGIN_SUCCESS |
User logged in successfully |
LOGIN_FAILED |
Login attempt with wrong credentials |
LOGOUT |
User logged out |
PASSWORD_CHANGED |
User changed their password |
PASSWORD_RESET_REQUESTED |
Password reset email sent |
PASSWORD_RESET_COMPLETED |
Password reset completed |
Authorization Events¶
| Event | Description |
|---|---|
PERMISSION_DENIED |
User lacks required permission |
CSRF_VIOLATION |
CSRF token missing or invalid |
AUTH_REQUIRED |
Request to protected resource without authentication |
Rate Limiting Events¶
| Event | Description |
|---|---|
RATE_LIMIT_HIT |
Client exceeded rate limit |
Session Events¶
| Event | Description |
|---|---|
SESSION_CREATED |
New session created |
SESSION_DESTROYED |
Session explicitly destroyed (logout) |
SESSION_EXPIRED |
Session expired due to timeout |
SESSION_FIXATION_ATTEMPT |
Detected session fixation attack attempt |
Suspicious Activity Events¶
| Event | Description |
|---|---|
SUSPICIOUS_INPUT |
Input matching known attack patterns |
PATH_TRAVERSAL_ATTEMPT |
Attempt to access files outside allowed directories |
INVALID_TOKEN |
Invalid or expired authentication token |
Querying Events¶
get_recent()¶
Get the most recent security events:
events = await log.get_recent(limit=50)
# Returns list of dicts with: id, event, user_id, ip_address, user_agent, path, detail, timestamp
get_for_user()¶
Get events for a specific user:
get_for_ip()¶
Get events from a specific IP address:
get_by_event()¶
Get events of a specific type within a time window:
failed_logins = await log.get_by_event(SecurityEvent.LOGIN_FAILED, since_hours=1)
csrf_violations = await log.get_by_event(SecurityEvent.CSRF_VIOLATION, since_hours=24)
count_by_event()¶
Count events of a specific type within a time window:
count = await log.count_by_event(SecurityEvent.LOGIN_FAILED, since_hours=1)
if count > 100:
# Possible brute-force attack
await alert_security_team()
count_by_ip()¶
Count events from a specific IP within a time window:
ip_count = await log.count_by_ip("1.2.3.4", SecurityEvent.LOGIN_FAILED, since_hours=1)
if ip_count > 10:
# Lock out this IP
await block_ip("1.2.3.4")
Brute-Force Detection Pattern¶
@app.post("/login")
async def login(request):
data = await request.json()
ip = request.client_ip
# Check if IP is already rate-limited
recent_failures = await log.count_by_ip(ip, SecurityEvent.LOGIN_FAILED, since_hours=1)
if recent_failures > 10:
await log.log(SecurityEvent.RATE_LIMIT_HIT, ip=ip, detail="login lockout")
return Response.json({"error": "Too many attempts"}, status=429)
user = await authenticate(data["email"], data["password"])
if user is None:
await log.log(SecurityEvent.LOGIN_FAILED, ip=ip, detail=f"email={data['email']}")
return Response.json({"error": "Invalid credentials"}, status=401)
await log.log(SecurityEvent.LOGIN_SUCCESS, user_id=user.id, ip=ip)
return Response.json({"token": create_session(user)})
cleanup()¶
Delete entries older than N days:
Global Singleton¶
from hyperdjango.security import get_security_log, set_security_log
# Set during app startup
set_security_log(SecurityLog(db))
# Access from anywhere
log = get_security_log()
SecurityHeadersMiddleware¶
Adds security headers to all responses. Headers are pre-computed at init time for zero per-request overhead.
from hyperdjango.standalone_middleware import SecurityHeadersMiddleware
app.use(SecurityHeadersMiddleware(
content_type_nosniff=True,
frame_options="DENY",
hsts=False,
hsts_max_age=31536000,
csp="default-src 'self'",
referrer_policy="strict-origin-when-cross-origin",
permissions_policy="camera=(), microphone=()",
cross_origin_opener_policy="same-origin",
))
Headers Explained¶
X-Content-Type-Options: nosniff¶
Default: enabled. Prevents the browser from MIME-type sniffing. Without this, a browser might interpret a text file as JavaScript if it contains valid JS, enabling XSS attacks.
X-Frame-Options¶
Default: DENY. Controls whether the page can be embedded in <iframe>, <frame>, or <object>. Prevents clickjacking attacks where an attacker overlays your site with invisible frames.
| Value | Description |
|---|---|
"DENY" |
Page cannot be framed by anyone |
"SAMEORIGIN" |
Page can only be framed by pages on the same origin |
Strict-Transport-Security (HSTS)¶
Default: disabled. Tells the browser to always use HTTPS for this domain. Once set, the browser will not make insecure HTTP requests to this domain for max-age seconds.
SecurityHeadersMiddleware(hsts=True, hsts_max_age=31536000)
# Strict-Transport-Security: max-age=31536000
Only enable HSTS when you are certain HTTPS is fully configured. HSTS is difficult to undo -- if you set max-age=31536000 (1 year), browsers will refuse HTTP connections to your domain for a year.
Content-Security-Policy (CSP)¶
Default: not set. Controls which resources (scripts, styles, images, fonts, etc.) the browser is allowed to load. The most effective defense against XSS attacks.
# Restrictive: only load resources from same origin
SecurityHeadersMiddleware(csp="default-src 'self'")
# Allow inline styles and scripts from a CDN
SecurityHeadersMiddleware(
csp="default-src 'self'; script-src 'self' https://cdn.example.com; style-src 'self' 'unsafe-inline'"
)
# Report violations without blocking
SecurityHeadersMiddleware(
csp="default-src 'self'; report-uri /csp-report"
)
Common CSP directives:
| Directive | Controls |
|---|---|
default-src |
Fallback for all resource types |
script-src |
JavaScript sources |
style-src |
CSS sources |
img-src |
Image sources |
font-src |
Font sources |
connect-src |
Fetch/XHR/WebSocket targets |
frame-src |
iframe sources |
object-src |
Plugin sources (Flash, etc.) |
Referrer-Policy¶
Default: strict-origin-when-cross-origin. Controls how much referrer information is sent with requests.
| Value | Description |
|---|---|
"no-referrer" |
Never send the Referer header |
"same-origin" |
Only send for same-origin requests |
"strict-origin" |
Send origin only (no path) for cross-origin, full URL for same-origin |
"strict-origin-when-cross-origin" |
Default. Origin for cross-origin, full URL for same-origin |
SecurityHeadersMiddleware(referrer_policy="strict-origin-when-cross-origin")
# Referrer-Policy: strict-origin-when-cross-origin
Cross-Origin-Opener-Policy (COOP)¶
Default: same-origin. Isolates the browsing context from cross-origin popup windows. Prevents cross-origin attacks where a popup could access window.opener of your page.
SecurityHeadersMiddleware(cross_origin_opener_policy="same-origin")
# Cross-Origin-Opener-Policy: same-origin
Permissions-Policy¶
Controls which browser features (camera, microphone, geolocation, etc.) the page can use:
SecurityHeadersMiddleware(
permissions_policy="camera=(), microphone=(), geolocation=(self)"
)
# Permissions-Policy: camera=(), microphone=(), geolocation=(self)
() disables the feature entirely. (self) allows it only on same-origin pages.
Default Headers¶
The following headers are set with no configuration needed:
X-Content-Type-Options: nosniff
X-Frame-Options: DENY
Referrer-Policy: strict-origin-when-cross-origin
Cross-Origin-Opener-Policy: same-origin
CSRFMiddleware¶
Double-submit cookie pattern for CSRF protection. Safe methods (GET, HEAD, OPTIONS) pass through. Unsafe methods (POST, PUT, PATCH, DELETE) require a valid CSRF token.
from hyperdjango.standalone_middleware import CSRFMiddleware
app.use(CSRFMiddleware(
secret="your-csrf-secret",
cookie_name="csrf_token",
header_name="x-csrf-token",
field_name="_csrf_token",
exempt_paths={"/webhooks/", "/api/public/"},
))
| Parameter | Type | Default | Description |
|---|---|---|---|
secret |
str |
required | HMAC signing key for token generation |
cookie_name |
str |
"csrf_token" |
Name of the CSRF cookie |
header_name |
str |
"x-csrf-token" |
Header to check for the token |
field_name |
str |
"_csrf_token" |
Form field name to check for the token |
exempt_paths |
set[str] |
set() |
Paths that skip CSRF validation |
How It Works¶
- Token generation: On safe requests (GET), the middleware generates a token with HMAC signature:
{random}.{hmac_signature}and sets it as a cookie. - Token validation: On unsafe requests (POST, PUT, etc.), the middleware checks for the token in either the
X-CSRF-Tokenheader or the_csrf_tokenform field. - Double-submit verification: The submitted token must match the cookie value (double-submit cookie pattern).
- API key bypass: Requests authenticated via API key skip CSRF validation (API keys are not vulnerable to CSRF).
In Templates¶
Include the CSRF token in forms:
<form method="POST" action="/submit">
<input type="hidden" name="_csrf_token" value="{{ csrf_token }}" />
<button type="submit">Submit</button>
</form>
In JavaScript¶
Read the token from the cookie and send it as a header:
const csrfToken = document.cookie
.split("; ")
.find((row) => row.startsWith("csrf_token="))
?.split("=")[1];
fetch("/api/data", {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-CSRF-Token": csrfToken,
},
body: JSON.stringify(data),
});
SQL Injection Prevention¶
HyperDjango prevents SQL injection through parameterized queries at every level:
ORM Queries (Always Safe)¶
The ORM constructs all queries with parameterization. User input is never interpolated into SQL:
# Safe: values are parameterized
users = await User.objects.filter(name=user_input).all()
# Generates: SELECT * FROM users WHERE name = $1 (with user_input as parameter)
Raw Queries (Use Parameters)¶
When writing raw SQL, always use parameterized queries:
# SAFE: parameterized query
rows = await db.query(
"SELECT * FROM users WHERE email = $1 AND active = $2",
email,
True,
)
# DANGEROUS: never do this
rows = await db.query(f"SELECT * FROM users WHERE email = '{email}'") # SQL INJECTION!
The native pg.zig driver uses PostgreSQL's extended query protocol, which separates SQL text from parameter values at the wire level -- parameters cannot break out of their value context.
Prepared Statements¶
Frequently executed queries use prepared statements, which are parsed once and executed with different parameters. The parse phase rejects malformed SQL before any parameters are bound.
XSS Prevention¶
Auto-Escaping in Templates¶
The Zig template engine auto-escapes all variable output by default:
<!-- Template: {{ user_input }} -->
<!-- If user_input = "<script>alert('xss')</script>" -->
<!-- Renders: <script>alert('xss')</script> -->
The native Zig html_escape function (1.7-3.4x faster than Python) escapes <, >, &, ", and '.
Marking Content as Safe¶
To output raw HTML, use the safe filter explicitly:
Only use safe with content you fully control. Never apply it to user input.
JSON in HTML¶
When embedding data in <script> tags, use JSON serialization:
The tojson filter serializes to JSON and escapes </script> sequences.
Clickjacking Protection¶
The X-Frame-Options: DENY header (set by default via SecurityHeadersMiddleware) prevents your pages from being embedded in frames on other sites. This blocks clickjacking attacks where an attacker overlays your UI with invisible frames to trick users into clicking.
For sites that need framing from same-origin pages:
For more granular control, use CSP's frame-ancestors directive instead:
app.use(SecurityHeadersMiddleware(
csp="default-src 'self'; frame-ancestors 'self' https://trusted.example.com"
))
SSL/HTTPS¶
Enforcing HTTPS¶
Configure HSTS to ensure browsers always use HTTPS:
Secure Cookies¶
Always set secure=True on session cookies in production:
response.set_cookie(
"session_id",
value=session_id,
httponly=True,
secure=True, # HTTPS only
samesite="Lax",
)
Detecting HTTPS¶
Behind a reverse proxy, request.is_secure checks X-Forwarded-Proto:
if not request.is_secure:
return Response.redirect(f"https://{request.headers['host']}{request.path}", status=301)
Host Header Validation¶
Fake Host headers can be used for cache poisoning, password reset poisoning, and CSRF attacks. Validate the Host header against an allowlist:
ALLOWED_HOSTS = {"example.com", "www.example.com", "api.example.com"}
@app.middleware
async def validate_host(request, call_next):
host = request.headers.get("host", "").split(":")[0] # Strip port
if host not in ALLOWED_HOSTS:
return Response.json({"error": "Invalid host"}, status=400)
return await call_next(request)
In production behind a reverse proxy, also validate X-Forwarded-Host if your proxy sets it.
Sensitive Data Filtering¶
In Logs¶
Filter sensitive fields from request logging:
SENSITIVE_FIELDS = {"password", "token", "secret", "api_key", "credit_card"}
def filter_sensitive(data):
if isinstance(data, dict):
return {
k: "***" if k.lower() in SENSITIVE_FIELDS else filter_sensitive(v)
for k, v in data.items()
}
return data
In Error Reports¶
The detail field in SecurityLog events should never contain passwords or tokens. Log only identifiers:
# Good: log the email, not the password
await log.log(SecurityEvent.LOGIN_FAILED, ip=ip, detail=f"email={email}")
# Bad: never log credentials
await log.log(SecurityEvent.LOGIN_FAILED, detail=f"email={email} password={password}")
CORSMiddleware¶
Cross-Origin Resource Sharing with preflight handling.
from hyperdjango.standalone_middleware import CORSMiddleware
app.use(CORSMiddleware(
origins=["https://example.com", "https://app.example.com"],
methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
headers=["*"],
allow_credentials=False,
max_age=86400,
))
| Parameter | Type | Default | Description |
|---|---|---|---|
origins |
list[str] |
required | Allowed origins (["*"] for all) |
methods |
list[str] |
["GET", "POST", ...] |
Allowed HTTP methods |
headers |
list[str] |
["*"] |
Allowed request headers |
allow_credentials |
bool |
False |
Allow cookies/auth in cross-origin requests |
max_age |
int |
86400 |
Preflight cache duration in seconds |
OPTIONS preflight requests return 204 with the appropriate CORS headers.
RateLimitMiddleware¶
Sliding window rate limiter with no external dependencies.
from hyperdjango.standalone_middleware import RateLimitMiddleware
app.use(RateLimitMiddleware(
max_requests=100,
window=60,
key_func=lambda r: r.client_ip,
))
| Parameter | Type | Default | Description |
|---|---|---|---|
max_requests |
int |
required | Maximum requests per window |
window |
int |
required | Window size in seconds |
key_func |
callable | lambda r: r.client_ip |
Function to extract rate limit key |
Response headers on every request:
X-RateLimit-Limit: Maximum requests allowedX-RateLimit-Remaining: Requests remaining in windowX-RateLimit-Reset: Seconds until window resets
Returns 429 with Retry-After header when limit exceeded. Periodic stale key cleanup prevents unbounded memory growth.
CompressionMiddleware¶
Gzip compression for responses above a minimum size.
from hyperdjango.standalone_middleware import CompressionMiddleware
app.use(CompressionMiddleware(
min_size=500,
level=6,
))
| Parameter | Type | Default | Description |
|---|---|---|---|
min_size |
int |
500 |
Minimum response size in bytes to compress |
level |
int |
6 |
Gzip compression level (1=fast, 9=best) |
Skips: streaming responses, already-compressed content, images/video/audio, responses with existing Content-Encoding.
Middleware Stack¶
All middleware follows the async (request, call_next) protocol. The stack manages ordering:
from hyperdjango.standalone_middleware import MiddlewareStack
stack = MiddlewareStack()
stack.add(SecurityHeadersMiddleware())
stack.add(CORSMiddleware(origins=["*"]))
stack.add(RateLimitMiddleware(max_requests=100, window=60))
handler = stack.wrap(my_handler)
Or use app.use() which manages the stack automatically:
Recommended Middleware Order¶
# 1. Security headers (outermost -- applies to all responses including errors)
app.use(SecurityHeadersMiddleware(hsts=True))
# 2. CORS (must run before CSRF to handle preflight)
app.use(CORSMiddleware(origins=["https://app.example.com"]))
# 3. Rate limiting (reject abusive traffic early)
app.use(RateLimitMiddleware(max_requests=100, window=60))
# 4. CSRF (after rate limiting, before business logic)
app.use(CSRFMiddleware(secret="..."))
# 5. Compression (innermost -- compress the final response)
app.use(CompressionMiddleware())
Security Checklist¶
A production security checklist for HyperDjango applications:
-
SecurityHeadersMiddlewareenabled with HSTS - CSP configured to restrict script/style sources
-
CSRFMiddlewareenabled with a strong secret - Session cookies set with
httponly=True,secure=True,samesite="Lax" - Rate limiting configured on login and sensitive endpoints
-
SecurityLogrecording authentication and authorization events - All database queries use parameterized values (no string interpolation)
- Template auto-escaping is never globally disabled
- Host header validation in place
- HTTPS enforced (redirect HTTP to HTTPS)
- Sensitive data filtered from logs and error reports
- Security log cleanup scheduled (e.g.,
cleanup(days=90)via background task) - Brute-force detection on login endpoint
- File uploads validated for type and size
- CORS restricted to specific origins (not
*in production)