Skip to content

feat(cbo): ANALYZE TABLE persistent column statistics (reuse Spark's stats infra) #629

Description

@LuciferYang

Summary

Add ANALYZE TABLE … COMPUTE STATISTICS [FOR ALL COLUMNS | FOR COLUMNS …] support for
Lance tables, persisting per-column statistics into the Lance manifest so Spark's
cost-based optimizer (CBO) can consume them on subsequent scans — O(1) per column, with no
per-scan re-aggregation.

The proposed approach reuses Spark's own statistics infrastructure rather than
hand-rolling aggregation or a custom serialization format, and adds no new SQL grammar
(Spark already parses ANALYZE; an injected resolution rule rewrites it for Lance tables).
It complements the connector's existing zonemap-derived runtime statistics (e.g. Dynamic
File Pruning) and is independent of them — no zonemap or btree index is required for the
persisted-stats path.

A prototype implementing this exists on a branch; a PR will follow. This issue is intended to
capture the design and invite feedback before that PR lands — in particular on the two
deliberate tradeoffs called out under Risks & known limitations (coupling to Spark
internals, and storing stats in manifest TBLPROPERTIES).

Motivation

Spark's CBO needs column-level statistics (min, max, null count, distinct count, avg/max
length) to produce good join orders and selectivity estimates. Without persisted stats every
scan must either re-aggregate or fall back to coarse heuristics. ANALYZE TABLE is the
standard Spark entry point for computing and storing these — but Spark rejects it for V2
tables, and Lance has no place to put the results. This change closes both gaps.

Approach

1. Reuse Spark's stats engine instead of hand-rolling aggregation or a codec.
ANALYZE delegates to Spark's own CommandUtils.computeColumnStats — one aggregation job
producing min, max, nullCount, HLL-approximate distinctCount, avgLen, maxLen — and persists
each column serialized with Spark's CatalogColumnStat.toMap. The read path reads the
payload back via CatalogColumnStat.fromMap / toPlanStat and feeds Spark's CBO directly.
Using Spark's serialization (rather than a bespoke codec) keeps the connector's stored form
in lockstep with Spark's own semantics — at the cost of coupling to a Spark-internal format
(see Risks). Distinct count is always HLL-approximate (there is no EXACT mode), matching
native ANALYZE.

2. No new grammar — an injected resolution rule.
Spark parses ANALYZE natively. A LanceAnalyzeTableResolution analyzer rule rewrites the
native AnalyzeColumn / AnalyzeTable logical plans into LanceAnalyzeTable when the
target resolves to a Lance dataset, pre-empting the V2-table rejection Spark would otherwise
throw at planning time (DataSourceV2Strategy / NOT_SUPPORTED_COMMAND_FOR_V2_TABLE).
Non-Lance ANALYZE is intentionally left untouched for Spark to handle. The connector's
custom grammar is retained for the 8 other Lance commands that have no native equivalent;
only the ANALYZE rule is moved to native interception.

NOSCAN, and a partition spec on the bare COMPUTE STATISTICS form, are not supported:
they fall through to Spark's native V2 rejection rather than being silently accepted. A
partition spec combined with FOR COLUMNS / FOR ALL COLUMNS is dropped by Spark's parser
with a warning (matching native ANALYZE column-stats semantics, where column stats are not
partition-scoped) and the full table is analyzed. A bare COMPUTE STATISTICS with no FOR
clause is intercepted and treated as FOR ALL COLUMNS — a deliberate divergence from native
Spark, where the bare form computes only table-level row/size stats and runs no per-column
aggregation.

Wire format (lance.stats.version=1)

Stored in Lance manifest TBLPROPERTIES. Envelope plus one block per analyzed column:

lance.stats.{version, complete, computedAtVersion, computedAt, numRows, schemaHash}
lance.stats.column.<name>.{version, distinctCount, min, max, nullCount, avgLen, maxLen}

Per-column values use Spark's own CatalogColumnStat map form. Spark can also emit a
histogram entry; the writer strips it before persisting and the reader ignores it, so
no histogram key is ever written to disk. A reader that encounters an unrecognized
lance.stats.version ignores the entire payload and falls back to live aggregation. A
per-column version newer than the reader's Spark CatalogColumnStat version drops only
that column
(it is omitted from the reported stats) while the remaining columns are still
served. Either way the format is forward-compatible by degradation, not by hard error.

Correctness guarantees

  • Single-write commit. The writer assembles one TableChange[] ending in
    lance.stats.complete=true and submits it via a single catalog.alterTable
    one Dataset.updateConfig, so no half-written stats become visible (the read path refuses
    to consume stats unless complete="true"). Note this is a read-merge-write of manifest
    properties, not a connector-level compare-and-swap; isolation from other concurrent
    property writers relies on Lance's own commit-conflict handling, not on this connector.
  • Schema-drift guard. A SHA-256 fingerprint of (name, dataType, nullable) triples is
    recorded under lance.stats.schemaHash; the read path recomputes it and falls back to
    live aggregation on mismatch or absence (fail-safe, not silent pass-through).
  • Invalidation on data change. Stats are tagged with the manifest version they describe
    (computedAtVersion). Any subsequent commit (append/overwrite/delete/optimize) bumps the
    manifest version, so the read path's exact computedAtVersion == currentVersion check no
    longer matches and it falls back to live aggregation — unless allow.stale is set.
  • Concurrency (read-side staleness guard). ANALYZE captures the manifest version
    before the aggregation read and tags stats with readVersion + 1. This guards the
    reader: a writer interleaving between capture and alterTable makes the tag not match
    the version the stats actually landed at, and the read path rejects them
    ("safer-stale-than-wrong"). It does not add isolation between two concurrent
    ANALYZE/writer commits
    (those are subject to the read-merge-write above); the guard's purpose is solely to keep a
    reader from consuming stats that don't describe the live data.

Configuration

SparkConf Per-scan option Default Purpose
spark.lance.cbo.column.stats.enabled cbo.column.stats.enabled true Master kill-switch for the persisted-stats fast path on the READ side (ANALYZE still writes). The SparkConf key overrides the per-scan option.
spark.lance.cbo.column.stats.allow.stale (none) false Accept stats whose computedAtVersion differs from the current manifest version (trades correctness for hit rate).

Skipped / rejected types

Complex containers (Struct/Array/Map/UDT/Null), interval types (Calendar / AnsiInterval),
and TimestampNTZ are unsupported. TimestampNTZ is excluded specifically because a filtered
min/max comparison on it throws inside Spark's CBO FilterEstimation (EstimationUtils
has no toDouble case for TimestampNTZTypeMatchError).

  • Under FOR ALL COLUMNS, unsupported columns are skipped (logged at INFO).
  • Under FOR COLUMNS …, naming an unsupported column fails fast with a clear message.

Column names containing . cannot be targeted by FOR COLUMNS (they would produce
ambiguous keys under the lance.stats.column.<name>.<suffix> scheme); FOR ALL COLUMNS
skips them. This is a known limitation — see below.

Risks & known limitations

  • Coupling to Spark internals. CommandUtils.computeColumnStats is reached by reflective
    name/arity lookup (its first parameter type differs between Spark 3.x and 4.x), and the
    on-disk per-column payload is exactly CatalogColumnStat's internal map encoding. Both are
    non-public Spark APIs. The reflective lookup matches by method name and arity, so an arity
    change produces a clear diagnostic, but a same-arity parameter-type or return-type drift
    surfaces as a raw reflective error rather than a named one. Either way, a Spark upgrade that
    changes these will require a corresponding connector change, and the lance.stats.version
    envelope does not by itself protect against Spark silently reformatting the per-column map.
  • Manifest size on wide tables. Stats live in manifest TBLPROPERTIES, which is read on
    every scan. For tables with many columns this grows the manifest with each ANALYZE. A
    sidecar-file storage medium is a possible alternative and is open for discussion.
  • Dotted column names. ANALYZE … FOR COLUMNS cannot target a top-level column whose
    name contains .; the workaround is to omit it (or rename). FOR ALL COLUMNS skips such
    columns rather than failing. This is not merely the connector's flat
    lance.stats.column.<name>.<suffix> key scheme: Spark's V2 CBO matches column stats to plan
    attributes via attr.name().equals(ref.describe()) (DataSourceV2Relation.transformV2Stats),
    and ref.describe() backtick-quotes a dotted name (`a.b`a.b), so the optimizer
    cannot consume such stats for any V2 source — connector-side key encoding alone would not
    make them usable.

Key classes

  • read/LanceStatsKeys.java — wire-format spec, key constants, SparkConf constants, schema hash.
  • v2/LanceColumnStatCodec.scala — read-side bridge: CatalogColumnStat.fromMap/toPlanStat → V2 ColumnStatistics; histogram strip, value-length cap, version gate, fail-safe decode.
  • v2/LanceNativeColumnStats.scala — reflective bridge to CommandUtils.computeColumnStats (3.x/4.x SparkSession split).
  • v2/LanceAnalyzeTableResolution.scala — analyzer rule rewriting native AnalyzeColumn/AnalyzeTableLanceAnalyzeTable for Lance tables.
  • v2/LanceAnalyzeTableExec.scala — ANALYZE executor (writer, single-commit ordering, prefix-based orphan-key GC, histogram strip).
  • read/LanceScanBuilder.javaloadPersistedColumnStats fast path.

Observability

  • LanceScanBuilder logs at INFO when the fast path engages.
  • Schema-hash mismatch logs at INFO; schema-hash absence and stale-version fallback log at DEBUG.
  • The ANALYZE writer logs analyzed column count + post-write manifest version, and a
    per-orphan-key removal count when schema columns were dropped between runs.

Test plan

  • Unit: LanceStatsKeysTest, LanceAnalyzeTableSchemaHashTest, LoadPersistedColumnStatsTest,
    LanceSparkReadOptionsSerializationTest.
  • End-to-end: BaseAnalyzeTableTest (run per Spark version via AnalyzeTableTest) —
    FOR ALL COLUMNS / FOR COLUMNS, decimal / empty / all-null / re-analyze / partial,
    stats-reach-scan, TimestampNTZ-skip, case-insensitivity, NOSCAN-falls-through-to-Spark,
    and kill-switch paths.
  • Coverage: the Spark 3.5 module suite is green locally; Spark 4.0/4.1 reuse the 3.5 test
    sources via build-helper and run them in CI; 3.4 compiles. The full four-version +
    integration matrix runs via CI when the PR opens.

Feedback welcome on the design — especially the two tradeoffs in Risks & known
limitations
— before the PR is opened.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions