Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray data] readwebdataset2lance failed #48672

Closed
Jay-ju opened this issue Nov 9, 2024 · 0 comments · Fixed by #48674
Closed

[Ray data] readwebdataset2lance failed #48672

Jay-ju opened this issue Nov 9, 2024 · 0 comments · Fixed by #48674
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component)

Comments

@Jay-ju
Copy link
Contributor

Jay-ju commented Nov 9, 2024

What happened + What you expected to happen

024-11-09 16:06:15,377 ERROR exceptions.py:81 -- Full stack trace:
Traceback (most recent call last):
  File "/data00/code/ray/python/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/plan.py", line 522, in execute
    blocks = execute_to_legacy_block_list(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 123, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 169, in _bundles_to_block_list
    for ref_bundle in bundles:
  File "/data00/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
           ^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 153, in get_next
    item = self._outer._output_node.get_output_blocking(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 308, in get_output_blocking
    raise self._exception
  File "/data00/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 232, in run
    continue_sched = self._scheduling_loop_step(self._topology)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 287, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
                         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 503, in process_completed_tasks
    raise e from None
  File "/data00/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 465, in process_completed_tasks
    bytes_read = task.on_data_ready(
                 ^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 112, in on_data_ready
    raise ex from None
  File "/data00/code/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 108, in on_data_ready
    ray.get(block_ref)
  File "/data00/code/ray/python/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/_private/worker.py", line 2664, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(OSError): ray::Write() (pid=118762, ip=10.37.65.107)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/data00/code/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/planner/plan_write_op.py", line 26, in fn
    write_result = datasink_or_legacy_datasource.write(blocks, ctx)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/ray/sink.py", line 245, in write
    fragments_and_schema = _write_fragment(
                           ^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/ray/sink.py", line 87, in _write_fragment
    fragments = write_fragments(
                ^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/fragment.py", line 614, in write_fragments
    fragments = _write_fragments(
                ^^^^^^^^^^^^^^^^^
OSError: LanceError(Arrow): C Data interface error: Type error: ("Expected bytes, got a 'list' object", 'Conversion failed for column img_filename with type object'). Detail: Python exception: Traceback (most recent call last):
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/ray/sink.py", line 78, in record_batch_converter
    tbl = _pd_to_arrow(block, schema)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/ray/sink.py", line 44, in _pd_to_arrow
    tbl = pa.Table.from_pandas(df, schema=schema)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 4623, in pyarrow.lib.Table.from_pandas
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/pyarrow/pandas_compat.py", line 616, in dataframe_to_arrays
    arrays = [convert_column(c, f)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/pyarrow/pandas_compat.py", line 616, in <listcomp>
    arrays = [convert_column(c, f)
              ^^^^^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/pyarrow/pandas_compat.py", line 603, in convert_column
    raise e
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/pyarrow/pandas_compat.py", line 597, in convert_column
    result = pa.array(col, type=type_, from_pandas=True, safe=safe)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/array.pxi", line 358, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 85, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: ("Expected bytes, got a 'list' object", 'Conversion failed for column img_filename with type object')
, /data00/code/lance/rust/lance-datafusion/src/utils.rs:41:28
ray.data.exceptions.SystemException

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/data00/code/dataset_identity/learn/webdataset/webdataset2lance_json.py", line 209, in <module>
    ds.write_datasink(sink, concurrency=1)
  File "/data00/code/ray/python/ray/data/dataset.py", line 3614, in write_datasink
    self._write_ds = Dataset(plan, logical_plan).materialize()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/dataset.py", line 4642, in materialize
    copy._plan.execute()
  File "/data00/code/ray/python/ray/data/exceptions.py", line 89, in handle_trace
    raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(OSError): ray::Write() (pid=118762, ip=10.37.65.107)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/data00/code/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/code/ray/python/ray/data/_internal/planner/plan_write_op.py", line 26, in fn
    write_result = datasink_or_legacy_datasource.write(blocks, ctx)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/ray/sink.py", line 245, in write
    fragments_and_schema = _write_fragment(
                           ^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/ray/sink.py", line 87, in _write_fragment
    fragments = write_fragments(
                ^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/fragment.py", line 614, in write_fragments
    fragments = _write_fragments(
                ^^^^^^^^^^^^^^^^^
OSError: LanceError(Arrow): C Data interface error: Type error: ("Expected bytes, got a 'list' object", 'Conversion failed for column img_filename with type object'). Detail: Python exception: Traceback (most recent call last):
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/ray/sink.py", line 78, in record_batch_converter
    tbl = _pd_to_arrow(block, schema)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/lance/ray/sink.py", line 44, in _pd_to_arrow
    tbl = pa.Table.from_pandas(df, schema=schema)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/table.pxi", line 4623, in pyarrow.lib.Table.from_pandas
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/pyarrow/pandas_compat.py", line 616, in dataframe_to_arrays
    arrays = [convert_column(c, f)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/pyarrow/pandas_compat.py", line 616, in <listcomp>
    arrays = [convert_column(c, f)
              ^^^^^^^^^^^^^^^^^^^^
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/pyarrow/pandas_compat.py", line 603, in convert_column
    raise e
  File "/data00/miniconda3/envs/ray_311/lib/python3.11/site-packages/pyarrow/pandas_compat.py", line 597, in convert_column
    result = pa.array(col, type=type_, from_pandas=True, safe=safe)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/array.pxi", line 358, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 85, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: ("Expected bytes, got a 'list' object", 'Conversion failed for column img_filename with type object')
, /data00/code/lance/rust/lance-datafusion/src/utils.rs:41:28

Versions / Dependencies

Ray: main

Reproduction script

# 读取webdataset
ds = ray.data.read_webdataset(
    paths=input_dir,
    filesystem=TOS_FS,
    suffixes=FILE_TYPES,
    concurrency=1,
    output_format='pandas',
    decoder=None,
    expand_json=True
)
print(
    f"=== {ds.count()}, schema : {ds.materialize()}, ds take {ds.take(1)[0]['watermark']}")
# 按照类型的要求做转换
# 原始类型
# <class 'object'>  -->  pa.string()
# dict key --> pa.int()
# dict --> pa.int()
# <numpy.ndarray(shape=(320, 480, 3), dtype=uint8)> --> pa.binary()

required_schema = pa.schema(REQUIRED_SCHEMA)

# # 类型转换
# ds = ds.map_batches(TypeConverter,  fn_constructor_args=(
#     ds.schema(), required_schema), batch_size=10, num_cpus=0.5, concurrency=1)

# # 数据写入lance
sink = LanceDatasink(
    output_dir,
    schema=required_schema,
    max_rows_per_file=1000,
    storage_options=storage_options,
    mode="overwrite")
ds.write_datasink(sink, concurrency=1)

Issue Severity

High: It blocks me from completing my task.

@Jay-ju Jay-ju added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 9, 2024
@richardliaw richardliaw added the data Ray Data-related issues label Nov 10, 2024
richardliaw pushed a commit that referenced this issue Dec 3, 2024
## Why are these changes needed?

Closes #48672

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: jukejian <jukejian@bytedance.com>
jecsand838 pushed a commit to jecsand838/ray that referenced this issue Dec 4, 2024
## Why are these changes needed?

Closes ray-project#48672

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: jukejian <jukejian@bytedance.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
dentiny pushed a commit to dentiny/ray that referenced this issue Dec 7, 2024
## Why are these changes needed?

Closes ray-project#48672

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: jukejian <jukejian@bytedance.com>
Signed-off-by: hjiang <dentinyhao@gmail.com>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this issue Dec 17, 2024
## Why are these changes needed?

Closes ray-project#48672

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: jukejian <jukejian@bytedance.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants