Tasks -- In-Process Background Task Queue¶
In-process background task queue using Python's free-threading (3.14t). No external dependencies. Priority queue, result tracking, retry with exponential backoff, timer-based scheduling (via the scheduler library), dead letter queue, task groups, lifecycle hooks, per-user pending limits, and circuit breakers.
WARNING: This task system is NOT persistent. All pending tasks, results, schedules, and dead letters exist only in memory. If the process restarts, crashes, or is killed -- everything is lost. Do NOT use this for tasks that must reliably execute (billing, critical notifications, scheduled reports).
Use this for: Fire-and-forget side effects (send email after signup, resize image, push webhook), short-lived retryable operations (API calls with retry), and periodic housekeeping within a running process (clear expired sessions, flush metrics).
Do NOT use this for: Nightly reports, billing cycles, scheduled jobs that must survive deploys/restarts, distributed task processing across multiple workers, or anything where a missed execution has business consequences.
Future: For persistent, crash-recoverable, distributed recurring task workflows, HyperDjango will integrate pyjobby -- a PostgreSQL-backed persistent job queue with durable scheduling, crash recovery, and multi-worker support.
Request --> Handler --> task.delay() --> Response (immediate)
|
+--> Worker Thread --> Execute Task
|
+--> on_success / on_failure / on_retry hooks
+--> Dead Letter Queue (on permanent failure)
from hyperdjango import HyperApp
app = HyperApp()
@app.task
async def send_email(to, subject, body):
await email_service.send(to, subject, body)
# Call directly (synchronous):
send_email("user@example.com", "Hello", "Body")
# Enqueue for background execution:
handle = send_email.delay("user@example.com", "Hello", "Body")
handle.status() # TaskStatus.PENDING / RUNNING / SUCCESS / FAILED
handle.result() # blocks until done, returns result
@task Decorator¶
The @task decorator wraps a function so it can be called normally (synchronous) or enqueued for background execution via .delay(). Both async and sync functions are supported.
Basic Usage¶
from hyperdjango.tasks import task
@task
async def process_order(order_id):
order = await db.query_one("SELECT * FROM orders WHERE id = $1", order_id)
await charge_payment(order)
await send_confirmation(order)
return order_id
# Synchronous call (blocks until done):
result = process_order(42)
# Background call (returns immediately):
handle = process_order.delay(42)
Sync functions work the same way:
@task
def generate_report(report_id):
data = compute_report(report_id)
save_report(report_id, data)
return report_id
With Options¶
from hyperdjango.tasks import task, TaskPriority
@task(
priority=TaskPriority.HIGH,
max_retries=3,
retry_delay=1.0,
retry_backoff=2.0,
retry_on=(ConnectionError, TimeoutError),
on_success=lambda result: print(f"Done: {result}"),
on_failure=lambda exc: alert_team(str(exc)),
on_retry=lambda exc, attempt: print(f"Retry {attempt}: {exc}"),
)
async def fetch_external_data(url):
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=30)
resp.raise_for_status()
return resp.json()
Custom Task Queue¶
from hyperdjango.tasks import task, TaskQueue
my_queue = TaskQueue(workers=8, max_queue_size=50000)
@task(queue=my_queue)
async def heavy_task(data):
...
Task Priority Levels¶
Tasks are processed in priority order. Higher priority tasks are dequeued first.
| Priority | Value | Use Case |
|---|---|---|
TaskPriority.LOW |
0 | Batch jobs, cleanup, reports |
TaskPriority.NORMAL |
10 | Default -- most tasks |
TaskPriority.HIGH |
20 | User-triggered actions, notifications |
TaskPriority.CRITICAL |
30 | Payment processing, security alerts |
from hyperdjango.tasks import task, TaskPriority
@task(priority=TaskPriority.CRITICAL)
async def process_payment(payment_id):
...
@task(priority=TaskPriority.LOW)
async def generate_daily_report():
...
TaskHandle -- Tracking Results¶
.delay() returns a TaskHandle for monitoring the task.
handle.status()¶
Returns the current TaskStatus:
| Status | Description |
|---|---|
PENDING |
Queued, not yet started |
RUNNING |
Currently executing |
SUCCESS |
Completed successfully |
FAILED |
Permanently failed (all retries exhausted) |
RETRYING |
Failed but will retry |
CANCELLED |
Cancelled before execution |
handle = send_email.delay("user@example.com", "Hello", "Body")
print(handle.status()) # TaskStatus.PENDING
handle.result(timeout=None)¶
Block until the task completes and return its result. Raises RuntimeError if the task failed or was cancelled. Raises TimeoutError if the timeout expires.
handle = process_order.delay(42)
result = handle.result(timeout=30.0) # blocks up to 30 seconds
print(result) # 42 (the return value)
handle.cancel()¶
Cancel the task if it is still pending. Returns True if successfully cancelled.
handle.is_done()¶
Non-blocking check whether the task has finished (success, failed, or cancelled).
Retry with Exponential Backoff¶
Configure automatic retries with exponential backoff and jitter.
@task(
max_retries=5, # retry up to 5 times
retry_delay=1.0, # initial delay: 1 second
retry_backoff=2.0, # multiply delay by 2 each attempt
retry_on=(ConnectionError, TimeoutError), # only retry these exceptions
)
async def call_external_api(endpoint):
...
Retry timing with the above settings:
| Attempt | Delay (base) | With Jitter (approx) |
|---|---|---|
| 1 | 1.0s | 1.0 - 1.25s |
| 2 | 2.0s | 2.0 - 2.50s |
| 3 | 4.0s | 4.0 - 5.00s |
| 4 | 8.0s | 8.0 - 10.0s |
| 5 | 16.0s | 16.0 - 20.0s |
Jitter is random between 0 and 25% of the base delay, preventing thundering herd on retries.
If retry_on is None (default), all exceptions trigger a retry. Set it to a tuple of specific exception types to only retry those.
After all retries are exhausted, the task is sent to the dead letter queue.
Task Scheduling¶
Schedule tasks using TaskScheduler, powered by the scheduler library's timer-based engine. No poll loops — the library calculates exact next-execution times with proper timezone support.
If task_queue is None, the global default queue is used.
Timing Methods¶
The add() method accepts exactly one timing parameter:
| Parameter | Type | Description |
|---|---|---|
interval |
float |
Seconds between executions (cyclic) |
daily |
datetime.time |
Run at this time every day |
hourly |
datetime.time |
Run at this minute:second every hour |
weekly |
Weekday(time) |
Run on a specific weekday (e.g., Monday(time(hour=9))) |
minutely |
datetime.time |
Run at this second every minute |
cron |
str |
5-field cron expression (translated to scheduler timing) |
Interval Scheduling¶
from hyperdjango.tasks import TaskScheduler, task
scheduler = TaskScheduler()
@task
async def cleanup_expired_sessions():
await db.execute("DELETE FROM sessions WHERE expires_at < NOW()")
# Run every 5 minutes (300 seconds)
schedule_id = scheduler.add(cleanup_expired_sessions, interval=300.0)
scheduler.start() # starts the scheduler background thread
Daily / Hourly / Weekly Scheduling¶
from datetime import time
from hyperdjango.tasks import TaskScheduler, task, Monday, Friday
scheduler = TaskScheduler()
@task
async def send_daily_digest():
users = await db.query("SELECT * FROM users WHERE digest_enabled = true")
for user in users:
await send_digest_email(user)
# Every day at 8:00 AM
scheduler.add(send_daily_digest, daily=time(hour=8))
# Every hour at :30
scheduler.add(check_health, hourly=time(minute=30))
# Every Monday at 9:30 AM
scheduler.add(generate_weekly_report, weekly=Monday(time(hour=9, minute=30)))
# Every Friday at 6 PM
scheduler.add(send_weekly_summary, weekly=Friday(time(hour=18)))
Weekday triggers available: Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday — all re-exported from hyperdjango.tasks.
Cron Scheduling¶
Standard 5-field cron expressions are still supported. They are automatically translated to the scheduler library's native timing:
# Every 15 minutes → cyclic(timedelta(minutes=15))
scheduler.add(check_health, cron="*/15 * * * *")
# Daily at 8 AM → daily(time(hour=8))
scheduler.add(send_daily_digest, cron="0 8 * * *")
# Every Monday midnight → weekly(Monday(time(0,0)))
scheduler.add(weekly_report, cron="0 0 * * 1")
Cron field syntax supports: * (any), */N (every N), N (exact), N-M (range), N,M (list). Day of week: 0 = Sunday.
Managing Schedules¶
# Add with arguments
schedule_id = scheduler.add(
process_queue,
interval=60.0,
args=("high-priority",),
kwargs={"batch_size": 100},
)
# Remove a schedule
scheduler.remove(schedule_id) # returns True if found
# Check count
print(scheduler.count) # number of scheduled entries
# Stop the scheduler
scheduler.stop()
Dead Letter Queue¶
Tasks that permanently fail (all retries exhausted) are sent to the dead letter queue for inspection and manual retry.
from hyperdjango.tasks import TaskQueue
queue = TaskQueue()
queue.start()
# Inspect dead letters
letters = queue.dead_letters.peek(10) # last 10 dead letters
for letter in letters:
print(f"{letter.func_name}: {letter.error}")
print(f" Args: {letter.args}")
print(f" Attempts: {letter.attempts}")
print(f" Traceback: {letter.traceback}")
# Retry a dead letter
handle = queue.dead_letters.retry(letter.task_id)
if handle is not None:
result = handle.result(timeout=30)
# Pop the oldest dead letter
letter = queue.dead_letters.pop()
# Clear all dead letters
queue.dead_letters.clear()
# Check size
print(queue.dead_letters.size)
DeadLetter Fields¶
| Field | Type | Description |
|---|---|---|
task_id |
str |
Original task ID |
func_name |
str |
Function name |
args |
tuple |
Positional arguments |
kwargs |
dict |
Keyword arguments |
error |
str |
Error message |
traceback |
str |
Full traceback |
failed_at |
float |
Monotonic timestamp of failure |
attempts |
int |
Total attempts made |
The DLQ has a configurable max size (default 10,000). When full, the oldest entries are evicted. Configure via the TASK_DLQ_MAX_SIZE setting.
Per-User Task Limits¶
Prevent any single user from flooding the queue by setting a per-user pending task limit:
from hyperdjango.tasks import TaskUserLimitError, _task_queue
# Set limit (0 = unlimited, which is the default)
_task_queue._max_pending_per_user = 10
# Pass user_id when enqueuing
handle = send_email.delay("user@example.com", "Hello", "Body", user_id="user_42")
# If user_42 already has 10 pending tasks:
try:
handle = send_email.delay("user@example.com", "Again", "Body", user_id="user_42")
except TaskUserLimitError:
# User has too many pending tasks — return 429 or similar
pass
# Check current pending count
count = _task_queue.get_user_pending("user_42")
The pending count is decremented automatically when a task completes (success, failure, or cancellation). Configure the limit via the TASK_MAX_PENDING_PER_USER setting.
Circuit Breaker¶
Automatically stop submitting tasks to a function that keeps failing. The circuit breaker tracks failures per function name over a rolling window:
- CLOSED (normal): Tasks execute normally. Failures are counted.
- OPEN (tripped): New submissions for this function are rejected with
TaskCircuitOpenError. Opens afterfailure_thresholdfailures within the rolling window. - HALF_OPEN (probing): After the recovery timeout, one task is allowed through as a probe. If it succeeds, the circuit closes. If it fails, the circuit re-opens.
from hyperdjango.tasks import TaskCircuitOpenError, _task_queue
# Configure (defaults shown)
_task_queue._circuit_failure_threshold = 5 # failures before opening
_task_queue._circuit_recovery_timeout = 30.0 # seconds before half-open probe
_task_queue._circuit_window = 300.0 # rolling window in seconds
# Check circuit breaker state
cb = _task_queue.get_circuit_breaker("fetch_url")
if cb is not None:
print(f"State: {cb.state}") # closed / open / half_open
print(f"Failures: {cb.failure_count}")
# Handle rejection
try:
handle = fetch_url.delay("https://flaky-api.example.com")
except TaskCircuitOpenError:
# Circuit is open — the target is known to be failing
pass
# View all circuit breakers
all_breakers = _task_queue.get_all_circuit_breakers()
Configure thresholds via settings: TASK_CIRCUIT_FAILURE_THRESHOLD, TASK_CIRCUIT_RECOVERY_TIMEOUT, TASK_CIRCUIT_WINDOW.
Task Groups¶
Run multiple tasks in parallel and wait for all to complete.
from hyperdjango.tasks import TaskGroup, TaskStatus
group = TaskGroup()
# Add tasks to the group
group.add(fetch_user_data, user_id=1)
group.add(fetch_user_data, user_id=2)
group.add(fetch_user_data, user_id=3)
group.add(generate_thumbnail, image_id=42)
# Wait for all tasks (returns list of TaskResult)
results = group.run(timeout=30.0)
for result in results:
print(f"Task {result.task_id}: {result.status}")
if result.status == TaskStatus.SUCCESS:
print(f" Result: {result.result}")
elif result.status == TaskStatus.FAILED:
print(f" Error: {result.error}")
Each group.add() call enqueues the task immediately and returns a TaskHandle. group.run() blocks until all tasks complete or the timeout expires (raises TimeoutError).
TaskResult Fields¶
| Field | Type | Description |
|---|---|---|
task_id |
str |
Task identifier |
status |
TaskStatus |
Final status |
result |
object \| None |
Return value (if SUCCESS) |
error |
str \| None |
Error message (if FAILED) |
started_at |
float \| None |
Monotonic timestamp when execution started |
finished_at |
float \| None |
Monotonic timestamp when execution finished |
attempts |
int |
Total execution attempts |
Lifecycle Hooks¶
Attach callbacks to task success, failure, and retry events.
from hyperdjango.tasks import task
def on_payment_success(result):
send_receipt(result["order_id"])
def on_payment_failure(exc):
alert_ops_team(f"Payment failed: {exc}")
def on_payment_retry(exc, attempt):
logger.warning(f"Payment retry {attempt}: {exc}")
@task(
max_retries=3,
retry_delay=5.0,
on_success=on_payment_success,
on_failure=on_payment_failure,
on_retry=on_payment_retry,
)
async def process_payment(order_id, amount):
result = await payment_gateway.charge(order_id, amount)
return {"order_id": order_id, "charge_id": result.id}
| Hook | Signature | When Called |
|---|---|---|
on_success |
fn(result) |
After successful execution |
on_failure |
fn(exception) |
After all retries exhausted (permanent failure) |
on_retry |
fn(exception, attempt_number) |
Before each retry |
Exceptions in hooks are caught and logged -- they do not affect the task result.
TaskQueueStats -- Monitoring¶
Get comprehensive queue statistics for dashboards and health checks.
from hyperdjango.tasks import TaskQueue
queue = TaskQueue(workers=8)
queue.start()
stats = queue.stats
print(f"Pending: {stats.pending}")
print(f"Running: {stats.running}")
print(f"Processed: {stats.processed}")
print(f"Failed: {stats.failed}")
print(f"Retried: {stats.retried}")
print(f"Workers: {stats.workers}")
print(f"Queue running: {stats.queue_running}")
print(f"Dead letters: {stats.dead_letters}")
print(f"Scheduled: {stats.scheduled}")
print(f"Avg execution time: {stats.avg_execution_time_ms:.1f}ms")
print(f"Throughput: {stats.tasks_per_second:.1f} tasks/sec")
TaskQueueStats Fields¶
| Field | Type | Description |
|---|---|---|
pending |
int |
Tasks waiting in the queue |
running |
int |
Tasks currently executing |
processed |
int |
Total successfully completed tasks |
failed |
int |
Total permanently failed tasks |
retried |
int |
Total retry attempts |
workers |
int |
Number of worker threads |
queue_running |
bool |
Whether the queue is active |
dead_letters |
int |
Number of dead letters |
scheduled |
int |
Number of scheduled entries |
avg_execution_time_ms |
float |
Average task execution time |
tasks_per_second |
float |
Task throughput |
TaskQueue Configuration¶
from hyperdjango.tasks import TaskQueue
queue = TaskQueue(
workers=8, # number of worker threads
max_queue_size=50000, # max pending tasks
)
queue.start()
# ... use the queue ...
queue.stop() # graceful shutdown, waits up to 5s per worker
The queue starts automatically on the first .delay() call. Worker threads are daemon threads.
Settings¶
These settings are defined in hyperdjango.conf and can be configured via HYPERDJANGO_* in Django settings, HYPER_* environment variables, or a .env file. See settings.md for full details.
| Setting | Default | Range | Description |
|---|---|---|---|
TASK_WORKERS |
4 |
1--64 | Number of worker threads |
TASK_MAX_QUEUE_SIZE |
10000 |
1--1,000,000 | Maximum pending tasks before dropping |
TASK_DLQ_MAX_SIZE |
10000 |
1--1,000,000 | Maximum dead letters retained |
Queue Full Behavior¶
When the queue reaches max_queue_size, new tasks are dropped with a warning log and the task handle status is set to FAILED with error "Queue full".
Worker Thread Architecture¶
Each worker thread creates its own asyncio event loop:
- Async tasks run in their own event loop, isolated from the main server loop
- Each worker processes one task at a time
- Workers pick tasks from a shared
PriorityQueue(thread-safe) - Workers use a sentinel message pattern for graceful shutdown
- Free-threaded Python 3.14t allows true parallel execution across workers
Common Patterns¶
Email Sending¶
@task
async def send_welcome_email(user_id):
user = await db.query_one("SELECT * FROM users WHERE id = $1", user_id)
await send_mail(subject="Welcome!", to=[user.email], message=f"Hi {user.name}!")
@app.post("/register")
async def register(request):
data = await request.json()
user_id = await db.execute("INSERT INTO users ...")
send_welcome_email.delay(user_id)
return Response.json({"id": user_id}, status=201)
Report Generation¶
@task
async def generate_report(report_id):
report = await db.query_one("SELECT * FROM reports WHERE id = $1", report_id)
data = await compute_report_data(report)
pdf = await render_pdf(data)
await storage.save(f"reports/{report_id}.pdf", pdf)
await db.execute("UPDATE reports SET status='complete' WHERE id = $1", report_id)
@app.post("/reports")
async def create_report(request):
report_id = await db.execute("INSERT INTO reports (status) VALUES ('pending') RETURNING id")
generate_report.delay(report_id)
return Response.json({"id": report_id, "status": "pending"}, status=202)
Webhook Delivery with Retry¶
@task(max_retries=5, retry_delay=2.0, retry_backoff=3.0,
retry_on=(ConnectionError, TimeoutError))
async def deliver_webhook(webhook_url, payload):
async with httpx.AsyncClient() as client:
resp = await client.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
return resp.status_code
Parallel Fan-Out¶
from hyperdjango.tasks import TaskGroup
@app.post("/process-batch")
async def process_batch(request):
data = await request.json()
items = data["items"]
group = TaskGroup()
for item in items:
group.add(process_item, item_id=item["id"])
results = group.run(timeout=60.0)
succeeded = sum(1 for r in results if r.status == TaskStatus.SUCCESS)
return {"total": len(items), "succeeded": succeeded}
Full Example¶
from hyperdjango import HyperApp
from hyperdjango.tasks import task, TaskPriority, TaskGroup, TaskScheduler
app = HyperApp(title="Order System", database="postgres://localhost/orders")
# --- Task definitions ---
@app.task(max_retries=3, retry_delay=2.0, retry_backoff=2.0,
retry_on=(ConnectionError,))
async def charge_payment(order_id, amount):
result = await payment_api.charge(order_id, amount)
return result.charge_id
@app.task(priority=TaskPriority.HIGH)
async def send_order_confirmation(order_id, email):
order = await app.db.query_one("SELECT * FROM orders WHERE id = $1", order_id)
await send_email(email, "Order Confirmed", render_template("order_email.html", order))
@app.task(priority=TaskPriority.LOW)
async def update_analytics(event_type, data):
await app.db.execute(
"INSERT INTO analytics (event, data, created_at) VALUES ($1, $2, NOW())",
event_type, data,
)
# --- API endpoint ---
@app.post("/orders")
async def create_order(request):
data = await request.json()
order_id = await app.db.execute(
"INSERT INTO orders (user_id, total) VALUES ($1, $2) RETURNING id",
data["user_id"], data["total"],
)
charge_payment.delay(order_id, data["total"])
send_order_confirmation.delay(order_id, data["email"])
update_analytics.delay("order_created", {"order_id": order_id})
return {"order_id": order_id, "status": "processing"}
# --- Scheduled tasks ---
scheduler = TaskScheduler()
@app.task
async def cleanup_expired_carts():
await app.db.execute("DELETE FROM carts WHERE updated_at < NOW() - INTERVAL '7 days'")
@app.task
async def send_daily_summary():
count = await app.db.query_one(
"SELECT COUNT(*) FROM orders WHERE created_at > NOW() - INTERVAL '1 day'"
)
await send_email("ops@company.com", "Daily Summary", f"{count} orders today")
scheduler.add(cleanup_expired_carts, cron="0 3 * * *") # 3 AM daily
scheduler.add(send_daily_summary, cron="0 18 * * 1-5") # 6 PM weekdays
# --- Health endpoint ---
@app.get("/health/tasks")
async def task_health(request):
stats = app.task_queue.stats
return {
"pending": stats.pending,
"running": stats.running,
"processed": stats.processed,
"failed": stats.failed,
"throughput": f"{stats.tasks_per_second:.1f}/s",
"avg_time_ms": f"{stats.avg_execution_time_ms:.1f}",
"dead_letters": stats.dead_letters,
}
scheduler.start()
app.run(port=8000)
Thread Safety¶
TaskQueue is fully thread-safe for Python 3.14t free-threading:
- The internal
PriorityQueueis inherently thread-safe (Pythonqueue.PriorityQueueuses a lock). - Task result storage uses explicit
threading.Lockfor all reads and writes. - Dead letter queue operations are protected by the same lock.
- Worker threads each create their own isolated
asyncioevent loop -- no event loop sharing. - Stats computation acquires the lock atomically to produce consistent snapshots.
- No deadlock risk: all locks are acquired in a consistent order (single lock per operation).
See Also¶
- settings.md --
TASK_WORKERS,TASK_MAX_QUEUE_SIZE,TASK_DLQ_MAX_SIZEsettings - realtime.md -- Real-time WebSocket patterns (tasks can trigger real-time notifications)
- formats.md -- Locale-aware formatting (for formatting dates/numbers in task outputs)