Skip to content

Conversation

@omatthew98
Copy link
Contributor

Why are these changes needed?

If we had an execution where we needed to concatenate native pyarrow types and pyarrow extension types, we would get errors like the following:

⚠️  Dataset dataset_5_0 execution failed: : 0.00 row [00:00, ? row/s]
- Repartition 1: 0.00 row [00:00, ? row/s]
  *- Split Repartition: : 0.00 row [00:00, ? row/s]
2025-09-22 17:21:34,068 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2025-09-22 17:21:34,068	ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/Users/mowen/code/ray/python/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/plan.py", line 533, in execute
    blocks = execute_to_legacy_block_list(
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 127, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 175, in _bundles_to_block_list
    bundle_list = list(bundles)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 34, in __next__
    return self.get_next()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 680, in get_next
    bundle = state.get_output_blocking(output_split_idx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 373, in get_output_blocking
    raise self._exception
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 331, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 475, in _scheduling_loop_step
    update_operator_states(topology)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 586, in update_operator_states
    op.all_inputs_done()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/operators/base_physical_operator.py", line 122, in all_inputs_done
    self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/repartition.py", line 84, in split_repartition_fn
    return scheduler.execute(refs, num_outputs, ctx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py", line 106, in execute
    ] = reduce_bar.fetch_until_complete(list(reduce_metadata_schema))
  File "/Users/mowen/code/ray/python/ray/data/_internal/progress_bar.py", line 166, in fetch_until_complete
    for ref, result in zip(done, ray.get(done)):
  File "/Users/mowen/code/ray/python/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/client_mode_hook.py", line 104, in wrapper
    return func(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 2952, in get
    values, debugger_breakpoint = worker.get_objects(
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 1025, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RuntimeError): ray::reduce() (pid=7442, ip=127.0.0.1)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 128, in reduce
    new_block = builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/delegating_block_builder.py", line 68, in build
    return self._builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/table_block.py", line 144, in build
    return self._concat_tables(tables)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_block.py", line 161, in _concat_tables
    return transform_pyarrow.concat(tables, promote_types=True)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 706, in concat
    col = _concatenate_chunked_arrays(col_chunked_arrays)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 397, in _concatenate_chunked_arrays
    raise RuntimeError(f"Types mismatch: {type_} != {arr.type}")
RuntimeError: Types mismatch: uint64 != double
2025-09-22 17:21:34,069	ERROR worker.py:429 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::reduce() (pid=7442, ip=127.0.0.1)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 128, in reduce
    new_block = builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/delegating_block_builder.py", line 68, in build
    return self._builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/table_block.py", line 144, in build
    return self._concat_tables(tables)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_block.py", line 161, in _concat_tables
    return transform_pyarrow.concat(tables, promote_types=True)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 706, in concat
    col = _concatenate_chunked_arrays(col_chunked_arrays)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 397, in _concatenate_chunked_arrays
    raise RuntimeError(f"Types mismatch: {type_} != {arr.type}")
RuntimeError: Types mismatch: uint64 != double

This PR adds a test that replicates this and fixes the underlying issue by concatenating extension types and native types separately before rejoining them.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@omatthew98 omatthew98 requested a review from a team as a code owner September 23, 2025 00:23
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the concat logic in transform_pyarrow.py to correctly handle concatenation of tables with a mix of native pyarrow types and extension types, fixing a type mismatch error. The approach is to separate columns by type (native vs. extension), concatenate each group using the appropriate method, and then rejoin them. A new test case is added to validate this fix.

The refactoring is a significant improvement in correctness and structure. I've made a few suggestions to further improve efficiency and code clarity by optimizing the column processing loop, removing an obsolete TODO, and ensuring type hint consistency.

Comment on lines 737 to 758
for col_name in schema.names:
col_type = schema.field(col_name).type

col_chunked_arrays = []
for block in blocks:
col_chunked_arrays.append(block.column(col_name))

if col_name in cols_with_null_list:
concatenated_cols[col_name] = _concat_cols_with_null_list(
col_chunked_arrays
)
elif isinstance(col_type, tensor_types):
concatenated_cols[col_name] = _concat_cols_with_extension_tensor_types(
col_chunked_arrays
)
elif isinstance(col_type, ArrowPythonObjectType):
concatenated_cols[col_name] = _concat_cols_with_extension_object_types(
col_chunked_arrays
)
else:
# Add to the list of native pyarrow columns, these will be concatenated after the loop using pyarrow.concat_tables
native_pyarrow_cols.add(col_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This loop is a bit inefficient. For every column, it constructs col_chunked_arrays by iterating over all blocks, but this list is only used for extension-type columns. For native pyarrow columns, this work is discarded.

To improve performance and clarity, you could first iterate through the schema to categorize all column names into native or different extension types. Then, process the native columns in a single batch call, and iterate through the extension columns to process them individually. This avoids redundantly creating col_chunked_arrays for native columns.

Here's a sketch of the proposed refactoring:

# Concatenate the columns according to their type
concatenated_cols = {}
native_pyarrow_cols = set()
extension_cols_to_process = {}

# First, categorize columns
for col_name in schema.names:
    col_type = schema.field(col_name).type
    if col_name in cols_with_null_list:
        extension_cols_to_process[col_name] = "null_list"
    elif isinstance(col_type, tensor_types):
        extension_cols_to_process[col_name] = "tensor"
    elif isinstance(col_type, ArrowPythonObjectType):
        extension_cols_to_process[col_name] = "object"
    else:
        native_pyarrow_cols.add(col_name)

# Process native columns in one batch
if native_pyarrow_cols:
    concatenated_cols.update(
        _concat_cols_with_native_pyarrow_types(
            list(native_pyarrow_cols), blocks, promote_types
        )
    )

# Process extension columns individually
for col_name, ext_type in extension_cols_to_process.items():
    col_chunked_arrays = [block.column(col_name) for block in blocks]
    if ext_type == "null_list":
        concatenated_cols[col_name] = _concat_cols_with_null_list(
            col_chunked_arrays
        )
    elif ext_type == "tensor":
        concatenated_cols[col_name] = _concat_cols_with_extension_tensor_types(
            col_chunked_arrays
        )
    elif ext_type == "object":
        concatenated_cols[col_name] = _concat_cols_with_extension_object_types(
            col_chunked_arrays
        )

cursor[bot]

This comment was marked as outdated.

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Sep 23, 2025
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, minor comments

Comment on lines +606 to +630
# For each opaque list column, iterate through all schemas until
# we find a valid value_type that can be used to override the
# column types in the following for-loop.
scalar_type = None
for arr in col_chunked_arrays:
if not pa.types.is_list(arr.type) or not pa.types.is_null(arr.type.value_type):
scalar_type = arr.type
break

if scalar_type is not None:
for c_idx in range(len(col_chunked_arrays)):
c = col_chunked_arrays[c_idx]
if pa.types.is_list(c.type) and pa.types.is_null(c.type.value_type):
if pa.types.is_list(scalar_type):
# If we are dealing with a list input,
# cast the array to the scalar_type found above.
col_chunked_arrays[c_idx] = c.cast(scalar_type)
else:
# If we are dealing with a single value, construct
# a new array with null values filled.
col_chunked_arrays[c_idx] = pa.chunked_array(
[pa.nulls(c.length(), type=scalar_type)]
)

return _concatenate_chunked_arrays(col_chunked_arrays)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're just moving this code around and not adding it, but it looks dubious to me and so i'd very much like to delete it, unless there are tests that would fail.

Let's delete this code and see if there are any tests covering it

@omatthew98 omatthew98 added the go add ONLY when ready to merge, run all tests label Sep 29, 2025
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) October 8, 2025 19:22
@alexeykudinkin alexeykudinkin merged commit 5e6e586 into ray-project:master Oct 8, 2025
7 checks passed
omatthew98 added a commit to omatthew98/ray that referenced this pull request Oct 8, 2025
aslonnie pushed a commit that referenced this pull request Oct 8, 2025
…tension types (#57566)

## Why are these changes needed?
Cherry-pick #56811 

Original description:
If we had an execution where we needed to concatenate native pyarrow
types and pyarrow extension types, we would get errors like the
following:
```
⚠️  Dataset dataset_5_0 execution failed: : 0.00 row [00:00, ? row/s]
- Repartition 1: 0.00 row [00:00, ? row/s]
  *- Split Repartition: : 0.00 row [00:00, ? row/s]
2025-09-22 17:21:34,068 ERROR exceptions.py:73 -- Exception occurred in Ray Data or Ray Core internal code. If you continue to see this error, please open an issue on the Ray project GitHub page with the full stack trace below: https://github.com/ray-project/ray/issues/new/choose
2025-09-22 17:21:34,068	ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/Users/mowen/code/ray/python/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/data/_internal/plan.py", line 533, in execute
    blocks = execute_to_legacy_block_list(
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 127, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 175, in _bundles_to_block_list
    bundle_list = list(bundles)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 34, in __next__
    return self.get_next()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 680, in get_next
    bundle = state.get_output_blocking(output_split_idx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 373, in get_output_blocking
    raise self._exception
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 331, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 475, in _scheduling_loop_step
    update_operator_states(topology)
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 586, in update_operator_states
    op.all_inputs_done()
  File "/Users/mowen/code/ray/python/ray/data/_internal/execution/operators/base_physical_operator.py", line 122, in all_inputs_done
    self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/repartition.py", line 84, in split_repartition_fn
    return scheduler.execute(refs, num_outputs, ctx)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py", line 106, in execute
    ] = reduce_bar.fetch_until_complete(list(reduce_metadata_schema))
  File "/Users/mowen/code/ray/python/ray/data/_internal/progress_bar.py", line 166, in fetch_until_complete
    for ref, result in zip(done, ray.get(done)):
  File "/Users/mowen/code/ray/python/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/client_mode_hook.py", line 104, in wrapper
    return func(*args, **kwargs)
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 2952, in get
    values, debugger_breakpoint = worker.get_objects(
  File "/Users/mowen/code/ray/python/ray/_private/worker.py", line 1025, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RuntimeError): ray::reduce() (pid=7442, ip=127.0.0.1)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 128, in reduce
    new_block = builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/delegating_block_builder.py", line 68, in build
    return self._builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/table_block.py", line 144, in build
    return self._concat_tables(tables)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_block.py", line 161, in _concat_tables
    return transform_pyarrow.concat(tables, promote_types=True)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 706, in concat
    col = _concatenate_chunked_arrays(col_chunked_arrays)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 397, in _concatenate_chunked_arrays
    raise RuntimeError(f"Types mismatch: {type_} != {arr.type}")
RuntimeError: Types mismatch: uint64 != double
2025-09-22 17:21:34,069	ERROR worker.py:429 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::reduce() (pid=7442, ip=127.0.0.1)
  File "/Users/mowen/code/ray/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 128, in reduce
    new_block = builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/delegating_block_builder.py", line 68, in build
    return self._builder.build()
  File "/Users/mowen/code/ray/python/ray/data/_internal/table_block.py", line 144, in build
    return self._concat_tables(tables)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_block.py", line 161, in _concat_tables
    return transform_pyarrow.concat(tables, promote_types=True)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 706, in concat
    col = _concatenate_chunked_arrays(col_chunked_arrays)
  File "/Users/mowen/code/ray/python/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 397, in _concatenate_chunked_arrays
    raise RuntimeError(f"Types mismatch: {type_} != {arr.type}")
RuntimeError: Types mismatch: uint64 != double
```
This PR adds a test that replicates this and fixes the underlying issue
by concatenating extension types and native types separately before
rejoining them.

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(
liulehui pushed a commit to liulehui/ray that referenced this pull request Oct 9, 2025
joshkodi pushed a commit to joshkodi/ray that referenced this pull request Oct 13, 2025
…tension types (ray-project#56811)

Signed-off-by: Josh Kodi <joshkodi@gmail.com>
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
xinyuangui2 pushed a commit to xinyuangui2/ray that referenced this pull request Oct 22, 2025
elliot-barn pushed a commit that referenced this pull request Oct 23, 2025
…tension types (#56811)

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…tension types (ray-project#56811)

Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…tension types (ray-project#56811)

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants