Summary
Add ANALYZE TABLE … COMPUTE STATISTICS [FOR ALL COLUMNS | FOR COLUMNS …] support for
Lance tables, persisting per-column statistics into the Lance manifest so Spark's
cost-based optimizer (CBO) can consume them on subsequent scans — O(1) per column, with no
per-scan re-aggregation.
The proposed approach reuses Spark's own statistics infrastructure rather than
hand-rolling aggregation or a custom serialization format, and adds no new SQL grammar
(Spark already parses ANALYZE; an injected resolution rule rewrites it for Lance tables).
It complements the connector's existing zonemap-derived runtime statistics (e.g. Dynamic
File Pruning) and is independent of them — no zonemap or btree index is required for the
persisted-stats path.
A prototype implementing this exists on a branch; a PR will follow. This issue is intended to
capture the design and invite feedback before that PR lands — in particular on the two
deliberate tradeoffs called out under Risks & known limitations (coupling to Spark
internals, and storing stats in manifest TBLPROPERTIES).
Motivation
Spark's CBO needs column-level statistics (min, max, null count, distinct count, avg/max
length) to produce good join orders and selectivity estimates. Without persisted stats every
scan must either re-aggregate or fall back to coarse heuristics. ANALYZE TABLE is the
standard Spark entry point for computing and storing these — but Spark rejects it for V2
tables, and Lance has no place to put the results. This change closes both gaps.
Approach
1. Reuse Spark's stats engine instead of hand-rolling aggregation or a codec.
ANALYZE delegates to Spark's own CommandUtils.computeColumnStats — one aggregation job
producing min, max, nullCount, HLL-approximate distinctCount, avgLen, maxLen — and persists
each column serialized with Spark's CatalogColumnStat.toMap. The read path reads the
payload back via CatalogColumnStat.fromMap / toPlanStat and feeds Spark's CBO directly.
Using Spark's serialization (rather than a bespoke codec) keeps the connector's stored form
in lockstep with Spark's own semantics — at the cost of coupling to a Spark-internal format
(see Risks). Distinct count is always HLL-approximate (there is no EXACT mode), matching
native ANALYZE.
2. No new grammar — an injected resolution rule.
Spark parses ANALYZE natively. A LanceAnalyzeTableResolution analyzer rule rewrites the
native AnalyzeColumn / AnalyzeTable logical plans into LanceAnalyzeTable when the
target resolves to a Lance dataset, pre-empting the V2-table rejection Spark would otherwise
throw at planning time (DataSourceV2Strategy / NOT_SUPPORTED_COMMAND_FOR_V2_TABLE).
Non-Lance ANALYZE is intentionally left untouched for Spark to handle. The connector's
custom grammar is retained for the 8 other Lance commands that have no native equivalent;
only the ANALYZE rule is moved to native interception.
NOSCAN, and a partition spec on the bare COMPUTE STATISTICS form, are not supported:
they fall through to Spark's native V2 rejection rather than being silently accepted. A
partition spec combined with FOR COLUMNS / FOR ALL COLUMNS is dropped by Spark's parser
with a warning (matching native ANALYZE column-stats semantics, where column stats are not
partition-scoped) and the full table is analyzed. A bare COMPUTE STATISTICS with no FOR
clause is intercepted and treated as FOR ALL COLUMNS — a deliberate divergence from native
Spark, where the bare form computes only table-level row/size stats and runs no per-column
aggregation.
Wire format (lance.stats.version=1)
Stored in Lance manifest TBLPROPERTIES. Envelope plus one block per analyzed column:
lance.stats.{version, complete, computedAtVersion, computedAt, numRows, schemaHash}
lance.stats.column.<name>.{version, distinctCount, min, max, nullCount, avgLen, maxLen}
Per-column values use Spark's own CatalogColumnStat map form. Spark can also emit a
histogram entry; the writer strips it before persisting and the reader ignores it, so
no histogram key is ever written to disk. A reader that encounters an unrecognized
lance.stats.version ignores the entire payload and falls back to live aggregation. A
per-column version newer than the reader's Spark CatalogColumnStat version drops only
that column (it is omitted from the reported stats) while the remaining columns are still
served. Either way the format is forward-compatible by degradation, not by hard error.
Correctness guarantees
- Single-write commit. The writer assembles one
TableChange[] ending in
lance.stats.complete=true and submits it via a single catalog.alterTable →
one Dataset.updateConfig, so no half-written stats become visible (the read path refuses
to consume stats unless complete="true"). Note this is a read-merge-write of manifest
properties, not a connector-level compare-and-swap; isolation from other concurrent
property writers relies on Lance's own commit-conflict handling, not on this connector.
- Schema-drift guard. A SHA-256 fingerprint of
(name, dataType, nullable) triples is
recorded under lance.stats.schemaHash; the read path recomputes it and falls back to
live aggregation on mismatch or absence (fail-safe, not silent pass-through).
- Invalidation on data change. Stats are tagged with the manifest version they describe
(computedAtVersion). Any subsequent commit (append/overwrite/delete/optimize) bumps the
manifest version, so the read path's exact computedAtVersion == currentVersion check no
longer matches and it falls back to live aggregation — unless allow.stale is set.
- Concurrency (read-side staleness guard).
ANALYZE captures the manifest version
before the aggregation read and tags stats with readVersion + 1. This guards the
reader: a writer interleaving between capture and alterTable makes the tag not match
the version the stats actually landed at, and the read path rejects them
("safer-stale-than-wrong"). It does not add isolation between two concurrent
ANALYZE/writer commits
(those are subject to the read-merge-write above); the guard's purpose is solely to keep a
reader from consuming stats that don't describe the live data.
Configuration
| SparkConf |
Per-scan option |
Default |
Purpose |
spark.lance.cbo.column.stats.enabled |
cbo.column.stats.enabled |
true |
Master kill-switch for the persisted-stats fast path on the READ side (ANALYZE still writes). The SparkConf key overrides the per-scan option. |
spark.lance.cbo.column.stats.allow.stale |
(none) |
false |
Accept stats whose computedAtVersion differs from the current manifest version (trades correctness for hit rate). |
Skipped / rejected types
Complex containers (Struct/Array/Map/UDT/Null), interval types (Calendar / AnsiInterval),
and TimestampNTZ are unsupported. TimestampNTZ is excluded specifically because a filtered
min/max comparison on it throws inside Spark's CBO FilterEstimation (EstimationUtils
has no toDouble case for TimestampNTZType → MatchError).
- Under
FOR ALL COLUMNS, unsupported columns are skipped (logged at INFO).
- Under
FOR COLUMNS …, naming an unsupported column fails fast with a clear message.
Column names containing . cannot be targeted by FOR COLUMNS (they would produce
ambiguous keys under the lance.stats.column.<name>.<suffix> scheme); FOR ALL COLUMNS
skips them. This is a known limitation — see below.
Risks & known limitations
- Coupling to Spark internals.
CommandUtils.computeColumnStats is reached by reflective
name/arity lookup (its first parameter type differs between Spark 3.x and 4.x), and the
on-disk per-column payload is exactly CatalogColumnStat's internal map encoding. Both are
non-public Spark APIs. The reflective lookup matches by method name and arity, so an arity
change produces a clear diagnostic, but a same-arity parameter-type or return-type drift
surfaces as a raw reflective error rather than a named one. Either way, a Spark upgrade that
changes these will require a corresponding connector change, and the lance.stats.version
envelope does not by itself protect against Spark silently reformatting the per-column map.
- Manifest size on wide tables. Stats live in manifest
TBLPROPERTIES, which is read on
every scan. For tables with many columns this grows the manifest with each ANALYZE. A
sidecar-file storage medium is a possible alternative and is open for discussion.
- Dotted column names.
ANALYZE … FOR COLUMNS cannot target a top-level column whose
name contains .; the workaround is to omit it (or rename). FOR ALL COLUMNS skips such
columns rather than failing. This is not merely the connector's flat
lance.stats.column.<name>.<suffix> key scheme: Spark's V2 CBO matches column stats to plan
attributes via attr.name().equals(ref.describe()) (DataSourceV2Relation.transformV2Stats),
and ref.describe() backtick-quotes a dotted name (`a.b` ≠ a.b), so the optimizer
cannot consume such stats for any V2 source — connector-side key encoding alone would not
make them usable.
Key classes
read/LanceStatsKeys.java — wire-format spec, key constants, SparkConf constants, schema hash.
v2/LanceColumnStatCodec.scala — read-side bridge: CatalogColumnStat.fromMap/toPlanStat → V2 ColumnStatistics; histogram strip, value-length cap, version gate, fail-safe decode.
v2/LanceNativeColumnStats.scala — reflective bridge to CommandUtils.computeColumnStats (3.x/4.x SparkSession split).
v2/LanceAnalyzeTableResolution.scala — analyzer rule rewriting native AnalyzeColumn/AnalyzeTable → LanceAnalyzeTable for Lance tables.
v2/LanceAnalyzeTableExec.scala — ANALYZE executor (writer, single-commit ordering, prefix-based orphan-key GC, histogram strip).
read/LanceScanBuilder.java — loadPersistedColumnStats fast path.
Observability
LanceScanBuilder logs at INFO when the fast path engages.
- Schema-hash mismatch logs at INFO; schema-hash absence and stale-version fallback log at DEBUG.
- The
ANALYZE writer logs analyzed column count + post-write manifest version, and a
per-orphan-key removal count when schema columns were dropped between runs.
Test plan
- Unit:
LanceStatsKeysTest, LanceAnalyzeTableSchemaHashTest, LoadPersistedColumnStatsTest,
LanceSparkReadOptionsSerializationTest.
- End-to-end:
BaseAnalyzeTableTest (run per Spark version via AnalyzeTableTest) —
FOR ALL COLUMNS / FOR COLUMNS, decimal / empty / all-null / re-analyze / partial,
stats-reach-scan, TimestampNTZ-skip, case-insensitivity, NOSCAN-falls-through-to-Spark,
and kill-switch paths.
- Coverage: the Spark 3.5 module suite is green locally; Spark 4.0/4.1 reuse the 3.5 test
sources via build-helper and run them in CI; 3.4 compiles. The full four-version +
integration matrix runs via CI when the PR opens.
Feedback welcome on the design — especially the two tradeoffs in Risks & known
limitations — before the PR is opened.
Summary
Add
ANALYZE TABLE … COMPUTE STATISTICS [FOR ALL COLUMNS | FOR COLUMNS …]support forLance tables, persisting per-column statistics into the Lance manifest so Spark's
cost-based optimizer (CBO) can consume them on subsequent scans — O(1) per column, with no
per-scan re-aggregation.
The proposed approach reuses Spark's own statistics infrastructure rather than
hand-rolling aggregation or a custom serialization format, and adds no new SQL grammar
(Spark already parses
ANALYZE; an injected resolution rule rewrites it for Lance tables).It complements the connector's existing zonemap-derived runtime statistics (e.g. Dynamic
File Pruning) and is independent of them — no zonemap or btree index is required for the
persisted-stats path.
A prototype implementing this exists on a branch; a PR will follow. This issue is intended to
capture the design and invite feedback before that PR lands — in particular on the two
deliberate tradeoffs called out under Risks & known limitations (coupling to Spark
internals, and storing stats in manifest
TBLPROPERTIES).Motivation
Spark's CBO needs column-level statistics (min, max, null count, distinct count, avg/max
length) to produce good join orders and selectivity estimates. Without persisted stats every
scan must either re-aggregate or fall back to coarse heuristics.
ANALYZE TABLEis thestandard Spark entry point for computing and storing these — but Spark rejects it for V2
tables, and Lance has no place to put the results. This change closes both gaps.
Approach
1. Reuse Spark's stats engine instead of hand-rolling aggregation or a codec.
ANALYZEdelegates to Spark's ownCommandUtils.computeColumnStats— one aggregation jobproducing min, max, nullCount, HLL-approximate distinctCount, avgLen, maxLen — and persists
each column serialized with Spark's
CatalogColumnStat.toMap. The read path reads thepayload back via
CatalogColumnStat.fromMap/toPlanStatand feeds Spark's CBO directly.Using Spark's serialization (rather than a bespoke codec) keeps the connector's stored form
in lockstep with Spark's own semantics — at the cost of coupling to a Spark-internal format
(see Risks). Distinct count is always HLL-approximate (there is no EXACT mode), matching
native
ANALYZE.2. No new grammar — an injected resolution rule.
Spark parses
ANALYZEnatively. ALanceAnalyzeTableResolutionanalyzer rule rewrites thenative
AnalyzeColumn/AnalyzeTablelogical plans intoLanceAnalyzeTablewhen thetarget resolves to a Lance dataset, pre-empting the V2-table rejection Spark would otherwise
throw at planning time (
DataSourceV2Strategy/NOT_SUPPORTED_COMMAND_FOR_V2_TABLE).Non-Lance
ANALYZEis intentionally left untouched for Spark to handle. The connector'scustom grammar is retained for the 8 other Lance commands that have no native equivalent;
only the
ANALYZErule is moved to native interception.NOSCAN, and a partition spec on the bareCOMPUTE STATISTICSform, are not supported:they fall through to Spark's native V2 rejection rather than being silently accepted. A
partition spec combined with
FOR COLUMNS/FOR ALL COLUMNSis dropped by Spark's parserwith a warning (matching native
ANALYZEcolumn-stats semantics, where column stats are notpartition-scoped) and the full table is analyzed. A bare
COMPUTE STATISTICSwith noFORclause is intercepted and treated as
FOR ALL COLUMNS— a deliberate divergence from nativeSpark, where the bare form computes only table-level row/size stats and runs no per-column
aggregation.
Wire format (
lance.stats.version=1)Stored in Lance manifest
TBLPROPERTIES. Envelope plus one block per analyzed column:Per-column values use Spark's own
CatalogColumnStatmap form. Spark can also emit ahistogramentry; the writer strips it before persisting and the reader ignores it, sono
histogramkey is ever written to disk. A reader that encounters an unrecognizedlance.stats.versionignores the entire payload and falls back to live aggregation. Aper-column
versionnewer than the reader's SparkCatalogColumnStatversion drops onlythat column (it is omitted from the reported stats) while the remaining columns are still
served. Either way the format is forward-compatible by degradation, not by hard error.
Correctness guarantees
TableChange[]ending inlance.stats.complete=trueand submits it via a singlecatalog.alterTable→one
Dataset.updateConfig, so no half-written stats become visible (the read path refusesto consume stats unless
complete="true"). Note this is a read-merge-write of manifestproperties, not a connector-level compare-and-swap; isolation from other concurrent
property writers relies on Lance's own commit-conflict handling, not on this connector.
(name, dataType, nullable)triples isrecorded under
lance.stats.schemaHash; the read path recomputes it and falls back tolive aggregation on mismatch or absence (fail-safe, not silent pass-through).
(
computedAtVersion). Any subsequent commit (append/overwrite/delete/optimize) bumps themanifest version, so the read path's exact
computedAtVersion == currentVersioncheck nolonger matches and it falls back to live aggregation — unless
allow.staleis set.ANALYZEcaptures the manifest versionbefore the aggregation read and tags stats with
readVersion + 1. This guards thereader: a writer interleaving between capture and
alterTablemakes the tag not matchthe version the stats actually landed at, and the read path rejects them
("safer-stale-than-wrong"). It does not add isolation between two concurrent
ANALYZE/writer commits(those are subject to the read-merge-write above); the guard's purpose is solely to keep a
reader from consuming stats that don't describe the live data.
Configuration
spark.lance.cbo.column.stats.enabledcbo.column.stats.enabledtruespark.lance.cbo.column.stats.allow.stalefalsecomputedAtVersiondiffers from the current manifest version (trades correctness for hit rate).Skipped / rejected types
Complex containers (Struct/Array/Map/UDT/Null), interval types (Calendar / AnsiInterval),
and TimestampNTZ are unsupported. TimestampNTZ is excluded specifically because a filtered
min/max comparison on it throws inside Spark's CBO
FilterEstimation(EstimationUtilshas no
toDoublecase forTimestampNTZType→MatchError).FOR ALL COLUMNS, unsupported columns are skipped (logged at INFO).FOR COLUMNS …, naming an unsupported column fails fast with a clear message.Column names containing
.cannot be targeted byFOR COLUMNS(they would produceambiguous keys under the
lance.stats.column.<name>.<suffix>scheme);FOR ALL COLUMNSskips them. This is a known limitation — see below.
Risks & known limitations
CommandUtils.computeColumnStatsis reached by reflectivename/arity lookup (its first parameter type differs between Spark 3.x and 4.x), and the
on-disk per-column payload is exactly
CatalogColumnStat's internal map encoding. Both arenon-public Spark APIs. The reflective lookup matches by method name and arity, so an arity
change produces a clear diagnostic, but a same-arity parameter-type or return-type drift
surfaces as a raw reflective error rather than a named one. Either way, a Spark upgrade that
changes these will require a corresponding connector change, and the
lance.stats.versionenvelope does not by itself protect against Spark silently reformatting the per-column map.
TBLPROPERTIES, which is read onevery scan. For tables with many columns this grows the manifest with each
ANALYZE. Asidecar-file storage medium is a possible alternative and is open for discussion.
ANALYZE … FOR COLUMNScannot target a top-level column whosename contains
.; the workaround is to omit it (or rename).FOR ALL COLUMNSskips suchcolumns rather than failing. This is not merely the connector's flat
lance.stats.column.<name>.<suffix>key scheme: Spark's V2 CBO matches column stats to planattributes via
attr.name().equals(ref.describe())(DataSourceV2Relation.transformV2Stats),and
ref.describe()backtick-quotes a dotted name (`a.b`≠a.b), so the optimizercannot consume such stats for any V2 source — connector-side key encoding alone would not
make them usable.
Key classes
read/LanceStatsKeys.java— wire-format spec, key constants, SparkConf constants, schema hash.v2/LanceColumnStatCodec.scala— read-side bridge:CatalogColumnStat.fromMap/toPlanStat→ V2ColumnStatistics; histogram strip, value-length cap, version gate, fail-safe decode.v2/LanceNativeColumnStats.scala— reflective bridge toCommandUtils.computeColumnStats(3.x/4.xSparkSessionsplit).v2/LanceAnalyzeTableResolution.scala— analyzer rule rewriting nativeAnalyzeColumn/AnalyzeTable→LanceAnalyzeTablefor Lance tables.v2/LanceAnalyzeTableExec.scala— ANALYZE executor (writer, single-commit ordering, prefix-based orphan-key GC, histogram strip).read/LanceScanBuilder.java—loadPersistedColumnStatsfast path.Observability
LanceScanBuilderlogs at INFO when the fast path engages.ANALYZEwriter logs analyzed column count + post-write manifest version, and aper-orphan-key removal count when schema columns were dropped between runs.
Test plan
LanceStatsKeysTest,LanceAnalyzeTableSchemaHashTest,LoadPersistedColumnStatsTest,LanceSparkReadOptionsSerializationTest.BaseAnalyzeTableTest(run per Spark version viaAnalyzeTableTest) —FOR ALL COLUMNS/FOR COLUMNS, decimal / empty / all-null / re-analyze / partial,stats-reach-scan, TimestampNTZ-skip, case-insensitivity, NOSCAN-falls-through-to-Spark,
and kill-switch paths.
sources via
build-helperand run them in CI; 3.4 compiles. The full four-version +integration matrix runs via CI when the PR opens.
Feedback welcome on the design — especially the two tradeoffs in Risks & known
limitations — before the PR is opened.