[SPARK-55224][PYTHON] Use Spark DataType as ground truth in Pandas-Arrow serialization#53992
[SPARK-55224][PYTHON] Use Spark DataType as ground truth in Pandas-Arrow serialization#53992Yicong-Huang wants to merge 33 commits intoapache:masterfrom
Conversation
JIRA Issue Information=== Improvement SPARK-55224 === This comment was automatically generated by GitHub Actions |
…tor/spark-type-ground-truth
…tor/spark-type-ground-truth # Conflicts: # python/pyspark/sql/pandas/serializers.py
| spark_type : DataType, optional | ||
| If None, spark type converted from arrow_type will be used | ||
| arrow_cast: bool, optional | ||
| The Spark type to use. If None, pyarrow's inferred type will be used. |
There was a problem hiding this comment.
wait, the spark type here is the return type?
I think it should never be None?
There was a problem hiding this comment.
updated. there are some cases where spark type is not available, using from_arrow_type to get it.
There was a problem hiding this comment.
createDataFrame (conversion.py, connect/session.py): spark_type can be None for non-timestamp columns when user doesn't provide a schema. This is existing behavior on master:
spark_types = [
TimestampType() if is_datetime64_dtype(t) ...
else None # Non-timestamp columns get None
for t in data.dtypes
]
And later when the type is None (in this case, both spark type and arrow type will be None in master), pyarrow will try to infer.
return pa.Array.from_pandas(
series, mask=mask, type=None, safe=self._safecheck
)
> type : pyarrow.DataType, optional
> If not provided, the Arrow type is inferred from the pandas dtype.
There was a problem hiding this comment.
can we factor out the usage of createDataFrame first?
There was a problem hiding this comment.
That makes sense, thanks for the suggestion.
I want to make sure I understand what you mean by factoring out the createDataFrame usage.
In this PR I am not changing the createDataFrame behavior. Spark still allows users to pass an optional schema (see doc here), and when it is not provided, we can end up without a Spark type at this stage. That is already part of the current behavior. So even if we refactor and isolate the createDataFrame-related logic, we would still have cases where the Spark type is None before Arrow conversion.
Are you suggesting that instead we should make createDataFrame always let Arrow infer the type first, and then convert that inferred Arrow type back into a Spark type, so that downstream we can assume the Spark type is always defined?
There was a problem hiding this comment.
If so, maybe we can do it after this PR, and then change the optional spark type to be required?
There was a problem hiding this comment.
one major goal of this refactoring is to eliminate confusion in UDF.
Are you suggesting that instead we should make createDataFrame always let Arrow infer the type first, and then convert that inferred Arrow type back into a Spark type, so that downstream we can assume the Spark type is always defined?
this can be a solution if it works. Otherwise, we should copy the necessary code to createDataFrame
There was a problem hiding this comment.
got it. I went ahead implemented the above logic. PR is open here #54092!
|
@zhengruifeng this is ready as well! |
Combine prefers_large_types from HEAD with struct_in_pandas, ndarray_as_list, df_for_struct parameters from upstream/master.
|
merged to master |
| "row" | ||
| if ( | ||
| eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF | ||
| or runner_conf.use_legacy_pandas_udf_conversion |
There was a problem hiding this comment.
@Yicong-Huang why adding or runner_conf.use_legacy_pandas_udf_conversion here?
the use_legacy_pandas_udf_conversion is supposed to only take effect in SQL_ARROW_BATCHED_UDF.
Suppose the eval type is SQL_SCALAR_PANDAS_UDF and the config use_legacy_pandas_udf_conversion is true,
the struct_in_pandas was changed from dict -> row?
There was a problem hiding this comment.
Good catch! You're right.. the or runner_conf.use_legacy_pandas_udf_conversion is redundant here. When use_legacy_pandas_udf_conversion=True and eval_type=SQL_ARROW_BATCHED_UDF, the earlier elif condition (line 2759) doesn't match, so it falls through to the else branch and the or clause is unnecessary for that case.
And as you pointed out, it has an unintended side effect: if use_legacy_pandas_udf_conversion=True while eval_type is something else (e.g., SQL_SCALAR_PANDAS_UDF), it would incorrectly change struct_in_pandas from "dict" to "row".
I've created a follow-up PR to fix this: #54212
| ) | ||
| ndarray_as_list = ( | ||
| eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF | ||
| or runner_conf.use_legacy_pandas_udf_conversion |
What changes were proposed in this pull request?
Let
_create_batchand_create_arrayin PySpark's Pandas serializers to use Spark'sDataTypeas the single source of truth, deriving Arrow types internally when needed.Before: Callers in
worker.pypre-computedarrow_return_type = to_arrow_type(return_type, ...)and passed botharrow_typeandspark_typethrough the serialization pipeline.After: Callers pass only
spark_type(Spark DataType). The serializers derivearrow_typeinternally viato_arrow_type().Key changes:
worker.pyupdated to yieldreturn_typeinstead ofarrow_return_typeArrowStreamArrowUDFSerializer) unchanged - they still passarrow_typedirectlyWhy are the changes needed?
spark_typeis the canonical type representation defined by usersarrow_type_create_batchand_create_arraynow follow the same patternDoes this PR introduce any user-facing change?
No. This is an internal refactoring with no user-facing API changes.
How was this patch tested?
Existing tests.
Was this patch authored or co-authored using generative AI tooling?
No.