[GLUTEN-11605][VL] Write per-block column statistics in shuffle writer#11769
[GLUTEN-11605][VL] Write per-block column statistics in shuffle writer#11769acvictor wants to merge 4 commits intoapache:mainfrom
Conversation
cb073fd to
19b8d5a
Compare
19b8d5a to
50e0444
Compare
|
@marin-ma @zhztheplayer this is ready for review. I will push one more commit to again disable by default. |
cpp/core/shuffle/BlockStatistics.cc
Outdated
| } | ||
| // Check each bit — return early on first null found. | ||
| for (uint32_t i = 0; i < numRows; ++i) { | ||
| if (!arrow::bit_util::GetBit(validityBuffer->data(), i)) { |
There was a problem hiding this comment.
Perhaps check each bytes by comparing with 0xff can be faster
cpp/core/jni/JniWrapper.cc
Outdated
| // Reuse the dynamic filter config to also enable block statistics collection, | ||
| // since stats are only useful when dynamic filter pushdown is active. | ||
| const auto& confMap = ctx->getConfMap(); | ||
| auto it = confMap.find("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"); |
There was a problem hiding this comment.
Other option values are passing through function args. Can you add a new arg enableBlockStatistics?
cpp/core/shuffle/Payload.cc
Outdated
| mergedStats.merge(*append->blockStats_); | ||
| result->setBlockStats(std::move(mergedStats)); | ||
| } else if (source->hasBlockStats()) { | ||
| result->setBlockStats(*source->blockStats_); |
There was a problem hiding this comment.
If only source or append has blockStats, should we either discard it or compute for the missing side and merge them?
There was a problem hiding this comment.
Done - stats are only kept when both payloads have them and if one side is missing stats are discarded
| if (!isRowValid(validityBuffer, i)) { | ||
| continue; | ||
| } | ||
| T val = values[i]; |
There was a problem hiding this comment.
Check if require to consider NaN cases,
NaN may be silently skipped by the comparisons (neither updates min nor max) and making minVal = maxVal = NaN.
See if require to add a NaN check:
if constexpr (std::is_floating_point_v) {
if (std::isnan(val)) continue; // Skip NaN values
}
| case Type::kCompressed: { | ||
| int64_t size = sizeof(Type) + sizeof(uint32_t) + sizeof(uint32_t); // type + numRows + numBuffers | ||
| if (!buffers_.empty() && buffers_[0]) { | ||
| size += buffers_[0]->size(); |
There was a problem hiding this comment.
Missing buffer size field?
size += sizeof(int64_t) + buffers_[0]->size(); // buffer size field + data
Will revert on this soon |
What changes are proposed in this pull request?
This PR adds per-block column statistics (min/max/hasNull) to the shuffle writer pipeline as a prerequisite for block-level pruning using dynamic filters at the shuffle reader. When
spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabledis true, the shuffle writer computes per-column min/max statistics from raw Arrow buffers duringevictBuffers()and serializes them as akStatisticsPayloadblock before each non-dictionary payload in the output file. This mirrors how parquet row group statistics enable predicate pushdown.How was this patch tested?
Added new tests and also ran the CI with config set to true.
Was this patch authored or co-authored using generative AI tooling?
No
Related issue: #11605