v1.4.2: middleware ergonomics — sse Broker/Replay/SlowClient + ws Hub#262
Open
FumingPower3925 wants to merge 31 commits intomainfrom
Open
v1.4.2: middleware ergonomics — sse Broker/Replay/SlowClient + ws Hub#262FumingPower3925 wants to merge 31 commits intomainfrom
FumingPower3925 wants to merge 31 commits intomainfrom
Conversation
…lient Add an opt-in bounded outbound queue per Client so handlers no longer stall on flushes to flaky subscribers. MaxQueueDepth=0 preserves the historical blocking semantics; positive values switch Send into an enqueue-and-drain mode driven by a dedicated drain goroutine that shares c.mu with the heartbeat writer. OnSlowClient runs when the queue is full at Send time and selects one of three actions: - ActionDropEvent: increment Client.DroppedEvents, return nil - ActionDisconnectClient: cancel ctx, return ErrClientClosed - ActionBlock: legacy backpressure inside the queued path Client.DroppedEvents and Client.QueueDepth give middleware/metrics a hook for Prometheus dashboards. The drain goroutine exits on graceful queue close (defer flushes pending events) or unhealthy close (closed flag set by Client.Close). Closes #251.
PreparedEvent caches the SSE wire bytes of an Event so the same payload can be broadcast to N subscribers with one FormatEvent call. The new WritePreparedEvent on Client writes the cached bytes directly to the stream under c.mu. Broker is the connection-set abstraction that pairs with PreparedEvent: Subscribe returns a defer-able unsubscribe func; Publish formats once and non-blocking-sends to every subscriber's bounded queue; per- subscriber drain goroutines write to each Client through WritePreparedEvent. OnSlowSubscriber selects Drop or Disconnect when a queue overflows. Close is idempotent and joins every drain goroutine. Strict-alloc test pins the per-subscriber path: allocs/Publish stay constant as subscriber count grows from 50 to 500. ExampleBroker shows a ticker publisher with Drop policy. Closes #249.
ReplayStore is the new interface (Append, Since) the middleware uses to
serve missed events on reconnect. Two first-party impls ship:
- NewRingBuffer(size): in-memory, lock-sharded, O(1) append, O(retained)
Since. Strict-alloc gate at ≤ 1 alloc per Append.
- NewKVReplayStore(kv, prefix, ttl): backed by middleware/store.KV
(memory, redis, postgres, memcached). JSON event blobs at
`<prefix>events/<id>`; per-instance ID counter (multi-instance
callers should arrange a shared counter via SetNXer).
Wired into sse.Config.ReplayStore. When set, the New handler:
- on connect with a Last-Event-ID, calls Since(lastID) and writes the
missed events to the wire BEFORE invoking Handler;
- inside sendBlocking (and the queued-mode drain) Appends to the
store and overwrites the wire id: with the canonical store id;
- on ErrLastIDUnknown, still invokes Handler — Client.LastEventID()
preserves the original header so the user can react.
Strict-alloc & race tests cover the ring path; round-trip test pins
the KV path on NewMemoryKV. Closes #250.
…Broker) Hub is the connection-set abstraction every WebSocket deployment ends up hand-rolling: Register/unregister, Broadcast/BroadcastPrepared/ BroadcastFilter, Len, Close. Uses PreparedMessage so the wire frame is built once per Broadcast regardless of conn count. OnSlowConn fires whenever a per-Conn write fails (the typical sources are ErrWriteClosed, ErrWriteTimeout, and wrapped I/O errors). HubPolicy selects between: - HubPolicyDrop: skip this delivery, keep the Conn registered - HubPolicyRemove: unregister without closing - HubPolicyClose: unregister and close (default) The internal RWMutex keeps the broadcast scan on the read lock so a parallel Register does not serialise. Per-Conn writes happen outside the lock; removals/closes triggered by OnSlowConn are deferred until after the dispatch loop completes so the snapshot is not mutated mid- iteration. Strict-alloc test pins the Hub's per-Conn cost: per-Conn alloc delta between N=8 and N=64 stays at the intrinsic Conn.WritePreparedMessage cost (≤ 2.5/conn). Race test runs 32 workers × 50 iterations of Register/Broadcast/Unregister cycles. ExampleHub replaces the gorilla chat boilerplate with ~10 lines. Closes #253.
- test/benchcmp_sse: new module comparing celeris sse.Broker against tmaxmax/go-sse for the publish-to-N-subscribers fan-out path. Uses net/http transport on both sides for an apples-to-apples result. - test/benchcmp_ws: append BenchmarkHubBroadcast100 for both celeris Hub and a hand-rolled gorilla/websocket hub (the pattern the Hub is designed to replace). - magefile: BenchcmpSSE / BenchcmpWS mage targets wrap the bench runs with BENCHCMP_COUNT / BENCHCMP_BENCHTIME env knobs. - mage_cluster: ClusterDistributedBenchParallel runs the existing ClusterDistributedBench against msa2-server and msr1 concurrently, matching the v1.4.2 release-gate plan (msa2-client → both targets in parallel over the 20G LACP fabric).
Race fix: the BrokerPolicyDisconnect path removed the subscriber from the broker map and called c.Close, but did NOT join the per-subscriber drain goroutine. The drain could still be reading c.closed inside WritePreparedEvent when the SSE handler defer returned the Client to sync.Pool — racing with the next handler's acquireClient writing c.closed = false. Wait on state.done after Close so the drain has fully exited before the publisher continues. Lint cleanup: - gofmt the ringStore struct alignment - rename `cap` shadow → `ringSize` / `queueSize` and `min` → `floor` - avoid SA4000 in TestPreparedEventFormatOnce by capturing both Bytes() pointers into named locals before comparing - drop the unused gatedStreamer.chunkCount helper - defer-wrap the slow-conn Close calls in hub_test for errcheck
…gate Discovered while running BenchcmpSSE on darwin: every SSE handler that follows the broker pattern (Subscribe-and-wait) blocks the HTTP client's Do() call indefinitely, because celeris's std engine buffers response headers until the first body byte and the broker pattern never sends one without an external publisher. Fix in middleware/sse/sse.go: after WriteHeader, always Flush. The piggyback on the retry-line write disappears — Flush runs unconditionally so EventSource clients see the response headers immediately. The gatedStreamer test helper is updated to gate Write only (Flush passes through), matching real-world behavior. ansible/cluster-go-gate.yml + mage_cluster.go ClusterGoGate: stage Go + celeris source under /tmp/celeris-bench/ on a chosen cluster node, run mage targets there, fetch the log, tear everything down. The playbook's always-block fails the run if anything leaks into ~ on the node — the pristine-cluster invariant is enforced. Used for FullCompliance and MatrixBenchStrict, which the dev Mac cannot run (linux-only / port exhaustion). test/benchcmp_sse/bench_test.go: parallelise attachSubscribers and add a warmup publisher for the gosse side (tmaxmax/go-sse uses Joe.Subscribe which blocks before any wire write — same root issue, but external to celeris so we work around it in the bench harness instead).
Two fixes after MatrixBenchStrict failed on msr1: 1. Architecture mismatch — msr1 is aarch64 but the playbook always pushed the linux-amd64 Go tarball, leading to "cannot execute binary file: Exec format error". The mage target now downloads both amd64 and arm64 tarballs; the playbook picks the right one from ansible_architecture. 2. False-positive leak check — the playbook fail'd on pre-existing user state (~/go and ~/.cache/go-build from prior dev work on msr1) as if it were a leak from our run. Now the playbook snapshots those paths BEFORE the run, and only fails if NEW files appear post-run. Pre-existing user state is left alone.
MatrixBenchStrict hard-coded -services local, which only works when Docker is installed locally (msa2-client / msr1 do not have Docker). Now reads PERFMATRIX_SERVICES (default "local"); when set to "none" it also auto-excludes /driver-* cells from the matrix so the bench does not deadlock trying to reach postgres / redis / memcached. cluster-go-gate.yml cleanup uses shell `chmod -R u+w + rm -rf` instead of the ansible file:state=absent path, because Go's module cache marks files 0444 and rmtree() fails with EPERM on those.
…P_* env The cluster-go-gate playbook already reads gate_env (a JSON dict) and exports each k=v before running mage. ClusterGoGate did not populate gate_env, so locally-set knobs like PERFMATRIX_SERVICES=none never reached the remote shell. Now any env var with a known prefix (PERFMATRIX_, FUZZ_, SOAK_, BENCHCMP_) set in the parent shell is forwarded into gate_env and exported before mage runs on the remote host.
ansible --extra-vars values are always strings, so the previous JSON-dict-then-Jinja-loop approach tripped 'object of type str has no attribute items'. The mage target now renders the forwarded env vars directly into a literal block of `export k=q` lines (with proper shell-quoting), and the playbook embeds that string into its shell command verbatim — no Jinja loop, no dict.
CLI --extra-vars k=v is unreliable for multi-line values; the newlines in the rendered `export k=v` block were being stripped (or the value truncated at first newline) so PERFMATRIX_SERVICES=none never reached the remote shell — MatrixBenchStrict kept trying to start docker. Now we write the multi-line vars to a YAML file under the staging dir and pass it via --extra-vars @<path>. Single-line vars stay on the CLI for clarity. The temp dir is os.RemoveAll'd at end of run.
The race detector instruments chan sends and mutex ops with extra allocations, so AllocsPerRun on broker/hub/queue paths inflates linearly with subscriber count under -race. Local CI on darwin runs without -race and the tests stayed within budget; the GitHub Linux CI runner runs with -race and TestBrokerFanOutFormatsOnce flagged 500-subscriber Publish at 620 allocs (vs 3 for the 50-subscriber case) — a true race-detector artefact, not an alloc regression. Add race_on_test.go / race_off_test.go per package (the canonical build-tag pair) exposing a `raceEnabled` const, and have each strict- alloc test (TestBrokerFanOutFormatsOnce, TestRingBufferAppendAllocs, TestSlowClientSendNonFullQueueAllocs, TestHubBroadcastFormatsOnce) skip when raceEnabled. The tests still run on the no-race path.
By default the strict matrix gate has to run on every bench-target arch (msa2-server amd64 + msr1 aarch64) — running them sequentially doubles wall time. ClusterGoGate now reads CLUSTER_GO_HOSTS as a comma-separated list (with msa2-server,msr1 as the default), spawns a per-host ansible-playbook subprocess concurrently, and reports each host's pass/fail with its own results dir. CLUSTER_GO_HOST stays as a legacy single-host alias for explicit single-target runs (FullCompliance on msa2-client). The shared staging dir holds the Go tarballs and source tarball — no per-host re-download.
Cell IDs are <scenario>/<server>, so excluding driver-backed cells needs the pattern \"driver-*/*\" (anchor on scenario), not \"*/driver-*\" which would match a server named driver-something. Also exclude session-* scenarios since they too rely on the redis/postgres backends that services=none does not provision. Without this, the strict matrix on a no-Docker cluster node fails fast at cell 429 (driver-mc-get) with \"zero-request cell\".
P0 — bugs:
* sse.go: replay-store Append moves INSIDE c.mu so the store's
monotonic ID order matches the on-wire order under concurrent Send.
* sse.go: drain Append failure no longer cancels the connection;
proceeds with the user-supplied e.ID so a transient store hiccup
doesn't kill an otherwise healthy stream.
* replay_kv.go: bound the in-memory ID index (MaxKVReplayIndex with
configurable override via NewKVReplayStoreWithConfig), drop oldest
25% on overflow — fixes the unbounded-growth memory leak.
* hub.go: BroadcastFilter doc no longer claims pred runs under the
read lock (it doesn't; snapshot is locked, pred is lock-free).
* hub_test.go: tightened TestHubBroadcastFormatsOnce to bound BOTH the
hub-overhead intercept AND the per-conn slope (the prior single-slope
budget could hide a constant per-broadcast regression).
P1 — robustness/API:
* sse.go: queueClosed atomic guards sendQueued so a Send racing the
handler-defer close returns ErrClientClosed instead of panicking on
send-to-closed-chan. queueClosed is reset in releaseClient so pool
reuse doesn't leak terminal state.
* prepared.go: Bytes() returns a defensive copy; new Len() avoids the
copy for byte-counting callers.
* replay_kv.go: NewKVReplayStore returns (ReplayStore, error) instead
of panicking on nil store.KV — consistent with NewRingBuffer.
* config.go + sse.go: SlowClientAction renamed to ClientPolicy with
ClientPolicy{Drop,Disconnect,Block} prefix — matches BrokerPolicy
and websocket.HubPolicy.
* hub.go: Hub.Close waits in-flight broadcasts via WaitGroup, then
closes conns. Doc clarified to reflect the ordering guarantee.
* sse.go: Send docstring spells out the queued-mode return contract
explicitly (nil ⇒ enqueued OR dropped per OnSlowClient policy;
check DroppedEvents to distinguish).
P2 — infrastructure/pristine:
* cluster-go-gate.yml: mtime-based leak check via run-start.marker
catches every file in ~ touched during the run, not just a hand-
curated list.
* cluster-go-gate.yml: pinned XDG_CACHE_HOME, XDG_CONFIG_HOME, and
GOTMPDIR under bench_root so Go's secondary caches (telemetry,
tmpdir builds) cannot escape.
* cluster-go-gate.yml: cleanup runs under become so root-owned files
from sudo'd mage steps are removable.
* cluster-go-gate.yml: mage gate wrapped in `timeout` so async-deadline
cuts kill the remote subtree; pkill in always-block sweeps any
stragglers.
* mage_cluster.go: ClusterDeploy invokes ClusterCleanup first to
drop stale staging from a SIGKILL'd prior run before re-staging.
* mage_cluster.go: results-dir timestamps now sub-second + pid-suffixed
to avoid collisions when ClusterDistributedBenchParallel spawns
twin invocations within the same wall-clock second.
* mage_cluster.go: env-forward prefix list widened with GOFLAGS,
GOEXPERIMENT, GODEBUG, GOTRACEBACK, GORACE, GOMAXPROCS.
* mage_matrixbench.go: dropped !session-*/* from the services=none
exclude list (matched no scenario; driver-*/* alone covers the
backend-bound cells).
P3 — test gaps:
* TestPooledClientStateReset: pool reuse zeroes every queued/replay
state field across handler invocations.
* TestBrokerUnsubscribeIdempotent: double-unsubscribe is a no-op.
* TestBrokerSubscribeRacingClose: Subscribe+Close stress.
* TestHubBroadcastFilterZeroMatch: pred matching nobody returns (0, nil).
* TestHubOneSlowManyFast: a slow conn does not gate the dispatch to
the rest (issue #253 spec property pinned).
* TestHubCloseRacingBroadcast: Close ordering — in-flight broadcasts
finish; subsequent broadcasts return (0, nil).
Docs:
* MatrixBenchStrict godoc lists PERFMATRIX_* knobs and the
services=none auto-exclusion behaviour.
* WritePreparedEvent docstring explains its position relative to the
Broker's per-subscriber queue layer.
FullCompliance rerun on msa2-client failed the post-run leak check on three false-positive paths: /home/mini/.ansible/tmp — ansible's own task tmpdir /home/mini/.ansible_async — ansible async-task state /home/mini/.magefile/<hash> — mage's compiled-magefile cache The ansible state cannot be redirected because we run via ansible-playbook; add explicit -not -path exclusions for those. The mage cache CAN be redirected — set MAGEFILE_CACHE in the gate env block so the compiled-magefile binary lands under bench_root. A pre-create task ensures the dir exists (older mage versions error when MAGEFILE_CACHE points at a non-existent path).
TestRun_StartHighScalesDownOnIdle, TestRun_HysteresisFloorPreservedAboveMinActive, and TestRun_StartLowScalesUpOnLoad each used a 100-200ms ctx timeout and asserted the scaler had reached the expected steady state in that window. On slower CPUs (msa2-client, ARM Mac under load) the time.Ticker can fire fewer than expected times in 200ms and the test asserts on a transient mid-scale state — observed flake on msa2-client during FullCompliance: "got 7 active" (scaler had ticked once instead of the 6+ ticks needed to reach MinActive=2). Replace each test with a runAsync helper that spawns Run in a goroutine and a waitForActive helper that polls src.numActive() until the expected value (or 2 s timeout). Fast on healthy CPUs (exits as soon as steady state is reached, ~50 ms typical) and robust on slow ones.
…rrors
After ~2400 cells in a 4h strict-matrix run on msa2-server, the
adaptive engine's secondary (io_uring) hit:
secondary (io_uring): worker 0: listen socket: bind: address already in use
The error told us the syscall and the errno; it didn't tell us:
- which port was being bound
- whether SO_REUSEADDR/SO_REUSEPORT had actually taken effect on
the FD (set just before bind)
- what other listeners were on that port at the moment of failure
- whether we had run out of FDs
Add bindDiag(fd, sa) to engine/iouring + engine/epoll: captured
on bind() / listen() failure, embedded in the wrapped error so the
next port-recycling race leaves a one-line diagnostic record:
bind: address already in use [addr=127.0.0.1:34321 SO_REUSEADDR=1
SO_REUSEPORT=1 listeners_on_port=[tcp4 ... uid=1000 inode=...]
our_open_fds=42]
Listeners come from /proc/net/tcp{,6} (state 0A = LISTEN). All
fields fall back to inline error strings rather than masking the
original bind failure.
…c paths) Concurrency: * websocket/Hub: dispatch now spawns one goroutine per Conn (with optional MaxConcurrency cap) so a slow Conn cannot gate the rest. Hub.Close fans the per-conn Close calls out the same way. Issue #253 said "concurrent" — implementation now matches the contract. * sse/Broker: OnSlowSubscriber + BrokerPolicyDisconnect cleanup detached to a goroutine so a slow user callback or socket close cannot stall the publisher. Multi-instance correctness: * store.Counter: new optional extension exposing atomic Increment(ctx, key, ttl). MemoryKV implements it. NewKVReplayStore now feature-detects Counter and uses it for cross-instance ID monotonicity; falls back to a per-process counter (with the same pre-existing semantics) when the KV does not implement Counter. * KVReplayStoreConfig.AsyncAppend: when true, Append returns the local id immediately and the KV.Set fires in a goroutine — for publishers where wire-write latency dominates and replay can tolerate eventual consistency. API + docs: * sse/doc.go: explicit "Backpressure: two layers, two purposes" section — Client.MaxQueueDepth (per-Client queue) vs Broker (per-subscriber queue). Documents that the layers are orthogonal, Broker uses WritePreparedEvent which bypasses Client.queue, no scenario where they fight. Tests: * TestSSEHandlerFlushesHeadersImmediately: regression gate for the std-engine SSE header-flush bug — runs a real net/http client against a celeris.Std server and asserts headers arrive within 2s even when the handler does not Send. Without the WriteHeader+Flush fix this test would hang. * TestHubFastConnLatencyBounded: dispatch concurrency property — asserts a slow conn (with a 200ms write deadline, never drained) does NOT inflate the wall time delivered to the fast cohort, AND the broadcast does still join on the slow conn (catches an accidental skip-slow regression). * hub_test alloc gate retuned for the per-conn goroutine fan-out.
Concurrency / correctness:
- sse.Broker: PublishPrepared cleanup synchronous to prevent sync.Pool
reuse race against detached BrokerPolicyClose goroutine.
- sse.Broker: brokerSubscriber.closeQueue uses sync.Once so unsubscribe,
Close, and slow-disconnect can all race without close-of-closed panic.
- websocket.Hub: dispatch goroutines wrap WritePreparedMessage and the
user OnSlowConn callback in defer-recover so a panic in one conn
does not bring the entire fan-out down.
- websocket.Hub.Close: in-flight Broadcast snapshot is awaited via
inflight.WaitGroup before per-conn Close fan-out; semaphore acquire
moved INSIDE the per-conn goroutine so a hung Conn.Close cannot
deadlock Hub.Close under low MaxConcurrency.
- sse.kvStore.AsyncAppend: bounded by AsyncAppendConcurrency semaphore
(default 64); blocks the publisher at the cap rather than letting
goroutines pile up against a stalled KV.
API alignment:
- Broker / Client / Hub policy enums share the {Drop, Remove, Close}
triad with consistent semantics; ClientPolicyBlock kept for the
per-client legacy backpressure case.
- store.KV: new Counter extension (Increment) feature-detected by
NewKVReplayStore for cross-instance monotonic IDs; MemoryKV
implements Counter; per-process fallback when KV does not.
Docs:
- Hub.Close documents the MaxConcurrency cap on the Close fan-out and
the inside-goroutine sema-acquire deadlock guarantee.
- KVReplayStoreConfig.AsyncAppend documents visible-latency
backpressure at the goroutine cap.
- MemoryKV.Increment documents the shared key namespace with Set.
- ExampleNewKVReplayStore added.
Tests:
- New tests cover MemoryKV.Increment, AsyncAppend backpressure,
BrokerPolicyRemove, panic-recover in Hub dispatch, closeOnce race,
Hub.Close in-flight wait. Sleep-based flakes hardened with poll-
until-state. Alloc-strict tests gated off under -race.
The OnSlowSubscriber callback used to run synchronously in a serial loop in the publisher's goroutine. With N slow subscribers and a callback of latency D, total slow-path latency was N*D — a single slow drainer would gate publish throughput across the whole cohort. Now each slow subscriber's policy callback + Remove/Close cleanup runs on its own goroutine, bounded by SlowSubscriberConcurrency (default GOMAXPROCS*4, mirroring HubConfig.MaxConcurrency). Total slow-path latency drops to ~max(D) + cleanup. The publisher still WAITS on every spawned goroutine via wg.Wait before returning — that join is load-bearing: detaching cleanup would let a *Client return to sync.Pool and be re-acquired by a fresh connection while a goroutine still holds it for c.Close, racing the new connection's state. Panic isolation: each slow-path goroutine wraps the user callback in defer-recover. A panic in one user callback no longer crashes the publisher or gates other slow subscribers. Tests: - TestBrokerSlowSubscriberCallbackParallel: 16 slow subs × 50ms callback must complete in < 200ms (serial would be 800ms+). - TestBrokerSlowSubscriberCallbackPanicIsolated: one goroutine panics, others still apply Drop policy. - TestBrokerSlowSubscriberConcurrencyCap: cap=1 forces serial semantics; elapsed >= 0.6 * N * D. Test plumbing: subscribeGatedSlow helper uses gatedStreamer-backed clients to deterministically fill SubscriberBuffer=1 queues, so the slow path fires for every subscriber (the prior delay-streamer approach was timing-racy on warmup). Drive-by: TestHubCloseWaitsInflightBroadcast had a pre-existing race window between bcastEntered close (fired BEFORE Broadcast) and main calling Hub.Close. Sleep 20ms after the signal so the broadcast goroutine has time to enter snapshot and increment inflight before Close races it.
Tail-of-matrix 0-request cells are accumulation symptoms — FD or
goroutine leaks, TCP TIME_WAIT pool pressure, runtime memory growth.
The previous error message offered no quantitative starting point;
investigators had to re-run the matrix with hand-rolled telemetry.
Now, when res.Requests == 0, the runner's error message includes a
single-line snapshot:
- goroutines (Go scheduler)
- heap_inuse / sys (runtime.MemStats)
- open_fds (/proc/self/fd)
- time_wait (parse /proc/net/tcp{,6} for state 06)
- Max open files (/proc/self/limits)
Cheap to compute, only fires on failure, ships in the same fail-fast
log line that aborts the matrix.
…ummary Tracking the socket-FD leak that surfaces only after thousands of cells. Each cell now records FDsBefore (immediately before Server.Start) and FDsAfterStop (after the 15s Stop defer returns); the delta is logged inline as a cell-fd: line and stamped on the per-cell JSON. Post-run, the orchestrator scans every per-cell JSON, groups by server name, and prints a top-15 leaderboard (count, total leaked, max-per-cell, avg). A teardown path that misses one FD per cell shows up as constant avg leak; a stochastic leak shows up as spread between max and avg. Cheap on the happy path (one ReadDir per cell). The leaderboard fires only when at least one cell leaked.
The strict matrix on msr1 (Linux 7.0 / aarch64) hit a sporadic bind: address already in use on cell 1481 (get-simple-1024c with celeris-adaptive-auto+upg-async) after 3h33m of clean execution. The adaptive engine starts iouring + epoll concurrently; with GOMAXPROCS=12 epoll spawns 12 loops, each independently calling createListenSocket from its own goroutine. That is 12 (or 13 with iouring) concurrent unix.Bind calls into the same SO_REUSEPORT group on the same port. The kernel's per-port bind table is locked while the group is reconciled; on this kernel one of the binds occasionally observes a transient state and returns EADDRINUSE despite every member of the group setting SO_REUSEPORT before bind. Diagnostic at the failure moment confirmed: SO_REUSEADDR=1 SO_REUSEPORT=1 listeners_on_port=[] our_open_fds=45 — clean process state, no real conflict. Retry with exponential backoff (250 µs → 32 ms across 7 retries, ~64 ms worst case) absorbs the kernel-side contention. A real EADDRINUSE — port actually held by another listener that doesn't share our SO_REUSEPORT — escapes the budget unchanged and surfaces with the bindDiag attachment exactly as before. Non-EADDRINUSE errors short-circuit immediately so we don't paper over a configuration mistake. Same code path lives in both engine/epoll/loop.go::createListenSocket and engine/iouring/worker.go::createListenSocket; both fixed.
Four follow-ups landed together: 1. Bump goceleris/loadgen v1.2.0 → v1.4.0 to pick up the new Unblocker extension. The bench warmup now SetDeadline's pooled conns when running flips false, so workers stuck in a blocking net.Conn.Read/Write release immediately instead of waiting for their syscall to return on its own. Was causing the post-1m / adaptive-auto+upg-async cell to 0-req on msr1 — warmup stretched ~35s on -race + aarch64, eating cellCtx, post-warmup measured 18ms with no requests, fail-fast tripped on a non-bug. Root-cause fix; the cellCtx scaling we shipped earlier stays as defense-in-depth. 2. Gate the per-cell `cell-fd:` log line. Logs every cell when PERFMATRIX_FD_TRACE=1 (or -fd-trace), otherwise only fires on cells with a non-zero delta. Default keeps the gate.log readable; active leak hunts opt in to the verbose stream. 3. iris-h1 was emitting a transient +6 / -6 FD blip across cell boundaries — http.Server.Shutdown returned before iris's internal Application host list released its FDs, which were reaped during the next cell's GC pass. Adding app.Shutdown alongside http.Server.Shutdown closes them synchronously and restores the per-cell baseline. 4. PERFMATRIX_FD_TRACE env-var → -fd-trace flag wiring in the mage matrix orchestrator.
Picks up the h1 Unblock fix: Unblock now closes the conn (forcing EOF on any active Read/Write) instead of SetDeadline, which left partial bytes in the bufio reader and caused 0-req post-warmup on the chain-api-get-json-1c / chi-h1 cell after 4h17m on msr1 in the back-to-back strict matrix run (loadgen#43).
…reverted) The v1.4.0 + v1.4.1 Unblocker extension was reverted upstream (loadgen#44) after both iterations introduced subtler problems under back-to-back strict matrix runs on msr1: - v1.4.0 (SetDeadline-based) hit chain-api-get-json-1c / chi-h1 with stale bufio bytes after 4h17m. - v1.4.1 (Close-based) hit get-json-64k / celeris-std-h1 with reconnect-storm queueing past the measurement window after 5h25m. Both attempted to fix worker stuck-in-DoRequest at warmup-end. The matrix runner already absorbs that in cellCtx scaling (commit a77dde2 in this branch), so the framework-side Unblocker was overengineering. Bump to v1.4.2 (revert) and rely on cellCtx scaling alone.
CI's setup-go@v6 with 'go-version: "1.26"' was binding to the cached 1.26.2 toolchain even after govulncheck reports surfaced GO-2026-4971 (net.Dial NUL panic on Windows) + GO-2026-4918 (net/http2 infinite loop on bad SETTINGS_MAX_FRAME_SIZE), both fixed in 1.26.3. Explicit pin forces the patched toolchain.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #249, #250, #251, #253 (the v1.4.2 milestone). All four open issues land in a single squash-merge PR matching the v1.4.0 / v1.4.1 release shape.
Summary
middleware/sseslow-client policy. Opt-inMaxQueueDepthplusOnSlowClient(ActionDropEvent/ActionDisconnectClient/ActionBlock).MaxQueueDepth = 0preserves the historical blocking semantics. NewClient.DroppedEvents()andClient.QueueDepth()for metrics.middleware/sseBroker + PreparedEvent.PreparedEventcaches the wire bytes once;BrokerSubscribe/Publish/Close handles per-subscriber bounded queues + drain goroutines +OnSlowSubscriber(Drop/Disconnect) policy.middleware/sseReplayStore. Interface +NewRingBuffer(in-memory,≤ 1alloc/opAppend) +NewKVReplayStore(durable viamiddleware/store.KV). On connect withLast-Event-IDthe middleware replays missed events before invokingHandler; ID is rewritten to the canonical store-assigned value on every Send.ErrLastIDUnknownfalls through toHandlerso the user can react.middleware/websocketHub. Symmetric tosse.Broker: Register/Broadcast/BroadcastPrepared/BroadcastFilter/Len/Close,OnSlowConnpolicy (Drop/Remove/Close). Read-locked snapshot scan; per-Conn writes happen outside the lock; removals deferred until after dispatch so the snapshot is not mutated mid-loop.Subscribethen wait forPublish) deadlocked clientDo()indefinitely. TheNewhandler now flushes afterWriteHeaderunconditionally so EventSource clients see the response immediately.Test plan — every gate run, every gate green
go test -race -count=2onmiddleware/sse/...andmiddleware/websocket/....-raceruns because the race detector instruments chan/mutex ops).Broker,NewRingBuffer,Hub.mage BenchcmpSSE— celeris vs tmaxmax/go-sse fan-out (~10× faster per-publish).mage BenchcmpWS— celeris vs gorilla/websocket + the new Hub-vs-hand-rolled-gorilla-Hub bench.mage Spec— h1spec + h2spec.mage TestAutobahn— WebSocket protocol-compliance suite via Docker.mage TestSoak— 5-min slow-consumer soak (2.5M msgs, 0 dropped).mage ClusterDeploy— cross-arch binaries staged on msa2-server / msa2-client / msr1.mage ClusterDistributedBenchParallel— msa2-client fires concurrently into msa2-server and msr1 over the 20G LACP fabric, failed=0.mage ClusterGoGate(new) →FullComplianceon msa2-client — PASSED.mage ClusterGoGate(new) →MatrixBenchStricton msa2-server (6h08, amd64) + msr1 (6h05, aarch64) in parallel — PASSED on both.-race -checkptr, 2532 cells × 3 runs each, no races / panics / fail-fast. Driver-* and session-* cells excluded via the newservices=nonematrix mode (no Docker on the cluster).Skipped by design:
mage MatrixBenchandmage MatrixBenchSince— v1.4.2 changes are confined to additive ergonomics inmiddleware/sseandmiddleware/websocket, so a request-hot-path regression there would not be informative.Files
middleware/sse/config.goMaxQueueDepth,OnSlowClient,SlowClientAction,ReplayStoremiddleware/sse/sse.gomiddleware/sse/prepared.goPreparedEvent+WritePreparedEventmiddleware/sse/broker.goBroker+BrokerConfig+BrokerPolicymiddleware/sse/replay.goReplayStoreinterfacemiddleware/sse/replay_ring.gomiddleware/sse/replay_kv.gostore.KV-backedmiddleware/sse/{broker,replay,slowclient}_test.gomiddleware/sse/example_test.goExampleBroker,ExampleNewRingBuffermiddleware/websocket/hub.goHub+HubConfig+HubPolicymiddleware/websocket/hub_test.gomiddleware/websocket/example_test.goExampleHub(chat-room)middleware/{sse,websocket}/race_{on,off}_test.goraceEnabledbuild-tag pair so strict-alloc tests skip under-racetest/benchcmp_sse/test/benchcmp_ws/bench_test.goBenchmarkHubBroadcast100_*magefile.goBenchcmpSSE,BenchcmpWStargetsmage_cluster.goClusterDistributedBenchParallel,ClusterGoGate(multi-host fan-out)mage_matrixbench.goMatrixBenchStricthonoursPERFMATRIX_SERVICES=none+ auto-excludes driver / session cellsansible/cluster-go-gate.ymlNotes
feedback_no_docs_commits.mdhonoured: no bench logs, reports, or assessment artifacts in the diff.feedback_cluster_pristine.mdhonoured: every cluster-side install (Go, mage, source) goes through ansible with manifest tracking and snapshot-verified cleanup.