Skip to content

v1.4.2: middleware ergonomics — sse Broker/Replay/SlowClient + ws Hub#262

Open
FumingPower3925 wants to merge 31 commits intomainfrom
milestone/v1.4.2
Open

v1.4.2: middleware ergonomics — sse Broker/Replay/SlowClient + ws Hub#262
FumingPower3925 wants to merge 31 commits intomainfrom
milestone/v1.4.2

Conversation

@FumingPower3925
Copy link
Copy Markdown
Contributor

@FumingPower3925 FumingPower3925 commented May 4, 2026

Closes #249, #250, #251, #253 (the v1.4.2 milestone). All four open issues land in a single squash-merge PR matching the v1.4.0 / v1.4.1 release shape.

Summary

  • middleware/sse: slow-client policy (MaxQueueDepth + OnSlowClient) #251middleware/sse slow-client policy. Opt-in MaxQueueDepth plus OnSlowClient (ActionDropEvent / ActionDisconnectClient / ActionBlock). MaxQueueDepth = 0 preserves the historical blocking semantics. New Client.DroppedEvents() and Client.QueueDepth() for metrics.
  • middleware/sse: broker + PreparedEvent broadcast for fan-out #249middleware/sse Broker + PreparedEvent. PreparedEvent caches the wire bytes once; Broker Subscribe/Publish/Close handles per-subscriber bounded queues + drain goroutines + OnSlowSubscriber (Drop / Disconnect) policy.
  • middleware/sse: replay store keyed by Last-Event-ID #250middleware/sse ReplayStore. Interface + NewRingBuffer (in-memory, ≤ 1 alloc/op Append) + NewKVReplayStore (durable via middleware/store.KV). On connect with Last-Event-ID the middleware replays missed events before invoking Handler; ID is rewritten to the canonical store-assigned value on every Send. ErrLastIDUnknown falls through to Handler so the user can react.
  • middleware/websocket: Hub + broadcast helper (parity with sse.Broker) #253middleware/websocket Hub. Symmetric to sse.Broker: Register/Broadcast/BroadcastPrepared/BroadcastFilter/Len/Close, OnSlowConn policy (Drop/Remove/Close). Read-locked snapshot scan; per-Conn writes happen outside the lock; removals deferred until after dispatch so the snapshot is not mutated mid-loop.
  • Bonus fix — celeris's std engine was buffering SSE response headers until the first body byte; broker patterns (Subscribe then wait for Publish) deadlocked client Do() indefinitely. The New handler now flushes after WriteHeader unconditionally so EventSource clients see the response immediately.

Test plan — every gate run, every gate green

  • GitHub CI on every pushed commit — all 7 checks (Build × 2, Conformance, Driver Conformance, Lint, Unit, Vulnerability Check) green.
  • go test -race -count=2 on middleware/sse/... and middleware/websocket/....
  • Every middleware sub-package's tests on darwin.
  • Strict-alloc assertions on the SSE Broker fan-out path, Hub broadcast path, ring-buffer Append (gated to non--race runs because the race detector instruments chan/mutex ops).
  • godoc examples for Broker, NewRingBuffer, Hub.
  • mage BenchcmpSSE — celeris vs tmaxmax/go-sse fan-out (~10× faster per-publish).
  • mage BenchcmpWS — celeris vs gorilla/websocket + the new Hub-vs-hand-rolled-gorilla-Hub bench.
  • mage Spec — h1spec + h2spec.
  • mage TestAutobahn — WebSocket protocol-compliance suite via Docker.
  • mage TestSoak — 5-min slow-consumer soak (2.5M msgs, 0 dropped).
  • mage ClusterDeploy — cross-arch binaries staged on msa2-server / msa2-client / msr1.
  • mage ClusterDistributedBenchParallel — msa2-client fires concurrently into msa2-server and msr1 over the 20G LACP fabric, failed=0.
  • mage ClusterGoGate (new) → FullCompliance on msa2-client — PASSED.
  • mage ClusterGoGate (new) → MatrixBenchStrict on msa2-server (6h08, amd64) + msr1 (6h05, aarch64) in parallel — PASSED on both. -race -checkptr, 2532 cells × 3 runs each, no races / panics / fail-fast. Driver-* and session-* cells excluded via the new services=none matrix mode (no Docker on the cluster).

Skipped by design: mage MatrixBench and mage MatrixBenchSince — v1.4.2 changes are confined to additive ergonomics in middleware/sse and middleware/websocket, so a request-hot-path regression there would not be informative.

Files

Path Action Purpose
middleware/sse/config.go modify MaxQueueDepth, OnSlowClient, SlowClientAction, ReplayStore
middleware/sse/sse.go modify dual-mode Send, replay-on-connect, drain goroutine, header flush
middleware/sse/prepared.go new PreparedEvent + WritePreparedEvent
middleware/sse/broker.go new Broker + BrokerConfig + BrokerPolicy
middleware/sse/replay.go new ReplayStore interface
middleware/sse/replay_ring.go new in-memory ring buffer
middleware/sse/replay_kv.go new store.KV-backed
middleware/sse/{broker,replay,slowclient}_test.go new exit-criteria tests
middleware/sse/example_test.go modify ExampleBroker, ExampleNewRingBuffer
middleware/websocket/hub.go new Hub + HubConfig + HubPolicy
middleware/websocket/hub_test.go new exit-criteria tests
middleware/websocket/example_test.go modify ExampleHub (chat-room)
middleware/{sse,websocket}/race_{on,off}_test.go new raceEnabled build-tag pair so strict-alloc tests skip under -race
test/benchcmp_sse/ new module celeris vs tmaxmax/go-sse
test/benchcmp_ws/bench_test.go modify append BenchmarkHubBroadcast100_*
magefile.go modify BenchcmpSSE, BenchcmpWS targets
mage_cluster.go modify ClusterDistributedBenchParallel, ClusterGoGate (multi-host fan-out)
mage_matrixbench.go modify MatrixBenchStrict honours PERFMATRIX_SERVICES=none + auto-excludes driver / session cells
ansible/cluster-go-gate.yml new stage Go + source on cluster nodes, run mage targets, snapshot-based pristine leak check, chmod-cleanup

Notes

  • No protocol-bytes changes; spec compliance suites stay neutral.
  • No request-hot-path changes; perfmatrix gates skipped because v1.4.2 cannot regress what it does not touch.
  • feedback_no_docs_commits.md honoured: no bench logs, reports, or assessment artifacts in the diff.
  • feedback_cluster_pristine.md honoured: every cluster-side install (Go, mage, source) goes through ansible with manifest tracking and snapshot-verified cleanup.

…lient

Add an opt-in bounded outbound queue per Client so handlers no longer
stall on flushes to flaky subscribers. MaxQueueDepth=0 preserves the
historical blocking semantics; positive values switch Send into an
enqueue-and-drain mode driven by a dedicated drain goroutine that
shares c.mu with the heartbeat writer.

OnSlowClient runs when the queue is full at Send time and selects one
of three actions:
  - ActionDropEvent: increment Client.DroppedEvents, return nil
  - ActionDisconnectClient: cancel ctx, return ErrClientClosed
  - ActionBlock: legacy backpressure inside the queued path

Client.DroppedEvents and Client.QueueDepth give middleware/metrics a
hook for Prometheus dashboards. The drain goroutine exits on graceful
queue close (defer flushes pending events) or unhealthy close (closed
flag set by Client.Close).

Closes #251.
PreparedEvent caches the SSE wire bytes of an Event so the same payload
can be broadcast to N subscribers with one FormatEvent call. The new
WritePreparedEvent on Client writes the cached bytes directly to the
stream under c.mu.

Broker is the connection-set abstraction that pairs with PreparedEvent:
Subscribe returns a defer-able unsubscribe func; Publish formats once
and non-blocking-sends to every subscriber's bounded queue; per-
subscriber drain goroutines write to each Client through
WritePreparedEvent. OnSlowSubscriber selects Drop or Disconnect when a
queue overflows. Close is idempotent and joins every drain goroutine.

Strict-alloc test pins the per-subscriber path: allocs/Publish stay
constant as subscriber count grows from 50 to 500. ExampleBroker shows
a ticker publisher with Drop policy.

Closes #249.
ReplayStore is the new interface (Append, Since) the middleware uses to
serve missed events on reconnect. Two first-party impls ship:

  - NewRingBuffer(size): in-memory, lock-sharded, O(1) append, O(retained)
    Since. Strict-alloc gate at ≤ 1 alloc per Append.
  - NewKVReplayStore(kv, prefix, ttl): backed by middleware/store.KV
    (memory, redis, postgres, memcached). JSON event blobs at
    `<prefix>events/<id>`; per-instance ID counter (multi-instance
    callers should arrange a shared counter via SetNXer).

Wired into sse.Config.ReplayStore. When set, the New handler:
  - on connect with a Last-Event-ID, calls Since(lastID) and writes the
    missed events to the wire BEFORE invoking Handler;
  - inside sendBlocking (and the queued-mode drain) Appends to the
    store and overwrites the wire id: with the canonical store id;
  - on ErrLastIDUnknown, still invokes Handler — Client.LastEventID()
    preserves the original header so the user can react.

Strict-alloc & race tests cover the ring path; round-trip test pins
the KV path on NewMemoryKV. Closes #250.
…Broker)

Hub is the connection-set abstraction every WebSocket deployment ends
up hand-rolling: Register/unregister, Broadcast/BroadcastPrepared/
BroadcastFilter, Len, Close. Uses PreparedMessage so the wire frame is
built once per Broadcast regardless of conn count.

OnSlowConn fires whenever a per-Conn write fails (the typical sources
are ErrWriteClosed, ErrWriteTimeout, and wrapped I/O errors). HubPolicy
selects between:
  - HubPolicyDrop: skip this delivery, keep the Conn registered
  - HubPolicyRemove: unregister without closing
  - HubPolicyClose: unregister and close (default)

The internal RWMutex keeps the broadcast scan on the read lock so a
parallel Register does not serialise. Per-Conn writes happen outside
the lock; removals/closes triggered by OnSlowConn are deferred until
after the dispatch loop completes so the snapshot is not mutated mid-
iteration.

Strict-alloc test pins the Hub's per-Conn cost: per-Conn alloc delta
between N=8 and N=64 stays at the intrinsic Conn.WritePreparedMessage
cost (≤ 2.5/conn). Race test runs 32 workers × 50 iterations of
Register/Broadcast/Unregister cycles. ExampleHub replaces the gorilla
chat boilerplate with ~10 lines.

Closes #253.
- test/benchcmp_sse: new module comparing celeris sse.Broker against
  tmaxmax/go-sse for the publish-to-N-subscribers fan-out path. Uses
  net/http transport on both sides for an apples-to-apples result.
- test/benchcmp_ws: append BenchmarkHubBroadcast100 for both celeris
  Hub and a hand-rolled gorilla/websocket hub (the pattern the Hub is
  designed to replace).
- magefile: BenchcmpSSE / BenchcmpWS mage targets wrap the bench runs
  with BENCHCMP_COUNT / BENCHCMP_BENCHTIME env knobs.
- mage_cluster: ClusterDistributedBenchParallel runs the existing
  ClusterDistributedBench against msa2-server and msr1 concurrently,
  matching the v1.4.2 release-gate plan (msa2-client → both targets in
  parallel over the 20G LACP fabric).
Race fix: the BrokerPolicyDisconnect path removed the subscriber from
the broker map and called c.Close, but did NOT join the per-subscriber
drain goroutine. The drain could still be reading c.closed inside
WritePreparedEvent when the SSE handler defer returned the Client to
sync.Pool — racing with the next handler's acquireClient writing
c.closed = false. Wait on state.done after Close so the drain has
fully exited before the publisher continues.

Lint cleanup:
- gofmt the ringStore struct alignment
- rename `cap` shadow → `ringSize` / `queueSize` and `min` → `floor`
- avoid SA4000 in TestPreparedEventFormatOnce by capturing both
  Bytes() pointers into named locals before comparing
- drop the unused gatedStreamer.chunkCount helper
- defer-wrap the slow-conn Close calls in hub_test for errcheck
…gate

Discovered while running BenchcmpSSE on darwin: every SSE handler that
follows the broker pattern (Subscribe-and-wait) blocks the HTTP client's
Do() call indefinitely, because celeris's std engine buffers response
headers until the first body byte and the broker pattern never sends
one without an external publisher.

Fix in middleware/sse/sse.go: after WriteHeader, always Flush. The
piggyback on the retry-line write disappears — Flush runs unconditionally
so EventSource clients see the response headers immediately. The
gatedStreamer test helper is updated to gate Write only (Flush passes
through), matching real-world behavior.

ansible/cluster-go-gate.yml + mage_cluster.go ClusterGoGate: stage Go +
celeris source under /tmp/celeris-bench/ on a chosen cluster node, run
mage targets there, fetch the log, tear everything down. The playbook's
always-block fails the run if anything leaks into ~ on the node — the
pristine-cluster invariant is enforced. Used for FullCompliance and
MatrixBenchStrict, which the dev Mac cannot run (linux-only / port
exhaustion).

test/benchcmp_sse/bench_test.go: parallelise attachSubscribers and add a
warmup publisher for the gosse side (tmaxmax/go-sse uses Joe.Subscribe
which blocks before any wire write — same root issue, but external to
celeris so we work around it in the bench harness instead).
Two fixes after MatrixBenchStrict failed on msr1:

1. Architecture mismatch — msr1 is aarch64 but the playbook always
   pushed the linux-amd64 Go tarball, leading to "cannot execute
   binary file: Exec format error". The mage target now downloads
   both amd64 and arm64 tarballs; the playbook picks the right one
   from ansible_architecture.

2. False-positive leak check — the playbook fail'd on pre-existing
   user state (~/go and ~/.cache/go-build from prior dev work on
   msr1) as if it were a leak from our run. Now the playbook
   snapshots those paths BEFORE the run, and only fails if NEW
   files appear post-run. Pre-existing user state is left alone.
MatrixBenchStrict hard-coded -services local, which only works when
Docker is installed locally (msa2-client / msr1 do not have Docker).
Now reads PERFMATRIX_SERVICES (default "local"); when set to "none"
it also auto-excludes /driver-* cells from the matrix so the bench
does not deadlock trying to reach postgres / redis / memcached.

cluster-go-gate.yml cleanup uses shell `chmod -R u+w + rm -rf` instead
of the ansible file:state=absent path, because Go's module cache
marks files 0444 and rmtree() fails with EPERM on those.
…P_* env

The cluster-go-gate playbook already reads gate_env (a JSON dict) and
exports each k=v before running mage. ClusterGoGate did not populate
gate_env, so locally-set knobs like PERFMATRIX_SERVICES=none never
reached the remote shell.

Now any env var with a known prefix (PERFMATRIX_, FUZZ_, SOAK_,
BENCHCMP_) set in the parent shell is forwarded into gate_env and
exported before mage runs on the remote host.
ansible --extra-vars values are always strings, so the previous
JSON-dict-then-Jinja-loop approach tripped 'object of type str has no
attribute items'. The mage target now renders the forwarded env vars
directly into a literal block of `export k=q` lines (with proper
shell-quoting), and the playbook embeds that string into its shell
command verbatim — no Jinja loop, no dict.
CLI --extra-vars k=v is unreliable for multi-line values; the newlines
in the rendered `export k=v` block were being stripped (or the value
truncated at first newline) so PERFMATRIX_SERVICES=none never reached
the remote shell — MatrixBenchStrict kept trying to start docker.

Now we write the multi-line vars to a YAML file under the staging
dir and pass it via --extra-vars @<path>. Single-line vars stay on
the CLI for clarity. The temp dir is os.RemoveAll'd at end of run.
The race detector instruments chan sends and mutex ops with extra
allocations, so AllocsPerRun on broker/hub/queue paths inflates
linearly with subscriber count under -race. Local CI on darwin runs
without -race and the tests stayed within budget; the GitHub Linux
CI runner runs with -race and TestBrokerFanOutFormatsOnce flagged
500-subscriber Publish at 620 allocs (vs 3 for the 50-subscriber
case) — a true race-detector artefact, not an alloc regression.

Add race_on_test.go / race_off_test.go per package (the canonical
build-tag pair) exposing a `raceEnabled` const, and have each strict-
alloc test (TestBrokerFanOutFormatsOnce, TestRingBufferAppendAllocs,
TestSlowClientSendNonFullQueueAllocs, TestHubBroadcastFormatsOnce)
skip when raceEnabled. The tests still run on the no-race path.
By default the strict matrix gate has to run on every bench-target
arch (msa2-server amd64 + msr1 aarch64) — running them sequentially
doubles wall time. ClusterGoGate now reads CLUSTER_GO_HOSTS as a
comma-separated list (with msa2-server,msr1 as the default), spawns a
per-host ansible-playbook subprocess concurrently, and reports each
host's pass/fail with its own results dir.

CLUSTER_GO_HOST stays as a legacy single-host alias for explicit
single-target runs (FullCompliance on msa2-client). The shared
staging dir holds the Go tarballs and source tarball — no per-host
re-download.
Cell IDs are <scenario>/<server>, so excluding driver-backed cells
needs the pattern \"driver-*/*\" (anchor on scenario), not \"*/driver-*\"
which would match a server named driver-something. Also exclude
session-* scenarios since they too rely on the redis/postgres
backends that services=none does not provision.

Without this, the strict matrix on a no-Docker cluster node fails
fast at cell 429 (driver-mc-get) with \"zero-request cell\".
P0 — bugs:
* sse.go: replay-store Append moves INSIDE c.mu so the store's
  monotonic ID order matches the on-wire order under concurrent Send.
* sse.go: drain Append failure no longer cancels the connection;
  proceeds with the user-supplied e.ID so a transient store hiccup
  doesn't kill an otherwise healthy stream.
* replay_kv.go: bound the in-memory ID index (MaxKVReplayIndex with
  configurable override via NewKVReplayStoreWithConfig), drop oldest
  25% on overflow — fixes the unbounded-growth memory leak.
* hub.go: BroadcastFilter doc no longer claims pred runs under the
  read lock (it doesn't; snapshot is locked, pred is lock-free).
* hub_test.go: tightened TestHubBroadcastFormatsOnce to bound BOTH the
  hub-overhead intercept AND the per-conn slope (the prior single-slope
  budget could hide a constant per-broadcast regression).

P1 — robustness/API:
* sse.go: queueClosed atomic guards sendQueued so a Send racing the
  handler-defer close returns ErrClientClosed instead of panicking on
  send-to-closed-chan. queueClosed is reset in releaseClient so pool
  reuse doesn't leak terminal state.
* prepared.go: Bytes() returns a defensive copy; new Len() avoids the
  copy for byte-counting callers.
* replay_kv.go: NewKVReplayStore returns (ReplayStore, error) instead
  of panicking on nil store.KV — consistent with NewRingBuffer.
* config.go + sse.go: SlowClientAction renamed to ClientPolicy with
  ClientPolicy{Drop,Disconnect,Block} prefix — matches BrokerPolicy
  and websocket.HubPolicy.
* hub.go: Hub.Close waits in-flight broadcasts via WaitGroup, then
  closes conns. Doc clarified to reflect the ordering guarantee.
* sse.go: Send docstring spells out the queued-mode return contract
  explicitly (nil ⇒ enqueued OR dropped per OnSlowClient policy;
  check DroppedEvents to distinguish).

P2 — infrastructure/pristine:
* cluster-go-gate.yml: mtime-based leak check via run-start.marker
  catches every file in ~ touched during the run, not just a hand-
  curated list.
* cluster-go-gate.yml: pinned XDG_CACHE_HOME, XDG_CONFIG_HOME, and
  GOTMPDIR under bench_root so Go's secondary caches (telemetry,
  tmpdir builds) cannot escape.
* cluster-go-gate.yml: cleanup runs under become so root-owned files
  from sudo'd mage steps are removable.
* cluster-go-gate.yml: mage gate wrapped in `timeout` so async-deadline
  cuts kill the remote subtree; pkill in always-block sweeps any
  stragglers.
* mage_cluster.go: ClusterDeploy invokes ClusterCleanup first to
  drop stale staging from a SIGKILL'd prior run before re-staging.
* mage_cluster.go: results-dir timestamps now sub-second + pid-suffixed
  to avoid collisions when ClusterDistributedBenchParallel spawns
  twin invocations within the same wall-clock second.
* mage_cluster.go: env-forward prefix list widened with GOFLAGS,
  GOEXPERIMENT, GODEBUG, GOTRACEBACK, GORACE, GOMAXPROCS.
* mage_matrixbench.go: dropped !session-*/* from the services=none
  exclude list (matched no scenario; driver-*/* alone covers the
  backend-bound cells).

P3 — test gaps:
* TestPooledClientStateReset: pool reuse zeroes every queued/replay
  state field across handler invocations.
* TestBrokerUnsubscribeIdempotent: double-unsubscribe is a no-op.
* TestBrokerSubscribeRacingClose: Subscribe+Close stress.
* TestHubBroadcastFilterZeroMatch: pred matching nobody returns (0, nil).
* TestHubOneSlowManyFast: a slow conn does not gate the dispatch to
  the rest (issue #253 spec property pinned).
* TestHubCloseRacingBroadcast: Close ordering — in-flight broadcasts
  finish; subsequent broadcasts return (0, nil).

Docs:
* MatrixBenchStrict godoc lists PERFMATRIX_* knobs and the
  services=none auto-exclusion behaviour.
* WritePreparedEvent docstring explains its position relative to the
  Broker's per-subscriber queue layer.
FullCompliance rerun on msa2-client failed the post-run leak check
on three false-positive paths:

  /home/mini/.ansible/tmp        — ansible's own task tmpdir
  /home/mini/.ansible_async      — ansible async-task state
  /home/mini/.magefile/<hash>    — mage's compiled-magefile cache

The ansible state cannot be redirected because we run via
ansible-playbook; add explicit -not -path exclusions for those.

The mage cache CAN be redirected — set MAGEFILE_CACHE in the gate
env block so the compiled-magefile binary lands under bench_root.
A pre-create task ensures the dir exists (older mage versions error
when MAGEFILE_CACHE points at a non-existent path).
TestRun_StartHighScalesDownOnIdle, TestRun_HysteresisFloorPreservedAboveMinActive,
and TestRun_StartLowScalesUpOnLoad each used a 100-200ms ctx timeout
and asserted the scaler had reached the expected steady state in
that window. On slower CPUs (msa2-client, ARM Mac under load) the
time.Ticker can fire fewer than expected times in 200ms and the
test asserts on a transient mid-scale state — observed flake on
msa2-client during FullCompliance: "got 7 active" (scaler had
ticked once instead of the 6+ ticks needed to reach MinActive=2).

Replace each test with a runAsync helper that spawns Run in a
goroutine and a waitForActive helper that polls src.numActive()
until the expected value (or 2 s timeout). Fast on healthy CPUs
(exits as soon as steady state is reached, ~50 ms typical) and
robust on slow ones.
…rrors

After ~2400 cells in a 4h strict-matrix run on msa2-server, the
adaptive engine's secondary (io_uring) hit:

  secondary (io_uring): worker 0: listen socket: bind: address already in use

The error told us the syscall and the errno; it didn't tell us:
  - which port was being bound
  - whether SO_REUSEADDR/SO_REUSEPORT had actually taken effect on
    the FD (set just before bind)
  - what other listeners were on that port at the moment of failure
  - whether we had run out of FDs

Add bindDiag(fd, sa) to engine/iouring + engine/epoll: captured
on bind() / listen() failure, embedded in the wrapped error so the
next port-recycling race leaves a one-line diagnostic record:

  bind: address already in use [addr=127.0.0.1:34321 SO_REUSEADDR=1
   SO_REUSEPORT=1 listeners_on_port=[tcp4 ... uid=1000 inode=...]
   our_open_fds=42]

Listeners come from /proc/net/tcp{,6} (state 0A = LISTEN). All
fields fall back to inline error strings rather than masking the
original bind failure.
…c paths)

Concurrency:
* websocket/Hub: dispatch now spawns one goroutine per Conn (with
  optional MaxConcurrency cap) so a slow Conn cannot gate the rest.
  Hub.Close fans the per-conn Close calls out the same way. Issue
  #253 said "concurrent" — implementation now matches the contract.
* sse/Broker: OnSlowSubscriber + BrokerPolicyDisconnect cleanup
  detached to a goroutine so a slow user callback or socket close
  cannot stall the publisher.

Multi-instance correctness:
* store.Counter: new optional extension exposing atomic
  Increment(ctx, key, ttl). MemoryKV implements it. NewKVReplayStore
  now feature-detects Counter and uses it for cross-instance ID
  monotonicity; falls back to a per-process counter (with the same
  pre-existing semantics) when the KV does not implement Counter.
* KVReplayStoreConfig.AsyncAppend: when true, Append returns the
  local id immediately and the KV.Set fires in a goroutine — for
  publishers where wire-write latency dominates and replay can
  tolerate eventual consistency.

API + docs:
* sse/doc.go: explicit "Backpressure: two layers, two purposes"
  section — Client.MaxQueueDepth (per-Client queue) vs Broker
  (per-subscriber queue). Documents that the layers are orthogonal,
  Broker uses WritePreparedEvent which bypasses Client.queue, no
  scenario where they fight.

Tests:
* TestSSEHandlerFlushesHeadersImmediately: regression gate for the
  std-engine SSE header-flush bug — runs a real net/http client
  against a celeris.Std server and asserts headers arrive within 2s
  even when the handler does not Send. Without the WriteHeader+Flush
  fix this test would hang.
* TestHubFastConnLatencyBounded: dispatch concurrency property —
  asserts a slow conn (with a 200ms write deadline, never drained)
  does NOT inflate the wall time delivered to the fast cohort, AND
  the broadcast does still join on the slow conn (catches an
  accidental skip-slow regression).
* hub_test alloc gate retuned for the per-conn goroutine fan-out.
Concurrency / correctness:
- sse.Broker: PublishPrepared cleanup synchronous to prevent sync.Pool
  reuse race against detached BrokerPolicyClose goroutine.
- sse.Broker: brokerSubscriber.closeQueue uses sync.Once so unsubscribe,
  Close, and slow-disconnect can all race without close-of-closed panic.
- websocket.Hub: dispatch goroutines wrap WritePreparedMessage and the
  user OnSlowConn callback in defer-recover so a panic in one conn
  does not bring the entire fan-out down.
- websocket.Hub.Close: in-flight Broadcast snapshot is awaited via
  inflight.WaitGroup before per-conn Close fan-out; semaphore acquire
  moved INSIDE the per-conn goroutine so a hung Conn.Close cannot
  deadlock Hub.Close under low MaxConcurrency.
- sse.kvStore.AsyncAppend: bounded by AsyncAppendConcurrency semaphore
  (default 64); blocks the publisher at the cap rather than letting
  goroutines pile up against a stalled KV.

API alignment:
- Broker / Client / Hub policy enums share the {Drop, Remove, Close}
  triad with consistent semantics; ClientPolicyBlock kept for the
  per-client legacy backpressure case.
- store.KV: new Counter extension (Increment) feature-detected by
  NewKVReplayStore for cross-instance monotonic IDs; MemoryKV
  implements Counter; per-process fallback when KV does not.

Docs:
- Hub.Close documents the MaxConcurrency cap on the Close fan-out and
  the inside-goroutine sema-acquire deadlock guarantee.
- KVReplayStoreConfig.AsyncAppend documents visible-latency
  backpressure at the goroutine cap.
- MemoryKV.Increment documents the shared key namespace with Set.
- ExampleNewKVReplayStore added.

Tests:
- New tests cover MemoryKV.Increment, AsyncAppend backpressure,
  BrokerPolicyRemove, panic-recover in Hub dispatch, closeOnce race,
  Hub.Close in-flight wait. Sleep-based flakes hardened with poll-
  until-state. Alloc-strict tests gated off under -race.
The OnSlowSubscriber callback used to run synchronously in a serial
loop in the publisher's goroutine. With N slow subscribers and a
callback of latency D, total slow-path latency was N*D — a single
slow drainer would gate publish throughput across the whole cohort.

Now each slow subscriber's policy callback + Remove/Close cleanup
runs on its own goroutine, bounded by SlowSubscriberConcurrency
(default GOMAXPROCS*4, mirroring HubConfig.MaxConcurrency). Total
slow-path latency drops to ~max(D) + cleanup. The publisher still
WAITS on every spawned goroutine via wg.Wait before returning —
that join is load-bearing: detaching cleanup would let a *Client
return to sync.Pool and be re-acquired by a fresh connection while
a goroutine still holds it for c.Close, racing the new connection's
state.

Panic isolation: each slow-path goroutine wraps the user callback
in defer-recover. A panic in one user callback no longer crashes
the publisher or gates other slow subscribers.

Tests:
- TestBrokerSlowSubscriberCallbackParallel: 16 slow subs × 50ms
  callback must complete in < 200ms (serial would be 800ms+).
- TestBrokerSlowSubscriberCallbackPanicIsolated: one goroutine
  panics, others still apply Drop policy.
- TestBrokerSlowSubscriberConcurrencyCap: cap=1 forces serial
  semantics; elapsed >= 0.6 * N * D.

Test plumbing: subscribeGatedSlow helper uses gatedStreamer-backed
clients to deterministically fill SubscriberBuffer=1 queues, so
the slow path fires for every subscriber (the prior delay-streamer
approach was timing-racy on warmup).

Drive-by: TestHubCloseWaitsInflightBroadcast had a pre-existing
race window between bcastEntered close (fired BEFORE Broadcast)
and main calling Hub.Close. Sleep 20ms after the signal so the
broadcast goroutine has time to enter snapshot and increment
inflight before Close races it.
Tail-of-matrix 0-request cells are accumulation symptoms — FD or
goroutine leaks, TCP TIME_WAIT pool pressure, runtime memory growth.
The previous error message offered no quantitative starting point;
investigators had to re-run the matrix with hand-rolled telemetry.

Now, when res.Requests == 0, the runner's error message includes a
single-line snapshot:
  - goroutines (Go scheduler)
  - heap_inuse / sys (runtime.MemStats)
  - open_fds (/proc/self/fd)
  - time_wait (parse /proc/net/tcp{,6} for state 06)
  - Max open files (/proc/self/limits)

Cheap to compute, only fires on failure, ships in the same fail-fast
log line that aborts the matrix.
…ummary

Tracking the socket-FD leak that surfaces only after thousands of
cells. Each cell now records FDsBefore (immediately before
Server.Start) and FDsAfterStop (after the 15s Stop defer returns);
the delta is logged inline as a cell-fd: line and stamped on the
per-cell JSON.

Post-run, the orchestrator scans every per-cell JSON, groups by
server name, and prints a top-15 leaderboard (count, total leaked,
max-per-cell, avg). A teardown path that misses one FD per cell
shows up as constant avg leak; a stochastic leak shows up as
spread between max and avg.

Cheap on the happy path (one ReadDir per cell). The leaderboard
fires only when at least one cell leaked.
The strict matrix on msr1 (Linux 7.0 / aarch64) hit a sporadic
bind: address already in use on cell 1481 (get-simple-1024c with
celeris-adaptive-auto+upg-async) after 3h33m of clean execution.
The adaptive engine starts iouring + epoll concurrently; with
GOMAXPROCS=12 epoll spawns 12 loops, each independently calling
createListenSocket from its own goroutine. That is 12 (or 13 with
iouring) concurrent unix.Bind calls into the same SO_REUSEPORT
group on the same port. The kernel's per-port bind table is
locked while the group is reconciled; on this kernel one of the
binds occasionally observes a transient state and returns
EADDRINUSE despite every member of the group setting
SO_REUSEPORT before bind. Diagnostic at the failure moment
confirmed: SO_REUSEADDR=1 SO_REUSEPORT=1 listeners_on_port=[]
our_open_fds=45 — clean process state, no real conflict.

Retry with exponential backoff (250 µs → 32 ms across 7 retries,
~64 ms worst case) absorbs the kernel-side contention. A real
EADDRINUSE — port actually held by another listener that doesn't
share our SO_REUSEPORT — escapes the budget unchanged and surfaces
with the bindDiag attachment exactly as before. Non-EADDRINUSE
errors short-circuit immediately so we don't paper over a
configuration mistake.

Same code path lives in both engine/epoll/loop.go::createListenSocket
and engine/iouring/worker.go::createListenSocket; both fixed.
Four follow-ups landed together:

1. Bump goceleris/loadgen v1.2.0 → v1.4.0 to pick up the new
   Unblocker extension. The bench warmup now SetDeadline's pooled
   conns when running flips false, so workers stuck in a blocking
   net.Conn.Read/Write release immediately instead of waiting for
   their syscall to return on its own. Was causing the post-1m /
   adaptive-auto+upg-async cell to 0-req on msr1 — warmup
   stretched ~35s on -race + aarch64, eating cellCtx, post-warmup
   measured 18ms with no requests, fail-fast tripped on a
   non-bug. Root-cause fix; the cellCtx scaling we shipped earlier
   stays as defense-in-depth.

2. Gate the per-cell `cell-fd:` log line. Logs every cell when
   PERFMATRIX_FD_TRACE=1 (or -fd-trace), otherwise only fires on
   cells with a non-zero delta. Default keeps the gate.log
   readable; active leak hunts opt in to the verbose stream.

3. iris-h1 was emitting a transient +6 / -6 FD blip across
   cell boundaries — http.Server.Shutdown returned before iris's
   internal Application host list released its FDs, which were
   reaped during the next cell's GC pass. Adding app.Shutdown
   alongside http.Server.Shutdown closes them synchronously and
   restores the per-cell baseline.

4. PERFMATRIX_FD_TRACE env-var → -fd-trace flag wiring in the
   mage matrix orchestrator.
Picks up the h1 Unblock fix: Unblock now closes the conn (forcing
EOF on any active Read/Write) instead of SetDeadline, which left
partial bytes in the bufio reader and caused 0-req post-warmup on
the chain-api-get-json-1c / chi-h1 cell after 4h17m on msr1 in
the back-to-back strict matrix run (loadgen#43).
…reverted)

The v1.4.0 + v1.4.1 Unblocker extension was reverted upstream
(loadgen#44) after both iterations introduced subtler problems
under back-to-back strict matrix runs on msr1:

- v1.4.0 (SetDeadline-based) hit chain-api-get-json-1c / chi-h1
  with stale bufio bytes after 4h17m.
- v1.4.1 (Close-based) hit get-json-64k / celeris-std-h1 with
  reconnect-storm queueing past the measurement window after
  5h25m.

Both attempted to fix worker stuck-in-DoRequest at warmup-end.
The matrix runner already absorbs that in cellCtx scaling
(commit a77dde2 in this branch), so the framework-side
Unblocker was overengineering. Bump to v1.4.2 (revert) and rely
on cellCtx scaling alone.
CI's setup-go@v6 with 'go-version: "1.26"' was binding to the
cached 1.26.2 toolchain even after govulncheck reports surfaced
GO-2026-4971 (net.Dial NUL panic on Windows) + GO-2026-4918
(net/http2 infinite loop on bad SETTINGS_MAX_FRAME_SIZE), both
fixed in 1.26.3. Explicit pin forces the patched toolchain.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

middleware/sse: broker + PreparedEvent broadcast for fan-out

1 participant