Describe the bug
An aggregate whose intermediate buffer format is incompatible between Spark and Comet (CometAggregateExpressionSerde.supportsMixedPartialFinal == false) can be split across Comet and Spark within a distinct-aggregate rewrite, so a Comet-encoded partial buffer is handed to a Spark aggregate (or vice versa). Depending on the aggregate this produces a hard crash or silently wrong results.
This is a pre-existing gap in the mixed-execution guards added for #1389. Those guards only handle a direct single-stage Partial → Final pair:
CometExecRule.tagUnsafePartialAggregates / findPartialAggInPlan only looked for a Partial aggregate feeding directly into a non-convertible Final, and stopped at intermediate aggregate stages.
CometBaseAggregate.doConvert only guarded the Final mode (sparkFinalMode), not intermediate PartialMerge stages.
Spark's single-distinct rewrite (AggUtils.planAggregateWithOneDistinct) produces a four-stage plan where the non-distinct aggregate's Partial is separated from its Final by intermediate PartialMerge stages:
Final(pa, count) <- consumes buffers
PartialMerge(pa) + Partial(count distinct)
PartialMerge(pa) <- consumes buffers, keyed by the distinct column
Partial(pa) <- produces the incompatible buffer
If any stage of this chain falls back to Spark while another converts to Comet (e.g. because the distinct aggregate or its grouping key is unsupported by Comet, or via the comet.exec.*HashAggregate.enabled test configs), the incompatible buffer crosses the engine boundary.
Steps to reproduce
Any incompatible-buffer aggregate combined with a distinct aggregate, forced to split. For example with a TypedImperativeAggregate:
SELECT collect_set(v), count(DISTINCT k) FROM t GROUP BY g
Forcing the split with spark.comet.exec.finalHashAggregate.enabled=false (Comet does the partial/merge, Spark does the final) crashes, because Spark's aggregate tries to deserialize a Comet-encoded buffer.
Expected behavior
When an incompatible-buffer aggregate cannot run entirely in one engine across the whole distinct-rewrite chain, the entire chain for that aggregate must fall back to Spark, as already happens for the simple Partial → Final case.
Additional context
Surfaced by the approx_percentile / percentile_approx work (#4801): Spark's ObjectHashAggregateSuite "randomized aggregation test - [typed, with distinct]" crashed with a native panic
quantile_summaries.rs: range end index 100 out of range for slice of length 96
because Comet's Greenwald-Khanna digest is wildly different from Spark's PercentileDigest, so the boundary panics loudly. Other affected aggregates (collect_set, collect_list, exact percentile, and the declarative avg / decimal sum / variance family) have structurally similar buffers and instead risk silent wrong results, which is why this went unnoticed.
Affected: any aggregate with supportsMixedPartialFinal == false used together with a distinct aggregate.
Describe the bug
An aggregate whose intermediate buffer format is incompatible between Spark and Comet (
CometAggregateExpressionSerde.supportsMixedPartialFinal == false) can be split across Comet and Spark within a distinct-aggregate rewrite, so a Comet-encoded partial buffer is handed to a Spark aggregate (or vice versa). Depending on the aggregate this produces a hard crash or silently wrong results.This is a pre-existing gap in the mixed-execution guards added for #1389. Those guards only handle a direct single-stage
Partial → Finalpair:CometExecRule.tagUnsafePartialAggregates/findPartialAggInPlanonly looked for aPartialaggregate feeding directly into a non-convertibleFinal, and stopped at intermediate aggregate stages.CometBaseAggregate.doConvertonly guarded theFinalmode (sparkFinalMode), not intermediatePartialMergestages.Spark's single-distinct rewrite (
AggUtils.planAggregateWithOneDistinct) produces a four-stage plan where the non-distinct aggregate'sPartialis separated from itsFinalby intermediatePartialMergestages:If any stage of this chain falls back to Spark while another converts to Comet (e.g. because the distinct aggregate or its grouping key is unsupported by Comet, or via the
comet.exec.*HashAggregate.enabledtest configs), the incompatible buffer crosses the engine boundary.Steps to reproduce
Any incompatible-buffer aggregate combined with a distinct aggregate, forced to split. For example with a
TypedImperativeAggregate:Forcing the split with
spark.comet.exec.finalHashAggregate.enabled=false(Comet does the partial/merge, Spark does the final) crashes, because Spark's aggregate tries to deserialize a Comet-encoded buffer.Expected behavior
When an incompatible-buffer aggregate cannot run entirely in one engine across the whole distinct-rewrite chain, the entire chain for that aggregate must fall back to Spark, as already happens for the simple
Partial → Finalcase.Additional context
Surfaced by the
approx_percentile/percentile_approxwork (#4801): Spark'sObjectHashAggregateSuite"randomized aggregation test - [typed, with distinct]" crashed with a native panicbecause Comet's Greenwald-Khanna digest is wildly different from Spark's
PercentileDigest, so the boundary panics loudly. Other affected aggregates (collect_set,collect_list, exactpercentile, and the declarativeavg/ decimalsum/ variance family) have structurally similar buffers and instead risk silent wrong results, which is why this went unnoticed.Affected: any aggregate with
supportsMixedPartialFinal == falseused together with a distinct aggregate.