Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions spark/src/main/scala/org/apache/comet/expressions/CometCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,29 @@ import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType,

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withFallbackReason}
import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported}
import org.apache.comet.serde.{CodegenDispatchFallback, CometExpressionSerde, Compatible, ExprOuterClass, Incompatible, SupportLevel, Unsupported}
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProtoInternal, serializeDataType}
import org.apache.comet.shims.CometExprShim

object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
object CometCast
extends CometExpressionSerde[Cast]
with CometExprShim
with CodegenDispatchFallback {

// Shared with CometCastSuite so the asserted reason cannot drift from production.
private[comet] val negativeScaleDecimalToStringReason: String =
"Negative-scale decimal requires spark.sql.legacy.allowNegativeScaleOfDecimal=true"

// When `spark.sql.legacy.castComplexTypesToString.enabled` is true, Spark wraps maps and
// structs with `[]` (instead of `{}`) when casting to string, and omits NULL elements of
// structs/maps/arrays (instead of rendering them as the literal "null"). Comet only
// implements the default formatting, so fall back to Spark for any array/map/struct to-string
// cast when the flag is enabled. The flag is internal in Spark 4.0 and defaults to false.
// structs/maps/arrays (instead of rendering them as the literal "null"). Comet's native cast
// only implements the default formatting, so when the flag is on we mark the cast Incompatible
// and let the [[CodegenDispatchFallback]] trait route it through the JVM codegen dispatcher
// (Spark's own `doGenCode` inside the Comet kernel) so results still match Spark exactly. The
// flag is internal in Spark 4.0 and defaults to false.
private[comet] val legacyCastComplexTypesToStringReason: String =
"spark.sql.legacy.castComplexTypesToString.enabled=true is not supported"
"spark.sql.legacy.castComplexTypesToString.enabled=true is not supported natively"

private def legacyCastComplexTypesToString: Boolean =
SQLConf.get
Expand Down Expand Up @@ -166,7 +171,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
if (toType == DataTypes.StringType && legacyCastComplexTypesToString && (fromType
.isInstanceOf[ArrayType] || fromType.isInstanceOf[StructType] ||
fromType.isInstanceOf[MapType])) {
return Unsupported(Some(legacyCastComplexTypesToStringReason))
return Incompatible(Some(legacyCastComplexTypesToStringReason))
}

(fromType, toType) match {
Expand Down
22 changes: 22 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/aggregates.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ object CometMax extends CometAggregateExpressionSerde[Max] {
}

object CometCount extends CometAggregateExpressionSerde[Count] {

// When `spark.sql.legacy.allowParameterlessCount=true`, Spark allows `count()` with no
// arguments and treats it as `count(*)`. Comet's native planner asserts on non-empty children
// and would panic on such an expression, so mark it Unsupported here and let the aggregate fall
// back to Spark. Aggregate serdes have no [[CodegenDispatchFallback]] path (aggregates cannot
// be routed through the JVM codegen dispatcher), so a clean Spark fallback is the appropriate
// outcome. Under the default config value, Spark's analyzer rejects parameterless `count()` so
// this branch is unreachable.
private val legacyAllowParameterlessCountReason: String =
"`spark.sql.legacy.allowParameterlessCount=true` produces `count()` with no children, which " +
"the native planner does not support"

override def getUnsupportedReasons(): Seq[String] = Seq(legacyAllowParameterlessCountReason)

override def getSupportLevel(expr: Count): SupportLevel = {
if (expr.children.isEmpty) {
Unsupported(Some(legacyAllowParameterlessCountReason))
} else {
Compatible()
}
}

override def convert(
aggExpr: AggregateExpression,
expr: Count,
Expand Down
29 changes: 26 additions & 3 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,28 @@ object CometArrayJoin
}
}

object CometArrayInsert extends CometExpressionSerde[ArrayInsert] {
object CometArrayInsert extends CometExpressionSerde[ArrayInsert] with CodegenDispatchFallback {

override def getSupportLevel(expr: ArrayInsert): SupportLevel = Compatible()
// Spark's `spark.sql.legacy.negativeIndexInArrayInsert=true` changes how a 0-based/negative
// position is interpreted. Rather than maintain a parallel native code path for the legacy
// semantics, mark `array_insert` Incompatible when the flag is on so
// [[CodegenDispatchFallback]] routes the expression through the JVM codegen dispatcher
// (Spark's own `doGenCode` inside the Comet kernel) — that gives Spark-exact results
// without duplicating the legacy branch natively.
private val legacyNegativeIndexConfig = "spark.sql.legacy.negativeIndexInArrayInsert"

private val legacyNegativeIndexReason =
s"`$legacyNegativeIndexConfig=true` legacy negative-index semantics are not implemented natively"

override def getIncompatibleReasons(): Seq[String] = Seq(legacyNegativeIndexReason)

override def getSupportLevel(expr: ArrayInsert): SupportLevel = {
if (SQLConf.get.getConfString(legacyNegativeIndexConfig, "false").toBoolean) {
Incompatible(Some(legacyNegativeIndexReason))
} else {
Compatible()
}
}

override def convert(
expr: ArrayInsert,
Expand All @@ -460,8 +479,12 @@ object CometArrayInsert extends CometExpressionSerde[ArrayInsert] {
val srcExprProto = exprToProtoInternal(expr.children.head, inputs, binding)
val posExprProto = exprToProtoInternal(expr.children(1), inputs, binding)
val itemExprProto = exprToProtoInternal(expr.children(2), inputs, binding)
// Reached in two cases:
// 1. Legacy conf is false → getSupportLevel returned Compatible → run native.
// 2. Legacy conf is true AND user set allowIncompatible=true → opt in to native.
// In case (2) the native impl honors the legacy semantics directly so we forward the flag.
val legacyNegativeIndex =
SQLConf.get.getConfString("spark.sql.legacy.negativeIndexInArrayInsert").toBoolean
SQLConf.get.getConfString(legacyNegativeIndexConfig, "false").toBoolean
if (srcExprProto.isDefined && posExprProto.isDefined && itemExprProto.isDefined) {
val arrayInsertBuilder = ExprOuterClass.ArrayInsert
.newBuilder()
Expand Down
11 changes: 8 additions & 3 deletions spark/src/main/scala/org/apache/comet/serde/maps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,17 @@ object CometMapFromEntries
}
}

object CometStrToMap extends CometScalarFunction[StringToMap]("str_to_map") with CometTypeShim {
object CometStrToMap
extends CometScalarFunction[StringToMap]("str_to_map")
with CometTypeShim
with CodegenDispatchFallback {

// Spark 4.1.1+ honours spark.sql.legacy.truncateForEmptyRegexSplit by truncating trailing
// empty entries from the split result. Comet's native str_to_map always behaves as if the flag
// were false, so it is incompatible when legacy truncation is enabled. Read by string key so it
// resolves on older Spark versions where the config is not registered.
// were false. When the flag is true, mark this Incompatible so the CodegenDispatchFallback
// trait routes the expression through the JVM codegen dispatcher (Spark's own doGenCode inside
// the Comet kernel) rather than falling the entire projection back to Spark. Read by string
// key so it resolves on older Spark versions where the config is not registered.
private val legacyTruncateConfig = "spark.sql.legacy.truncateForEmptyRegexSplit"

private val legacyTruncateReason =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- When `spark.sql.legacy.allowParameterlessCount=true`, Spark accepts `count()` (no arguments)
-- and treats it as `count(*)`. Comet's native planner asserts non-empty children on Count, so
-- `CometCount.getSupportLevel` marks parameterless Count `Unsupported` and lets the aggregate
-- fall back to Spark. Aggregate serdes do not have a JVM codegen dispatcher path, so the
-- Spark fallback is the correct outcome.

-- Config: spark.sql.legacy.allowParameterlessCount=true

statement
CREATE TABLE test_count_parameterless(i int, grp string) USING parquet

statement
INSERT INTO test_count_parameterless VALUES (1, 'x'), (2, 'x'), (NULL, 'y'), (3, 'y'), (NULL, 'y')

-- Parameterless count() falls back to Spark; the aggregate result must still match Spark.
query expect_fallback(spark.sql.legacy.allowParameterlessCount=true)
SELECT count() FROM test_count_parameterless

-- Parameterless count() with GROUP BY.
query expect_fallback(spark.sql.legacy.allowParameterlessCount=true)
SELECT grp, count() FROM test_count_parameterless GROUP BY grp ORDER BY grp

-- Parameterless count() on empty table.
statement
CREATE TABLE test_count_empty(i int) USING parquet

query expect_fallback(spark.sql.legacy.allowParameterlessCount=true)
SELECT count() FROM test_count_empty
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Tests array_insert with legacy negative index mode enabled but without opting into the
-- native (incompatible) path. `CometArrayInsert` mixes in [[CodegenDispatchFallback]] so with
-- spark.sql.legacy.negativeIndexInArrayInsert=true the expression is routed through the JVM
-- codegen dispatcher (Spark's own `doGenCode` inside the Comet kernel), producing Spark-exact
-- results without a Spark fallback and without touching the native legacy branch.
-- The companion file array_insert_legacy.sql covers the allowIncompatible=true opt-in path.

-- ConfigMatrix: parquet.enable.dictionary=false,true
-- Config: spark.sql.legacy.negativeIndexInArrayInsert=true

-- -1 inserts before last element in legacy mode
query
SELECT array_insert(array(1, 2, 3), -1, 10)

-- -2 inserts before second-to-last
query
SELECT array_insert(array(1, 2, 3), -2, 10)

-- -3 inserts before first element
query
SELECT array_insert(array(1, 2, 3), -3, 10)

-- negative beyond start with null padding (legacy mode pads differently)
query
SELECT array_insert(array(1, 2, 3), -5, 10)

-- far negative beyond start
query
SELECT array_insert(array(1, 3, 4), -2, 2)

-- column-based test
statement
CREATE TABLE test_ai_legacy_dispatch(arr array<int>, pos int, val int) USING parquet

statement
INSERT INTO test_ai_legacy_dispatch VALUES
(array(1, 2, 3), -1, 10),
(array(4, 5), -1, 20),
(array(1, 2, 3), -4, 10),
(NULL, -1, 10)

query
SELECT array_insert(arr, pos, val) FROM test_ai_legacy_dispatch
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,26 @@

-- When `spark.sql.legacy.castComplexTypesToString.enabled` is true Spark wraps maps and
-- structs with `[...]` (instead of `{...}`) and omits NULL elements of structs/maps/arrays
-- (instead of rendering them as the literal "null"). Comet only implements the default
-- formatting, so any array/map/struct → string cast must fall back to Spark.
-- (instead of rendering them as the literal "null"). Comet's native cast does not implement
-- the legacy formatting; the [[CodegenDispatchFallback]] mixin on `CometCast` routes these
-- casts through the JVM codegen dispatcher (Spark's own `doGenCode` inside the Comet kernel)
-- so results match Spark exactly without a Spark fallback.
-- The flag is internal in Spark 4.0 and defaults to false.

-- Config: spark.sql.legacy.castComplexTypesToString.enabled=true

-- Struct → string falls back.
query expect_fallback(spark.sql.legacy.castComplexTypesToString.enabled=true is not supported)
-- Struct → string routed through the codegen dispatcher.
query
SELECT CAST(struct(1, 2, null) AS STRING)

-- Array → string falls back (NULL elements rendered differently between modes).
query expect_fallback(spark.sql.legacy.castComplexTypesToString.enabled=true is not supported)
-- Array → string routed through the codegen dispatcher.
query
SELECT CAST(array(1, 2, null) AS STRING)

-- Map → string falls back (`[]` vs `{}` wrapping differs between modes).
query expect_fallback(spark.sql.legacy.castComplexTypesToString.enabled=true is not supported)
-- Map → string routed through the codegen dispatcher.
query
SELECT CAST(map('a', 1, 'b', null) AS STRING)

-- Nested complex types still fall back through the outer type.
query expect_fallback(spark.sql.legacy.castComplexTypesToString.enabled=true is not supported)
-- Nested complex types also routed through the codegen dispatcher via the outer type.
query
SELECT CAST(struct(array(1, null), map('k', null)) AS STRING)
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
-- specific language governing permissions and limitations
-- under the License.

-- Tests that str_to_map falls back to Spark when
-- spark.sql.legacy.truncateForEmptyRegexSplit is enabled. In legacy mode Spark truncates trailing
-- empty entries from the split result, which Comet's native str_to_map does not honour.
-- Tests that str_to_map routes through the JVM codegen dispatcher when
-- spark.sql.legacy.truncateForEmptyRegexSplit is enabled. In legacy mode Spark truncates
-- trailing empty entries from the split result, which Comet's native str_to_map does not
-- honour. `CometStrToMap` marks the expression Incompatible when the flag is on and mixes in
-- [[CodegenDispatchFallback]], so the projection stays native (Spark's own `doGenCode` runs
-- inside the Comet kernel) while producing Spark-exact results.
-- See https://github.com/apache/datafusion-comet/issues/4477

-- Config: spark.sql.legacy.truncateForEmptyRegexSplit=true

-- trailing pair delimiter: legacy mode truncates the trailing empty entry, so Comet must fall
-- back to Spark
query expect_fallback(truncateForEmptyRegexSplit)
-- trailing pair delimiter: legacy mode truncates the trailing empty entry; Comet delegates to
-- the codegen dispatcher.
query
SELECT str_to_map('a:1,b:2,', ',', ':')

-- column input also falls back
-- column input is also handled via the codegen dispatcher
statement
CREATE TABLE test_str_to_map_legacy(s STRING, pair_delim STRING, key_value_delim STRING) USING parquet

Expand All @@ -37,5 +40,5 @@ INSERT INTO test_str_to_map_legacy VALUES
('x:1;y:2;', ';', ':'),
(NULL, ',', ':')

query expect_fallback(truncateForEmptyRegexSplit)
query
SELECT str_to_map(s, pair_delim, key_value_delim) FROM test_str_to_map_legacy
Loading