Opsmas 2025 Day 11: pyjobby & hiproc

Opsmas 2025 Day 11: pyjobby & hiproc

TOC:

pyjobby

pyjobby is a python-centric postgres-backed dynamic workqueue system. I updated it this year, but it is originally a system I wrote in 2021 as a backend to a photography sharing site I was working on (but never released, of course). Image processing pipelines can invole many steps (extract data, mange multi-resolution thumbnails, upload image to cold storage, upload image to CDN storage, upload image to “middle-hot” storage, begin object/person detection/extraction in AI image on GPU servers, update user billing for new image storage, notify users of new image uploads if followers registered) and it all needs to be cleanly tracked, managed, with clean overflow/backpressure/reporting/accounting systems in place.

hence, pyjobby.

big whoop, you think.

but wait - i made a neat job queue work queue distributed processing reporting and accounting system this time.

pyjobby has/does:

  • work queue state machine flows: waiting → queued → claimed → running → finished (success) or crashed (error)
waiting ──┐
          │
          ├──> queued ──> claimed ──> running ──┬──> finished
          │                                     │
          └─────────────────────────────────────┴──> crashed
                                                          │
                                                          ├──> queued (retry)
                                                          └──> crashed (final)
  • postgres database with atomic row locking/checkout/updating as global queue primitive
  • any worker can join the job system by attaching to the datbaase with a work capability descriptor
Machine 1:  pj --workers 4 --cap "web-1"
Machine 2:  pj --workers 4 --cap "web-2"
Machine 3:  pj --workers 8 --cap "ml-gpu" --queue ml
  • jobs are allocated based on capability matching (anything: has gpu? big ram? big cpu? storage? location?)

Route jobs to workers with specific resources:

# Job requiring GPU
await addJob(db,
    job_class="job.ml.TrainModel",
    kwargs={...},
    capability="gpu")  # ← Only runs on workers with "gpu" capability

# Job requiring specific server (local files)
# (every worker automatically has capability hostname:<node hostname>)
await addJob(db,
    job_class="job.file.Process",
    kwargs={...},
    capability=f"hostname:{platform.node()}")
  • jobs can natively be assigned to a customer/user for tracking which jobs are for which “people” for activities

  • multiple job queues

  • duplicate prevention with idempotent keys

# User uploads file at 10:00 AM
await addJob(db,
    job_class="job.billing.UpdateUsage",
    kwargs={"user_id": 123},
    deadline_key=f"billing:123:2025-11-18",  # ← Unique key
    run_after="2025-11-18 23:59:00",         # ← Run at midnight
    queue="default")

# User uploads another file at 11:00 AM
await addJob(db,
    job_class="job.billing.UpdateUsage",
    kwargs={"user_id": 123},
    deadline_key=f"billing:123:2025-11-18",  # ← Same key
    run_after="2025-11-18 23:59:00",
    queue="default")
# ↑ This INSERT will fail (unique constraint violation)
# Only one billing update will run at midnight
  • jobs can be submitted via web API or direct database API writing
  • tree/graph ability: can register jobs only “waking up” when another job completes first
# Job 1: Process uploaded file
job1_id = await addJob(db,
    job_class="job.file.Upload",
    kwargs={"filepath": "/tmp/upload.jpg"},
    queue="default")

# Job 2: Generate thumbnail (waits for Job 1)
await addJob(db,
    job_class="job.image.Thumbnail",
    kwargs={"filepath": "/tmp/upload.jpg"},
    state="waiting",         # ← Must start in 'waiting' state
    waitfor_job=job1_id,     # ← Depends on Job 1
    queue="default")

Group Dependency for concurrency waiting (run_group + waitfor_group):

import secrets
group_id = secrets.randbits(63)  # Generate unique group ID

# Create 3 parallel jobs in a group
for task in ["hash", "exif", "upload"]:
    await addJob(db,
        job_class=f"job.image.{task.capitalize()}",
        kwargs={"filepath": "/tmp/upload.jpg"},
        run_group=group_id,  # ← All part of same group
        queue="default")

# Create job that waits for ALL group members to finish
await addJob(db,
    job_class="job.email.NotifyComplete",
    kwargs={"user_id": 123},
    state="waiting",              # ← Starts in waiting
    waitfor_group=group_id,       # ← Waits for entire group
    queue="default")
  • priority levels (lower numbers run first; in the same prio level, jobs processed FIFO (by id ascending)):
# High priority (paid users)
await addJob(db, job_class="job.email.SendEmail",
             kwargs={...}, prio=-10)

# Normal priority (default)
await addJob(db, job_class="job.email.SendEmail",
             kwargs={...}, prio=0)

# Low priority (background cleanup)
await addJob(db, job_class="job.cleanup.TempFiles",
             kwargs={...}, prio=100)
  • recurring jobs/tasks either built-in or by just having one job re-schedule itself again before it completes for a future delayed start time
  • crashed jobs auto-reschedule themselves due to finally block ultimate fallback handling
  • cli and web management interfaces; direct DB APIs and web APIs for managing job lifecycles and reporting

basic db schema:

  • id: Primary key (auto-increment)
  • state: Enum (waiting, queued, claimed, running, heartbeat, crashed, finished)
  • job_class: Full Python path to job class (e.g., “job.email.SendEmail”)
  • kwargs: JSONB of arguments passed to task(**kwargs)
  • queue: String identifier for job queue (default: “default”)
  • prio: Priority (lower number = higher priority, default: 0)
  • run_after: Minimum start time (TIMESTAMP, default: NOW())
  • capability: Required worker capability to run this job
  • waitfor_job: Job ID this job depends on
  • waitfor_group: Group ID this job depends on
  • run_group: Group ID this job belongs to
  • deadline_key: Unique key for singleton future jobs
  • result: JSONB result from successful job execution
  • backtrace: Error message and stack trace if job crashed
  • uid: User ID (for multi-tenant tracking; identifying resource hogs)

some rando exampos

architecture

┌─────────────────────────────────────────────────────────────┐
│                     pj Command (CLI)                        │
│                      workit() Entry                         │
└──────────────────────┬──────────────────────────────────────┘
                       │
                       │ Spawns N workers via multiprocessing
                       ▼
        ┌──────────────────────────────────────────┐
        │                                          │
        ▼                                          ▼
┌───────────────┐                          ┌───────────────┐
│   Worker 1    │                          │   Worker N    │
│  JobSystem    │         ...              │  JobSystem    │
│   Instance    │                          │   Instance    │
└───────┬───────┘                          └───────┬───────┘
        │                                          │
        │ Polls DB every 5-6 seconds               │
        │ Optional: Listens for web requests       │
        │                                          │
        ▼                                          ▼
┌──────────────────────────────────────────────────────────┐
│                   PostgreSQL Database                    │
│                      'jorb' Table                        │
│                                                          │
│  States: waiting → queued → claimed → running →          │
│          finished (success) or crashed (error)           │
└──────────────────────────────────────────────────────────┘

the waiting state is a special condition for jobs not even eligibile to run yet becuse they are waiting on conditions like parent or group jobs to complete. Only queued jobs can be fetched by workers.

  • job polling loop (what workers do)
while True:
    job_data = await self.claim()  # Atomically claim next eligible job
    if job_data:
        await self.executeJob(job_data)  # Run the job
        # Immediately check for more work because we see jobs exist in the queue (no delay)
    else:
        # else, the database has no jobs at all for us, so wait a couple seconds before
        # checking again since we don't want to busy loop hammering the database for no reason.
        await asyncio.sleep(5 + random.uniform(0, 0.001))  # 5-6s delay

there’s still room to improve the system with endless newer features, but it’s stable and feature rich enough to shake a stick at these days.

stats

===============================================================================
 Language            Files        Lines         Code     Comments       Blanks
===============================================================================
 Python                 67        40962        30581         3298         7083
 Shell                  10          761          514          119          128
 SQL                     9         1197          672          287          238
 TOML                    1           99           89            0           10
-------------------------------------------------------------------------------
 HTML                    1           83           78            0            5
 |- CSS                  1          275          230            0           45
 |- JavaScript           1          277          212           20           45
 (Total)                            635          520           20           95
-------------------------------------------------------------------------------
 Markdown               28        10845            0         7023         3822
 |- BASH                19          785          437          213          135
 |- CSS                  1            5            4            1            0
 |- HTML                 1           26           24            0            2
 |- INI                  2           91           62           13           16
 |- JavaScript           2          140          108           12           20
 |- JSON                 2          194          194            0            0
 |- Python              26         5864         4158          858          848
 |- SQL                 15          692          518          112           62
 |- YAML                 3          173          162            0           11
 (Total)                          18916         5746         8242         4928
===============================================================================
 Total                 119        54020        31992        10727        11301
===============================================================================

hiproc

I feel like half my brain is always just living in terminal history search/recall of previous commands. When I lose my terminal command history it sometimes takes me hours or days to reconstruct workflows if I didn’t have things documented or automated properly.

When something bad happens like your terminal app crashes or your system reboots, you lose all your per-terminal history and often get reset back to “the last terminal to close overwrote the global history for every terminal now” and you lose dozens or hundreds of recent commands which had useful work happening inside of them (connecting to other systems, detailed report runs with command line arguments, ad-hoc ssh tunnels and port mappings all over the place), then those commands are all just “gone forever” and have to be re-constructed from other logical outcomes (at least if you use “per-terminal” histories and not the disaster of “shared global realtime history between all sessions” thing modern werid people use).

so when my system did its yearly “Freeze and OOM and crash” routine this summer, I had an idea: what if i had a way to, like, save commands? woah, nobody has thought of this before.

my requirements were:

  • easy to add commands
  • easy to recall commands
  • maybe multi-system compatible
  • maybe even save dffferent commands per-context / directory / workspace but with the same recall name

so i dreamed up hiproc or hp for cli usage.

but then the next problem is: what architecture do you use? It must be near-instant to use; I don’t want to have “the curse of the interperted VM” like all the python utilities with 1-3 second startup times to just run a command wrapper. I don’t want any command to “read a database of commands and pick the right one;” everything must be as active/responsive/low-latency/“online” as possible wich means a “live DB” in memory without any per-command-request loading/parsing/saving/updating in the request flow.

So I invented (yes, again, nobody has ever thought of this before): python in the back, compiled in the front.

hiproc uses a dual architecture with the data model running as a python API server then the actual hp cli command you run is a tiny rust binary connecting to the python API for near-instant recall and exec of saved commands. (why rust? mainly because it has a clean package manager so i could use web and json libraries and interfaces easily without having to reinvent everything in C for this project at least.)

samples

Usage: hp <COMMAND>

Commands:
  find                  Interactively find and execute a command
  save                  Save a new command with smart defaults
  search                Search for commands
  namespaces            List all namespaces
  list                  List user's commands with IDs
  info                  Show detailed info about a command by ID
  here                  Show commands relevant to current directory and context
  suggest               Get intelligent command suggestions based on context
  similar               Show commands similar to a given command ID
  analytics             Show execution analytics and insights
  rename                Rename a command by ID
  delete                Delete one or more commands by ID
  edit                  Edit a command by ID
  generate-completions  Generate shell completion scripts
  exec                  Execute a command by ID with optional arguments (also: hp <id>)
  run                   Execute a command by name with smart contextual matching
  quick-save            Quick-save the last executed shell command
  do                    Execute and save a command with smart defaults
  help                  Print this message or the help of the given subcommand(s)

Options:
  -h, --help     Print help
  -V, --version  Print version

QUICK WORKFLOWS:
  hp save "command"        Save command with auto-detected name/namespace
  hp save "command" name   Save command with custom name, auto-detect namespace
  hp do "command"          Execute and save command in one step (alias: hp x)
  hp quick-save name       Save last shell command with custom name

DIRECT EXECUTION:
  hp <id>                  Execute stored command by ID
  hp <namespace> <name>    Execute stored command by namespace and name

Examples:
  hp save "cargo build"             # Saves as 'cargo' in current project namespace
  hp save "ls -la" list             # Saves as 'list' with auto-detected namespace
  hp do git status                  # Executes and saves 'git status' as 'git/status'
  hp 123                            # Run stored command ID 123
  hp rust build                     # Run 'build' command from 'rust' namespace
$ hp search code
+----+-----------+---------------+------+-------------------+----------+-------------+---------------------+---------------------+------+------------------------------------------------------------------------------------------------------------------------------------+
| ID | Namespace | Name          | User | Hostname       | Scope    | Directory   | Created             | Last Used           | Uses | Command                                                                                                                            |
+============================================================================================================================================================================================================================================================================+
| 36 | matt      | update-coders | matt | computer.local | personal | /Users/matt | 2025-10-31 12:20:32 | 2025-12-21 14:39:04 | 34   | uv tool install --python 3.13 kimi-cli -U; npm install -g @qwen-code/qwen-code@latest @anthropic-ai/claude-code @google/gemini-cli |
+----+-----------+---------------+------+-------------------+----------+-------------+---------------------+---------------------+------+------------------------------------------------------------------------------------------------------------------------------------+
$ hp run update-coders
Executing command 36: uv tool install --python 3.13 kimi-cli -U; npm install -g @qwen-code/qwen-code@latest @anthropic-ai/claude-code @google/gemini-cli

one neat benefit: if you configure the hiproc python backend to listen to a local network interface (not public, for globs sake), then you can configure your hp config file to connect to the multi-host service so you can have multiple internal clients connect to the same “command backend,” then you can use hp run <cmd> across all your machines and update/manage them all centrally (eveything converges back to “feeling like you have a globally auto-mounted NFS home directory” in the end i guess).

hiproc does support multiple users and hosts and namespaces all mixed together, but there’s currently no authentication or control other than “trust me, bro” so either add more features or only use it in a very narrow local scope.

Adding a new command is as simple as:

hp save "ssh me@mysuperhost" login

then recall is obviously

hp run login

But when you run hp save it also notices: your hostname, your directory, your user; so if you save “more specific” commands, it uses a hierarchy to track “save depth” to run more specific commands first (though you can always use hp find or hp search to recall comands and run them by numeric database row id directly). or another way: you could save a deploy command to every project directory with different behaviors and just to hp run deploy from any project dir for the correct behavior to be recalled (though, obviously things like deploy should be better managed than ‘in a magic single-purpose un-revision-controlled command recall system’ amrite)

there’s also a tiny web interface for searching your commands across systems too.

there’s no “security” currently so, uh, only run it only localhost and never port forward it anywhere outside of your internal network. it can’t run commands autonomously, but if “bad actors” got in the system they could modify your expected commands into something else when you go to run them again (TODO idea: have the hp binary check if a previous command changed and trigger another TOFU name<->cmd binding update for approval).

stats

===============================================================================
 Language            Files        Lines         Code     Comments       Blanks
===============================================================================
 Python                 13         2554         2084           95          375
 Shell                   2         1042          777          107          158
 TOML                    3           78           68            2            8
-------------------------------------------------------------------------------
 HTML                    1           35           33            1            1
 |- CSS                  1           55           55            0            0
 |- JavaScript           1           54           49            0            5
 (Total)                            144          137            1            6
-------------------------------------------------------------------------------
 Markdown                1          494            0          308          186
 |- BASH                 1          249          134           69           46
 |- TOML                 1            2            2            0            0
 (Total)                            745          136          377          232
-------------------------------------------------------------------------------
 Rust                    8         2645         2254          106          285
 |- Markdown             7          113            0          106            7
 (Total)                           2758         2254          212          292
===============================================================================
 Total                  28         6848         5216          619         1013
===============================================================================