-
Notifications
You must be signed in to change notification settings - Fork 29k
[WIP] POC for serializer changes #54075
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Yicong-Huang
wants to merge
37
commits into
apache:master
Choose a base branch
from
Yicong-Huang:SPARK-55175/poc/simplify-serializers
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
[WIP] POC for serializer changes #54075
Yicong-Huang
wants to merge
37
commits into
apache:master
from
Yicong-Huang:SPARK-55175/poc/simplify-serializers
+1,752
−1,425
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Extract the struct flattening/wrapping logic from ArrowStreamUDFSerializer into reusable transformers in a new transformers.py module.
…use-flatten-struct' into SPARK-55176/refactor/extract-arrow-to-pandas-converter
Resolved conflicts by: - Adopting create_converter architecture while keeping arrow_to_pandas compatibility - Using _load_group_dataframes helper from upstream - Removing GroupArrowUDFSerializer (not in upstream) - Keeping input_type (singular) parameter name
…er/mapper layers
This refactoring separates concerns between serialization and data transformation:
- Serializers now only handle data serialization/deserialization
- Wrappers handle UDF wrapping and data format conversion
- Mappers handle UDF result aggregation using transformer utilities
Key changes:
1. Simplified serializer hierarchy by removing redundant classes:
- Removed ArrowStreamPandasUDFSerializer, ArrowStreamGroupUDFSerializer,
ArrowStreamArrowUDFSerializer, ArrowStreamUDTFSerializer
- Renamed ArrowStreamMapIterSerializer to ArrowStreamGroupSerializer
- Unified grouped/non-grouped UDF handling in ArrowStreamGroupSerializer
2. Introduced transformer utility classes:
- ArrowBatchTransformer: Arrow batch operations (wrap_struct, flatten_struct,
partial_batch, partial_table, concat_batches, merge_batches, reorder_columns)
- PandasBatchTransformer: Pandas/Arrow conversions (to_arrow, concat_series_batches)
3. Moved data transformations from serializers to wrappers/mappers:
- Moved to_arrow conversion from mappers to wrappers for Pandas agg UDFs
- Wrappers now return RecordBatch directly instead of (result, arrow_type) tuples
- Mappers simplified to use transformer methods for common operations
Benefits:
- Clearer separation of concerns
- Reduced code duplication through transformer utilities
- Easier to maintain and extend
- Consistent data format handling across UDF types
- Use ArrowBatchTransformer.zip_batches for type coercion instead of manual loop - Simplify error handling logic - Handle empty struct case properly - Unwrap wrapped batches from worker before type coercion, then wrap back for JVM - All 46/47 UDTF tests pass (1 known failure unrelated to this change)
|
This commit fixes several issues with pandas UDF handling after the
serializer refactoring:
1. Fix parameter initialization order in read_udfs():
- Move pandas_udf_* parameter defaults BEFORE the if-elif chain
- Previously they were reset AFTER being set, causing df_for_struct
to always be False for scalar pandas UDFs
2. Add struct_in_pandas="dict" to scalar UDF wrappers:
- wrap_scalar_pandas_udf: enables DataFrame→struct array conversion
- wrap_pandas_batch_iter_udf: same fix for iter variant
3. Fix grouped map UDF column matching:
- Use assign_cols_by_name to match DataFrame columns by name when
available, otherwise by position
- Handle empty DataFrame (0 columns) by creating empty struct array
4. Fix Arrow batch handling:
- zip_batches: convert items to list for pa.RecordBatch.from_arrays
- mapper: special handling for SQL_ARROW_BATCHED_UDF to return raw
result instead of calling zip_batches
5. Fix error handling in create_array:
- Only catch ArrowInvalid for arrow_cast retry (not ArrowTypeError)
- Add ArrowTypeError to TypeError handler for proper error messages
- Update error message format to match expected test output
All 264+ pandas UDF tests pass including struct type tests.
Remove timezone, int_to_decimal_coercion_enabled, and assign_cols_by_name parameters that were stored but never used by the serializer or subclasses.
…to-pandas conversion - Extract common verification functions: verify_result_length, verify_result_type, verify_is_iterable, verify_element_type - Simplify wrapper functions using common verification utilities - Centralize Arrow-to-pandas conversion in read_udfs mapper/func - Remove unused pandas_udf_* variables from read_udfs - Fix is_scalar_pandas_iter to not convert Arrow iter UDFs to pandas
- Separate iter UDF branches by type for clarity (scalar pandas/arrow, map pandas, map arrow) - Wrapper functions now return (result, arrow_return_type) tuples, with output conversion centralized in mapper/func - Grouped map pandas UDFs now receive Iterator[DataFrame] directly instead of Iterator[List[Series]] - Inline concat_series_batches and series_batches_to_dataframe methods
…implify-serializers
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
To be filled