Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Ray Datasets to read binary files in parallel #2241

Merged
merged 112 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
9411a71
Use Ray Datasets to read binary files in parallel
tgaddair Jul 7, 2022
f2fad56
Merge branch 'master' into fast-im-read
geoffreyangus Jul 14, 2022
0b894b0
working version (without NaNs)
geoffreyangus Jul 14, 2022
c137cd9
Works with NaNs
geoffreyangus Jul 15, 2022
f5b75dd
remove TODO item
geoffreyangus Jul 15, 2022
e59f268
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 15, 2022
1c424af
wip– cannot find files on remote filesystems
geoffreyangus Jul 15, 2022
f4c9177
hack that handles NaNs
geoffreyangus Jul 15, 2022
e3d928e
starting work on custom datasource starting from file data source
geoffreyangus Jul 16, 2022
0c72668
NaN handling using custom data source
geoffreyangus Jul 16, 2022
d824093
http handling
geoffreyangus Jul 16, 2022
521367b
flakiness in roc
geoffreyangus Jul 16, 2022
437ffd7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 16, 2022
85f1c4c
revert audio feature
geoffreyangus Jul 16, 2022
47c254d
Merge branch 'fast-im-read' of https://github.com/ludwig-ai/ludwig in…
geoffreyangus Jul 16, 2022
1e854c1
normalize NaNs to None
geoffreyangus Jul 16, 2022
1674fbf
wip: tests flaky
geoffreyangus Jul 18, 2022
fee99da
Merge branch 'master' into fast-im-read
geoffreyangus Jul 18, 2022
7957705
fixed flakiness by removing forced nan in generate data
geoffreyangus Jul 18, 2022
728ecfe
cleanup
geoffreyangus Jul 18, 2022
b4f2f97
cleanup
geoffreyangus Jul 18, 2022
ae4026f
cleanup from benchmarking
geoffreyangus Jul 18, 2022
f240421
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 18, 2022
b6551af
add ray test for pre-loaded numpy images
geoffreyangus Jul 18, 2022
4d4a60a
Merge branch 'fast-im-read' of https://github.com/ludwig-ai/ludwig in…
geoffreyangus Jul 18, 2022
493cb65
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 18, 2022
c9405c5
added modin support
geoffreyangus Jul 18, 2022
888b528
use indices in new data source
geoffreyangus Jul 19, 2022
4865804
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 19, 2022
ea03172
modin workaround
geoffreyangus Jul 19, 2022
b0ac02f
Merge branch 'fast-im-read' of https://github.com/ludwig-ai/ludwig in…
geoffreyangus Jul 19, 2022
dab50cf
cleanup
geoffreyangus Jul 19, 2022
2ef18f7
add multiple image feature test and reset_index for concatenated spli…
geoffreyangus Jul 19, 2022
699ff81
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 19, 2022
6793d1a
adding distributed mark to new test
geoffreyangus Jul 19, 2022
8a8754b
move reset_index to maintain old performance
geoffreyangus Jul 19, 2022
65d35a6
revert change to df_like df index constructor
geoffreyangus Jul 19, 2022
46137a2
update imports for tests
geoffreyangus Jul 19, 2022
41185a6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 19, 2022
d433836
refactored to have forward looking interface
geoffreyangus Jul 20, 2022
e7c96a7
Merge branch 'fast-im-read' of https://github.com/ludwig-ai/ludwig in…
geoffreyangus Jul 20, 2022
aca98ca
attempt to fix weird seg fault in unit test
geoffreyangus Jul 20, 2022
6033579
limit generated files in test_ray and increase unit test time for now…
geoffreyangus Jul 20, 2022
2df314a
Merge branch 'master' into fast-im-read
geoffreyangus Jul 20, 2022
f579233
added more samples to test_preprocessing to overcome inexactness of p…
geoffreyangus Jul 20, 2022
1e7211d
stripped down tests to improve efficiency
geoffreyangus Jul 20, 2022
d68e822
fix audio bug
geoffreyangus Jul 21, 2022
1ea37ec
further streamlining tests
geoffreyangus Jul 21, 2022
c3617b4
revert num_examples change to ensure no metric nans (causes hanging(?…
geoffreyangus Jul 21, 2022
d7b3ea9
refine check
geoffreyangus Jul 22, 2022
e7cc0ff
add df_engine conditional to ensure df methods are compatible
geoffreyangus Jul 22, 2022
91c0ca4
add auto-parallelism and reset_index fix for large dask-read CSVs
geoffreyangus Jul 22, 2022
898f034
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 22, 2022
1606d4b
Merge branch 'master' into fast-im-read
geoffreyangus Jul 22, 2022
5fe09ec
add bug fix for num bytes and set parallelism floor to 200
geoffreyangus Jul 22, 2022
5f541d0
Merge branch 'fast-im-read' of https://github.com/ludwig-ai/ludwig in…
geoffreyangus Jul 22, 2022
a43a30e
added better comment and constant to limit reset_index calls
geoffreyangus Jul 23, 2022
db753ca
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 23, 2022
0d484ae
refined conditions and comments
geoffreyangus Jul 23, 2022
04e5701
Merge branch 'fast-im-read' of https://github.com/ludwig-ai/ludwig in…
geoffreyangus Jul 23, 2022
36e270c
refined comment
geoffreyangus Jul 23, 2022
8cf1aeb
refined comment
geoffreyangus Jul 23, 2022
5c9b456
Reduced pytest timeout time
geoffreyangus Jul 25, 2022
8861c92
increase num_examples to 100 to prevent NaNs in validation
geoffreyangus Jul 25, 2022
892b26b
num_examples=100 to prevent hanging
geoffreyangus Jul 25, 2022
dd83127
Custom to_dask() implementation that also passes meta during dd.DataF…
arnavgarg1 Jul 27, 2022
56c6e21
Merge branch 'master' into fast-im-read
geoffreyangus Jul 27, 2022
1a188b0
Merge branch 'master' into fast-im-read
geoffreyangus Jul 28, 2022
3ba39cd
Using 100 rows instead
arnavgarg1 Jul 28, 2022
55d3019
Pin Ray nightly version
geoffreyangus Jul 28, 2022
0a1477e
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Jul 28, 2022
05fb78a
fix link
geoffreyangus Jul 28, 2022
ddb4ada
pin torch to 07/26
geoffreyangus Jul 28, 2022
adea85c
cleanup
geoffreyangus Jul 28, 2022
9506182
Removes empty partitions after dropping rows and splitting datasets
geoffreyangus Jul 28, 2022
590cddb
remove extraneous comment in known_divisions
geoffreyangus Jul 28, 2022
9a81617
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 28, 2022
d5402f5
add unit test
geoffreyangus Jul 29, 2022
d94ad85
Merge branch 'remove-empty-partitions' of https://github.com/ludwig-a…
geoffreyangus Jul 29, 2022
83ea53b
upgrade ray pinned version to enable parquet partition filtering
geoffreyangus Jul 29, 2022
0f712f4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 29, 2022
6f0ac1d
Merge branch 'custom_to_dask' of https://github.com/ludwig-ai/ludwig
geoffreyangus Jul 29, 2022
dad7605
added preliminary check for empty partitions to improve speed
geoffreyangus Jul 29, 2022
d3c2a5a
downgrade Ray to ensure TensorDtypes are not inferred during Ray Data…
geoffreyangus Jul 29, 2022
8586c70
merge
geoffreyangus Jul 29, 2022
51dfa85
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 29, 2022
6116a71
Merge branch 'pin-ray-nightly' into remove-empty-partitions
geoffreyangus Jul 29, 2022
566b755
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 29, 2022
00ccfea
Merge branch 'remove-empty-partitions' into fast-im-read
geoffreyangus Jul 29, 2022
a13316b
add persist to save time
geoffreyangus Jul 29, 2022
883d008
add NoneType check to split
geoffreyangus Jul 29, 2022
06d1731
Merge branch 'remove-empty-partitions' into fast-im-read
geoffreyangus Jul 29, 2022
c03bf4f
unpin torch
geoffreyangus Aug 1, 2022
289e974
Merge branch 'pin-ray-nightly' into remove-empty-partitions
geoffreyangus Aug 1, 2022
459255a
move persist call to this PR
geoffreyangus Aug 1, 2022
b538f78
Merge branch 'remove-empty-partitions' into fast-im-read
geoffreyangus Aug 1, 2022
825fe72
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Aug 2, 2022
523a9af
fix merge conflicts
geoffreyangus Aug 2, 2022
f9d715e
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Aug 3, 2022
d77ee41
Merge branch 'master' into remove-empty-partitions
geoffreyangus Aug 3, 2022
aa221b0
revert to to_dask()
geoffreyangus Aug 3, 2022
f578bf3
Merge branch 'remove-empty-partitions' into fast-im-read
geoffreyangus Aug 3, 2022
724d202
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Aug 3, 2022
fb53d72
Merge branch 'master' into remove-empty-partitions
geoffreyangus Aug 3, 2022
4d90905
Merge branch 'remove-empty-partitions' into fast-im-read
geoffreyangus Aug 3, 2022
d079969
reverted custom to_dask and isolated ray into DaskEngine methods
geoffreyangus Aug 3, 2022
ced5310
Merge branch 'remove-empty-partitions' into fast-im-read
geoffreyangus Aug 3, 2022
6efa162
Merge branch 'master' of https://github.com/ludwig-ai/ludwig
geoffreyangus Aug 3, 2022
5cd4d49
Merge branch 'master' into remove-empty-partitions
geoffreyangus Aug 3, 2022
b572f1c
Merge branch 'remove-empty-partitions' into fast-im-read
geoffreyangus Aug 3, 2022
e9774d1
merge with master
geoffreyangus Aug 4, 2022
e8b0160
Merge branch 'master' into fast-im-read
geoffreyangus Aug 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions ludwig/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from contextlib import contextmanager
from typing import Callable, Optional, Union

import numpy as np
import pandas as pd

from ludwig.data.cache.manager import CacheManager
Expand All @@ -27,7 +28,7 @@
from ludwig.data.dataset.pandas import PandasDatasetManager
from ludwig.models.base import BaseModel
from ludwig.schema.trainer import ECDTrainerConfig, GBMTrainerConfig
from ludwig.utils.fs_utils import get_bytes_obj_if_path
from ludwig.utils.fs_utils import get_bytes_obj_from_path
from ludwig.utils.misc_utils import get_from_registry
from ludwig.utils.torch_utils import initialize_pytorch
from ludwig.utils.types import Series
Expand Down Expand Up @@ -112,15 +113,25 @@ def supports_multiprocessing(self):
def check_lazy_load_supported(self, feature):
pass

def read_binary_files(self, column: pd.Series, map_fn: Optional[Callable] = None) -> pd.Series:
df = column.to_frame(name=column.name)
def read_binary_files(
self, column: pd.Series, map_fn: Optional[Callable] = None, file_size: Optional[int] = None
) -> pd.Series:
column = column.fillna(np.nan).replace([np.nan], [None]) # normalize NaNs to None

sample_fname = column.head(1).values[0]
with ThreadPoolExecutor() as executor: # number of threads is inferred
result = executor.map(lambda idx_and_row: get_bytes_obj_if_path(idx_and_row[1][column.name]), df.iterrows())
if isinstance(sample_fname, str):
result = executor.map(
lambda path: get_bytes_obj_from_path(path) if path is not None else path, column.values
)
else:
# If the sample path is not a string, assume the paths has already been read in
result = column.values

if map_fn is not None:
result = executor.map(map_fn, result)

return pd.Series(result, index=df.index, name=column.name)
return pd.Series(result, index=column.index, name=column.name)


class LocalTrainingMixin:
Expand Down
273 changes: 273 additions & 0 deletions ludwig/backend/datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
import contextlib
import logging
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TYPE_CHECKING, Union

import ray
import urllib3
from packaging import version
from ray.data.block import Block
from ray.data.context import DatasetContext
from ray.data.datasource.binary_datasource import BinaryDatasource
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.data.datasource.file_based_datasource import (
_check_pyarrow_version,
_resolve_paths_and_filesystem,
_S3FileSystemWrapper,
_wrap_s3_serialization_workaround,
BaseFileMetadataProvider,
BlockOutputBuffer,
DefaultFileMetadataProvider,
)

from ludwig.utils.fs_utils import get_bytes_obj_from_http_path, is_http

_ray113 = version.parse("1.13") <= version.parse(ray.__version__) == version.parse("1.13.0")

if TYPE_CHECKING:
import pyarrow

if _ray113:
# Only implemented starting in Ray 1.13
from ray.data.datasource.partitioning import PathPartitionFilter

logger = logging.getLogger(__name__)


class BinaryIgnoreNoneTypeDatasource(BinaryDatasource):
"""Binary datasource, for reading and writing binary files. Ignores None values.

Examples:
>>> import ray
>>> from ray.data.datasource import BinaryDatasource
>>> source = BinaryDatasource() # doctest: +SKIP
>>> ray.data.read_datasource( # doctest: +SKIP
... source, paths=["/path/to/dir", None]).take()
[b"file_data", ...]
"""

def create_reader(self, **kwargs):
return _BinaryIgnoreNoneTypeDatasourceReader(self, **kwargs)

def prepare_read(
self,
parallelism: int,
paths: Union[str, List[str], Tuple[str, int], List[Tuple[str, int]]],
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
schema: Optional[Union[type, "pyarrow.lib.Schema"]] = None,
open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
partition_filter: "PathPartitionFilter" = None,
# TODO(ekl) deprecate this once read fusion is available.
_block_udf: Optional[Callable[[Block], Block]] = None,
**reader_args,
) -> List[ReadTask]:
"""Creates and returns read tasks for a file-based datasource.

If `paths` is a tuple, The resulting dataset will have an `idx` key containing the second item in the tuple.
Useful for tracking the order of files in the dataset.
"""
reader = self.create_reader(
paths=paths,
filesystem=filesystem,
schema=schema,
open_stream_args=open_stream_args,
meta_provider=meta_provider,
partition_filter=partition_filter,
_block_udf=_block_udf,
**reader_args,
)
return reader.get_read_tasks(parallelism)

def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
"""Opens a source path for reading and returns the associated Arrow NativeFile.

The default implementation opens the source path as a sequential input stream.

Implementations that do not support streaming reads (e.g. that require random
access) should override this method.
"""
if path is None or is_http(path):
return contextlib.nullcontext()
return filesystem.open_input_stream(path, **open_args)

def _read_file(
self,
f: Union["pyarrow.NativeFile", contextlib.nullcontext],
path_and_idx: Tuple[str, int] = None,
**reader_args,
):
include_paths = reader_args.get("include_paths", False)

path, idx = path_and_idx
if path is None:
data = None
elif is_http(path):
try:
data = get_bytes_obj_from_http_path(path)
except urllib3.exceptions.HTTPError as e:
logging.warning(e)
data = None
else:
super_result = super()._read_file(f, path, **reader_args)[0]
if include_paths:
_, data = super_result
else:
data = super_result

result = {"data": data}
if include_paths:
result["path"] = path
if idx is not None:
result["idx"] = idx
return [result]


# TODO(geoffrey): ensure this subclasses ray.data.datasource.Reader in ray 1.14
class _BinaryIgnoreNoneTypeDatasourceReader:
def __init__(
self,
delegate: Datasource,
paths: Union[str, List[str], Tuple[str, int], List[Tuple[str, int]]],
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
schema: Optional[Union[type, "pyarrow.lib.Schema"]] = None,
open_stream_args: Optional[Dict[str, Any]] = None,
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
partition_filter: "PathPartitionFilter" = None,
# TODO(ekl) deprecate this once read fusion is available.
_block_udf: Optional[Callable[[Block], Block]] = None,
**reader_args,
):
_check_pyarrow_version()
self._delegate = delegate
self._schema = schema
self._open_stream_args = open_stream_args
self._meta_provider = meta_provider
self._partition_filter = partition_filter
self._block_udf = _block_udf
self._reader_args = reader_args

has_idx = isinstance(paths[0], tuple) # include idx if paths is a list of Tuple[str, int]
raw_paths_and_idxs = paths if has_idx else [(path, None) for path in paths]

self._paths = []
self._file_sizes = []
for raw_path, idx in raw_paths_and_idxs:
# Paths must be resolved and expanded
if raw_path is None or is_http(raw_path):
read_path = raw_path
file_size = None # unknown file size is None
else:
resolved_path, filesystem = _resolve_paths_and_filesystem([raw_path], filesystem)
read_path, file_size = meta_provider.expand_paths(resolved_path, filesystem)
# expand_paths returns two lists, so get the first element of each
read_path = read_path[0]
file_size = file_size[0]

self._paths.append((read_path, idx))
self._file_sizes.append(file_size)
self._filesystem = filesystem

def estimate_inmemory_data_size(self) -> Optional[int]:
total_size = 0
for sz in self._file_sizes:
if sz is not None:
total_size += sz
return total_size

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
import numpy as np

open_stream_args = self._open_stream_args
reader_args = self._reader_args
_block_udf = self._block_udf

paths, file_sizes = self._paths, self._file_sizes
if self._partition_filter is not None:
raise ValueError("partition_filter is not currently supported by this class")

read_stream = self._delegate._read_stream
filesystem = _wrap_s3_serialization_workaround(self._filesystem)

if open_stream_args is None:
open_stream_args = {}

open_input_source = self._delegate._open_input_source

def read_files(
read_paths_and_idxs: List[Tuple[str, int]],
fs: Union["pyarrow.fs.FileSystem", _S3FileSystemWrapper],
) -> Iterable[Block]:
logger.debug(f"Reading {len(read_paths)} files.")
if isinstance(fs, _S3FileSystemWrapper):
fs = fs.unwrap()
ctx = DatasetContext.get_current()
output_buffer = BlockOutputBuffer(block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size)
for read_path_and_idx in read_paths_and_idxs:
read_path, _ = read_path_and_idx
# Get reader_args and open_stream_args only if valid path.
if read_path is not None:
compression = open_stream_args.pop("compression", None)
if compression is None:
import pyarrow as pa

try:
# If no compression manually given, try to detect
# compression codec from path.
compression = pa.Codec.detect(read_path).name
except (ValueError, TypeError):
# Arrow's compression inference on the file path
# doesn't work for Snappy, so we double-check ourselves.
import pathlib

suffix = pathlib.Path(read_path).suffix
if suffix and suffix[1:] == "snappy":
compression = "snappy"
else:
compression = None
if compression == "snappy":
# Pass Snappy compression as a reader arg, so datasource subclasses
# can manually handle streaming decompression in
# self._read_stream().
reader_args["compression"] = compression
reader_args["filesystem"] = fs
elif compression is not None:
# Non-Snappy compression, pass as open_input_stream() arg so Arrow
# can take care of streaming decompression for us.
open_stream_args["compression"] = compression

with open_input_source(fs, read_path, **open_stream_args) as f:
for data in read_stream(f, read_path_and_idx, **reader_args):
output_buffer.add_block(data)
if output_buffer.has_next():
yield output_buffer.next()
output_buffer.finalize()
if output_buffer.has_next():
yield output_buffer.next()

# fix https://github.com/ray-project/ray/issues/24296
parallelism = min(parallelism, len(paths))

read_tasks = []
for read_paths_and_idxs in np.array_split(paths, parallelism):
if len(read_paths_and_idxs) <= 0:
continue

read_paths, _ = zip(*read_paths_and_idxs)
meta = self._meta_provider(
read_paths,
self._schema,
rows_per_file=self._delegate._rows_per_file(),
file_sizes=file_sizes,
)

read_task = ReadTask(
lambda read_paths_and_idxs=read_paths_and_idxs: read_files(read_paths_and_idxs, filesystem), meta
)
read_tasks.append(read_task)

return read_tasks
Loading