Opsmas 2025 Day 11: pyjobby & hiproc
TOC:
pyjobby
pyjobby is a python-centric postgres-backed dynamic workqueue system. I updated it this year, but it is originally a system I wrote in 2021 as a backend to a photography sharing site I was working on (but never released, of course). Image processing pipelines can invole many steps (extract data, mange multi-resolution thumbnails, upload image to cold storage, upload image to CDN storage, upload image to “middle-hot” storage, begin object/person detection/extraction in AI image on GPU servers, update user billing for new image storage, notify users of new image uploads if followers registered) and it all needs to be cleanly tracked, managed, with clean overflow/backpressure/reporting/accounting systems in place.
hence, pyjobby.
big whoop, you think.
but wait - i made a neat job queue work queue distributed processing reporting and accounting system this time.
- work queue state machine flows: waiting → queued → claimed → running → finished (success) or crashed (error)
waiting ──┐
│
├──> queued ──> claimed ──> running ──┬──> finished
│ │
└─────────────────────────────────────┴──> crashed
│
├──> queued (retry)
└──> crashed (final)
- postgres database with atomic row locking/checkout/updating as global queue primitive
- any worker can join the job system by attaching to the datbaase with a work capability descriptor
Machine 1: pj --workers 4 --cap "web-1"
Machine 2: pj --workers 4 --cap "web-2"
Machine 3: pj --workers 8 --cap "ml-gpu" --queue ml
- jobs are allocated based on capability matching (anything: has gpu? big ram? big cpu? storage? location?)
Route jobs to workers with specific resources:
# Job requiring GPU
await addJob(db,
job_class="job.ml.TrainModel",
kwargs={...},
capability="gpu") # ← Only runs on workers with "gpu" capability
# Job requiring specific server (local files)
# (every worker automatically has capability hostname:<node hostname>)
await addJob(db,
job_class="job.file.Process",
kwargs={...},
capability=f"hostname:{platform.node()}")- jobs can natively be assigned to a customer/user for tracking which jobs are for which “people” for activities
multiple job queues
duplicate prevention with idempotent keys
# User uploads file at 10:00 AM
await addJob(db,
job_class="job.billing.UpdateUsage",
kwargs={"user_id": 123},
deadline_key=f"billing:123:2025-11-18", # ← Unique key
run_after="2025-11-18 23:59:00", # ← Run at midnight
queue="default")
# User uploads another file at 11:00 AM
await addJob(db,
job_class="job.billing.UpdateUsage",
kwargs={"user_id": 123},
deadline_key=f"billing:123:2025-11-18", # ← Same key
run_after="2025-11-18 23:59:00",
queue="default")
# ↑ This INSERT will fail (unique constraint violation)
# Only one billing update will run at midnight- jobs can be submitted via web API or direct database API writing
- tree/graph ability: can register jobs only “waking up” when another job completes first
# Job 1: Process uploaded file
job1_id = await addJob(db,
job_class="job.file.Upload",
kwargs={"filepath": "/tmp/upload.jpg"},
queue="default")
# Job 2: Generate thumbnail (waits for Job 1)
await addJob(db,
job_class="job.image.Thumbnail",
kwargs={"filepath": "/tmp/upload.jpg"},
state="waiting", # ← Must start in 'waiting' state
waitfor_job=job1_id, # ← Depends on Job 1
queue="default")Group Dependency for concurrency waiting (run_group + waitfor_group):
import secrets
group_id = secrets.randbits(63) # Generate unique group ID
# Create 3 parallel jobs in a group
for task in ["hash", "exif", "upload"]:
await addJob(db,
job_class=f"job.image.{task.capitalize()}",
kwargs={"filepath": "/tmp/upload.jpg"},
run_group=group_id, # ← All part of same group
queue="default")
# Create job that waits for ALL group members to finish
await addJob(db,
job_class="job.email.NotifyComplete",
kwargs={"user_id": 123},
state="waiting", # ← Starts in waiting
waitfor_group=group_id, # ← Waits for entire group
queue="default")- priority levels (lower numbers run first; in the same prio level, jobs processed FIFO (by
idascending)):
# High priority (paid users)
await addJob(db, job_class="job.email.SendEmail",
kwargs={...}, prio=-10)
# Normal priority (default)
await addJob(db, job_class="job.email.SendEmail",
kwargs={...}, prio=0)
# Low priority (background cleanup)
await addJob(db, job_class="job.cleanup.TempFiles",
kwargs={...}, prio=100)- recurring jobs/tasks either built-in or by just having one job re-schedule itself again before it completes for a future delayed start time
- crashed jobs auto-reschedule themselves due to
finallyblock ultimate fallback handling - cli and web management interfaces; direct DB APIs and web APIs for managing job lifecycles and reporting
basic db schema:
id: Primary key (auto-increment)state: Enum (waiting, queued, claimed, running, heartbeat, crashed, finished)job_class: Full Python path to job class (e.g., “job.email.SendEmail”)kwargs: JSONB of arguments passed totask(**kwargs)queue: String identifier for job queue (default: “default”)prio: Priority (lower number = higher priority, default: 0)run_after: Minimum start time (TIMESTAMP, default: NOW())capability: Required worker capability to run this jobwaitfor_job: Job ID this job depends onwaitfor_group: Group ID this job depends onrun_group: Group ID this job belongs todeadline_key: Unique key for singleton future jobsresult: JSONB result from successful job executionbacktrace: Error message and stack trace if job crasheduid: User ID (for multi-tenant tracking; identifying resource hogs)
some rando exampos
architecture
┌─────────────────────────────────────────────────────────────┐
│ pj Command (CLI) │
│ workit() Entry │
└──────────────────────┬──────────────────────────────────────┘
│
│ Spawns N workers via multiprocessing
▼
┌──────────────────────────────────────────┐
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Worker 1 │ │ Worker N │
│ JobSystem │ ... │ JobSystem │
│ Instance │ │ Instance │
└───────┬───────┘ └───────┬───────┘
│ │
│ Polls DB every 5-6 seconds │
│ Optional: Listens for web requests │
│ │
▼ ▼
┌──────────────────────────────────────────────────────────┐
│ PostgreSQL Database │
│ 'jorb' Table │
│ │
│ States: waiting → queued → claimed → running → │
│ finished (success) or crashed (error) │
└──────────────────────────────────────────────────────────┘
the waiting state is a special condition for jobs not even eligibile to run yet becuse they are waiting on conditions like parent or group jobs to complete. Only queued jobs can be fetched by workers.
- job polling loop (what workers do)
while True:
job_data = await self.claim() # Atomically claim next eligible job
if job_data:
await self.executeJob(job_data) # Run the job
# Immediately check for more work because we see jobs exist in the queue (no delay)
else:
# else, the database has no jobs at all for us, so wait a couple seconds before
# checking again since we don't want to busy loop hammering the database for no reason.
await asyncio.sleep(5 + random.uniform(0, 0.001)) # 5-6s delaythere’s still room to improve the system with endless newer features, but it’s stable and feature rich enough to shake a stick at these days.
stats
===============================================================================
Language Files Lines Code Comments Blanks
===============================================================================
Python 67 40962 30581 3298 7083
Shell 10 761 514 119 128
SQL 9 1197 672 287 238
TOML 1 99 89 0 10
-------------------------------------------------------------------------------
HTML 1 83 78 0 5
|- CSS 1 275 230 0 45
|- JavaScript 1 277 212 20 45
(Total) 635 520 20 95
-------------------------------------------------------------------------------
Markdown 28 10845 0 7023 3822
|- BASH 19 785 437 213 135
|- CSS 1 5 4 1 0
|- HTML 1 26 24 0 2
|- INI 2 91 62 13 16
|- JavaScript 2 140 108 12 20
|- JSON 2 194 194 0 0
|- Python 26 5864 4158 858 848
|- SQL 15 692 518 112 62
|- YAML 3 173 162 0 11
(Total) 18916 5746 8242 4928
===============================================================================
Total 119 54020 31992 10727 11301
===============================================================================hiproc
I feel like half my brain is always just living in terminal history search/recall of previous commands. When I lose my terminal command history it sometimes takes me hours or days to reconstruct workflows if I didn’t have things documented or automated properly.
When something bad happens like your terminal app crashes or your system reboots, you lose all your per-terminal history and often get reset back to “the last terminal to close overwrote the global history for every terminal now” and you lose dozens or hundreds of recent commands which had useful work happening inside of them (connecting to other systems, detailed report runs with command line arguments, ad-hoc ssh tunnels and port mappings all over the place), then those commands are all just “gone forever” and have to be re-constructed from other logical outcomes (at least if you use “per-terminal” histories and not the disaster of “shared global realtime history between all sessions” thing modern werid people use).
so when my system did its yearly “Freeze and OOM and crash” routine this summer, I had an idea: what if i had a way to, like, save commands? woah, nobody has thought of this before.
my requirements were:
- easy to add commands
- easy to recall commands
- maybe multi-system compatible
- maybe even save dffferent commands per-context / directory / workspace but with the same recall name
so i dreamed up hiproc or hp for cli usage.
but then the next problem is: what architecture do you use? It must be near-instant to use; I don’t want to have “the curse of the interperted VM” like all the python utilities with 1-3 second startup times to just run a command wrapper. I don’t want any command to “read a database of commands and pick the right one;” everything must be as active/responsive/low-latency/“online” as possible wich means a “live DB” in memory without any per-command-request loading/parsing/saving/updating in the request flow.
So I invented (yes, again, nobody has ever thought of this before): python in the back, compiled in the front.
hiproc uses a dual architecture with the data model running as a python API server then the actual hp cli command you run is a tiny rust binary connecting to the python API for near-instant recall and exec of saved commands. (why rust? mainly because it has a clean package manager so i could use web and json libraries and interfaces easily without having to reinvent everything in C for this project at least.)
samples
Usage: hp <COMMAND>
Commands:
find Interactively find and execute a command
save Save a new command with smart defaults
search Search for commands
namespaces List all namespaces
list List user's commands with IDs
info Show detailed info about a command by ID
here Show commands relevant to current directory and context
suggest Get intelligent command suggestions based on context
similar Show commands similar to a given command ID
analytics Show execution analytics and insights
rename Rename a command by ID
delete Delete one or more commands by ID
edit Edit a command by ID
generate-completions Generate shell completion scripts
exec Execute a command by ID with optional arguments (also: hp <id>)
run Execute a command by name with smart contextual matching
quick-save Quick-save the last executed shell command
do Execute and save a command with smart defaults
help Print this message or the help of the given subcommand(s)
Options:
-h, --help Print help
-V, --version Print version
QUICK WORKFLOWS:
hp save "command" Save command with auto-detected name/namespace
hp save "command" name Save command with custom name, auto-detect namespace
hp do "command" Execute and save command in one step (alias: hp x)
hp quick-save name Save last shell command with custom name
DIRECT EXECUTION:
hp <id> Execute stored command by ID
hp <namespace> <name> Execute stored command by namespace and name
Examples:
hp save "cargo build" # Saves as 'cargo' in current project namespace
hp save "ls -la" list # Saves as 'list' with auto-detected namespace
hp do git status # Executes and saves 'git status' as 'git/status'
hp 123 # Run stored command ID 123
hp rust build # Run 'build' command from 'rust' namespace$ hp search code
+----+-----------+---------------+------+-------------------+----------+-------------+---------------------+---------------------+------+------------------------------------------------------------------------------------------------------------------------------------+
| ID | Namespace | Name | User | Hostname | Scope | Directory | Created | Last Used | Uses | Command |
+============================================================================================================================================================================================================================================================================+
| 36 | matt | update-coders | matt | computer.local | personal | /Users/matt | 2025-10-31 12:20:32 | 2025-12-21 14:39:04 | 34 | uv tool install --python 3.13 kimi-cli -U; npm install -g @qwen-code/qwen-code@latest @anthropic-ai/claude-code @google/gemini-cli |
+----+-----------+---------------+------+-------------------+----------+-------------+---------------------+---------------------+------+------------------------------------------------------------------------------------------------------------------------------------+$ hp run update-coders
Executing command 36: uv tool install --python 3.13 kimi-cli -U; npm install -g @qwen-code/qwen-code@latest @anthropic-ai/claude-code @google/gemini-clione neat benefit: if you configure the hiproc python backend to listen to a local network interface (not public, for globs sake), then you can configure your hp config file to connect to the multi-host service so you can have multiple internal clients connect to the same “command backend,” then you can use hp run <cmd> across all your machines and update/manage them all centrally (eveything converges back to “feeling like you have a globally auto-mounted NFS home directory” in the end i guess).
hiproc does support multiple users and hosts and namespaces all mixed together, but there’s currently no authentication or control other than “trust me, bro” so either add more features or only use it in a very narrow local scope.
Adding a new command is as simple as:
then recall is obviously
But when you run hp save it also notices: your hostname, your directory, your user; so if you save “more specific” commands, it uses a hierarchy to track “save depth” to run more specific commands first (though you can always use hp find or hp search to recall comands and run them by numeric database row id directly). or another way: you could save a deploy command to every project directory with different behaviors and just to hp run deploy from any project dir for the correct behavior to be recalled (though, obviously things like deploy should be better managed than ‘in a magic single-purpose un-revision-controlled command recall system’ amrite)
there’s also a tiny web interface for searching your commands across systems too.
there’s no “security” currently so, uh, only run it only localhost and never port forward it anywhere outside of your internal network. it can’t run commands autonomously, but if “bad actors” got in the system they could modify your expected commands into something else when you go to run them again (TODO idea: have the hp binary check if a previous command changed and trigger another TOFU name<->cmd binding update for approval).
stats
===============================================================================
Language Files Lines Code Comments Blanks
===============================================================================
Python 13 2554 2084 95 375
Shell 2 1042 777 107 158
TOML 3 78 68 2 8
-------------------------------------------------------------------------------
HTML 1 35 33 1 1
|- CSS 1 55 55 0 0
|- JavaScript 1 54 49 0 5
(Total) 144 137 1 6
-------------------------------------------------------------------------------
Markdown 1 494 0 308 186
|- BASH 1 249 134 69 46
|- TOML 1 2 2 0 0
(Total) 745 136 377 232
-------------------------------------------------------------------------------
Rust 8 2645 2254 106 285
|- Markdown 7 113 0 106 7
(Total) 2758 2254 212 292
===============================================================================
Total 28 6848 5216 619 1013
===============================================================================