Skip to content

Design first-class partitioned queues (logical queue fanout) #347

@hardbyte

Description

@hardbyte

Summary

Awa already ships most of the machinery for spreading one hot logical queue across multiple physical queues:

  • QueueFanout (Rust, awa-model/src/queue_fanout.rs) and awa.QueueFanout (Python, a PyO3 wrapper around the Rust type) shipped in 0.6.0-beta.2 via Add queue fanout helper #327: logical name + width, default logical__pN naming, from_physical_queues for explicit names, and route_opts_by_key setting both queue and ordering_key.
  • ClientBuilder::queue_fanout(...) and Python QueueFanout.queue_configs(...) register workers for every physical queue.
  • The core batch enqueue paths are already multi-queue: insert_many_copy stages a per-row queue column, and QueueStorage::enqueue_params_copy takes per-job InsertParams and groups internally by (queue, priority, enqueue_shard) (group_ready_rows_by_shard), including mixed keyed/rotor batches in one call.

So the remaining work is smaller than "design a new feature" — and since all of this is effectively unreleased (0.6.0-beta), every decision below is argued on long-term design merits, not compatibility:

  1. Routing defect — correlated hashing. Partition selection (QueueFanout::queue_for_key) and enqueue-shard selection (pick_shard) both call shard_for_ordering_key on the same key bytes, reducing one hash value modulo two different numbers. The residues are correlated whenever gcd(width, enqueue_shards) > 1. Details under Context below.
  2. Python batch gap. The insert_many_copy / enqueue_many_copy bindings take one queue and one ordering_key kwarg for the whole batch, so mixed-partition batches need one call per partition. The Rust core underneath has no such restriction.
  3. One public name, chosen for the long term, plus docs that teach the model in order: logical queue → partitions → ordering keys → enqueue shards → claimers.
  4. Footguns to close or document: width > 1 leaves the bare logical queue name unconsumed; changing width re-routes keys.

Draft ADR-031: Partitioned Queues (Logical Queue Fanout)

Status

Proposed.

Context

Awa's high-throughput storage work (ADR-019/023/025/026) removed avoidable MVCC churn and WAL from the common path while preserving Postgres-native transactional enqueue, at-least-once delivery, and (job_id, run_lease) stale-writer protection. Benchmarking shows the clearest remaining throughput lever is spreading one hot logical workload across multiple physical queues — each partition gets its own enqueue heads, claim heads, claimer leases, and receipt rings, so claim/completion/runtime work scales out.

The routing machinery for this exists (QueueFanout, #327) but is presented as a low-level helper rather than a first-class concept, and it has a defect that should be fixed before the surface stabilises:

Correlated hashing across partition and shard levels. queue_for_key picks the partition as shard_for_ordering_key(key, width) and the storage layer later picks the enqueue shard as shard_for_ordering_key(key, enqueue_shards) — the same 64-bit hash value reduced by two moduli. If a key hashes to partition i (hash ≡ i mod width), its enqueue shard is constrained to residues ≡ i mod gcd(width, enqueue_shards). Keyed traffic inside one partition therefore reaches only enqueue_shards / gcd(width, enqueue_shards) of the shards:

  • width = 4, enqueue_shards = 2: every keyed job in __p0 has an even hash, so shard 1 of that partition never sees keyed traffic.
  • width = 4, enqueue_shards = 4: each partition's keyed traffic lands on exactly one enqueue shard — ADR-025 is silently negated for keyed workloads.

Users will pick powers of two for both knobs, so the degenerate cases are the likely configurations. Rotor-routed jobs (no ordering key) are unaffected. The fix is to domain-separate the partition-level hash (e.g. a distinct seed or a domain-prefix byte over the key). Partition selection happens only client-side in QueueFanout, and the Python class wraps the Rust type, so this is one function change plus a documented portable spec for producers in other languages that hand-roll routing. Landing it now means the routing contract is correct before anyone depends on it.

Separately, the ergonomics gaps are real but narrow: Python bulk producers cannot express per-job routing, worker capacity semantics are per-partition but only documented as such in the Rust builder rustdoc, and nothing consumes the bare logical queue name at width > 1.

Terminology

  • Logical queue: the application-visible queue name, e.g. customer-updates.
  • Partition / physical queue: an ordinary Awa queue that carries a slice of the logical queue. The ordering scope.
  • Ordering key: caller-supplied bytes that route related jobs to the same partition, and to the same enqueue shard within it.
  • Enqueue shard: ADR-025's per-(queue, priority) head-row shard inside one physical queue. Orthogonal to partitions.
  • Internal queue stripe: queue-storage queue_stripe_count striping using queue#N names. Remains an advanced storage knob, not the public API.

This is the same model as Kafka partitions / SQS FIFO message groups / Pub/Sub ordering keys: a partition is the ordering scope, the operator picks how many, producers route into them.

Decision

Promote the existing fanout machinery to a first-class, documented concept. No storage changes. Specifically:

  1. One public name, no alias. Nothing released constrains the choice, so pick the name that reads best in ten years of docs. A compatibility alias would make four names for three partitioning concepts (alongside enqueue_shards and queue_stripe_count); rejected. Working name in this ADR: PartitionedQueue, using "partition" vocabulary consistently with ADR-025 ("a partition is the ordering scope") and with the mental model users arrive with from Kafka/SQS/Pub/Sub. "Queue group" is rejected: it reads as a worker consuming several unrelated queues, which this is not. "Fanout" is rejected for the public name: in messaging it usually means duplicate to all consumers, the opposite of partitioning.
  2. Fix the correlated hash before stabilising. Domain-separate partition selection from shard selection. Add a distribution test asserting keyed traffic spreads across enqueue shards within a partition at width == enqueue_shards.
  3. Close the Python batch gap generically, not with a group-specific API. Extend insert_many_copy / enqueue_many_copy to accept per-job queue / ordering_key (per-job opts), serving any multi-queue batch. A enqueue_many_grouped_copy(group, jobs, key=...) method is rejected: the Rust core already routes and groups per row, so a partition-aware bulk enqueue is just "stamp routing onto each job, call the existing COPY path".
  4. Rust adds no new enqueue API. Document the pattern; optionally add a small helper that stamps route_opts_by_key across a Vec<InsertParams>.
  5. Capacity semantics stay per-partition, named as such. ClientBuilder::queue_fanout applies the config to each partition, so hard-reserved capacity is width × max_workers, and per-queue rate limits also multiply (already documented in the builder rustdoc). The Python queue_configs(max_workers_per_queue=...) parameters rename to *_per_partition with the type. For a logical total, recommend weighted mode with global_max_workers (ADR-011), and document that per-partition weight gives the logical queue width × the share of a standalone queue.
  6. Close the orphaned-logical-queue footgun. At width > 1 with uniform __pN naming, nothing consumes the bare logical name — yet a producer that hasn't been updated mid-rolling-deploy, a cron schedule, or an ADR-030 move_queue destination can enqueue to it, and those jobs stall silently. This is a permanent structural hazard, not a migration concern: any system where the logical name is also a valid enqueue target needs that target consumed. Proposed: partition 0 is the logical name itself (customer-updates, customer-updates__p1, …, customer-updates__p{N-1}). This makes width = 1 literally a plain queue, keeps stray direct enqueues flowing, and makes growing 1 → N drain in place as a routine operation. Alternatives if rejected: registration also consumes the bare logical queue as a catch-all, or at minimum a startup warning when the logical queue has depth but no worker.
  7. Document width changes as a re-partitioning event, in Phase 1. Growing width re-routes keys, so per-key FIFO is not preserved across the transition (a key's old jobs sit in the old partition while new jobs flow to the new one). Shrinking strands jobs in dropped partitions. Runbook: to grow, deploy workers with the new width first, then producers; to shrink, switch producers first, drain (or move_queue, ADR-030) the dropped partitions, then remove them from workers.
  8. Key routing keeps setting both queue and ordering_key so per-key FIFO holds inside the chosen partition even with enqueue_shards > 1 (made effective by item 2).
  9. Observability derives logical labels from registration, not name parsing. The runtime knows the logical↔physical mapping when queue_fanout/queue_configs is called; attach the logical-queue label there. __pN stays a convention, never a parsing contract — a user can legitimately own a queue literally named foo__p0, and we already maintain #N parsing for internal stripes. DB-side/UI aggregation without runtime context is exactly the trigger for persisted metadata (Phase 3), not for suffix parsing.
  10. queue_storage.queue_stripe_count is decided here: keep it internal and frozen. No new public features target it; partitioned queues are the public answer. It is not deprecated yet — it still provides transparent claim/count aggregation across stripes that partitions only get in Phase 2/3. The Python enqueue_many_copy(queue_storage_queue_stripe_count=...) kwarg should not grow further.

Sketch of the Rust surface (final names per open questions):

let queue = PartitionedQueue::new("customer-updates", 4)?;

Client::builder(pool)
    .partitioned_queue(&queue, QueueConfig { max_workers: 16, ..QueueConfig::default() });
// capacity is per partition: 4 × 16 hard-reserved permits

let opts = queue.route_opts_by_key(InsertOpts::default(), customer_id);
client.insert_with(&pool, &args, opts).await?;

// Bulk: stamp routing per job, then the existing multi-queue COPY path.
let params: Vec<InsertParams> = orders
    .into_iter()
    .map(|o| o.into_params(queue.route_opts_by_key(InsertOpts::default(), o.customer_id())))
    .collect();
store.enqueue_params_copy(&pool, &params).await?;

Sketch of the Python surface:

queue = awa.PartitionedQueue("customer-updates", 4)

await client.start(*queue.queue_configs(max_workers_per_partition=16))

await client.insert(UpdateCustomer(customer_id="customer-42"), **queue.route_by_key("customer-42"))

# Bulk: per-job routing through the extended batch API — one call, mixed partitions.
await client.enqueue_many_copy(jobs, opts=[queue.route_by_key(j.customer_id) for j in jobs])

Defaults

  • Width defaults to 1 (identical to a plain queue).
  • Worker claimers stays 1 per partition; completion shards stay conservative.
  • For a measured hot logical queue, try width 2 before 4; benchmark before recommending higher.
  • Use enqueue_shards when producer head-row contention inside one partition is the bottleneck and partitioned FIFO there is acceptable; use extra claimers only when one runtime cannot keep its permits full.

Guarantees

Unchanged job safety. Each partition is an ordinary Awa queue with the full ADR-019/023/026 storage guarantees; transactional enqueue, at-least-once delivery, and (job_id, run_lease) guarded finalization are untouched. Partitioning changes ordering scope only, and the contract is explicit: no cross-partition FIFO at width > 1, per-key FIFO within the key's partition, and per-key FIFO is not preserved across a width change.

Relationship to Existing ADRs

  • ADR-019 (queue storage): partitions compose as independent ordinary queues; append-only discipline, tombstone ledger, receipt path, and prune safety are unchanged. The correctness argument is composition over independent queues — no TLA+ change needed while the implementation stays client-side.
  • ADR-023 (receipt rings): receipt safety is per partition and composes across the set.
  • ADR-025 (sharded enqueue heads): different level — partitions split a logical workload across queues; enqueue shards split head rows within one queue. Item 2 above is required for the two levels to compose for keyed traffic.
  • ADR-026 (narrow terminal history): terminal counts stay per partition; logical aggregation is a Phase 2 read-side concern.
  • ADR-008 (COPY ingestion): already multi-queue per row; this ADR only extends the Python bindings to expose that.
  • ADR-011 (weighted concurrency): see Decision item 5; build-time MinWorkersExceedGlobal validation already applies across partitions.
  • ADR-022 (descriptor catalog): if partition metadata is ever persisted (Phase 3), it belongs in descriptor-style metadata off the dispatch hot path.
  • ADR-030 (durable batch operations): a partitioned queue as a source is well-defined (union of partitions; default to whole-set with an explicit partition filter for repair). As a move_queue destination it is not — preserving key routing would require per-job re-hashing inside the operation. Destinations stay physical-queue-only for now; documented.

Consequences

Positive:

  • An ergonomic, documented path to the strongest demonstrated throughput lever, with one conceptual model across Rust and Python.
  • The keyed-routing skew is fixed before any routing contract exists, rather than becoming a re-partitioning event for production users later.
  • queue_stripe_count stops competing for the "public fanout answer" role.
  • Benchmark profiles gain an explicit width dimension.

Negative:

  • Three partitioning concepts still exist (partitions, enqueue shards, internal stripes); docs must keep them distinct.
  • Partition-0-as-logical-name makes partition naming asymmetric in dashboards.
  • Logical aggregation in admin/UI without runtime context still waits for persisted metadata.

Alternatives Considered

  1. Document manual N-queue fanout only. Rejected: the current shape, too much boilerplate and inconsistent setup.
  2. Promote queue_stripe_count as the public feature. Rejected: global storage config, internal queue#N names, producer/worker coupling risks.
  3. Only recommend enqueue_shards. Rejected: solves head contention inside one queue; partitions scale claim/completion/runtime work end-to-end.
  4. Add enqueue_many_grouped_copy(group, jobs, key=...). Rejected: the core COPY paths already accept per-row queue and ordering key and group internally; a group-specific method would duplicate that and be less general than per-job opts in the existing batch APIs.
  5. Keep QueueFanout as a compatibility alias for the new name. Rejected: two public names for one concept, and nothing released to justify the second one.
  6. Persist partition metadata first. Deferred: a no-schema API layer proves the concept; the trigger for persistence is DB-side aggregation/UI discovery, not routing.

Implementation Plan

Phase 1 — the actual ADR scope (API + docs, no storage migration):

  • Rename to the chosen public name across Rust and Python; no alias. Decide partition-0-as-logical-name at the same time, since both change default naming.
  • Domain-separate the partition-selection hash; document the portable spec next to shard_for_ordering_key's.
  • Extend Python insert_many_copy / enqueue_many_copy to per-job queue / ordering_key.
  • Consolidate the existing fanout mentions (architecture.md, configuration.md, getting-started-python.md, benchmarking.md) into one narrative that introduces logical queues, partitions, ordering keys, enqueue shards, and claimers in that order; include the width-change runbook and the capacity-multiplication semantics.

Phase 2 — observability and admin polish:

  • Logical-queue labels on snapshots/metrics, derived at registration time (no name parsing), partition drill-down retained.
  • Document for DLQ, retry, cancel, move, and batch operations whether they target the whole partition set or one partition; batch-op destinations stay physical.
  • UI grouping if/where the UI exposes queue topology.

Phase 3 — optional persisted metadata:

  • queue_partitions-style descriptor metadata only if DB-side discovery/aggregation becomes important; off the dispatch hot path (ADR-022 style).
  • Revisit producer/worker width-disagreement guardrails once metadata exists (e.g. warn on unconsumed partitions).

Validation Plan

  • Rust unit tests: validation, default names, explicit physical queues, key/index routing, std-trait ergonomics (largely exist for QueueFanout; carried through the rename).
  • Hash distribution test: at width == enqueue_shards (e.g. 4×4), keyed traffic within one partition spreads across all enqueue shards after the domain-separation fix.
  • Cross-language parity test that Python (via the PyO3 wrapper) and documented-spec implementations route identically.
  • Integration: worker registration claims from every partition; same ordering key always lands on one partition and preserves FIFO within it.
  • Python COPY test: one mixed-partition batch with per-job opts routes correctly and stays transactional.
  • Width-change integration test: grow 2 → 4 following the runbook; assert no stranded jobs and document the per-key FIFO break across the transition.
  • Benchmarks: width 1/2/4/8 profiles (queue depth, WAL/job, dead tuples, p50/p95/p99 pickup and e2e latency), plus a width × enqueue_shards cell (at least 4×4) with keyed routing — the configuration the current hash silently degrades.
  • TLA+: no update needed while partitions remain explicit client-side routing over independent ADR-019 queues; revisit only if persisted metadata or routing ever touches storage invariants.

Open Questions

  • Exact name: PartitionedQueue is the working choice; whatever wins, exactly one name survives. Constructor parameter: partitions reads better than positional width.
  • Partition 0 as the logical name: accept the dashboard asymmetry to remove the orphaned-queue footgun and smooth 1 → N migration?
  • Weighted mode: keep per-partition weight (status quo, logical queue gets width × share) or add a group-level weight the builder divides across partitions?
  • Domain-separation mechanism for the partition hash: distinct seed vs domain-prefix byte — pick whichever is simplest to specify portably for non-Rust producers.

Metadata

Metadata

Assignees

No one assigned

    Labels

    docsDocumentation improvementsfeatureNew functionalityresearchSomething to look into and consider

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions