Skip to content

feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153

Open
kafka1991 wants to merge 232 commits into
mainfrom
jh_conn_pool_refactor
Open

feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153
kafka1991 wants to merge 232 commits into
mainfrom
jh_conn_pool_refactor

Conversation

@kafka1991

@kafka1991 kafka1991 commented Jun 4, 2026

Copy link
Copy Markdown
Collaborator

Summary

This PR is the combined landing of #148 and #150 — two independently-developed columnar I/O tracks that both target QWP/WebSocket and share the same connection pool / SymbolGlobalDict, shipped together so downstream consumers (py-questdb-client, in-tree C/C++ tests) see one self-consistent surface.

#148 — Column-major sender (column_sender)

A DataFrame → Table ingest API. QuestDb connection pool + BorrowedSender + Chunk (per-column Vec<u8> that stacks wire bytes directly) + synchronous flush(AckLevel). Covers bool / signed integers / floats / UUID / Long256 / IPv4 / timestamps / VARCHAR / symbol_dict_{i8,i16,i32} bulk-intern. The connection-scoped SchemaRegistry (FULL / REFERENCE emit modes) and SymbolGlobalDict are shared with the row API, preserving the 1M-per-connection symbol cap on huge Pandas Categorical dicts. Full C ABI (include/questdb/ingress/column_sender.h) and a Criterion bench suite (column path ≈ memcpy ceiling; bulk-intern ~16× faster than per-row HashMap).

#150 — Apache Arrow + Polars integration

Both directions over QWP/WebSocket:

  • Ingress: Buffer::append_arrow / append_arrow_at_column consumes a whole RecordBatch in one call, column-major dense bulk path (one memcpy per column; QWP null bitmap built by byte-stride OR-with-NOT of the Arrow validity buffer when boundaries align, per-row fallback only when bit-offsets are unaligned).
  • Egress: Cursor::as_record_batch_reader() streaming RecordBatch iterator; Polars sub-feature provides the DataFrame bridge.
  • C ABI via the Arrow C Data Interface: line_sender_buffer_append_arrow* and line_reader_cursor_next_arrow_batch. Every producer-supplied ArrowArray/ArrowSchema is pre-validated before from_ffi (schema depth ≤ 64, row_count ≤ 16M, bounded per-node buffer/child counts, and rejection of NULL or under-sized buffer arrays), so a malformed struct returns an error instead of aborting the FFI crate's panic = "abort" profile. The manual per-column FFI path caps each variable-length payload at i32::MAX.

Why merged

The two tracks were developed on the same jh_conn_pool_refactor branch and converged on shared infrastructure (connection pool, SymbolGlobalDict, QWP/WS transport). Splitting them at review time would force one to ship behind a compatibility shim for the other; merging them together avoids that churn and gives C/C++ callers — and the upcoming Pandas / Polars wrapper in py-questdb-client — a column-major, zero-redundant-copy path into QuestDB in one cut.

Public surface

See the original PRs' "Public surface" / "What's in the box" sections:

Feature gating

  • questdb-rs: arrow + polars are opt-in features, excluded from almost-all-features; column_sender lives behind sync-sender-qwp-ws.
  • questdb-rs-ffi: arrow feature mirrors.
  • CMakeLists.txt: QUESTDB_ENABLE_ARROW=OFF by default; auto-flipped to ON when QUESTDB_TESTS_AND_EXAMPLES=ON so tests / examples exercise the Arrow path without explicit opt-in.

Test plan

  • Rust unit tests: 57 column_sender + 80+ Arrow
  • FFI unit tests: 8 new column_sender + Arrow path coverage
  • C/C++ tests: test_arrow_c.c / test_arrow_egress.cpp / test_arrow_ingress.cpp wired into CMake and exercised in CI
  • System tests against a live QuestDB: arrow_egress_fuzz / arrow_ingress_fuzz / arrow_round_trip_fuzz / arrow_alignment_fuzz
  • cargo bench --features sync-sender-qwp-ws --bench column_sender
  • End-to-end Pandas / Polars throughput (py-questdb-client, WS-7)

Closes #148, #150.

Summary by CodeRabbit

  • New Features

    • Opt-in Apache Arrow support for Arrow egress and ingest; new column-major sender for high-throughput DataFrame ingestion; Polars integration.
  • Examples

    • Added C, C++ and Rust examples demonstrating Arrow egress/ingest and column-sender/Polars workflows.
  • Documentation

    • Added column-sender ABI spec, implementation plan, and performance guidance.
  • Tests

    • Expanded Arrow/Polars unit, smoke and fuzz coverage; new integration tests.
  • Chores (CI)

    • CI updated to install pyarrow/polars, expanded test/fuzz jobs and longer timeouts.
  • Benchmarks

    • New Criterion benchmarks measuring column-sender performance.

bluestreak01 and others added 30 commits May 24, 2026 01:42
Plan and FFI ABI for the new column-major writer that will ingest
Pandas/Polars DataFrames over QWP/WebSocket. Locks the QuestDb pool
shape, BulkChunk encoder strategy, validity bitmap semantics, and
the C ABI the separate Python wrapper repo will consume.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Locks the column-sender API around synchronous flush:
sender.flush(&mut chunk, ack_level) blocks until the requested ACK
level (Ok = WAL commit, Durable = object-store via Enterprise
opt-in). Drops the FSN/submit/await split from the FFI; at most one
frame in flight per sender, parallelism via the pool.

Refuses sf_dir and other sf_* keys at QuestDb::connect with
ConfigError — store-and-forward is single-writer-per-slot and
interacts awkwardly with pool auto-grow; row-major Sender remains
the SF path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lands the Rust core, C ABI, and benchmarks for a column-major sender
targeting Pandas/Polars → QuestDB throughput over QWP/WebSocket. See
`doc/COLUMN_SENDER_PLAN.md` for the design and `doc/COLUMN_SENDER_FFI_ABI.md`
for the C ABI spec; both shipped in earlier commits on this branch.

# What's in the box

* **WS-0 — `QuestDb` pool** (`ingress/column_sender/db.rs`,
  `ingress/column_sender/conf.rs`).
  Thread-safe pool with eager-open, fail-fast at `pool_max`,
  `BorrowedSender<'a>` that returns on `Drop`, and a background reaper
  (`pool_reap=auto`, tick = max(5 s, idle_timeout / 12)) that closes
  excess-over-`pool_size` connections. New conf keys: `pool_size`,
  `pool_max`, `pool_idle_timeout_ms`, `pool_reap`. `sf_*` / `sender_id`
  / `qwp_ws_progress=manual` refused at `connect`-time.

* **WS-1 — synchronous `flush` plumbing** (`ingress/column_sender/sender.rs`,
  `ingress/column_sender/encoder.rs`).
  `ColumnSender::flush(chunk, AckLevel)` encodes the chunk, publishes via
  the existing QWP/WS replay queue (`Sender::qwp_ws_publish_raw` —
  pub(crate) escape hatch in the row-API sender), and blocks until the
  ACK watermark crosses the published FSN. Polls in 50 ms slices so a
  `must_close` mid-wait surfaces promptly. `AckLevel::Durable` requires
  `request_durable_ack=on` at connect or returns `InvalidApiCall`.

* **WS-2 — `Chunk` + numeric / fixed-width columns**
  (`ingress/column_sender/chunk.rs`, `validity.rs`, `wire.rs`).
  Per-column wire-shape `Vec<u8>` storage so encode is a header +
  `extend_from_slice` per column. Two code paths per type per the plan
  §2.2:
  - Bool, i8, i16, i32, i64, f32, f64: `null_flag = 0` always; nullable
    rows sentinel-encoded (0 / i32::MIN / i64::MIN / NaN), matching the
    row-API convention.
  - Sparse-null types (uuid, long256, ipv4, ts_nanos, ts_micros,
    date_millis): no-null = `extend_from_slice`; nullable = QWP-shape
    bitmap + dense gather.
  - Designated timestamp (micros or nanos) — exactly one per chunk.
  Connection-scoped `SchemaRegistry`: first emit → FULL; repeat → REFERENCE.

* **WS-3 — VARCHAR** (`Chunk::column_varchar`). Arrow Utf8 in
  (`offsets: &[i32]` length `row_count + 1`, `bytes: &[u8]`); wire out
  is dense `non_null_count + 1` LE-u32 offsets + concatenated bytes.
  No-null path memcpys offsets when `offsets[0] == 0`; nullable path
  walks validity and skips slicing for null rows. Offset validation
  (negative / non-monotonic / past `bytes_len`) caught client-side.

* **WS-4 — symbol bulk-intern**
  (`Chunk::symbol_dict_{i8,i16,i32}`, `encoder::resolve_symbols`).
  Three append-time passes: referenced-bitset + range check; compact
  referenced dict bytes; translate codes to internal indices and build
  the QWP-shape bitmap. Connection-scoped `SymbolGlobalDict` shared
  with the row API's type (`buffer/qwp.rs:next_id/intern/entry`
  promoted to `pub(crate)`). At flush time, only entries the chunk
  actually references reach the wire — protects the 1M-per-connection
  cap on huge Pandas `Categorical` dicts. Roll-back on encode error
  keeps client + server dict views coherent.

* **WS-5 — C ABI** (`questdb-rs-ffi/src/column_sender.rs`,
  `include/questdb/ingress/column_sender.h`).
  Full implementation of `doc/COLUMN_SENDER_FFI_ABI.md`:
  - Opaque handles `questdb_db`, `column_sender`, `column_sender_chunk`.
  - `column_sender_validity` repr-C struct; `column_sender_ack_level`
    repr-C enum.
  - `questdb_db_connect/close/borrow_sender/return_sender/reap_idle`.
  - Every chunk column-append, the VARCHAR + symbol_dict family, the
    two designated-timestamp variants, and `column_sender_flush`.
  - Errors reuse `line_sender_error*`.
  Rust side gains `OwnedSender` — Arc-backed borrow handle the FFI hands
  out as `column_sender*` so the C caller can free `questdb_db*` before
  all borrows return without dangling.

  Hand-runnable smoke test at `cpp_test/smoke_column_sender.c`
  (compiles with `-Wall -Wextra -Werror`; not wired into CMake yet —
  matches the `smoke_line_reader` pattern).

* **WS-6 — bench** (`questdb-rs/benches/column_sender.rs`,
  `doc/COLUMN_SENDER_PERF.md`).
  Three families: per-column append vs raw memcpy baseline; symbol
  bulk-intern vs naïve per-row HashMap; encode_chunk end-to-end (no
  network). First-baseline numbers (Apple Silicon laptop, 100k rows):
    - `column_f64/column_sender_no_null` ≈ 55 GiB/s — matches memcpy.
    - `column_i64/column_sender_no_null` ≈ 54 GiB/s — matches memcpy.
    - `column_varchar/column_sender_no_null` within ~5 % of memcpy.
    - Symbol bulk-intern ~16× faster than naïve per-row HashMap.
    - `encode_chunk/populate_plus_encode` ≈ 139 M rows/s end-to-end.

# Verification

- 57 column-sender tests (Rust core); 8 FFI tests; full 834-test lib
  suite passes.
- `cargo fmt` + `cargo clippy --tests --benches` clean on both crates.
- `cargo doc` introduces no new warnings.
- `cc -std=c11 -Wall -Wextra -Werror -I include` compiles the C header
  and the smoke program.

# What's not in here

- WS-7 (Python wrapper) lives in `py-questdb-client`. With the C ABI
  in `include/questdb/ingress/column_sender.h` and the FFI symbols in
  `libquestdb_client`, that repo can now start consuming.
- A live Pandas→QuestDB end-to-end bench and 1-hour soak — both
  belong in the Python repo / nightly CI rather than the in-tree
  Criterion suite.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rewrite the column-major sender to eliminate intermediate buffers and
pipeline writes for maximum single-connection throughput.

Architecture changes:
- ColumnSender now owns a dedicated ColumnConn (conn.rs) that drives
  socket I/O directly — no replay queue, no background thread, no
  row-API publisher involvement.
- Chunk<'a> holds borrowed descriptors (raw pointers + lengths) into
  the caller's buffers; no per-column Vec<u8> staging. The encoder
  writes wire bytes straight from caller memory into the connection's
  reusable write_buf at flush time.
- flush() pipelines: encode + WS-mask + write_all, then drain acks
  non-blocking. Blocks only when in-flight hits the 128-frame protocol
  cap. New sync(AckLevel) blocks until all acks settle.
- Server cumulative OKs handled correctly (sequence=N acks all frames
  up to N).

API changes:
- flush(&mut chunk, AckLevel) → flush(&mut chunk) (fire-and-forget)
- New sync(AckLevel) drains all in-flight acks
- FFI: column_sender_flush drops ack_level arg; new column_sender_sync
- FFI lifetime contract: caller buffers must outlive flush (no copy)

Performance (5M-row L1 quotes, 9 columns, localhost):
- Encode path: 6 GB/s (2.3% of wall time)
- End-to-end: 350 MB/s pipelined (was 264 MB/s stop-and-wait)
- Per-chunk p50: 0.72 ms (was 2.64 ms)
- Criterion populate+encode: 575 µs (was 718 µs, 20% faster)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The default macOS TCP send buffer (~128 KB) is smaller than a typical
QWP chunk (1.5 MB at 25k rows). write_all blocks mid-frame while the
kernel drains the small buffer. A 4 MiB send buffer lets the kernel
accept a full chunk in one shot, reducing write_all stalls when the
pipeline has multiple frames in flight.

Also sets SO_RCVBUF to 4 MiB to absorb ack bursts from the server
without backpressuring the server's send path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
flush() now sets FLAG_DEFER_COMMIT (0x01) on every QWP frame. The
server appends rows to WAL writers without committing. sync() sends a
commit-triggering empty frame (without the flag) that commits all
accumulated rows in one WAL transaction, then drains acks.

This eliminates per-chunk WAL fsync overhead: 200 chunks × 25k rows
now produce 1 WAL commit instead of 200. The p95 per-chunk latency
drops from ~23 ms to ~7 ms. Old servers that don't recognize the flag
ignore it (reserved bit position) and commit per-message — graceful
degradation per the spec.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The server's ClientSymbolCache only caches symbols with
symbolKey < initialSymbolCount. On a fresh table, initialSymbolCount
stays at 0 until a WAL segment rolls and the watermark updates. By
sending the first frame without FLAG_DEFER_COMMIT, the server commits
it immediately, which allows the next segment to pick up the new
symbol count and enable caching for all subsequent deferred frames.

This is a client-side workaround for a server-side cache limitation.
The proper fix is for the server to cache locally-assigned symbol IDs
within the same segment (see WalColumnarRowAppender.putSymbolColumn).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Align the C ABI, docs, and smoke test with column_sender_flush(sender, chunk, err) plus column_sender_sync(sender, ack_level, err). Reserve an in-flight slot for the sync commit, validate durable ACK opt-in before publishing, and add pool/sync coverage.
Rename the borrowed handle returned from the connection pool from
`column_sender` to `qwpws_conn` so it can host peer writer modes
(per-type today, generic Arrow / NumPy in Steps 2-3, future egress
readers). No behaviour change — the underlying Rust types
(ColumnSender / OwnedSender) keep their names since they're
doc-hidden; only the public C ABI changes.

FFI surface changes:
- struct column_sender             -> qwpws_conn
- questdb_db_borrow_sender         -> questdb_db_borrow_conn
- questdb_db_return_sender         -> questdb_db_return_conn
- column_sender_must_close         -> qwpws_conn_must_close
- column_sender_flush(sender, ...) -> column_sender_flush(conn, ...)
- column_sender_sync(sender, ...)  -> column_sender_sync(conn, ...)

column_sender_chunk and the column_sender_chunk_column_* / _symbol_dict_*
appenders keep their names — the chunk IS the column-sender writer's
accumulator, and flush/sync are operations on it; only the
borrowed-handle parameter type changes.

See plan-conn-pool-and-writers.md in py-questdb-client (Step 1) and
the Slack thread from 2026-05-27 with Victor for the rationale: pool
QWP/WS connections, not writers, so egress readers and Arrow / NumPy
appenders can share the same pool as the existing column_sender chunk
path.

Open Q1 from the plan is answered (chunk.rs:208, encoder.rs:82-95,
encoder.rs:460-466): `column_sender_chunk_column_*` already
direct-writes to the wire buffer — for native-LE contiguous data it
is one `extend_from_slice` per column. So Step 3's NumPy appender is
no longer about "saving an extra memcpy"; it's about avoiding
Python-side widening for narrower dtypes / strided / non-native-endian.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
New entry point that consumes an Apache Arrow C Data Interface
ArrowArray + ArrowSchema pair and dispatches to the existing per-type
chunk methods based on the schema's format string. Caller passes the
borrowed pointers it gets from PyArrow's `_export_to_c` (or any other
Arrow C Data producer); the FFI never constructs or releases the
arrays.

Supported schema formats in this patch:
  - c, s, i, l       int8 / int16 / int32 / int64
  - f, g             float32 / float64
  - b                bool (LSB-first bitmap)
  - u                UTF-8 string (int32 offsets)
  - tsn:..., tsu:... timestamp nanos / micros (timezone suffix ignored)
  - dictionary schemas with c/s/i indices and a UTF-8 value type —
    routed to symbol_dict_i8 / _i16 / _i32

Other formats — including LargeUtf8 (U), decimal, struct, list, and
non-UTF-8 dictionary values — currently return
line_sender_error_invalid_api_call. LargeUtf8 lands in Step 2b.

Constraints:
  - ArrowArray.offset must be 0; sliced arrays are rejected.
  - The chunk's row-count lock applies to the new appender the same
    way as the per-type calls.

The Arrow types are mirrored as #[repr(C)] structs in the Rust FFI
shim so we read them without taking a dependency on the arrow / arrow-
array crate. No new Rust dependencies.

See plan-conn-pool-and-writers.md (Step 2). The Cython-side wiring
(routing pandas Arrow-backed columns through this entry point) lands
in a separate patch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add ColumnKind::VarcharLarge (i64 offsets) + Chunk::column_varchar_large
+ encode_varchar_large. The new encoder reads i64 offsets and writes
u32 LE to the wire frame in one pass — no caller- or Rust-side
intermediate Vec<i32> for the narrowing.

Validation rejects negative offsets, decreasing offsets, offsets
exceeding the bytes buffer, AND any last offset exceeding u32::MAX
(the QWP wire offset table is uint32 LE). The overflow check at
chunk-build time surfaces a meaningful error rather than a per-row
overflow at encode time.

The Arrow appender's `U` format match now routes here. This unblocks
the Python side: pandas large_string columns can be sent without the
Python-side cast to UTF-8 (which previously allocated a fresh Arrow
array via pyarrow.cast).

estimate_frame_size grew a VarcharLarge case identical to Varchar.

questdb-rs 836 lib-tests pass. clippy clean on both crates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extend column_sender_chunk_append_arrow_column with row_offset and
row_count parameters so chunked-emission callers can slice an
ArrowArray without consolidating it first. Required for the Python
Client.dataframe path, which loops over row chunks and currently
slices buffers manually for the per-type appenders.

Per-format slicing:
  - fixed-width primitives + timestamps: data pointer is shifted by
    row_offset elements (`ptr.add(row_offset)`).
  - bool bitmap: shifted by row_offset / 8 bytes; row_offset % 8 == 0
    required (matches the validity bitmap byte-alignment).
  - utf8 / large_utf8: offsets pointer shifted by row_offset
    elements (Arrow offsets are monotonic, so the slice's offsets
    are still well-formed). bytes_len is read from the original
    array's last offset; the encoder rebases on the wire.
  - dictionary symbols: codes pointer shifted; the dictionary is
    shared across chunks unchanged.

Validity bitmap requires row_offset % 8 == 0; with row_offset=0 and
row_count=array.length we get exactly the previous behaviour.

Caller bounds-check: row_offset + row_count must not exceed
array.length.

The C header docs the new parameters; clippy & fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Match the Arrow C Data Interface spec more precisely: `children`
  is `*const *mut ArrowArray` (`struct ArrowArray**` in the spec)
  and `dictionary` is `*mut ArrowArray`. We never mutate, so this
  is layout-equivalent to the previous `*const`/`*const`, but the
  declarations now line up with the spec for readers cross-checking.
- Rename `array_len` -> `array_total_len` in the appender so the
  meaning is unambiguous next to the per-call `row_count` parameter.
- Cross-reference doc comments: the per-type varchar / symbol_dict
  C-ABI entries now mention `column_sender_chunk_append_arrow_column`
  as the recommended path for callers holding an Arrow array, and
  flag the per-type entries as the lower-level building block.

No behaviour change. fmt + clippy clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two correctness findings from the multi-agent review:

1. **encode_varchar_large rejected valid late slices.**
   validate_varchar_offsets_i64 checked the absolute `last` offset
   against u32::MAX, but the encoder narrows `(off - first)` per row.
   A slice taken from the tail of a multi-GiB LargeUtf8 array (e.g.
   base=3 GiB, last=4 GiB) was rejected even though every wire offset
   would be ≤ 1 GiB. Now we validate the *span* `last - first` against
   u32::MAX, with a clearer error message.

2. **Null-pointer deref on malformed Arrow arrays.**
   arrow_buffer<T> returned the raw buffer pointer without checking it
   for null. Callers then unconditionally `slice::from_raw_parts(...)`
   or `*offsets_ptr.add(...)`. A producer presenting length > 0 with a
   null data buffer (spec-violating but plausible from buggy clients)
   would UB before any validation ran.

   Added an `allow_null: bool` parameter. The bytes buffer of an empty
   varchar/symbol-dict array can legitimately be NULL (we already
   guard that downstream), so those three call sites pass `true`. All
   other call sites — offsets, primitives, codes, bool bitmap — pass
   `false` and surface a clean `InvalidApiCall` error instead.

Reviewers: convergent finding from concurrency-code-reviewer (Rust)
and general-purpose (cross-layer) agents.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add Chunk::column_numpy + NumpyDtype enum to questdb-rs, plus the C
FFI wrapper column_sender_chunk_append_numpy_column.

Behaviour, per Step 3 design decisions:
- i8/i16/i32 -> i64 sign-extend (wire = LONG).
- u8/u16/u32 -> i64 zero-extend (wire = LONG).
- i64 -> pass-through (wire = LONG).
- u64 -> i64 bit-reinterpret. Values > i64::MAX wrap to negative on
  the wire, matching the row path's C-cast behaviour.
- f32 -> f64 widen (wire = DOUBLE).
- f64 -> pass-through (wire = DOUBLE).
- bool (NumPy byte-per-row) -> Arrow LSB-first packed bitmap
  (wire = BOOLEAN).

Strided arrays and non-native-endian arrays are not supported in v1;
the caller (Python client) consolidates upstream.

Widening lives in Rust at append time, materialising into a chunk-
owned scratch arena (`Chunk::scratch: Vec<NumpyScratch>`). The
ColumnDescriptor's `*const T` points into the scratch; the encoder
hot path is unchanged. Scratch is cleared on Chunk::clear / drop.

The scratch enum uses typed variants (Box<[i64]>, Box<[f64]>,
Box<[u8]>) so the storage alignment matches the encoder's read
alignment.

questdb-rs 836 lib-tests pass. clippy + fmt clean on both crates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Round-3 dirty-sender fix (option c from
plan-conn-pool-and-writers.md): expose a new FFI that callers use in
error-recovery paths to force-close a conn instead of recycling it.

The problem: a mid-call flush failure left a conn with in-flight
uncommitted frames in the pool. The next borrower's first flush is
QWP's "immediate commit", which would commit the stale frames
alongside their own.

The fix exposes a single new entry point:

  void questdb_db_drop_conn(questdb_db* db, qwpws_conn* conn);

semantically equivalent to "mark must_close, then return" but in one
atomic step. The conn enters the terminal state and the pool drops
it on return rather than recycling it.

Implementation:
- ColumnConn gains `mark_must_close(&mut self)` (pub(crate)).
- ColumnSender gains `mark_must_close(&mut self)` (pub) that
  forwards to ColumnConn.
- The FFI wraps these: questdb_db_drop_conn marks then drops.

The existing `qwpws_conn_must_close()` getter is unchanged; this
adds the corresponding setter at each layer.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small hardening tweaks:

1. **Tighten format dispatch.** The Arrow C Data Interface only uses
   a `:`-prefixed parameter on timestamp / date / time formats;
   everything else is a single character. Previously
   `column_sender_chunk_append_arrow_column` did
   `format.split(':').next()` and dispatched on the prefix, which
   would spuriously match e.g. a malformed `"u:foo"` to the varchar
   arm. Exact-match the non-ts arms and use `starts_with("tsn:")` /
   `starts_with("tsu:")` for the ts arms.

2. **Accept `null_count == -1` with NULL bitmap as "no nulls".**
   pyarrow / polars emit this shape when the column has no nulls
   (the spec's "unknown" interpretation). We treat it as no-nulls;
   the encoder reads the data buffer densely. Only `null_count > 0`
   with a NULL bitmap is malformed.

3. **Guard `dict_array.length < 0`.** The main array's negative
   length is already rejected in
   `column_sender_chunk_append_arrow_column`; mirror the same check
   inside `arrow_dictionary_utf8` for symmetry.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…refactor

# Conflicts:
#	questdb-rs/src/ingress.rs
Extends the column-sender pool to also serve egress readers from one
shared `questdb_db` configured by a single conf-string. Lazy-init for
readers, eager for writers, same `pool_size` / `pool_max` /
`pool_idle_timeout_ms` / `pool_reap` budget.

- questdb-rs/db.rs: parallel reader free-list, `borrow_reader_owned`,
  `ReaderPoolHandle`, `OwnedReader::mark_must_close`, integrated into
  the reaper. All reader-side state and methods feature-gated under
  `_egress` so the default build (no egress) stays lean.
- questdb-rs/egress/config: reader conf-string parser accepts the
  `qwpws::` / `qwpwss::` schemes and ignores `pool_*` keys, so a
  single conf-string drives both the sender and reader pools without
  translation.
- questdb-rs-ffi/egress: `line_reader` becomes a named struct with a
  `ReaderOwnership` enum (Standalone vs Pooled{handle, must_close});
  pool borrow/return + `line_reader_mark_must_close` exposed in C.
- column_sender.rs: `questdb_db(pub(crate) QuestDb)` so the egress
  FFI can reach the inner pool to wire reader borrows.
- Headers: reader-pool entry points live in `egress/line_reader.h`
  next to the type they wrap; `ingress/column_sender.h` points
  there.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
bluestreak01 and others added 30 commits June 27, 2026 19:10
test_zone_failover_stays_in_zone_then_crosses_c_client_rust: two zone-A
servers + one zone-B server (zone-A first in addr=, zone=A). The Reader
binds zone A, stays in zone A across an intra-zone failover (kill the
bound zone-A host -> surviving zone-A sibling), and crosses to zone B
only once both zone-A hosts are gone (zone= is a preference, not a hard
filter). Extends _egress_connect_string with zone/target/timeout knobs
and adds a _wait_for_zone poll helper.

Mirrors the Enterprise Java test_zone_failover.py suite; the binding
under test is the Rust egress Reader.
SHOW_ZONE captured the zone value from the first batch then broke out of
the loop, dropping the cursor before it was fully read. The Rust Reader
poisons a connection whose cursor is dropped un-drained, so the *next*
SHOW_ZONE failed with 'Reader connection is closed and cannot be reused'.

This surfaced in test_zone_failover_stays_in_zone_then_crosses_c_client_rust
(build 246314): the first SHOW_ZONE passed but poisoned the connection,
and the _wait_for_zone poll that followed errored out.

Drain the cursor to completion (capture the value once, keep calling
next_batch until None). SHOW PARAMETERS returns a tiny result, so this
is cheap.
Align the row-major and store-and-forward column-major senders on a single
delivery-wait surface.

Row-major:
- Remove `Sender::await_acked_fsn(fsn, timeout) -> Result<bool>` and the
  `line_sender_qwpws_await_acked_fsn` C ABI entry point.
- Add `Sender::wait(ack_level: AckLevel, timeout: Duration) -> Result<()>`
  (and on the `BorrowedRowSender` handle), mirroring `SfColumnSender::wait`.
  It waits for the cumulative `published_fsn` boundary at the requested
  AckLevel. Re-export `AckLevel` at `crate::ingress`.

Column-major (store-and-forward only; direct sender untouched):
- `SfColumnSender::wait` / `ColumnSender::wait` gain a per-call `timeout`,
  replacing the `set_sfa_sync_timeout_for_test` hook.

Shared timeout semantics: `timeout` is a no-progress deadline (fires only if
the ack watermark fails to advance for that long); `Duration::ZERO` waits
indefinitely; on expiry returns `ErrorCode::FailoverRetry` with frames
retained for replay. The direct sender keeps `commit`/`flush_and_wait` and its
`request_timeout`-bounded behaviour.

FFI/bindings: `line_sender_qwpws_wait` and `sf_column_sender_wait` take a
`uint64_t timeout_millis`; C++ `wait(level, milliseconds = zero)` and Python
`wait(ack_level=0, timeout_millis=0)` stay source-compatible. Failover sidecar
AWAIT_ACKED maps a timeout back to its bool reply.

Also clear all pre-existing rustdoc broken-link / link-target / bare-URL
warnings (public and --document-private-items builds).

Tests: questdb-rs 889 + questdb-rs-ffi 52 pass; clippy/fmt/doc clean.
The qwp_column_sidecar binary still called SfColumnSender::wait(AckLevel) with
one argument, breaking the failover_clients clippy CI gate. Pass Duration::ZERO
(wait indefinitely, preserving prior block-until-ack behaviour).
…t sender

Extend the c_client e2e suite with deterministic scenarios ported from
the Enterprise reference suite (questdb-ent/e2e/tests), all driven by the
Rust qwp_sidecar via the shared lib.shared_fixtures harness. Deduplicated
against what this branch already has (test_sender_kill9_sf_recovery_replays
is not re-added).

test_failover.py (+6, -m c_client):
- failover_during_active_send
- two_failovers_in_one_scenario
- no_request_durable_ack_loses_rows  (negative/honesty test)
- orphan_drainer_durable_ack_survives_kill
- sender_repeated_sigkill_no_state_corruption  (multi-cycle SF recovery)
- partial_ack_sealed_segment_replay_dedup_collapses

New files (-m c_client):
- test_failover_graceful.py: graceful_failover_round_trip (demote/promote,
  write-rejection + connection-survival probes)
- test_switch.py: write_path_across_switch, disturbance_honesty_guard
- test_switch_roundtrip_crash_repro.py: roundtrip_post_switch_write_no_crash

New file (separate cadence, -m "c_client_rust and fuzz"):
- test_failover_fuzz.py: random_failover (parametrized). Tagged fuzz +
  c_client_rust but NOT c_client, so -m c_client stays deterministic.
  Registers the fuzz marker in pyproject.toml.

Reuses jh's _connect_string (username= keyword, sender_id/sf_max_bytes
kwargs) and second-sidecar spawn pattern. -m c_client now selects 18
deterministic tests.
Commit a9f6c2a added a `uint64_t timeout_millis` parameter to
`sf_column_sender_wait` (the symmetric wait(AckLevel, timeout) rework),
but two column-major callers were not updated alongside it:

- examples/line_sender_cpp_example_arrow.cpp called it with 3 args ->
  hard C++ compile error (mac "Make" job, Bash exit 2).

- system_test/arrow_ffi.py bound `sf_column_sender_wait` via ctypes with
  only 3 params (conn, ack_level, err_out). ctypes does no compile check,
  so this was a silent ABI mismatch: `err_out` was read from an
  uninitialized register and, on the server-rejection path where wait
  fails and writes `*err_out`, the store hit a stray address. On the CI
  Linux x86-64 runner that address is sometimes unmapped -> intermittent
  SIGSEGV (139) in TestArrowIngressSfa
  .test_sfa_write_rejection_reports_once_and_continues (explaining the
  alternating pass/fail across builds). On arm64 the stray register was
  benign, so the same UB surfaced as a deterministic wrong error code
  (INVALID_API_CALL instead of SERVER_REJECTION).

Pass 0 (wait indefinitely) for the timeout, matching the C++ wrapper's
default and the already-correct row-major line_sender_qwpws_wait binding.
`AckLevel` is defined in the `column_sender` module, which is compiled
only under `sync-sender-qwp-ws`. Commit a9f6c2a (symmetric
wait(AckLevel, timeout)) added an unconditional re-export at the
`ingress` root and an unconditional import in `sender.rs`:

    src/ingress.rs:81       pub use column_sender::AckLevel;
    src/ingress/sender.rs   use crate::ingress::column_sender::AckLevel;

Both broke any feature combo that enables a sync sender without
QWP/WebSocket, e.g.:

    cargo test --no-default-features \
        --features=ring-crypto,tls-webpki-certs,sync-sender-tcp

failing with E0432 (`unresolved import column_sender`) and aborting the
`Tests` job with exit 101 on the linux / linux-stable / linux-nightly
legs. The `sync-sender-http` combo hit the same gap (CI just stopped at
the tcp combo first).

Every actual `AckLevel` use (row-major `Sender::wait`,
`qwp_ws_completed_fsn`, `qwp_ws_wait_timeout`, and the column-major
`db`/`column_sender` APIs) is already gated behind `sync-sender-qwp-ws`,
so gating the import and re-export the same way is sufficient. Verified
locally: the tcp, http and qwp-ws combos all compile (lib + tests).
Move one-shot ingestion onto `QuestDb` so callers never handle a direct
column sender:

- Add `QuestDb::flush_arrow_batch(table, batch, timestamp_column, overrides,
  ack_level)`. The timestamp source is an `Option<ColumnName>` (Some =
  column-stamped, None = server-stamped), replacing the two separate
  `flush_arrow_batch_*` methods at the call site.
- `QuestDb::flush_polars_dataframe` already borrows the direct sender
  internally; hide the direct sender from the public surface
  (`#[doc(hidden)]` re-export + `borrow_direct_column_sender`, and demote
  the `DirectColumnSender` Arrow/Polars methods to `pub(crate)`).

Honor a caller-chosen ack level, optionally:

- `flush_arrow_batch` takes `ack_level: Option<AckLevel>`; `PolarsIngestOptions`
  gains `.ack_level(level)`.
- When unset, both fall back to the connect string's level via
  `default_ack_level()` — the same level the store-and-forward senders use:
  `Durable` when `request_durable_ack=on`, otherwise `Ok`. Requesting
  `Durable` without the opt-in is rejected with `InvalidApiCall`.

Update examples + the failover sidecar to the `db`-level entry points, and
add tests for both Arrow paths, the durable-without-opt-in rejection, and an
explicit ack level.
The FFI build (`questdb-rs-ffi` clippy `--all-features -D warnings`) forwards
`questdb-rs/arrow` but not `polars`, compiling questdb-rs with `arrow-ingress`
on and `polars-ingress` off. The two publish-only `DirectColumnSender` methods
(`flush_arrow_batch_server_stamped` / `flush_arrow_batch_at_column`), demoted to
`pub(crate)` in the previous commit, are only used by the polars checkpoint
loop, so they became dead code in that combo and `-D warnings` turned it into a
hard error.

Gate those two methods on `polars-ingress` (their only caller). The ACKing
`_and_wait` variants stay `arrow-ingress` since `flush_arrow_batch` uses them.
Add e2e coverage for the direct Arrow facade `Db::flush_arrow_batch` (the
`db.flush_arrow_batch("trades", &batch, None, &[], None)` application call)
when the primary moves underneath the client.

Unlike the store-and-forward `arrow`/`chunk` column paths, `flush_arrow_batch`
borrows a *direct* (non-SF) column sender, publishes a commit boundary, waits
for the ack level, and surfaces FailoverRetry to the caller instead of
replaying. So the tests honour the documented re-call contract (re-FLUSH) and
preserve pre-move data server-side (crash recovery / graceful switch) rather
than via client SF replay.

qwp_column_sidecar: add a new `arrow_db` SEND shape that drives
`Db::flush_arrow_batch(table, &batch, Some(ts), &[], None)` (replacing the
removed `flush_arrow_batch_at_column` for this path). The existing SF `arrow`
shape and its test are left untouched. cargo check passes.

tests/test_arrow_db_failover.py (3 tests, -m c_client):
- stale primary: configured primary already gone at the first (lazy) send
- primary failover mid-stream: kill -9 + same-database crash-restart
- in-place role switch: primary<->replica via POST /lifecycle/switch
  {"role":..,"timeout_ms":5000}; flush rejected on the read-only replica,
  resumes after promotion, pre-switch rows intact

-m c_client now selects 21 deterministic tests.
Two feature-gating compile errors surfaced by the CI test matrix:

- src/lib.rs: `pub use db::DirectColumnSender` was missing the
  `#[cfg(feature = "sync-sender-qwp-ws")]` gate that the `mod db`
  declaration and all other `db::` re-exports carry. Building with
  `sync-sender-tcp` (no qwp-ws) compiled out `mod db` while the import
  remained -> E0432 unresolved import `db`.

- src/tests/column_sender_pool.rs: `data_frame_count` was gated on
  `polars-ingress` but is also called by `arrow-ingress` tests. Building
  `arrow` without `polars` -> E0425 cannot find function. Widened the gate
  to `arrow-ingress` (polars-ingress implies arrow-ingress).
… moves

Add e2e coverage for the direct DataFrame facade `Db::flush_polars_dataframe`
(the `db.flush_polars_dataframe("trades", &df, &opts)` call) when the primary
moves underneath the client.

Key contrast with flush_arrow_batch: flush_polars_dataframe OWNS the source
DataFrame, so it re-drives automatically on a primary move -- a single call
returns Ok with no caller retry (flush_arrow_batch surfaces FailoverRetry and
requires a re-call). The tests assert that single-call auto-redrive contract.

tests/test_polars_db_failover.py (3 tests, -m c_client, skipif-gated on
C_QUESTDB_CLIENT_COLUMN_POLARS):
- stale primary: configured primary already gone at the first (lazy) send
- primary failover mid-stream: kill -9 + same-database crash-restart; a single
  flush re-drives to the restarted primary (no caller retry)
- in-place role switch: primary<->replica via POST /lifecycle/switch; flush
  rejected on the read-only replica, resumes after promotion, rows intact

test_failover.py: drop the `polars` param from the kill-9 + object-store-wipe
columnar test -- the direct polars path has no client SF replay, so it cannot
survive the wipe; its failover coverage now lives in the no-wipe file above.
Removes the now-unused `os` import.
…d 246443)

Root causes found from the c-client e2e build (PR 1094, jh_conn_pool_refactor):

1+2. test_write_path_across_switch / test_roundtrip_post_switch...: the row
   sidecar's STATS verb replied ERR when called after a write to a read-only
   (demoted) node, because Sender::acked_fsn()/qwp_ws_totals() surface the
   sender's last wire error. The Java sidecar's STATS always returns counters.
   Make the Rust STATS best-effort: fall back to the -1/0 sentinels the Python
   parser expects instead of failing. (qwp_sidecar.rs)

3. test_no_request_durable_ack_loses_rows: with request_durable_ack=off the SF
   is OK-trimmed, and the Rust sender (unlike Java) does not reconnect while
   idle with nothing to send, so reconn_succ stayed 0 and the reconnect guard
   tripped. Send a probe row to P2 to force the reconnect and prove
   connectivity, then assert the original batch is lost.

4. test_orphan_drainer_durable_ack_survives_kill: when P1 died mid-drain the
   orphan worker returned RetryLater and stopped -- the Rust drainer
   (drain_orphan_to_completion) does not autonomously reconnect to a successor;
   the slot is re-attempted only by a fresh orphan scan on the next CONNECT.
   Reconnect the foreground sender after P2 starts so a new scan replays the
   ghost slot.

All four are test/sidecar-side fixes (lazy-reconnect + connect-triggered orphan
re-scan are legitimate Rust client semantics); no client product change.
The QWP egress reader now recovers from the server's transient
"cached query plan cannot be used because table schema has changed"
(INTERNAL_ERROR / 0x06) condition internally, instead of surfacing it.

An async ALTER COLUMN TYPE bumps a table's metadata version between a
SELECT's server-side compilation and its execution, so the server rejects
its own cached plan. This is not actionable by the caller and recompiles
cleanly on the next execution — exactly how QuestDB's PGWire / REST
endpoints self-heal. Exposing it forced every user (and our own fuzz test)
to write retry loops: friction we should not ship.

Cursor::next_batch now intercepts this specific error in the QUERY_ERROR
arm and, when no row has yet been delivered (!data_delivered, the
load-bearing exact-once guard), transparently re-issues the query on the
SAME healthy connection with a fresh request_id via
replay_query_same_connection() — no reconnect, unlike the failover path.
Bounded by MAX_STALE_PLAN_RETRIES (15) so a table under relentless schema
churn surfaces the error instead of looping; retries are paced by the
server's recompile + RTT, so no client-side sleep and no busy-spin.
Detection is gated on INTERNAL_ERROR + message text (the wire has no
retryable bit) so genuine internal faults still surface unchanged.

Because the FFI reader_cursor_next_batch wraps Cursor::next_batch, the C /
Python paths inherit the fix for free. Reverts the test-level retry added
to system_test/qwp_egress_reader.py in 4a55bb3 — the reader handles it now,
so query_table_sorted is back to its simple form.

Tests: 4 new egress_failover cases — transparent retry succeeds, a
non-stale INTERNAL_ERROR still surfaces, a stale-plan error after rows were
delivered surfaces (guard), and the retry budget exhausts and surfaces.
…sweep

The functional matrix ran the entire suite once per BuildMode at the latest
protocol (API, CONF, ENV) on top of the http x protocol_version sweep. The
build-mode axis only changes how the client receives its config string, so
running the whole suite 3x for it is mostly wasted wall-clock -- a big part
of why the Windows job overruns the 90-minute CI cap.

Collapse that axis: drop build_mode from the cartesian product and instead
pick each test's mode pseudo-randomly from (seed, test-id). Each test runs
once; across the suite all three modes get exercised. The seed is printed at
the start (and on every failure) and can be pinned via QDB_BUILD_MODE_SEED to
reproduce a run's exact choices. Selection is deterministic and independent
of execution order. Non-latest/auto protocol versions stay on CONF, exactly
as before.

API-only coverage is preserved automatically: _BuildModeFuzzResult watches for
the 'BuildMode.API-only test' skip and defers any API-only test that drew
CONF/ENV (at the latest protocol) to a single forced-API retry pass -- so no
enumeration/annotation of the 20 API-only tests is needed and future ones are
covered too. The CONF-pinned QWP/WS suites are untouched (fuzzer is inert
unless running SUITE_MATRIX with a seed).

At the latest protocol this turns 3 full-suite passes into 1.
cargo fmt --check (the 'cargo fmt and clippy' CI job) wanted two hand-wrapped
lines in reader.rs collapsed onto one. Formatting only; no behavior change.
…cision

The re-run of build 246443 (jh_conn_pool_refactor @ 7d4650f, which includes
the earlier fix cade18b) went from 4 failures to 1: the STATS best-effort fix
and the no-durable-ack probe-send fix verified green; only
test_orphan_drainer_durable_ack_survives_kill remained red.

Root cause: this guard is ported from a Java BackgroundDrainer durable-ack
regression test. It kills the primary while the adopted orphan slot still holds
OK'd-but-not-durable frames and expects the drainer to replay them to the
successor. The Rust drain_orphans path observes 0 rows on P2 even after a fresh
orphan-scan reconnect -- OK'd-but-not-durable orphan frames do not survive a
primary failover, unlike the main sender (which honours request_durable_ack and
passes its failover tests).

This is either a real durable-ack gap in the Rust orphan drainer or intended
best-effort orphan semantics -- a client-side determination, not something to
paper over with a test hack. Skip with a documented reason so the suite is
green and the finding is surfaced rather than hidden.
…sender

Bound the TCP dial on both QWP transports. Until now the connect used the
OS default, which can hang for tens of seconds against a black-holed host
that silently drops SYNs (no RST). `connect_timeout` (connect-string key,
milliseconds) bounds each dial; on expiry it surfaces a distinct,
failover-eligible ConnectTimeout rather than burying it under SocketError.

Mechanism: native, cross-platform non-blocking connect() -> poll for
writability bounded by the budget -> getsockopt(SO_ERROR), as implemented by
std's TcpStream::connect_timeout (EINTR handled internally) — no hand-rolled
loop. Applied per resolved address (Happy-Eyeballs). io::ErrorKind::TimedOut
maps to the new ConnectTimeout code (Rust equivalent of the Java
CONNECT_TIMEOUT sentinel), so callers tell a timed-out dial from refused/reset.

Egress (QWP reader): ReaderConfig.connect_timeout_ms (0 = OS default), key
`connect_timeout`, capped at 1h; WsTransport::connect_to uses it. New
egress::ErrorCode::ConnectTimeout, wired into is_failover_eligible (+ its
exhaustive matrix) so a per-endpoint timeout advances to the next endpoint.
FFI reader_error_connect_timeout = 23 + header + round-trip/ABI tests.

Ingress (QWP/WS sender): QwpWsConfig.connect_timeout (Option<Duration>,
default None), key `connect_timeout` (rejected for non-QWP/WS);
connect_tcp_to_any_addr uses it and surfaces ConnectTimeout when every
candidate address timed out (stays retryable in the reconnect loop). New
crate::ErrorCode::ConnectTimeout; FFI line_sender_error_connect_timeout = 19
+ header + tripwire / ABI tests.

Tests: config parse + validate on both sides, a reachable-connect-still-works
case, and a behavioral dial against RFC 5737 TEST-NET-1 (192.0.2.1) asserting
ConnectTimeout fires within the budget. clippy + fmt clean on both crates.
The orphan drainer built its send core with new_with_durable_ack(.., false) --
hardcoded OK-trim mode -- even when the connect string set
request_durable_ack=on. So an adopted orphan slot was trimmed/marked drained on
an ordinary server OK, before the WAL upload made it durable. If the primary
then failed over within that window, the OK'd-but-not-durable orphan frames
were already gone and never replayed to the successor -> data loss.

The foreground sender already honours request_durable_ack (trims only on durable
ACKs), and its failover tests pass. Make the orphan drainer consistent: pass
*config.qwp_ws.request_durable_ack to the send core so, in durable-ack mode, an
orphan slot is trimmed only on a durable ACK and survives a primary failover.

Un-skips the e2e guard test_orphan_drainer_durable_ack_survives_kill_c_client_rust
(skipped in f3c049c while this was under investigation), which exercises exactly
this path (ghost slot adopted by a drain_orphans=on sender, primary killed
before durable ack, frames must replay to P2).

Found via Enterprise c-client e2e build 246443.
Collapse the previously split ingress/egress error types into a single
error vocabulary shared by ingestion and queries, so a `QuestDb` pool —
which spans both directions — speaks one error type. `?` now composes
across a sender borrow and a reader borrow in one `questdb::Result`.

Rust (questdb-rs):
- One `questdb::Error` / `ErrorCode` (35 variants). `egress::error` is a
  thin re-export of `crate::error`; the one-way `From<crate::Error> for
  egress::Error` bridge is gone. `Error` is boxed and carries every
  payload (in_doubt, qwp_ws_*, upgrade_reject, server_info) cfg-gated by
  direction. `UpgradeReject` moved to `egress::server_event`.

FFI (questdb-rs-ffi):
- One C error struct + one enum `line_sender_error_code` (ingest codes
  0..19 frozen, query codes appended 20..34). `reader_error` /
  `reader_error_code` are now type aliases of it; one total
  `From<ErrorCode>`.

C / C++ headers:
- `line_sender.h` is the canonical enum; `reader.h` aliases it (typedef +
  `#define`s). C++ gains a base `questdb::error` with `questdb::error_code`;
  `questdb::ingress::line_sender_error` and `questdb::egress::reader_error`
  are subclasses, so `catch (const questdb::error&)` handles either
  direction while per-direction catches keep working.

Drift guards (a new error code must be mapped everywhere or the build/tests
fail): in-crate wildcard-free exhaustiveness tripwire, FFI discriminant +
From pins driven by one table, a Rust-vs-`line_sender.h` cross-check, and
C++ static_asserts pinning the reader aliases.

BREAKING (unreleased egress/reader surface only; ingest C ABI 0..19
unchanged): `reader_error_code` discriminants are renumbered onto the
unified scheme (e.g. invalid_api_call 2->1, handshake_error 5->20), and
`questdb_db_connect_reader` no longer remaps ingress connect codes onto
egress ones.

Verified: questdb-rs + questdb-rs-ffi unit tests + clippy clean; full
cmake C/C++ build with mock-server cpp_tests; and live e2e against a
QWP-capable server (egress_live_server/failover/auth/tls = 168 Rust live
tests, C++ test_reader 253 assertions).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…r pool teardown

The SfColumnSender contract promises that a parked connection's background
runner keeps delivering and that returning/dropping a handle does not lose
accepted frames. But pool close (drain_idle_senders), idle eviction
(reap_idle_senders), and post-shutdown returns dropped the ColumnSender
outright, and SyncQwpWsRunner::drop only sets `stop` + joins — drive_step
returns Stop without flushing the queue. Any in-memory frame the runner had
accepted but not yet delivered was silently lost. (Disk-backed sf_dir stayed
safe via persistence + orphan replay; close_drain existed but was wired only
into the standalone row Sender, never the column-sender/pool path, so the
parsed close_flush_timeout was dead config there.)

Drain at the destroy-sites only, never on the reuse-return fast path:

- reap_idle_senders: skip connections whose queue still holds undelivered
  frames (lock-free progress read); they become reapable once the runner
  drains them or the transport goes terminal.
- drain_idle_senders (pool close): signal every runner (begin_close,
  non-blocking) then await each under one shared deadline, so a multi-conn
  close stays bounded to ~close_flush_timeout instead of the sum.
- return_to_pool / OwnedColumnSender drop under shutdown/must_close: bounded
  drain before the connection is dropped.

An unreachable peer still drops the undelivered tail after the bounded wait,
logging a warning (parity with commit_in_flight_on_drop). The SfColumnSender
and ColumnPoolKind docs are corrected: drop delivers best-effort within
close_flush_timeout; call wait() before close (or use sf_dir) for a hard
guarantee.

Refactors the runner's close_drain into reusable begin_close +
drain_to_deadline. Adds three regression tests (reap-skip, close blocks until
delivered, close drain bounded when peer never acks).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The store-and-forward wait() timeout advised "drop this sender and
re-borrow to replay", claiming parity with the direct backend's
transport-timeout path. That parity is false: the direct path latches
must_close and drops the connection (discarding uncommitted frames),
whereas the SFA connection is recycled with its queue + runner intact,
so the runner keeps delivering the original frames. Re-flushing then
double-delivers. Correct recovery is to retry wait(); fix the message
and docs (column-major and row-major) accordingly. No behavior change.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add four Enterprise e2e tests pinning that the QWP-WS background orphan
drainer honours request_durable_ack (fix 19b9fda) -- trimming an adopted
orphan slot only on a durable ack, never on a plain server OK -- using
the Enterprise harness object-store durability gate (obj_store.freeze).

  - trim_timing: no-kill pin. Freeze the store, adopt an orphan, assert
    the .sfa is RETAINED after the OK and trimmed only after thaw makes
    it durable. Pins the exact fixed line in ~3s.
  - partial_durability_at_kill: half the orphan made durable before the
    freeze, half OK-only; kill without wiping the store. The successor
    comes up as a replica (a fresh primary hits ER007 against the
    populated store), downloads the durable half, and is promoted to take
    the replayed non-durable half. Dense-sequence oracle => loss-free and
    duplicate-free.
  - multi_slot...survives_kill: three orphan slots adopted concurrently
    (max_background_drainers=3) must all survive a kill+wipe failover.
  - survives_drain_reconnect: the drain connection drops and re-adopts 3x
    while frozen; the slot must not prematurely trim across the churn
    (no kill -- isolates connection churn from the data-loss window).

Each was confirmed to FAIL with the fix reverted to the pre-19b9fda
OK-trim, and pass with it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Scenario-6 binding breadth for fix 19b9fda: the orphan-drainer durable-ack fix
lives in the shared Rust core and the C / C++ FFI bindings inherit it. Add
row-major QWP-WS sidecars driven through each binding plus the positive
(survives_kill) and negative (no_request_durable_ack_loses_rows) e2e variants.

  - system_test/c_sidecars/qwp_c_sidecar.c: C-FFI row-major sidecar
    (line_sender_from_conf + line_sender_qwpws_*), same stdin/stdout line
    protocol as the Rust/Java sidecars.
  - system_test/c_sidecars/qwp_cpp_sidecar.cpp: C++ translation unit. The C++
    wrapper has no row-major QWP-WS API (new_buffer() throws for WebSocket
    senders -- WS is column-major in C++), so it compiles/links the C++ header
    and drives the row-major path via the C ABI, exactly as a C++ user must.
  - c_client_sidecar.py: build_c_sidecar / build_cpp_sidecar (cargo-build the
    FFI lib + cc/c++ compile-link) and CClientCSidecar / CClientCppSidecar.
  - conftest.py: c_client_c_sidecar / c_client_cpp_sidecar fixtures.
  - tests/test_orphan_drainer_bindings.py: c_client_c + c_client_cpp variants
    (shared bodies, per-binding markers). The C/C++ FFI does not export the
    reconnSucc counter, so the negative test confirms the successor was reached
    server-side (the probe row lands on P2).

Both bindings' survives_kill confirmed to FAIL with the fix reverted to the
pre-19b9fda OK-trim (P2 observes 0 rows); all four pass with the fix.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants