feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153
Open
kafka1991 wants to merge 232 commits into
Open
feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153kafka1991 wants to merge 232 commits into
kafka1991 wants to merge 232 commits into
Conversation
Plan and FFI ABI for the new column-major writer that will ingest Pandas/Polars DataFrames over QWP/WebSocket. Locks the QuestDb pool shape, BulkChunk encoder strategy, validity bitmap semantics, and the C ABI the separate Python wrapper repo will consume. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Locks the column-sender API around synchronous flush: sender.flush(&mut chunk, ack_level) blocks until the requested ACK level (Ok = WAL commit, Durable = object-store via Enterprise opt-in). Drops the FSN/submit/await split from the FFI; at most one frame in flight per sender, parallelism via the pool. Refuses sf_dir and other sf_* keys at QuestDb::connect with ConfigError — store-and-forward is single-writer-per-slot and interacts awkwardly with pool auto-grow; row-major Sender remains the SF path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lands the Rust core, C ABI, and benchmarks for a column-major sender
targeting Pandas/Polars → QuestDB throughput over QWP/WebSocket. See
`doc/COLUMN_SENDER_PLAN.md` for the design and `doc/COLUMN_SENDER_FFI_ABI.md`
for the C ABI spec; both shipped in earlier commits on this branch.
# What's in the box
* **WS-0 — `QuestDb` pool** (`ingress/column_sender/db.rs`,
`ingress/column_sender/conf.rs`).
Thread-safe pool with eager-open, fail-fast at `pool_max`,
`BorrowedSender<'a>` that returns on `Drop`, and a background reaper
(`pool_reap=auto`, tick = max(5 s, idle_timeout / 12)) that closes
excess-over-`pool_size` connections. New conf keys: `pool_size`,
`pool_max`, `pool_idle_timeout_ms`, `pool_reap`. `sf_*` / `sender_id`
/ `qwp_ws_progress=manual` refused at `connect`-time.
* **WS-1 — synchronous `flush` plumbing** (`ingress/column_sender/sender.rs`,
`ingress/column_sender/encoder.rs`).
`ColumnSender::flush(chunk, AckLevel)` encodes the chunk, publishes via
the existing QWP/WS replay queue (`Sender::qwp_ws_publish_raw` —
pub(crate) escape hatch in the row-API sender), and blocks until the
ACK watermark crosses the published FSN. Polls in 50 ms slices so a
`must_close` mid-wait surfaces promptly. `AckLevel::Durable` requires
`request_durable_ack=on` at connect or returns `InvalidApiCall`.
* **WS-2 — `Chunk` + numeric / fixed-width columns**
(`ingress/column_sender/chunk.rs`, `validity.rs`, `wire.rs`).
Per-column wire-shape `Vec<u8>` storage so encode is a header +
`extend_from_slice` per column. Two code paths per type per the plan
§2.2:
- Bool, i8, i16, i32, i64, f32, f64: `null_flag = 0` always; nullable
rows sentinel-encoded (0 / i32::MIN / i64::MIN / NaN), matching the
row-API convention.
- Sparse-null types (uuid, long256, ipv4, ts_nanos, ts_micros,
date_millis): no-null = `extend_from_slice`; nullable = QWP-shape
bitmap + dense gather.
- Designated timestamp (micros or nanos) — exactly one per chunk.
Connection-scoped `SchemaRegistry`: first emit → FULL; repeat → REFERENCE.
* **WS-3 — VARCHAR** (`Chunk::column_varchar`). Arrow Utf8 in
(`offsets: &[i32]` length `row_count + 1`, `bytes: &[u8]`); wire out
is dense `non_null_count + 1` LE-u32 offsets + concatenated bytes.
No-null path memcpys offsets when `offsets[0] == 0`; nullable path
walks validity and skips slicing for null rows. Offset validation
(negative / non-monotonic / past `bytes_len`) caught client-side.
* **WS-4 — symbol bulk-intern**
(`Chunk::symbol_dict_{i8,i16,i32}`, `encoder::resolve_symbols`).
Three append-time passes: referenced-bitset + range check; compact
referenced dict bytes; translate codes to internal indices and build
the QWP-shape bitmap. Connection-scoped `SymbolGlobalDict` shared
with the row API's type (`buffer/qwp.rs:next_id/intern/entry`
promoted to `pub(crate)`). At flush time, only entries the chunk
actually references reach the wire — protects the 1M-per-connection
cap on huge Pandas `Categorical` dicts. Roll-back on encode error
keeps client + server dict views coherent.
* **WS-5 — C ABI** (`questdb-rs-ffi/src/column_sender.rs`,
`include/questdb/ingress/column_sender.h`).
Full implementation of `doc/COLUMN_SENDER_FFI_ABI.md`:
- Opaque handles `questdb_db`, `column_sender`, `column_sender_chunk`.
- `column_sender_validity` repr-C struct; `column_sender_ack_level`
repr-C enum.
- `questdb_db_connect/close/borrow_sender/return_sender/reap_idle`.
- Every chunk column-append, the VARCHAR + symbol_dict family, the
two designated-timestamp variants, and `column_sender_flush`.
- Errors reuse `line_sender_error*`.
Rust side gains `OwnedSender` — Arc-backed borrow handle the FFI hands
out as `column_sender*` so the C caller can free `questdb_db*` before
all borrows return without dangling.
Hand-runnable smoke test at `cpp_test/smoke_column_sender.c`
(compiles with `-Wall -Wextra -Werror`; not wired into CMake yet —
matches the `smoke_line_reader` pattern).
* **WS-6 — bench** (`questdb-rs/benches/column_sender.rs`,
`doc/COLUMN_SENDER_PERF.md`).
Three families: per-column append vs raw memcpy baseline; symbol
bulk-intern vs naïve per-row HashMap; encode_chunk end-to-end (no
network). First-baseline numbers (Apple Silicon laptop, 100k rows):
- `column_f64/column_sender_no_null` ≈ 55 GiB/s — matches memcpy.
- `column_i64/column_sender_no_null` ≈ 54 GiB/s — matches memcpy.
- `column_varchar/column_sender_no_null` within ~5 % of memcpy.
- Symbol bulk-intern ~16× faster than naïve per-row HashMap.
- `encode_chunk/populate_plus_encode` ≈ 139 M rows/s end-to-end.
# Verification
- 57 column-sender tests (Rust core); 8 FFI tests; full 834-test lib
suite passes.
- `cargo fmt` + `cargo clippy --tests --benches` clean on both crates.
- `cargo doc` introduces no new warnings.
- `cc -std=c11 -Wall -Wextra -Werror -I include` compiles the C header
and the smoke program.
# What's not in here
- WS-7 (Python wrapper) lives in `py-questdb-client`. With the C ABI
in `include/questdb/ingress/column_sender.h` and the FFI symbols in
`libquestdb_client`, that repo can now start consuming.
- A live Pandas→QuestDB end-to-end bench and 1-hour soak — both
belong in the Python repo / nightly CI rather than the in-tree
Criterion suite.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rewrite the column-major sender to eliminate intermediate buffers and pipeline writes for maximum single-connection throughput. Architecture changes: - ColumnSender now owns a dedicated ColumnConn (conn.rs) that drives socket I/O directly — no replay queue, no background thread, no row-API publisher involvement. - Chunk<'a> holds borrowed descriptors (raw pointers + lengths) into the caller's buffers; no per-column Vec<u8> staging. The encoder writes wire bytes straight from caller memory into the connection's reusable write_buf at flush time. - flush() pipelines: encode + WS-mask + write_all, then drain acks non-blocking. Blocks only when in-flight hits the 128-frame protocol cap. New sync(AckLevel) blocks until all acks settle. - Server cumulative OKs handled correctly (sequence=N acks all frames up to N). API changes: - flush(&mut chunk, AckLevel) → flush(&mut chunk) (fire-and-forget) - New sync(AckLevel) drains all in-flight acks - FFI: column_sender_flush drops ack_level arg; new column_sender_sync - FFI lifetime contract: caller buffers must outlive flush (no copy) Performance (5M-row L1 quotes, 9 columns, localhost): - Encode path: 6 GB/s (2.3% of wall time) - End-to-end: 350 MB/s pipelined (was 264 MB/s stop-and-wait) - Per-chunk p50: 0.72 ms (was 2.64 ms) - Criterion populate+encode: 575 µs (was 718 µs, 20% faster) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The default macOS TCP send buffer (~128 KB) is smaller than a typical QWP chunk (1.5 MB at 25k rows). write_all blocks mid-frame while the kernel drains the small buffer. A 4 MiB send buffer lets the kernel accept a full chunk in one shot, reducing write_all stalls when the pipeline has multiple frames in flight. Also sets SO_RCVBUF to 4 MiB to absorb ack bursts from the server without backpressuring the server's send path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
flush() now sets FLAG_DEFER_COMMIT (0x01) on every QWP frame. The server appends rows to WAL writers without committing. sync() sends a commit-triggering empty frame (without the flag) that commits all accumulated rows in one WAL transaction, then drains acks. This eliminates per-chunk WAL fsync overhead: 200 chunks × 25k rows now produce 1 WAL commit instead of 200. The p95 per-chunk latency drops from ~23 ms to ~7 ms. Old servers that don't recognize the flag ignore it (reserved bit position) and commit per-message — graceful degradation per the spec. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The server's ClientSymbolCache only caches symbols with symbolKey < initialSymbolCount. On a fresh table, initialSymbolCount stays at 0 until a WAL segment rolls and the watermark updates. By sending the first frame without FLAG_DEFER_COMMIT, the server commits it immediately, which allows the next segment to pick up the new symbol count and enable caching for all subsequent deferred frames. This is a client-side workaround for a server-side cache limitation. The proper fix is for the server to cache locally-assigned symbol IDs within the same segment (see WalColumnarRowAppender.putSymbolColumn). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Align the C ABI, docs, and smoke test with column_sender_flush(sender, chunk, err) plus column_sender_sync(sender, ack_level, err). Reserve an in-flight slot for the sync commit, validate durable ACK opt-in before publishing, and add pool/sync coverage.
Rename the borrowed handle returned from the connection pool from `column_sender` to `qwpws_conn` so it can host peer writer modes (per-type today, generic Arrow / NumPy in Steps 2-3, future egress readers). No behaviour change — the underlying Rust types (ColumnSender / OwnedSender) keep their names since they're doc-hidden; only the public C ABI changes. FFI surface changes: - struct column_sender -> qwpws_conn - questdb_db_borrow_sender -> questdb_db_borrow_conn - questdb_db_return_sender -> questdb_db_return_conn - column_sender_must_close -> qwpws_conn_must_close - column_sender_flush(sender, ...) -> column_sender_flush(conn, ...) - column_sender_sync(sender, ...) -> column_sender_sync(conn, ...) column_sender_chunk and the column_sender_chunk_column_* / _symbol_dict_* appenders keep their names — the chunk IS the column-sender writer's accumulator, and flush/sync are operations on it; only the borrowed-handle parameter type changes. See plan-conn-pool-and-writers.md in py-questdb-client (Step 1) and the Slack thread from 2026-05-27 with Victor for the rationale: pool QWP/WS connections, not writers, so egress readers and Arrow / NumPy appenders can share the same pool as the existing column_sender chunk path. Open Q1 from the plan is answered (chunk.rs:208, encoder.rs:82-95, encoder.rs:460-466): `column_sender_chunk_column_*` already direct-writes to the wire buffer — for native-LE contiguous data it is one `extend_from_slice` per column. So Step 3's NumPy appender is no longer about "saving an extra memcpy"; it's about avoiding Python-side widening for narrower dtypes / strided / non-native-endian. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
New entry point that consumes an Apache Arrow C Data Interface
ArrowArray + ArrowSchema pair and dispatches to the existing per-type
chunk methods based on the schema's format string. Caller passes the
borrowed pointers it gets from PyArrow's `_export_to_c` (or any other
Arrow C Data producer); the FFI never constructs or releases the
arrays.
Supported schema formats in this patch:
- c, s, i, l int8 / int16 / int32 / int64
- f, g float32 / float64
- b bool (LSB-first bitmap)
- u UTF-8 string (int32 offsets)
- tsn:..., tsu:... timestamp nanos / micros (timezone suffix ignored)
- dictionary schemas with c/s/i indices and a UTF-8 value type —
routed to symbol_dict_i8 / _i16 / _i32
Other formats — including LargeUtf8 (U), decimal, struct, list, and
non-UTF-8 dictionary values — currently return
line_sender_error_invalid_api_call. LargeUtf8 lands in Step 2b.
Constraints:
- ArrowArray.offset must be 0; sliced arrays are rejected.
- The chunk's row-count lock applies to the new appender the same
way as the per-type calls.
The Arrow types are mirrored as #[repr(C)] structs in the Rust FFI
shim so we read them without taking a dependency on the arrow / arrow-
array crate. No new Rust dependencies.
See plan-conn-pool-and-writers.md (Step 2). The Cython-side wiring
(routing pandas Arrow-backed columns through this entry point) lands
in a separate patch.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add ColumnKind::VarcharLarge (i64 offsets) + Chunk::column_varchar_large + encode_varchar_large. The new encoder reads i64 offsets and writes u32 LE to the wire frame in one pass — no caller- or Rust-side intermediate Vec<i32> for the narrowing. Validation rejects negative offsets, decreasing offsets, offsets exceeding the bytes buffer, AND any last offset exceeding u32::MAX (the QWP wire offset table is uint32 LE). The overflow check at chunk-build time surfaces a meaningful error rather than a per-row overflow at encode time. The Arrow appender's `U` format match now routes here. This unblocks the Python side: pandas large_string columns can be sent without the Python-side cast to UTF-8 (which previously allocated a fresh Arrow array via pyarrow.cast). estimate_frame_size grew a VarcharLarge case identical to Varchar. questdb-rs 836 lib-tests pass. clippy clean on both crates. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extend column_sender_chunk_append_arrow_column with row_offset and
row_count parameters so chunked-emission callers can slice an
ArrowArray without consolidating it first. Required for the Python
Client.dataframe path, which loops over row chunks and currently
slices buffers manually for the per-type appenders.
Per-format slicing:
- fixed-width primitives + timestamps: data pointer is shifted by
row_offset elements (`ptr.add(row_offset)`).
- bool bitmap: shifted by row_offset / 8 bytes; row_offset % 8 == 0
required (matches the validity bitmap byte-alignment).
- utf8 / large_utf8: offsets pointer shifted by row_offset
elements (Arrow offsets are monotonic, so the slice's offsets
are still well-formed). bytes_len is read from the original
array's last offset; the encoder rebases on the wire.
- dictionary symbols: codes pointer shifted; the dictionary is
shared across chunks unchanged.
Validity bitmap requires row_offset % 8 == 0; with row_offset=0 and
row_count=array.length we get exactly the previous behaviour.
Caller bounds-check: row_offset + row_count must not exceed
array.length.
The C header docs the new parameters; clippy & fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Match the Arrow C Data Interface spec more precisely: `children` is `*const *mut ArrowArray` (`struct ArrowArray**` in the spec) and `dictionary` is `*mut ArrowArray`. We never mutate, so this is layout-equivalent to the previous `*const`/`*const`, but the declarations now line up with the spec for readers cross-checking. - Rename `array_len` -> `array_total_len` in the appender so the meaning is unambiguous next to the per-call `row_count` parameter. - Cross-reference doc comments: the per-type varchar / symbol_dict C-ABI entries now mention `column_sender_chunk_append_arrow_column` as the recommended path for callers holding an Arrow array, and flag the per-type entries as the lower-level building block. No behaviour change. fmt + clippy clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two correctness findings from the multi-agent review: 1. **encode_varchar_large rejected valid late slices.** validate_varchar_offsets_i64 checked the absolute `last` offset against u32::MAX, but the encoder narrows `(off - first)` per row. A slice taken from the tail of a multi-GiB LargeUtf8 array (e.g. base=3 GiB, last=4 GiB) was rejected even though every wire offset would be ≤ 1 GiB. Now we validate the *span* `last - first` against u32::MAX, with a clearer error message. 2. **Null-pointer deref on malformed Arrow arrays.** arrow_buffer<T> returned the raw buffer pointer without checking it for null. Callers then unconditionally `slice::from_raw_parts(...)` or `*offsets_ptr.add(...)`. A producer presenting length > 0 with a null data buffer (spec-violating but plausible from buggy clients) would UB before any validation ran. Added an `allow_null: bool` parameter. The bytes buffer of an empty varchar/symbol-dict array can legitimately be NULL (we already guard that downstream), so those three call sites pass `true`. All other call sites — offsets, primitives, codes, bool bitmap — pass `false` and surface a clean `InvalidApiCall` error instead. Reviewers: convergent finding from concurrency-code-reviewer (Rust) and general-purpose (cross-layer) agents. clippy + fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add Chunk::column_numpy + NumpyDtype enum to questdb-rs, plus the C FFI wrapper column_sender_chunk_append_numpy_column. Behaviour, per Step 3 design decisions: - i8/i16/i32 -> i64 sign-extend (wire = LONG). - u8/u16/u32 -> i64 zero-extend (wire = LONG). - i64 -> pass-through (wire = LONG). - u64 -> i64 bit-reinterpret. Values > i64::MAX wrap to negative on the wire, matching the row path's C-cast behaviour. - f32 -> f64 widen (wire = DOUBLE). - f64 -> pass-through (wire = DOUBLE). - bool (NumPy byte-per-row) -> Arrow LSB-first packed bitmap (wire = BOOLEAN). Strided arrays and non-native-endian arrays are not supported in v1; the caller (Python client) consolidates upstream. Widening lives in Rust at append time, materialising into a chunk- owned scratch arena (`Chunk::scratch: Vec<NumpyScratch>`). The ColumnDescriptor's `*const T` points into the scratch; the encoder hot path is unchanged. Scratch is cleared on Chunk::clear / drop. The scratch enum uses typed variants (Box<[i64]>, Box<[f64]>, Box<[u8]>) so the storage alignment matches the encoder's read alignment. questdb-rs 836 lib-tests pass. clippy + fmt clean on both crates. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Round-3 dirty-sender fix (option c from plan-conn-pool-and-writers.md): expose a new FFI that callers use in error-recovery paths to force-close a conn instead of recycling it. The problem: a mid-call flush failure left a conn with in-flight uncommitted frames in the pool. The next borrower's first flush is QWP's "immediate commit", which would commit the stale frames alongside their own. The fix exposes a single new entry point: void questdb_db_drop_conn(questdb_db* db, qwpws_conn* conn); semantically equivalent to "mark must_close, then return" but in one atomic step. The conn enters the terminal state and the pool drops it on return rather than recycling it. Implementation: - ColumnConn gains `mark_must_close(&mut self)` (pub(crate)). - ColumnSender gains `mark_must_close(&mut self)` (pub) that forwards to ColumnConn. - The FFI wraps these: questdb_db_drop_conn marks then drops. The existing `qwpws_conn_must_close()` getter is unchanged; this adds the corresponding setter at each layer. clippy + fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small hardening tweaks:
1. **Tighten format dispatch.** The Arrow C Data Interface only uses
a `:`-prefixed parameter on timestamp / date / time formats;
everything else is a single character. Previously
`column_sender_chunk_append_arrow_column` did
`format.split(':').next()` and dispatched on the prefix, which
would spuriously match e.g. a malformed `"u:foo"` to the varchar
arm. Exact-match the non-ts arms and use `starts_with("tsn:")` /
`starts_with("tsu:")` for the ts arms.
2. **Accept `null_count == -1` with NULL bitmap as "no nulls".**
pyarrow / polars emit this shape when the column has no nulls
(the spec's "unknown" interpretation). We treat it as no-nulls;
the encoder reads the data buffer densely. Only `null_count > 0`
with a NULL bitmap is malformed.
3. **Guard `dict_array.length < 0`.** The main array's negative
length is already rejected in
`column_sender_chunk_append_arrow_column`; mirror the same check
inside `arrow_dictionary_utf8` for symmetry.
clippy + fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…refactor # Conflicts: # questdb-rs/src/ingress.rs
Extends the column-sender pool to also serve egress readers from one
shared `questdb_db` configured by a single conf-string. Lazy-init for
readers, eager for writers, same `pool_size` / `pool_max` /
`pool_idle_timeout_ms` / `pool_reap` budget.
- questdb-rs/db.rs: parallel reader free-list, `borrow_reader_owned`,
`ReaderPoolHandle`, `OwnedReader::mark_must_close`, integrated into
the reaper. All reader-side state and methods feature-gated under
`_egress` so the default build (no egress) stays lean.
- questdb-rs/egress/config: reader conf-string parser accepts the
`qwpws::` / `qwpwss::` schemes and ignores `pool_*` keys, so a
single conf-string drives both the sender and reader pools without
translation.
- questdb-rs-ffi/egress: `line_reader` becomes a named struct with a
`ReaderOwnership` enum (Standalone vs Pooled{handle, must_close});
pool borrow/return + `line_reader_mark_must_close` exposed in C.
- column_sender.rs: `questdb_db(pub(crate) QuestDb)` so the egress
FFI can reach the inner pool to wire reader borrows.
- Headers: reader-pool entry points live in `egress/line_reader.h`
next to the type they wrap; `ingress/column_sender.h` points
there.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
test_zone_failover_stays_in_zone_then_crosses_c_client_rust: two zone-A servers + one zone-B server (zone-A first in addr=, zone=A). The Reader binds zone A, stays in zone A across an intra-zone failover (kill the bound zone-A host -> surviving zone-A sibling), and crosses to zone B only once both zone-A hosts are gone (zone= is a preference, not a hard filter). Extends _egress_connect_string with zone/target/timeout knobs and adds a _wait_for_zone poll helper. Mirrors the Enterprise Java test_zone_failover.py suite; the binding under test is the Rust egress Reader.
SHOW_ZONE captured the zone value from the first batch then broke out of the loop, dropping the cursor before it was fully read. The Rust Reader poisons a connection whose cursor is dropped un-drained, so the *next* SHOW_ZONE failed with 'Reader connection is closed and cannot be reused'. This surfaced in test_zone_failover_stays_in_zone_then_crosses_c_client_rust (build 246314): the first SHOW_ZONE passed but poisoned the connection, and the _wait_for_zone poll that followed errored out. Drain the cursor to completion (capture the value once, keep calling next_batch until None). SHOW PARAMETERS returns a tiny result, so this is cheap.
Align the row-major and store-and-forward column-major senders on a single delivery-wait surface. Row-major: - Remove `Sender::await_acked_fsn(fsn, timeout) -> Result<bool>` and the `line_sender_qwpws_await_acked_fsn` C ABI entry point. - Add `Sender::wait(ack_level: AckLevel, timeout: Duration) -> Result<()>` (and on the `BorrowedRowSender` handle), mirroring `SfColumnSender::wait`. It waits for the cumulative `published_fsn` boundary at the requested AckLevel. Re-export `AckLevel` at `crate::ingress`. Column-major (store-and-forward only; direct sender untouched): - `SfColumnSender::wait` / `ColumnSender::wait` gain a per-call `timeout`, replacing the `set_sfa_sync_timeout_for_test` hook. Shared timeout semantics: `timeout` is a no-progress deadline (fires only if the ack watermark fails to advance for that long); `Duration::ZERO` waits indefinitely; on expiry returns `ErrorCode::FailoverRetry` with frames retained for replay. The direct sender keeps `commit`/`flush_and_wait` and its `request_timeout`-bounded behaviour. FFI/bindings: `line_sender_qwpws_wait` and `sf_column_sender_wait` take a `uint64_t timeout_millis`; C++ `wait(level, milliseconds = zero)` and Python `wait(ack_level=0, timeout_millis=0)` stay source-compatible. Failover sidecar AWAIT_ACKED maps a timeout back to its bool reply. Also clear all pre-existing rustdoc broken-link / link-target / bare-URL warnings (public and --document-private-items builds). Tests: questdb-rs 889 + questdb-rs-ffi 52 pass; clippy/fmt/doc clean.
The qwp_column_sidecar binary still called SfColumnSender::wait(AckLevel) with one argument, breaking the failover_clients clippy CI gate. Pass Duration::ZERO (wait indefinitely, preserving prior block-until-ack behaviour).
…t sender Extend the c_client e2e suite with deterministic scenarios ported from the Enterprise reference suite (questdb-ent/e2e/tests), all driven by the Rust qwp_sidecar via the shared lib.shared_fixtures harness. Deduplicated against what this branch already has (test_sender_kill9_sf_recovery_replays is not re-added). test_failover.py (+6, -m c_client): - failover_during_active_send - two_failovers_in_one_scenario - no_request_durable_ack_loses_rows (negative/honesty test) - orphan_drainer_durable_ack_survives_kill - sender_repeated_sigkill_no_state_corruption (multi-cycle SF recovery) - partial_ack_sealed_segment_replay_dedup_collapses New files (-m c_client): - test_failover_graceful.py: graceful_failover_round_trip (demote/promote, write-rejection + connection-survival probes) - test_switch.py: write_path_across_switch, disturbance_honesty_guard - test_switch_roundtrip_crash_repro.py: roundtrip_post_switch_write_no_crash New file (separate cadence, -m "c_client_rust and fuzz"): - test_failover_fuzz.py: random_failover (parametrized). Tagged fuzz + c_client_rust but NOT c_client, so -m c_client stays deterministic. Registers the fuzz marker in pyproject.toml. Reuses jh's _connect_string (username= keyword, sender_id/sf_max_bytes kwargs) and second-sidecar spawn pattern. -m c_client now selects 18 deterministic tests.
Commit a9f6c2a added a `uint64_t timeout_millis` parameter to `sf_column_sender_wait` (the symmetric wait(AckLevel, timeout) rework), but two column-major callers were not updated alongside it: - examples/line_sender_cpp_example_arrow.cpp called it with 3 args -> hard C++ compile error (mac "Make" job, Bash exit 2). - system_test/arrow_ffi.py bound `sf_column_sender_wait` via ctypes with only 3 params (conn, ack_level, err_out). ctypes does no compile check, so this was a silent ABI mismatch: `err_out` was read from an uninitialized register and, on the server-rejection path where wait fails and writes `*err_out`, the store hit a stray address. On the CI Linux x86-64 runner that address is sometimes unmapped -> intermittent SIGSEGV (139) in TestArrowIngressSfa .test_sfa_write_rejection_reports_once_and_continues (explaining the alternating pass/fail across builds). On arm64 the stray register was benign, so the same UB surfaced as a deterministic wrong error code (INVALID_API_CALL instead of SERVER_REJECTION). Pass 0 (wait indefinitely) for the timeout, matching the C++ wrapper's default and the already-correct row-major line_sender_qwpws_wait binding.
`AckLevel` is defined in the `column_sender` module, which is compiled only under `sync-sender-qwp-ws`. Commit a9f6c2a (symmetric wait(AckLevel, timeout)) added an unconditional re-export at the `ingress` root and an unconditional import in `sender.rs`: src/ingress.rs:81 pub use column_sender::AckLevel; src/ingress/sender.rs use crate::ingress::column_sender::AckLevel; Both broke any feature combo that enables a sync sender without QWP/WebSocket, e.g.: cargo test --no-default-features \ --features=ring-crypto,tls-webpki-certs,sync-sender-tcp failing with E0432 (`unresolved import column_sender`) and aborting the `Tests` job with exit 101 on the linux / linux-stable / linux-nightly legs. The `sync-sender-http` combo hit the same gap (CI just stopped at the tcp combo first). Every actual `AckLevel` use (row-major `Sender::wait`, `qwp_ws_completed_fsn`, `qwp_ws_wait_timeout`, and the column-major `db`/`column_sender` APIs) is already gated behind `sync-sender-qwp-ws`, so gating the import and re-export the same way is sufficient. Verified locally: the tcp, http and qwp-ws combos all compile (lib + tests).
Move one-shot ingestion onto `QuestDb` so callers never handle a direct column sender: - Add `QuestDb::flush_arrow_batch(table, batch, timestamp_column, overrides, ack_level)`. The timestamp source is an `Option<ColumnName>` (Some = column-stamped, None = server-stamped), replacing the two separate `flush_arrow_batch_*` methods at the call site. - `QuestDb::flush_polars_dataframe` already borrows the direct sender internally; hide the direct sender from the public surface (`#[doc(hidden)]` re-export + `borrow_direct_column_sender`, and demote the `DirectColumnSender` Arrow/Polars methods to `pub(crate)`). Honor a caller-chosen ack level, optionally: - `flush_arrow_batch` takes `ack_level: Option<AckLevel>`; `PolarsIngestOptions` gains `.ack_level(level)`. - When unset, both fall back to the connect string's level via `default_ack_level()` — the same level the store-and-forward senders use: `Durable` when `request_durable_ack=on`, otherwise `Ok`. Requesting `Durable` without the opt-in is rejected with `InvalidApiCall`. Update examples + the failover sidecar to the `db`-level entry points, and add tests for both Arrow paths, the durable-without-opt-in rejection, and an explicit ack level.
The FFI build (`questdb-rs-ffi` clippy `--all-features -D warnings`) forwards `questdb-rs/arrow` but not `polars`, compiling questdb-rs with `arrow-ingress` on and `polars-ingress` off. The two publish-only `DirectColumnSender` methods (`flush_arrow_batch_server_stamped` / `flush_arrow_batch_at_column`), demoted to `pub(crate)` in the previous commit, are only used by the polars checkpoint loop, so they became dead code in that combo and `-D warnings` turned it into a hard error. Gate those two methods on `polars-ingress` (their only caller). The ACKing `_and_wait` variants stay `arrow-ingress` since `flush_arrow_batch` uses them.
Add e2e coverage for the direct Arrow facade `Db::flush_arrow_batch` (the
`db.flush_arrow_batch("trades", &batch, None, &[], None)` application call)
when the primary moves underneath the client.
Unlike the store-and-forward `arrow`/`chunk` column paths, `flush_arrow_batch`
borrows a *direct* (non-SF) column sender, publishes a commit boundary, waits
for the ack level, and surfaces FailoverRetry to the caller instead of
replaying. So the tests honour the documented re-call contract (re-FLUSH) and
preserve pre-move data server-side (crash recovery / graceful switch) rather
than via client SF replay.
qwp_column_sidecar: add a new `arrow_db` SEND shape that drives
`Db::flush_arrow_batch(table, &batch, Some(ts), &[], None)` (replacing the
removed `flush_arrow_batch_at_column` for this path). The existing SF `arrow`
shape and its test are left untouched. cargo check passes.
tests/test_arrow_db_failover.py (3 tests, -m c_client):
- stale primary: configured primary already gone at the first (lazy) send
- primary failover mid-stream: kill -9 + same-database crash-restart
- in-place role switch: primary<->replica via POST /lifecycle/switch
{"role":..,"timeout_ms":5000}; flush rejected on the read-only replica,
resumes after promotion, pre-switch rows intact
-m c_client now selects 21 deterministic tests.
Two feature-gating compile errors surfaced by the CI test matrix: - src/lib.rs: `pub use db::DirectColumnSender` was missing the `#[cfg(feature = "sync-sender-qwp-ws")]` gate that the `mod db` declaration and all other `db::` re-exports carry. Building with `sync-sender-tcp` (no qwp-ws) compiled out `mod db` while the import remained -> E0432 unresolved import `db`. - src/tests/column_sender_pool.rs: `data_frame_count` was gated on `polars-ingress` but is also called by `arrow-ingress` tests. Building `arrow` without `polars` -> E0425 cannot find function. Widened the gate to `arrow-ingress` (polars-ingress implies arrow-ingress).
… moves
Add e2e coverage for the direct DataFrame facade `Db::flush_polars_dataframe`
(the `db.flush_polars_dataframe("trades", &df, &opts)` call) when the primary
moves underneath the client.
Key contrast with flush_arrow_batch: flush_polars_dataframe OWNS the source
DataFrame, so it re-drives automatically on a primary move -- a single call
returns Ok with no caller retry (flush_arrow_batch surfaces FailoverRetry and
requires a re-call). The tests assert that single-call auto-redrive contract.
tests/test_polars_db_failover.py (3 tests, -m c_client, skipif-gated on
C_QUESTDB_CLIENT_COLUMN_POLARS):
- stale primary: configured primary already gone at the first (lazy) send
- primary failover mid-stream: kill -9 + same-database crash-restart; a single
flush re-drives to the restarted primary (no caller retry)
- in-place role switch: primary<->replica via POST /lifecycle/switch; flush
rejected on the read-only replica, resumes after promotion, rows intact
test_failover.py: drop the `polars` param from the kill-9 + object-store-wipe
columnar test -- the direct polars path has no client SF replay, so it cannot
survive the wipe; its failover coverage now lives in the no-wipe file above.
Removes the now-unused `os` import.
…d 246443) Root causes found from the c-client e2e build (PR 1094, jh_conn_pool_refactor): 1+2. test_write_path_across_switch / test_roundtrip_post_switch...: the row sidecar's STATS verb replied ERR when called after a write to a read-only (demoted) node, because Sender::acked_fsn()/qwp_ws_totals() surface the sender's last wire error. The Java sidecar's STATS always returns counters. Make the Rust STATS best-effort: fall back to the -1/0 sentinels the Python parser expects instead of failing. (qwp_sidecar.rs) 3. test_no_request_durable_ack_loses_rows: with request_durable_ack=off the SF is OK-trimmed, and the Rust sender (unlike Java) does not reconnect while idle with nothing to send, so reconn_succ stayed 0 and the reconnect guard tripped. Send a probe row to P2 to force the reconnect and prove connectivity, then assert the original batch is lost. 4. test_orphan_drainer_durable_ack_survives_kill: when P1 died mid-drain the orphan worker returned RetryLater and stopped -- the Rust drainer (drain_orphan_to_completion) does not autonomously reconnect to a successor; the slot is re-attempted only by a fresh orphan scan on the next CONNECT. Reconnect the foreground sender after P2 starts so a new scan replays the ghost slot. All four are test/sidecar-side fixes (lazy-reconnect + connect-triggered orphan re-scan are legitimate Rust client semantics); no client product change.
The QWP egress reader now recovers from the server's transient "cached query plan cannot be used because table schema has changed" (INTERNAL_ERROR / 0x06) condition internally, instead of surfacing it. An async ALTER COLUMN TYPE bumps a table's metadata version between a SELECT's server-side compilation and its execution, so the server rejects its own cached plan. This is not actionable by the caller and recompiles cleanly on the next execution — exactly how QuestDB's PGWire / REST endpoints self-heal. Exposing it forced every user (and our own fuzz test) to write retry loops: friction we should not ship. Cursor::next_batch now intercepts this specific error in the QUERY_ERROR arm and, when no row has yet been delivered (!data_delivered, the load-bearing exact-once guard), transparently re-issues the query on the SAME healthy connection with a fresh request_id via replay_query_same_connection() — no reconnect, unlike the failover path. Bounded by MAX_STALE_PLAN_RETRIES (15) so a table under relentless schema churn surfaces the error instead of looping; retries are paced by the server's recompile + RTT, so no client-side sleep and no busy-spin. Detection is gated on INTERNAL_ERROR + message text (the wire has no retryable bit) so genuine internal faults still surface unchanged. Because the FFI reader_cursor_next_batch wraps Cursor::next_batch, the C / Python paths inherit the fix for free. Reverts the test-level retry added to system_test/qwp_egress_reader.py in 4a55bb3 — the reader handles it now, so query_table_sorted is back to its simple form. Tests: 4 new egress_failover cases — transparent retry succeeds, a non-stale INTERNAL_ERROR still surfaces, a stale-plan error after rows were delivered surfaces (guard), and the retry budget exhausts and surfaces.
…sweep The functional matrix ran the entire suite once per BuildMode at the latest protocol (API, CONF, ENV) on top of the http x protocol_version sweep. The build-mode axis only changes how the client receives its config string, so running the whole suite 3x for it is mostly wasted wall-clock -- a big part of why the Windows job overruns the 90-minute CI cap. Collapse that axis: drop build_mode from the cartesian product and instead pick each test's mode pseudo-randomly from (seed, test-id). Each test runs once; across the suite all three modes get exercised. The seed is printed at the start (and on every failure) and can be pinned via QDB_BUILD_MODE_SEED to reproduce a run's exact choices. Selection is deterministic and independent of execution order. Non-latest/auto protocol versions stay on CONF, exactly as before. API-only coverage is preserved automatically: _BuildModeFuzzResult watches for the 'BuildMode.API-only test' skip and defers any API-only test that drew CONF/ENV (at the latest protocol) to a single forced-API retry pass -- so no enumeration/annotation of the 20 API-only tests is needed and future ones are covered too. The CONF-pinned QWP/WS suites are untouched (fuzzer is inert unless running SUITE_MATRIX with a seed). At the latest protocol this turns 3 full-suite passes into 1.
cargo fmt --check (the 'cargo fmt and clippy' CI job) wanted two hand-wrapped lines in reader.rs collapsed onto one. Formatting only; no behavior change.
…cision The re-run of build 246443 (jh_conn_pool_refactor @ 7d4650f, which includes the earlier fix cade18b) went from 4 failures to 1: the STATS best-effort fix and the no-durable-ack probe-send fix verified green; only test_orphan_drainer_durable_ack_survives_kill remained red. Root cause: this guard is ported from a Java BackgroundDrainer durable-ack regression test. It kills the primary while the adopted orphan slot still holds OK'd-but-not-durable frames and expects the drainer to replay them to the successor. The Rust drain_orphans path observes 0 rows on P2 even after a fresh orphan-scan reconnect -- OK'd-but-not-durable orphan frames do not survive a primary failover, unlike the main sender (which honours request_durable_ack and passes its failover tests). This is either a real durable-ack gap in the Rust orphan drainer or intended best-effort orphan semantics -- a client-side determination, not something to paper over with a test hack. Skip with a documented reason so the suite is green and the finding is surfaced rather than hidden.
…sender Bound the TCP dial on both QWP transports. Until now the connect used the OS default, which can hang for tens of seconds against a black-holed host that silently drops SYNs (no RST). `connect_timeout` (connect-string key, milliseconds) bounds each dial; on expiry it surfaces a distinct, failover-eligible ConnectTimeout rather than burying it under SocketError. Mechanism: native, cross-platform non-blocking connect() -> poll for writability bounded by the budget -> getsockopt(SO_ERROR), as implemented by std's TcpStream::connect_timeout (EINTR handled internally) — no hand-rolled loop. Applied per resolved address (Happy-Eyeballs). io::ErrorKind::TimedOut maps to the new ConnectTimeout code (Rust equivalent of the Java CONNECT_TIMEOUT sentinel), so callers tell a timed-out dial from refused/reset. Egress (QWP reader): ReaderConfig.connect_timeout_ms (0 = OS default), key `connect_timeout`, capped at 1h; WsTransport::connect_to uses it. New egress::ErrorCode::ConnectTimeout, wired into is_failover_eligible (+ its exhaustive matrix) so a per-endpoint timeout advances to the next endpoint. FFI reader_error_connect_timeout = 23 + header + round-trip/ABI tests. Ingress (QWP/WS sender): QwpWsConfig.connect_timeout (Option<Duration>, default None), key `connect_timeout` (rejected for non-QWP/WS); connect_tcp_to_any_addr uses it and surfaces ConnectTimeout when every candidate address timed out (stays retryable in the reconnect loop). New crate::ErrorCode::ConnectTimeout; FFI line_sender_error_connect_timeout = 19 + header + tripwire / ABI tests. Tests: config parse + validate on both sides, a reachable-connect-still-works case, and a behavioral dial against RFC 5737 TEST-NET-1 (192.0.2.1) asserting ConnectTimeout fires within the budget. clippy + fmt clean on both crates.
The orphan drainer built its send core with new_with_durable_ack(.., false) -- hardcoded OK-trim mode -- even when the connect string set request_durable_ack=on. So an adopted orphan slot was trimmed/marked drained on an ordinary server OK, before the WAL upload made it durable. If the primary then failed over within that window, the OK'd-but-not-durable orphan frames were already gone and never replayed to the successor -> data loss. The foreground sender already honours request_durable_ack (trims only on durable ACKs), and its failover tests pass. Make the orphan drainer consistent: pass *config.qwp_ws.request_durable_ack to the send core so, in durable-ack mode, an orphan slot is trimmed only on a durable ACK and survives a primary failover. Un-skips the e2e guard test_orphan_drainer_durable_ack_survives_kill_c_client_rust (skipped in f3c049c while this was under investigation), which exercises exactly this path (ghost slot adopted by a drain_orphans=on sender, primary killed before durable ack, frames must replay to P2). Found via Enterprise c-client e2e build 246443.
Collapse the previously split ingress/egress error types into a single error vocabulary shared by ingestion and queries, so a `QuestDb` pool — which spans both directions — speaks one error type. `?` now composes across a sender borrow and a reader borrow in one `questdb::Result`. Rust (questdb-rs): - One `questdb::Error` / `ErrorCode` (35 variants). `egress::error` is a thin re-export of `crate::error`; the one-way `From<crate::Error> for egress::Error` bridge is gone. `Error` is boxed and carries every payload (in_doubt, qwp_ws_*, upgrade_reject, server_info) cfg-gated by direction. `UpgradeReject` moved to `egress::server_event`. FFI (questdb-rs-ffi): - One C error struct + one enum `line_sender_error_code` (ingest codes 0..19 frozen, query codes appended 20..34). `reader_error` / `reader_error_code` are now type aliases of it; one total `From<ErrorCode>`. C / C++ headers: - `line_sender.h` is the canonical enum; `reader.h` aliases it (typedef + `#define`s). C++ gains a base `questdb::error` with `questdb::error_code`; `questdb::ingress::line_sender_error` and `questdb::egress::reader_error` are subclasses, so `catch (const questdb::error&)` handles either direction while per-direction catches keep working. Drift guards (a new error code must be mapped everywhere or the build/tests fail): in-crate wildcard-free exhaustiveness tripwire, FFI discriminant + From pins driven by one table, a Rust-vs-`line_sender.h` cross-check, and C++ static_asserts pinning the reader aliases. BREAKING (unreleased egress/reader surface only; ingest C ABI 0..19 unchanged): `reader_error_code` discriminants are renumbered onto the unified scheme (e.g. invalid_api_call 2->1, handshake_error 5->20), and `questdb_db_connect_reader` no longer remaps ingress connect codes onto egress ones. Verified: questdb-rs + questdb-rs-ffi unit tests + clippy clean; full cmake C/C++ build with mock-server cpp_tests; and live e2e against a QWP-capable server (egress_live_server/failover/auth/tls = 168 Rust live tests, C++ test_reader 253 assertions). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…r pool teardown The SfColumnSender contract promises that a parked connection's background runner keeps delivering and that returning/dropping a handle does not lose accepted frames. But pool close (drain_idle_senders), idle eviction (reap_idle_senders), and post-shutdown returns dropped the ColumnSender outright, and SyncQwpWsRunner::drop only sets `stop` + joins — drive_step returns Stop without flushing the queue. Any in-memory frame the runner had accepted but not yet delivered was silently lost. (Disk-backed sf_dir stayed safe via persistence + orphan replay; close_drain existed but was wired only into the standalone row Sender, never the column-sender/pool path, so the parsed close_flush_timeout was dead config there.) Drain at the destroy-sites only, never on the reuse-return fast path: - reap_idle_senders: skip connections whose queue still holds undelivered frames (lock-free progress read); they become reapable once the runner drains them or the transport goes terminal. - drain_idle_senders (pool close): signal every runner (begin_close, non-blocking) then await each under one shared deadline, so a multi-conn close stays bounded to ~close_flush_timeout instead of the sum. - return_to_pool / OwnedColumnSender drop under shutdown/must_close: bounded drain before the connection is dropped. An unreachable peer still drops the undelivered tail after the bounded wait, logging a warning (parity with commit_in_flight_on_drop). The SfColumnSender and ColumnPoolKind docs are corrected: drop delivers best-effort within close_flush_timeout; call wait() before close (or use sf_dir) for a hard guarantee. Refactors the runner's close_drain into reusable begin_close + drain_to_deadline. Adds three regression tests (reap-skip, close blocks until delivered, close drain bounded when peer never acks). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The store-and-forward wait() timeout advised "drop this sender and re-borrow to replay", claiming parity with the direct backend's transport-timeout path. That parity is false: the direct path latches must_close and drops the connection (discarding uncommitted frames), whereas the SFA connection is recycled with its queue + runner intact, so the runner keeps delivering the original frames. Re-flushing then double-delivers. Correct recovery is to retry wait(); fix the message and docs (column-major and row-major) accordingly. No behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add four Enterprise e2e tests pinning that the QWP-WS background orphan drainer honours request_durable_ack (fix 19b9fda) -- trimming an adopted orphan slot only on a durable ack, never on a plain server OK -- using the Enterprise harness object-store durability gate (obj_store.freeze). - trim_timing: no-kill pin. Freeze the store, adopt an orphan, assert the .sfa is RETAINED after the OK and trimmed only after thaw makes it durable. Pins the exact fixed line in ~3s. - partial_durability_at_kill: half the orphan made durable before the freeze, half OK-only; kill without wiping the store. The successor comes up as a replica (a fresh primary hits ER007 against the populated store), downloads the durable half, and is promoted to take the replayed non-durable half. Dense-sequence oracle => loss-free and duplicate-free. - multi_slot...survives_kill: three orphan slots adopted concurrently (max_background_drainers=3) must all survive a kill+wipe failover. - survives_drain_reconnect: the drain connection drops and re-adopts 3x while frozen; the slot must not prematurely trim across the churn (no kill -- isolates connection churn from the data-loss window). Each was confirmed to FAIL with the fix reverted to the pre-19b9fda OK-trim, and pass with it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Scenario-6 binding breadth for fix 19b9fda: the orphan-drainer durable-ack fix lives in the shared Rust core and the C / C++ FFI bindings inherit it. Add row-major QWP-WS sidecars driven through each binding plus the positive (survives_kill) and negative (no_request_durable_ack_loses_rows) e2e variants. - system_test/c_sidecars/qwp_c_sidecar.c: C-FFI row-major sidecar (line_sender_from_conf + line_sender_qwpws_*), same stdin/stdout line protocol as the Rust/Java sidecars. - system_test/c_sidecars/qwp_cpp_sidecar.cpp: C++ translation unit. The C++ wrapper has no row-major QWP-WS API (new_buffer() throws for WebSocket senders -- WS is column-major in C++), so it compiles/links the C++ header and drives the row-major path via the C ABI, exactly as a C++ user must. - c_client_sidecar.py: build_c_sidecar / build_cpp_sidecar (cargo-build the FFI lib + cc/c++ compile-link) and CClientCSidecar / CClientCppSidecar. - conftest.py: c_client_c_sidecar / c_client_cpp_sidecar fixtures. - tests/test_orphan_drainer_bindings.py: c_client_c + c_client_cpp variants (shared bodies, per-binding markers). The C/C++ FFI does not export the reconnSucc counter, so the negative test confirms the successor was reached server-side (the probe row lands on P2). Both bindings' survives_kill confirmed to FAIL with the fix reverted to the pre-19b9fda OK-trim (P2 observes 0 rows); all four pass with the fix. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR is the combined landing of #148 and #150 — two independently-developed columnar I/O tracks that both target QWP/WebSocket and share the same connection pool /
SymbolGlobalDict, shipped together so downstream consumers (py-questdb-client, in-tree C/C++ tests) see one self-consistent surface.#148 — Column-major sender (
column_sender)A DataFrame → Table ingest API.
QuestDbconnection pool +BorrowedSender+Chunk(per-columnVec<u8>that stacks wire bytes directly) + synchronousflush(AckLevel). Covers bool / signed integers / floats / UUID / Long256 / IPv4 / timestamps / VARCHAR /symbol_dict_{i8,i16,i32}bulk-intern. The connection-scopedSchemaRegistry(FULL / REFERENCE emit modes) andSymbolGlobalDictare shared with the row API, preserving the 1M-per-connection symbol cap on huge PandasCategoricaldicts. Full C ABI (include/questdb/ingress/column_sender.h) and a Criterion bench suite (column path ≈ memcpy ceiling; bulk-intern ~16× faster than per-row HashMap).#150 — Apache Arrow + Polars integration
Both directions over QWP/WebSocket:
Buffer::append_arrow/append_arrow_at_columnconsumes a wholeRecordBatchin one call, column-major dense bulk path (one memcpy per column; QWP null bitmap built by byte-stride OR-with-NOT of the Arrow validity buffer when boundaries align, per-row fallback only when bit-offsets are unaligned).Cursor::as_record_batch_reader()streamingRecordBatchiterator; Polars sub-feature provides the DataFrame bridge.line_sender_buffer_append_arrow*andline_reader_cursor_next_arrow_batch. Every producer-suppliedArrowArray/ArrowSchemais pre-validated beforefrom_ffi(schema depth ≤ 64,row_count≤ 16M, bounded per-node buffer/child counts, and rejection of NULL or under-sized buffer arrays), so a malformed struct returns an error instead of aborting the FFI crate'spanic = "abort"profile. The manual per-column FFI path caps each variable-length payload ati32::MAX.Why merged
The two tracks were developed on the same
jh_conn_pool_refactorbranch and converged on shared infrastructure (connection pool,SymbolGlobalDict, QWP/WS transport). Splitting them at review time would force one to ship behind a compatibility shim for the other; merging them together avoids that churn and gives C/C++ callers — and the upcoming Pandas / Polars wrapper inpy-questdb-client— a column-major, zero-redundant-copy path into QuestDB in one cut.Public surface
See the original PRs' "Public surface" / "What's in the box" sections:
Feature gating
questdb-rs:arrow+polarsare opt-in features, excluded fromalmost-all-features; column_sender lives behindsync-sender-qwp-ws.questdb-rs-ffi:arrowfeature mirrors.CMakeLists.txt:QUESTDB_ENABLE_ARROW=OFFby default; auto-flipped toONwhenQUESTDB_TESTS_AND_EXAMPLES=ONso tests / examples exercise the Arrow path without explicit opt-in.Test plan
test_arrow_c.c/test_arrow_egress.cpp/test_arrow_ingress.cppwired into CMake and exercised in CIarrow_egress_fuzz/arrow_ingress_fuzz/arrow_round_trip_fuzz/arrow_alignment_fuzzcargo bench --features sync-sender-qwp-ws --bench column_senderpy-questdb-client, WS-7)Closes #148, #150.
Summary by CodeRabbit
New Features
Examples
Documentation
Tests
Chores (CI)
Benchmarks