Coding as queues

3 min read

Skip to deep dive →

The intuition

An ML training pipeline is not a script — it’s a sequence of transforms connected by queues. Data enters, is validated, cleaned, featurized, split, trained, evaluated. Each stage can fail independently. The question is not “did it run?” but “where did it stop?”

Once you see every ML workflow as a queue, you stop writing monolithic training scripts and start designing resilient data flows.

Why it matters

Monolithic training scripts fail catastrophically at line 142 and lose 6 hours of computation. Queue-oriented design lets you checkpoint, retry, and inspect at each stage. It is the single highest-leverage architectural shift for moving from notebook experiments to production ML.

What I learned

I used to write training scripts as one long .py file: load → clean → train → evaluate. When something failed mid-way, I’d comment out half the file and re-run. Now I think in stages: each stage is a function that reads from a known location and writes to a known location. The “queue” is just a directory of files waiting to be processed. This pattern has eliminated more debugging sessions than any linter or type checker.


Deep dive

Formal definition

A queue-oriented pipeline is a directed acyclic graph of stages where each stage:

  • Reads from exactly one input location (file, topic, table)
  • Writes to exactly one output location
  • Is idempotent (re-running produces the same output)
  • Reports success/failure atomically

The queue is the intermediate storage between stages. It decouples producers from consumers.

Full example

from pathlib import Path
import json

def stage(name, input_dir, output_dir, fn):
    """Run one stage: read all items from input_dir, apply fn, write to output_dir."""
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    for item in input_path.iterdir():
        if not item.is_file():
            continue
        # Idempotent: skip if already processed
        if (output_path / item.name).exists():
            continue
        data = json.loads(item.read_text())
        result = fn(data)
        output_path.joinpath(item.name).write_text(json.dumps(result))

# Usage: chain stages
# stage("load", "raw/", "validated/", validate)
# stage("featurize", "validated/", "features/", build_features)
# stage("train", "features/", "models/", train_model)

Caveats and edge cases

  • Queue overhead is only worth it when stages take >30s or can fail independently
  • Not all workflows need a message broker — a filesystem directory is often enough
  • Idempotency is the hard part: ensure your transforms are pure functions of their input
  • Monitoring is essential — if a stage silently stops, the queue fills up and nothing notices

References

  • “You Are Not a Queue” — the anti-pattern of over-engineering
  • Apache Beam / TensorFlow Transform for the production version of this pattern
  • “Data Pipelines with Python” (McMaster, 2023) for directory-as-queue implementations