Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make write an operator as part of the execution plan #32015

Merged
merged 51 commits into from
Feb 8, 2023

Conversation

jianoaix
Copy link
Contributor

@jianoaix jianoaix commented Jan 28, 2023

Why are these changes needed?

Today the "write" is not captured as an operator in the execution plan: it just fully executes the plan and then launches Ray tasks to perform the writing.

By make it a write operator, we will have a cleaner/unified model to handle how "write" is supported, enable query optimization for write (e.g. read->map->write can be fused into one operator node) for better performance.

Note: supporting this write operator in the new query plan will be in a follow-up PR.

Benchmarked on a single node:

import ray
import time

def make_ds(size_gb: int, parallelism: int):
    # Dataset of 10KiB tensor records.
    total_size = 1024 * 1024 * 1024 * size_gb
    record_dim = 1280
    record_size = record_dim * 8
    num_records = int(total_size / record_size)
    dataset = ray.data.range_tensor(num_records, shape=(record_dim,), parallelism=parallelism)
    print("Created dataset", dataset, "of size", dataset.size_bytes())
    return dataset

sizes = [1, 5, 10]
parallelism = [1, 5, 10, 20, 100]

perf = {}

for sz in sizes:
    for p in parallelism:
        path = f"/tmp/write/test-{sz}-{p}"
        time_start = time.perf_counter()
        ds = make_ds(sz, p).map_batches(lambda x: x).write_numpy(path)
        perf[path] = time.perf_counter() - time_start

print(perf)

Before:

{'/tmp/write/test-1-1': 9.953020322000157, '/tmp/write/test-1-5': 3.428301415000533, '/tmp/write/test-1-10': 4.723238538999794, '/tmp/write/test-1-20': 5.163609835999523, '/tmp/write/test-1-100': 4.33488237399979, '/tmp/write/test-5-1': 47.80200573299953, '/tmp/write/test-5-5': 57.3792794459996, '/tmp/write/test-5-10': 21.089490562999345, '/tmp/write/test-5-20': 49.24927898700025, '/tmp/write/test-5-100': 47.518018443000074, '/tmp/write/test-10-1': 182.66500996800005, '/tmp/write/test-10-5': 139.72414283000035, '/tmp/write/test-10-10': 129.0919650550004, '/tmp/write/test-10-20': 109.1896361629997, '/tmp/write/test-10-100': 61.62153517600018}

real    14m35.661s
user    0m13.973s
sys     0m2.241s

After:

{'/tmp/write/test-1-1': 14.615472456999669, '/tmp/write/test-1-5': 3.4766328169998815, '/tmp/write/test-1-10': 2.675365926000268, '/tmp/write/test-1-20': 2.3434579160002613, '/tmp/write/test-1-100': 2.7332164729996293, '/tmp/write/test-5-1': 29.679511864000233, '/tmp/write/test-5-5': 18.722395105999567, '/tmp/write/test-5-10': 18.884924998000315, '/tmp/write/test-5-20': 20.06004047999977, '/tmp/write/test-5-100': 18.469305754000743, '/tmp/write/test-10-1': 160.16473960199983, '/tmp/write/test-10-5': 47.103051861000495, '/tmp/write/test-10-10': 39.20541486499951, '/tmp/write/test-10-20': 38.90085198700035, '/tmp/write/test-10-100': 40.142852582999694}

real    7m39.632s
user    0m9.006s
sys     0m2.634s

The write fusion generally makes it faster. And it took only half of the time with fusion to complete this entire benchmark.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: jianoaix <iamjianxiao@gmail.com>
@jianoaix jianoaix added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 28, 2023
@jianoaix jianoaix removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Feb 7, 2023
python/ray/data/dataset.py Show resolved Hide resolved
@ericl ericl merged commit aa504ae into ray-project:master Feb 8, 2023
ericl pushed a commit that referenced this pull request Feb 14, 2023
@matthew29tang matthew29tang mentioned this pull request Feb 17, 2023
7 tasks
ericl pushed a commit that referenced this pull request Feb 21, 2023
With the new `write` added (from #32015 and #32440), Ray Data intends to support both the `write` and `do_write` functions for now. The check currently uses the `hasattr()` function to ensure the datasource object has a `write` method before using it.

However, this is insufficient for a custom datasource that inherits from `Datasource` since `Datasource` has the `write` method implemented. If the custom datasource only has `do_write` implemented, `hasattr(datasource, "write")` will return True since `hasattr()` will detect methods via inheritance. 

The solution is to check if the `write` method was overwritten from `Datasource.write`. Any class that has not implemented `write` will have the equality check return True
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
Follow up to ray-project#32015.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
With the new `write` added (from ray-project#32015 and ray-project#32440), Ray Data intends to support both the `write` and `do_write` functions for now. The check currently uses the `hasattr()` function to ensure the datasource object has a `write` method before using it.

However, this is insufficient for a custom datasource that inherits from `Datasource` since `Datasource` has the `write` method implemented. If the custom datasource only has `do_write` implemented, `hasattr(datasource, "write")` will return True since `hasattr()` will detect methods via inheritance.

The solution is to check if the `write` method was overwritten from `Datasource.write`. Any class that has not implemented `write` will have the equality check return True

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
peytondmurray pushed a commit to peytondmurray/ray that referenced this pull request Mar 22, 2023
With the new `write` added (from ray-project#32015 and ray-project#32440), Ray Data intends to support both the `write` and `do_write` functions for now. The check currently uses the `hasattr()` function to ensure the datasource object has a `write` method before using it.

However, this is insufficient for a custom datasource that inherits from `Datasource` since `Datasource` has the `write` method implemented. If the custom datasource only has `do_write` implemented, `hasattr(datasource, "write")` will return True since `hasattr()` will detect methods via inheritance. 

The solution is to check if the `write` method was overwritten from `Datasource.write`. Any class that has not implemented `write` will have the equality check return True
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
Follow up to ray-project#32015.

Signed-off-by: elliottower <elliot@elliottower.com>
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
With the new `write` added (from ray-project#32015 and ray-project#32440), Ray Data intends to support both the `write` and `do_write` functions for now. The check currently uses the `hasattr()` function to ensure the datasource object has a `write` method before using it.

However, this is insufficient for a custom datasource that inherits from `Datasource` since `Datasource` has the `write` method implemented. If the custom datasource only has `do_write` implemented, `hasattr(datasource, "write")` will return True since `hasattr()` will detect methods via inheritance.

The solution is to check if the `write` method was overwritten from `Datasource.write`. Any class that has not implemented `write` will have the equality check return True

Signed-off-by: elliottower <elliot@elliottower.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants