Skip to content

Conversation

@Yicong-Huang
Copy link
Contributor

To be filled

Extract the struct flattening/wrapping logic from ArrowStreamUDFSerializer into reusable transformers in a new transformers.py module.
…use-flatten-struct' into SPARK-55176/refactor/extract-arrow-to-pandas-converter
Resolved conflicts by:
- Adopting create_converter architecture while keeping arrow_to_pandas compatibility
- Using _load_group_dataframes helper from upstream
- Removing GroupArrowUDFSerializer (not in upstream)
- Keeping input_type (singular) parameter name
…er/mapper layers

This refactoring separates concerns between serialization and data transformation:
- Serializers now only handle data serialization/deserialization
- Wrappers handle UDF wrapping and data format conversion
- Mappers handle UDF result aggregation using transformer utilities

Key changes:
1. Simplified serializer hierarchy by removing redundant classes:
   - Removed ArrowStreamPandasUDFSerializer, ArrowStreamGroupUDFSerializer,
     ArrowStreamArrowUDFSerializer, ArrowStreamUDTFSerializer
   - Renamed ArrowStreamMapIterSerializer to ArrowStreamGroupSerializer
   - Unified grouped/non-grouped UDF handling in ArrowStreamGroupSerializer

2. Introduced transformer utility classes:
   - ArrowBatchTransformer: Arrow batch operations (wrap_struct, flatten_struct,
     partial_batch, partial_table, concat_batches, merge_batches, reorder_columns)
   - PandasBatchTransformer: Pandas/Arrow conversions (to_arrow, concat_series_batches)

3. Moved data transformations from serializers to wrappers/mappers:
   - Moved to_arrow conversion from mappers to wrappers for Pandas agg UDFs
   - Wrappers now return RecordBatch directly instead of (result, arrow_type) tuples
   - Mappers simplified to use transformer methods for common operations

Benefits:
- Clearer separation of concerns
- Reduced code duplication through transformer utilities
- Easier to maintain and extend
- Consistent data format handling across UDF types
- Use ArrowBatchTransformer.zip_batches for type coercion instead of manual loop
- Simplify error handling logic
- Handle empty struct case properly
- Unwrap wrapped batches from worker before type coercion, then wrap back for JVM
- All 46/47 UDTF tests pass (1 known failure unrelated to this change)
@Yicong-Huang Yicong-Huang marked this pull request as draft January 30, 2026 21:47
@github-actions
Copy link

⚠️ Pull Request Title Validation

This pull request title does not contain a JIRA issue ID.

Please update the title to either:

  • Include a JIRA ID: [SPARK-12345] Your description
  • Mark as minor change: [MINOR] Your description

For minor changes that don't require a JIRA ticket (e.g., typo fixes), please prefix the title with [MINOR].


This comment was automatically generated by GitHub Actions

This commit fixes several issues with pandas UDF handling after the
serializer refactoring:

1. Fix parameter initialization order in read_udfs():
   - Move pandas_udf_* parameter defaults BEFORE the if-elif chain
   - Previously they were reset AFTER being set, causing df_for_struct
     to always be False for scalar pandas UDFs

2. Add struct_in_pandas="dict" to scalar UDF wrappers:
   - wrap_scalar_pandas_udf: enables DataFrame→struct array conversion
   - wrap_pandas_batch_iter_udf: same fix for iter variant

3. Fix grouped map UDF column matching:
   - Use assign_cols_by_name to match DataFrame columns by name when
     available, otherwise by position
   - Handle empty DataFrame (0 columns) by creating empty struct array

4. Fix Arrow batch handling:
   - zip_batches: convert items to list for pa.RecordBatch.from_arrays
   - mapper: special handling for SQL_ARROW_BATCHED_UDF to return raw
     result instead of calling zip_batches

5. Fix error handling in create_array:
   - Only catch ArrowInvalid for arrow_cast retry (not ArrowTypeError)
   - Add ArrowTypeError to TypeError handler for proper error messages
   - Update error message format to match expected test output

All 264+ pandas UDF tests pass including struct type tests.
Remove timezone, int_to_decimal_coercion_enabled, and assign_cols_by_name
parameters that were stored but never used by the serializer or subclasses.
…to-pandas conversion

- Extract common verification functions: verify_result_length,
  verify_result_type, verify_is_iterable, verify_element_type
- Simplify wrapper functions using common verification utilities
- Centralize Arrow-to-pandas conversion in read_udfs mapper/func
- Remove unused pandas_udf_* variables from read_udfs
- Fix is_scalar_pandas_iter to not convert Arrow iter UDFs to pandas
- Separate iter UDF branches by type for clarity (scalar pandas/arrow,
  map pandas, map arrow)
- Wrapper functions now return (result, arrow_return_type) tuples,
  with output conversion centralized in mapper/func
- Grouped map pandas UDFs now receive Iterator[DataFrame] directly
  instead of Iterator[List[Series]]
- Inline concat_series_batches and series_batches_to_dataframe methods
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant