distributed-job-safety

📁 terrylica/cc-skills 📅 Feb 13, 2026
25
总安装量
24
周安装量
#15186
全站排名
安装命令
npx skills add https://github.com/terrylica/cc-skills --skill distributed-job-safety

Agent 安装分布

opencode 24
gemini-cli 23
codebuddy 23
github-copilot 23
codex 23
kimi-cli 23

Skill 文档

Distributed Job Safety

Patterns and anti-patterns for concurrent job management with pueue + mise + systemd-run, learned from production failures in distributed data pipeline orchestration.

Scope: Universal principles for any pueue + mise workflow with concurrent parameterized jobs. Examples use illustrative names but the principles apply to any domain.

Prerequisite skills: devops-tools:pueue-job-orchestration, itp:mise-tasks, itp:mise-configuration


The Nine Invariants

Non-negotiable rules for concurrent job safety. Violating any one causes silent data corruption or job failure.

Full formal specifications: references/concurrency-invariants.md

1. Filename Uniqueness by ALL Job Parameters

Every file path shared between concurrent jobs MUST include ALL parameters that differentiate those jobs.

WRONG:  {symbol}_{start}_{end}.json                    # Two thresholds collide
RIGHT:  {symbol}_{threshold}_{start}_{end}.json         # Each job gets its own file

Test: If two pueue jobs can run simultaneously with different parameter values, those values MUST appear in every shared filename, temp directory, and lock file.

2. Verify Before Mutate (No Blind Queueing)

Before queueing jobs, check what is already running. Before deleting state, check who owns it.

# WRONG: Blind queue
for item in "${ITEMS[@]}"; do
    pueue add --group mygroup -- run_job "$item" "$param"
done

# RIGHT: Check first
running=$(pueue status --json | jq '[.tasks[] | select(.status | keys[0] == "Running") | .label] | join(",")')
if echo "$running" | grep -q "${item}@${param}"; then
    echo "SKIP: ${item}@${param} already running"
    continue
fi

3. Idempotent File Operations (missing_ok=True)

All file deletion in concurrent contexts MUST tolerate the file already being gone.

# WRONG: TOCTOU race
if path.exists():
    path.unlink()        # Crashes if another job deleted between check and unlink

# RIGHT: Idempotent
path.unlink(missing_ok=True)

4. Atomic Writes for Shared State

Checkpoint files must never be partially written. Use the tempfile-fsync-rename pattern.

fd, temp_path = tempfile.mkstemp(dir=path.parent, prefix=".ckpt_", suffix=".tmp")
with os.fdopen(fd, "w") as f:
    f.write(json.dumps(data))
    f.flush()
    os.fsync(f.fileno())
os.replace(temp_path, path)  # POSIX atomic rename

Bash equivalent (for NDJSON telemetry appends):

# Atomic multi-line append via flock + temp file
TMPOUT=$(mktemp)
# ... write lines to $TMPOUT ...
flock "${LOG_FILE}.lock" bash -c "cat '${TMPOUT}' >> '${LOG_FILE}'"
rm -f "$TMPOUT"

5. Config File Is SSoT

The .mise.toml [env] section is the single source of truth for environment defaults. Per-job env overrides bypass the SSoT and allow arbitrary values with no review gate.

# WRONG: Per-job override bypasses mise SSoT
pueue add -- env MY_APP_MIN_THRESHOLD=50 uv run python script.py

# RIGHT: Set the correct value in .mise.toml, no per-job override needed
pueue add -- uv run python script.py

Controlled exception: pueue env set <id> KEY VALUE is acceptable for one-off overrides on stashed/queued tasks (e.g., hyperparameter sweeps). The key distinction: mise [env] is SSoT for defaults that apply to all runs; pueue env set is for one-time parameterization of a specific task without modifying the config file. See devops-tools:pueue-job-orchestration Per-Task Environment Override section.

6. Maximize Parallelism Within Safe Margins

Always probe host resources and scale parallelism to use available capacity. Conservative defaults waste hours of idle compute.

# Probe host resources
ssh host 'nproc && free -h && uptime'

# Sizing formula (leave 20% margin for OS + DB + overhead)
# max_jobs = min(
#     (available_memory_gb * 0.8) / per_job_memory_gb,
#     (total_cores * 0.8) / per_job_cpu_cores
# )

For ClickHouse workloads: The bottleneck is often ClickHouse’s concurrent_threads_soft_limit (default: 2 x nproc), not pueue’s parallelism. Each query requests max_threads threads (default: nproc). Right-size --max_threads per query to match the effective thread count (soft_limit / pueue_slots), then increase pueue slots. Pueue parallelism can be adjusted live without restarting running jobs.

Post-bump monitoring (mandatory for 5 minutes after any parallelism change):

  • uptime — load average should stay below 0.9 x nproc
  • vmstat 1 5 — si/so columns must remain 0 (no active swapping)
  • ClickHouse errors: SELECT count() FROM system.query_log WHERE event_time > now() - INTERVAL 5 MINUTE AND type = 'ExceptionWhileProcessing' — must be 0

Cross-reference: See devops-tools:pueue-job-orchestration ClickHouse Parallelism Tuning section for the full decision matrix.

7. Per-Job Memory Caps via systemd-run

On Linux with cgroups v2, wrap each job with systemd-run to enforce hard memory limits.

systemd-run --user --scope -p MemoryMax=8G -p MemorySwapMax=0 \
    uv run python scripts/process.py --symbol BTCUSDT --threshold 250

Critical: MemorySwapMax=0 is mandatory. Without it, the process escapes into swap and the memory limit is effectively meaningless.

8. Monitor by Stable Identifiers, Not Ephemeral IDs (INV-8)

Pueue job IDs are ephemeral — they shift when jobs are removed, re-queued, or split. Use group names and label patterns for monitoring.

# WRONG: Hardcoded job IDs
if pueue status --json | jq -e ".tasks.\"14\"" >/dev/null; then ...

# RIGHT: Query by group/label
pueue status --json | jq -r '.tasks | to_entries[] | select(.value.group == "mygroup") | .value.id'

Full specification: references/concurrency-invariants.md

9. Derived Artifact Filenames Must Include ALL Category Dimensions (INV-9)

When concurrent or sequential pipeline phases produce derived artifacts (Parquet chunks, JSONL summaries, temp files) that share a directory, every filename must include ALL discriminating dimensions — not just the job-level parameters (INV-1), but also pipeline-level categories like direction, strategy, or generation.

WRONG:  _chunk_{formation}_{symbol}_{threshold}.parquet     # No direction -- LONG glob eats SHORT files
RIGHT:  _chunk_{direction}_{formation}_{symbol}_{threshold}.parquet  # Direction-scoped

Glob scope rule: Cleanup and merge globs must match the filename pattern exactly:

# WRONG: Unscoped glob -- consumes artifacts from other categories
chunk_files = folds_dir.glob("_chunk_*.parquet")

# RIGHT: Category-scoped glob -- only touches this category's artifacts
chunk_files = folds_dir.glob(f"_chunk_{direction}_*.parquet")

Post-merge validation: After merging artifacts, assert expected values in category columns:

merged_df = pl.concat([pl.read_parquet(p) for p in chunk_files])
assert set(merged_df["strategy"].unique()) == {"standard"}, "Direction contamination!"

Relationship to INV-1: INV-1 ensures checkpoint file uniqueness by job parameters (runtime isolation). INV-9 extends this to derived artifacts that persist across pipeline phases (artifact isolation). Both prevent the same class of bug — silent cross-contamination from filename collisions.

Full specification: references/concurrency-invariants.md


Anti-Patterns (Learned from Production)

17 anti-patterns documented from production failures. Full details with code examples: references/anti-patterns.md

AP Name Key Symptom Related Invariant
AP-1 Redeploying without checking running Checkpoint collisions after kill+requeue INV-2
AP-2 Checkpoint filename missing parameters FileNotFoundError on checkpoint delete INV-1
AP-3 Trusting pueue restart logs Old error appears after restart
AP-4 Assuming PyPI propagation is instant “no version found” after publish
AP-5 Editable source vs. installed wheel uv run uses old code after pip upgrade
AP-6 Sequential phase assumption Phase contention from simultaneous queueing
AP-7 Manual post-processing steps “run optimize after they finish” never happens
AP-8 Hardcoded job IDs in monitors Monitor crashes after job re-queue INV-8
AP-9 Sequential when epochs enable parallel 1,700 hours single-threaded on 25+ cores INV-6
AP-10 State file bloat Silent 60x slowdown in job submission
AP-11 Wrong working directory in remote jobs [Errno 2] No such file or directory
AP-12 Per-file SSH for bulk submission 300K jobs takes days (SSH overhead)
AP-13 SIGPIPE under set -euo pipefail Exit code 141 on harmless pipe ops
AP-14 False data loss from variable NDJSON wc -l shows 3-6% fewer lines
AP-15 Cursor file deletion on completion Full re-run instead of incremental resume
AP-16 mise [env] for pueue/cron secrets Empty env vars in daemon jobs INV-5
AP-17 Unscoped glob across pipeline phases Phase A consumes Phase B’s artifacts INV-9

The Mise + Pueue + systemd-run Stack

Full architecture diagram and responsibility boundaries: references/stack-architecture.md

Layer Responsibility
mise Environment variables, tool versions, task discovery
pueue Daemon persistence, parallelism limits, restart, --after
systemd-run Per-job cgroup memory caps (Linux only, no-op on macOS)
autoscaler Dynamic parallelism tuning based on host resources
Python/app Domain logic, checkpoint management, data integrity

Remote Deployment Protocol

When deploying a fix to a running host:

1. AUDIT:   ssh host 'pueue status --json' -> count running/queued/failed
2. DECIDE:  Wait for running jobs? Kill? Let them finish with old code?
3. PULL:    ssh host 'cd ~/project && git fetch origin main && git reset --hard origin/main'
4. VERIFY:  ssh host 'cd ~/project && python -c "import pkg; print(pkg.__version__)"'
5. UPGRADE: ssh host 'cd ~/project && uv pip install --python .venv/bin/python --refresh pkg==X.Y.Z'
6. RESTART: ssh host 'pueue restart <failed_id>' OR add fresh jobs
7. MONITOR: ssh host 'pueue status --group mygroup'

Critical: Step 1 (AUDIT) is mandatory. Skipping it is the root cause of cascade failures.

See: references/deployment-checklist.md for full protocol.


Concurrency Safety Decision Tree

Adding a new parameter to a resumable job function?
|-- Is it job-differentiating (two jobs can have different values)?
|   |-- YES -> Add to checkpoint filename
|   |          Add to pueue job label
|   |          Add to remote checkpoint key
|   |-- NO  -> Skip (e.g., verbose, notify are per-run, not per-job)
|
|-- Does the function delete files?
|   |-- YES -> Use missing_ok=True
|   |          Use atomic write for creates
|   |-- NO  -> Standard operation
|
|-- Does the function write to shared storage?
    |-- YES -> Force deduplication after write
    |          Use UPSERT semantics where possible
    |-- NO  -> Standard operation

Autoscaler

Dynamic parallelism tuning for pueue groups based on host CPU and memory. Full details: references/autoscaler.md

CPU < 40% AND MEM < 60%  ->  SCALE UP (+1 per group)
CPU > 80% OR  MEM > 80%  ->  SCALE DOWN (-1 per group)
Otherwise                 ->  HOLD

Key principle: Ramp up incrementally (not to max). Job memory grows over time — jumping to max parallelism risks OOM when all jobs peak simultaneously.


Project-Specific Extensions

This skill provides universal patterns that apply to any distributed job pipeline. Projects should create a local extension skill (e.g., myproject-job-safety) in their .claude/skills/ directory that provides:

Local Extension Provides Example
Concrete function names run_resumable_job() -> myapp_populate_cache()
Application-specific env vars MY_APP_MIN_THRESHOLD, MY_APP_CH_HOSTS
Memory profiles per job type “250 dbps peaks at 5 GB, use MemoryMax=8G”
Database-specific audit queries SELECT ... FROM mydb.mytable ... countIf(x < 0)
Issue provenance tracking “Checkpoint race: GH-84”
Host-specific configuration “bigblack: 32 cores, 61 GB, groups p1/p2/p3/p4”

Two-layer invocation pattern: When this skill is triggered, also check for and invoke any local *-job-safety skill in the project’s .claude/skills/ directory for project-specific configuration.

devops-tools:distributed-job-safety    (universal patterns - this skill)
  + .claude/skills/myproject-job-safety  (project-specific config)
  = Complete operational knowledge

References