Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Ordering of blocks after map and map_batches #50890

Closed
jakac opened this issue Feb 25, 2025 · 9 comments · Fixed by #50986
Closed

[Data] Ordering of blocks after map and map_batches #50890

jakac opened this issue Feb 25, 2025 · 9 comments · Fixed by #50986
Labels
bug Something that is supposed to be working; but isn't

Comments

@jakac
Copy link
Contributor

jakac commented Feb 25, 2025

What happened + What you expected to happen

I have a dataset in files, rows are sorted within files and files are sorted by name in the same folder. After applying map or map_batches, the order of blocks is different than before applying map or map_batches.

Expected behavior: map and map_batches doesn't change the ordering of rows.

Versions / Dependencies

ray==2.42.1

Reproduction script

import time

import ray

ray.init()

dataset = ray.data.from_items(
        [
            {"time_to_sleep": 3},
            {"time_to_sleep": 2},
            {"time_to_sleep": 1},
        ],
    override_num_blocks=3
)

print(dataset.take_all())
# output: [{'time_to_sleep': 3}, {'time_to_sleep': 2}, {'time_to_sleep': 1}]

def map_simple(x):
    time.sleep(x['time_to_sleep'])
    return x

print(dataset.map(map_simple).take_all())
# output: [{'time_to_sleep': 1}, {'time_to_sleep': 2}, {'time_to_sleep': 3}]

def my_map_batches(x):
    time.sleep(x['time_to_sleep'][0])
    yield {'result': [x['time_to_sleep'][0]]}

mapped = dataset.map_batches(my_map_batches)

print(mapped.take_all())
# output: [{'result': 1}, {'result': 2}, {'result': 3}]

Issue Severity

High: It blocks me from completing my task.

@jakac jakac added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Feb 25, 2025
@jakac
Copy link
Contributor Author

jakac commented Feb 25, 2025

Interestingly, it works as expected if I sort the dataset first:

dataset = dataset.sort("time_to_sleep", descending=True)

But it doesn't solve the issue because sorting key might not be present in the data.

@jakac jakac changed the title [Data] Ordering of blocks and map_batches [Data] Ordering of blocks after map and map_batches Feb 25, 2025
@richardliaw
Copy link
Contributor

You can try setting preserve_order to get what you're looking for -- https://docs.ray.io/en/latest/data/api/doc/ray.data.ExecutionOptions.html#ray.data.ExecutionOptions.preserve_order

@richardliaw
Copy link
Contributor

Overall this shouldn't be default behavior as it has significant implications at scale

@gvspraveen gvspraveen removed the triage Needs triage (eg: priority, bug/not-bug, and owning component) label Feb 25, 2025
@jakac
Copy link
Contributor Author

jakac commented Feb 26, 2025

Indeed setting

ray.data.DataContext.get_current().execution_options.preserve_order = True

resolves the issue. Reducing severity to Low.

Now the issue is about documentation, I believe this behavior deserves a warning block somewhere visible. What do you think about https://docs.ray.io/en/latest/data/transforming-data.html below the "Transformations are lazy" note?

@richardliaw
Copy link
Contributor

How about just adding a section about "Ordering rows" in transforming data, and describing:

  1. Generally order is not preserved
  2. You can sort the dataset in order to get a deterministic order, but this adds an extra sort step
  3. You can also specify the above preserve_order flag.

I don't think it needs to be a top level warning block, since i think many other systems (distributed) have this behavior

@TechShivvy
Copy link

TechShivvy commented Feb 27, 2025

Is setting the preserve_order flag the only efficient solution, or is there another function that inherently preserves order instead of map_batches? Sorting the results afterward is one option, but since it requires an explicit step, I’d rather not consider that.

Also, you mentioned:

Overall, this shouldn’t be the default behavior as it has significant implications at scale.

Could you elaborate on what those implications are?
@richardliaw

Edit: In my case, I’m splitting a sequential data into smaller parts using map_batches and running predictions on them. When I visualized the results, I noticed that the predicted segments were shuffled/shifted/jumbled clearly. So I want to maintain the order in the predictions.

Edit 2: I also tried handling the same task using ray.remote instead of map_batchesfor small sets of data. I manually split the batches, ran predictions, and used ray.get, which maintained the order correctly. However, I’m not sure about the trade-offs between ray.remote and map_batches. I checked with Chrome Tracing, and both seemed to take a similar amount of time. Given that I’m working with a large dataset, I’d like to understand the pros and cons of each approach.

TIA!

@richardliaw
Copy link
Contributor

Is setting the preserve_order flag the only efficient solution, or is there another function that inherently preserves order instead of map_batches?

Setting the flag is recommended. I think in your case you won't see much difference if your workload is small. However if you have a ton of nodes (say, 20+) and a lot of stragglers, you can end up stalling the dataset processing if you enforce ordering.

@TechShivvy
Copy link

TechShivvy commented Mar 4, 2025

Thanks for the reply @richardliaw.

That's disappointing to hear, as my workload/data will be large. Should I conclude that ray.remote would be more suitable in my case, or is there an optimal saturation point for map_batches that can be achieved by adjusting batch size and concurrency settings? I'm basically trying to do timeseries forecasting.

@richardliaw
Copy link
Contributor

I think it's worthwhile just trying the preserve_order option for now -- there's a reasonable chance it is not performance-impacting for your use case.

abrarsheikh pushed a commit that referenced this issue Mar 8, 2025
## Why are these changes needed?

Adds a section about impact of operators on ordering of rows to docs.

## Related issue number

Closes #50890

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(
   - [x] Built docs locally and verified the format and the links

---------

Signed-off-by: jakac <matej.jakimov@gmail.com>
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
Signed-off-by: Abrar Sheikh <abrar@anyscale.com>
elimelt pushed a commit to elimelt/ray that referenced this issue Mar 9, 2025
## Why are these changes needed?

Adds a section about impact of operators on ordering of rows to docs.

## Related issue number

Closes ray-project#50890 

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(
   - [x] Built docs locally and verified the format and the links

---------

Signed-off-by: jakac <matej.jakimov@gmail.com>
Signed-off-by: Richard Liaw <rliaw@berkeley.edu>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants