Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read_csv fails with multi-node Ray Client #3179

Open
amogkam opened this issue Jun 23, 2021 · 12 comments
Open

read_csv fails with multi-node Ray Client #3179

amogkam opened this issue Jun 23, 2021 · 12 comments
Labels
External Pull requests and issues from people who do not regularly contribute to modin Needs more information ❔ Issues that require more information from the reporter new feature/request 💬 Requests and pull requests for new features P2 Minor bugs or low-priority feature requests

Comments

@amogkam
Copy link

amogkam commented Jun 23, 2021

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04):
  • Modin version (modin.__version__): 0.10
  • Python version: 3.8
  • Code we can use to reproduce:
    I create a sample test.csv file in the current working directory. Then I start a remote Ray cluster.

I then run this script from my laptop

import ray
import modin.pandas as pd

runtime_env = {"working_dir": ".", "pip": ["modin"]}
ray.client("<head_node_host>:10001").env(runtime_env).connect()

df = pd.read_csv("test.csv")
df.show()

And it fails with

Traceback (most recent call last):
  File "test.py", line 7, in <module>
    df = pd.read_csv("test.csv")
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/pandas/io.py", line 133, in read_csv
    return _read(**kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/pandas/io.py", line 59, in _read
    pd_obj = FactoryDispatcher.read_csv(**kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/data_management/factories/dispatcher.py", line 172, in read_csv
    return cls.__factory._read_csv(**kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/data_management/factories/factories.py", line 206, in _read_csv
    return cls.io_cls.read_csv(**kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/base/io/file_dispatcher.py", line 67, in read
    query_compiler = cls._read(*args, **kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/base/io/text/csv_dispatcher.py", line 160, in _read
    new_query_compiler = cls._get_new_qc(
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/base/io/text/csv_dispatcher.py", line 302, in _get_new_qc
    new_index, row_lengths = cls._define_index(index_ids, index_col_md, index_name)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/base/io/text/csv_dispatcher.py", line 252, in _define_index
    row_lengths = cls.materialize(index_ids)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/ray/task_wrapper.py", line 82, in materialize
    return ray.get(obj_id)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 61, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/ray/util/client/worker.py", line 202, in get
    res = self._get(obj_ref, op_timeout)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/ray/util/client/worker.py", line 225, in _get
    raise err
types.RayTaskError(FileNotFoundError): ray::deploy_ray_func() (pid=689, ip=172.31.51.109)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/ray/task_wrapper.py", line 40, in deploy_ray_func
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/modin/backends/pandas/parsers.py", line 216, in parse
    bio = FileDispatcher.file_open(
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/modin/engines/base/io/file_dispatcher.py", line 199, in file_open
    return open(file_path, mode=mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/amog/dev/test_project/test.csv'
Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=689, ip=172.31.51.109)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/ray/task_wrapper.py", line 40, in deploy_ray_func
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/modin/backends/pandas/parsers.py", line 216, in parse
    bio = FileDispatcher.file_open(
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/modin/engines/base/io/file_dispatcher.py", line 199, in file_open
    return open(file_path, mode=mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/amog/dev/test_project/test.csv'
Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=689, ip=172.31.51.109)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/ray/task_wrapper.py", line 40, in deploy_ray_func
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/modin/backends/pandas/parsers.py", line 216, in parse
    bio = FileDispatcher.file_open(
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/modin/engines/base/io/file_dispatcher.py", line 199, in file_open
    return open(file_path, mode=mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/amog/dev/test_project/test.csv'
Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::deploy_ray_func() (pid=689, ip=172.31.51.109)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "/Users/amog/dev/product/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
  File "/Users/amog/dev/product/lib/python3.8/site-packages/modin/engines/ray/task_wrapper.py", line 40, in deploy_ray_func
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/modin/backends/pandas/parsers.py", line 216, in parse
    bio = FileDispatcher.file_open(
  File "/tmp/ray/session_2021-06-22_17-00-38_682792_154/runtime_resources/conda/ray-39cb33edecee2d16bb9f77300d24774637f45439/lib/python3.8/site-packages/modin/engines/base/io/file_dispatcher.py", line 199, in file_open
    return open(file_path, mode=mode)
FileNotFoundError: [Errno 2] No such file or directory: '/Users/amog/dev/test_project/test.csv'

Describe the problem

It seems like Modin is expecting the csv file to exist both on the client and the server with the same path, which isn't the case, and is thus failing.

Possible solutions to this:

  • Require the user to use cloud storage or shared file system that's accessible from both the client and the server (too much usability overhead)
  • Read the csv entirely on the client side and require the csv to only exist on the client side (inefficient for large datasets)
  • Read the csv entirely on the sever side and require the user to pass in the path of the csv file on the server (unintuitive for users)
  • Have Modin support Ray runtime envs where path to file is different on client vs. server
  • Have Ray runtime envs automatically reroute client file paths to server file paths
@amogkam amogkam added the bug 🦗 Something isn't working label Jun 23, 2021
@amogkam
Copy link
Author

amogkam commented Jun 23, 2021

@wuisawesome
Copy link
Collaborator

@devin-petersohn for some context, this is mostly a problem when using Modin with distributed Ray, where we have decent support for uploading files to the cluster, but their absolute path will be different (locally, the file would be under /users/Alex/path/to/data.csv but on the cluster it will be at /home/ray/project/data.csv).

@devin-petersohn
Copy link
Collaborator

devin-petersohn commented Jun 25, 2021

Thanks @amogkam and @wuisawesome for the context! You are correct that the current readers assume that the path provided is accessible by all workers with the provided path.

Modin doesn't use the relative path, which would work. It is happening here:

return os.path.abspath(file_path)

Originally this was added because users can do an os.chdir and the workers will not have the same working directory.

Edit: #882 was the original issue for the os.chdir.

@wuisawesome
Copy link
Collaborator

Oh interesting... One possibility here could be to make paths relative to some sort of ray base directory instead of relative to CWD. I'm not 100% sure how difficult this would be for either Modin or Ray though...

Let me ask around on the Ray side.

@architkulkarni
Copy link

@devin-petersohn Will it work on Modin's side if we provide an API ray.get_runtime_context().get_working_dir(): str which returns the current working directory? (If called on the client side, it will return /users/Alex/path/, and if called in a Ray worker on the server side, it will return /home/ray/project/, to use Alex's example above.). That way, appending the relative path will always result in a valid absolute path, whether on the client side or on the server side. cc @iycheng

@wuisawesome
Copy link
Collaborator

nit: maybe it should be a pathlib.Path?

@architkulkarni
Copy link

Another idea suggested by @amogkam that we're thinking about is to provide an API of the form server_absolute_path = ray.get_runtime_context().convert_path(client_absolute_or_relative_path). Then on Modin's side, one could pass in users/Alex/path/to/data.csv to this function on the server side, and get as output /home/ray/project/data.csv. Would this be better?

@fishbone
Copy link

I think this should be fine. But could we rename the function to be get_absolue_path or get_relative_path so that we know what's the type of path just from the name?

@devin-petersohn
Copy link
Collaborator

From the Modin side, we try to keep the parsers engine agnostic. Right now there is no Ray specific code in parsing the file. I would prefer not to convert the path on the worker side.

One option is to do it right before task submission in the deploy classmethod:

@classmethod
def deploy(cls, func, num_returns, kwargs):
"""
Run local `func` remotely.
Parameters
----------
func : callable
A function to call.
num_returns : int
Amount of return values expected from `func`.
kwargs : dict
Keyword arguments to pass to remote instance of `func`.
Returns
-------
ray.ObjectRef or list
Ray identifier of the result being put to Plasma store.
"""
return deploy_ray_func.options(num_returns=num_returns).remote(func, kwargs)

This is not only going to be a problem for CSV, so with a small refactor it might be possible to do here for every format. I would prefer to just pass the path to a ray utility and have ray tell me the correct path for the workers, whether it's running in Client mode or not.

@wuisawesome
Copy link
Collaborator

that would look something like

>>> ray.util.relative_dir("/users/Alex/path/to/data.csv")
'$RAY_JOB_BASE_DIR/to/data.csv'

and Ray sets that env var correctly on all modes?

In the driver it would be /users/Alex/path, and on the cluster it might be /home/ray for example.

I don't know how you would implement this at the task_wrapper level, since it's such a general function, but I'm wondering if we could do it behind the engine abstraction?

For example BaseIO.resolve_path would return the absolute path, and RayIO.resolve_path could call ray.util.relative_dir?

@devin-petersohn
Copy link
Collaborator

There are two issues here:
1.) The metadata is calculated on the driver, where the correct filepath is the absolute path
2.) The workers need a different path than the driver for the ray workspace

Solving both is only possible if:
(1) we change the path as we deploy the task by modifying args,
(2) we change the path in the task,
(3) we copy/paste the code for each reader to create a custom Ray Client reader for each file type(remember it's not just csv that's the issue) and insert a line after metadata calculation to get the correct path, or
(4) we create an object that users must use with Ray Client paths to ensure they resolve correctly.
(5) Others?

I don't think 2 or 3 will be good from a maintainability standpoint.

@RehanSD
Copy link
Collaborator

RehanSD commented Oct 12, 2022

This issue is closely related to #4479 - except it seems that maybe we need to add a feature to support reading from a file when Ray has already distributed it? @devin-petersohn do we plan to add support for this feature?

@RehanSD RehanSD added new feature/request 💬 Requests and pull requests for new features Needs more information ❔ Issues that require more information from the reporter P2 Minor bugs or low-priority feature requests and removed bug 🦗 Something isn't working labels Oct 12, 2022
@anmyachev anmyachev added the External Pull requests and issues from people who do not regularly contribute to modin label Apr 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
External Pull requests and issues from people who do not regularly contribute to modin Needs more information ❔ Issues that require more information from the reporter new feature/request 💬 Requests and pull requests for new features P2 Minor bugs or low-priority feature requests
Projects
None yet
Development

No branches or pull requests

7 participants