Skip to content

[SPARK-55224][PYTHON] Use Spark DataType as ground truth in Pandas-Arrow serialization#53992

Closed
Yicong-Huang wants to merge 33 commits intoapache:masterfrom
Yicong-Huang:SPARK-55224/refactor/spark-type-ground-truth
Closed

[SPARK-55224][PYTHON] Use Spark DataType as ground truth in Pandas-Arrow serialization#53992
Yicong-Huang wants to merge 33 commits intoapache:masterfrom
Yicong-Huang:SPARK-55224/refactor/spark-type-ground-truth

Conversation

@Yicong-Huang
Copy link
Contributor

@Yicong-Huang Yicong-Huang commented Jan 27, 2026

What changes were proposed in this pull request?

Let _create_batch and _create_array in PySpark's Pandas serializers to use Spark's DataType as the single source of truth, deriving Arrow types internally when needed.

Before: Callers in worker.py pre-computed arrow_return_type = to_arrow_type(return_type, ...) and passed both arrow_type and spark_type through the serialization pipeline.

After: Callers pass only spark_type (Spark DataType). The serializers derive arrow_type internally via to_arrow_type().

Key changes:

  • ~15 Pandas-based wrapper functions in worker.py updated to yield return_type instead of arrow_return_type
  • Arrow UDF functions (which use ArrowStreamArrowUDFSerializer) unchanged - they still pass arrow_type directly

Why are the changes needed?

  1. Single source of truth: spark_type is the canonical type representation defined by users
  2. Simplified API: Callers no longer need to pre-compute arrow_type
  3. Consistency: Both _create_batch and _create_array now follow the same pattern

Does this PR introduce any user-facing change?

No. This is an internal refactoring with no user-facing API changes.

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions
Copy link

JIRA Issue Information

=== Improvement SPARK-55224 ===
Summary: Use Spark DataType as ground truth in Pandas-Arrow serialization
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

spark_type : DataType, optional
If None, spark type converted from arrow_type will be used
arrow_cast: bool, optional
The Spark type to use. If None, pyarrow's inferred type will be used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, the spark type here is the return type?
I think it should never be None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated. there are some cases where spark type is not available, using from_arrow_type to get it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what cases?

Copy link
Contributor Author

@Yicong-Huang Yicong-Huang Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createDataFrame (conversion.py, connect/session.py): spark_type can be None for non-timestamp columns when user doesn't provide a schema. This is existing behavior on master:

spark_types = [
    TimestampType() if is_datetime64_dtype(t) ...
    else None  # Non-timestamp columns get None
    for t in data.dtypes
]

And later when the type is None (in this case, both spark type and arrow type will be None in master), pyarrow will try to infer.

return pa.Array.from_pandas(
    series, mask=mask, type=None, safe=self._safecheck
)
> type : pyarrow.DataType, optional
> If not provided, the Arrow type is inferred from the pandas dtype.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we factor out the usage of createDataFrame first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thanks for the suggestion.

I want to make sure I understand what you mean by factoring out the createDataFrame usage.

In this PR I am not changing the createDataFrame behavior. Spark still allows users to pass an optional schema (see doc here), and when it is not provided, we can end up without a Spark type at this stage. That is already part of the current behavior. So even if we refactor and isolate the createDataFrame-related logic, we would still have cases where the Spark type is None before Arrow conversion.

Are you suggesting that instead we should make createDataFrame always let Arrow infer the type first, and then convert that inferred Arrow type back into a Spark type, so that downstream we can assume the Spark type is always defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, maybe we can do it after this PR, and then change the optional spark type to be required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one major goal of this refactoring is to eliminate confusion in UDF.

Are you suggesting that instead we should make createDataFrame always let Arrow infer the type first, and then convert that inferred Arrow type back into a Spark type, so that downstream we can assume the Spark type is always defined?

this can be a solution if it works. Otherwise, we should copy the necessary code to createDataFrame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. I went ahead implemented the above logic. PR is open here #54092!

@Yicong-Huang
Copy link
Contributor Author

@zhengruifeng this is ready as well!

Combine prefers_large_types from HEAD with struct_in_pandas,
ndarray_as_list, df_for_struct parameters from upstream/master.
@zhengruifeng
Copy link
Contributor

merged to master

"row"
if (
eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF
or runner_conf.use_legacy_pandas_udf_conversion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yicong-Huang why adding or runner_conf.use_legacy_pandas_udf_conversion here?

the use_legacy_pandas_udf_conversion is supposed to only take effect in SQL_ARROW_BATCHED_UDF.

Suppose the eval type is SQL_SCALAR_PANDAS_UDF and the config use_legacy_pandas_udf_conversion is true,

the struct_in_pandas was changed from dict -> row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! You're right.. the or runner_conf.use_legacy_pandas_udf_conversion is redundant here. When use_legacy_pandas_udf_conversion=True and eval_type=SQL_ARROW_BATCHED_UDF, the earlier elif condition (line 2759) doesn't match, so it falls through to the else branch and the or clause is unnecessary for that case.
And as you pointed out, it has an unintended side effect: if use_legacy_pandas_udf_conversion=True while eval_type is something else (e.g., SQL_SCALAR_PANDAS_UDF), it would incorrectly change struct_in_pandas from "dict" to "row".
I've created a follow-up PR to fix this: #54212

)
ndarray_as_list = (
eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF
or runner_conf.use_legacy_pandas_udf_conversion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants