Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/ray/data/_internal/datasource/parquet_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ray.data._internal.arrow_ops.transform_pyarrow import concat
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.savemode import SaveMode
from ray.data._internal.util import call_with_retry
from ray.data.block import Block, BlockAccessor
from ray.data.datasource.file_based_datasource import _resolve_kwargs
Expand Down Expand Up @@ -33,6 +34,7 @@ def __init__(
open_stream_args: Optional[Dict[str, Any]] = None,
filename_provider: Optional[FilenameProvider] = None,
dataset_uuid: Optional[str] = None,
mode: SaveMode = SaveMode.APPEND,
):
if arrow_parquet_args_fn is None:
arrow_parquet_args_fn = lambda: {} # noqa: E731
Expand All @@ -53,6 +55,7 @@ def __init__(
filename_provider=filename_provider,
dataset_uuid=dataset_uuid,
file_format="parquet",
mode=mode,
)

def write(
Expand Down
11 changes: 11 additions & 0 deletions python/ray/data/_internal/savemode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum

from ray.util.annotations import PublicAPI


@PublicAPI(stablity="alpha")
class SaveMode(str, Enum):
APPEND = "append"
OVERWRITE = "overwrite"
IGNORE = "ignore"
ERROR = "error"
48 changes: 47 additions & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@
_apply_batch_format,
)
from ray.data.context import DataContext
from ray.data.datasource import Connection, Datasink, FilenameProvider
from ray.data.datasource import Connection, Datasink, FilenameProvider, SaveMode
from ray.data.datasource.file_datasink import _FileDatasink
from ray.data.iterator import DataIterator
from ray.data.random_access_dataset import RandomAccessDataset
from ray.types import ObjectRef
Expand Down Expand Up @@ -3316,6 +3317,7 @@ def write_parquet(
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
num_rows_per_file: Optional[int] = None,
mode: SaveMode = SaveMode.APPEND,
**arrow_parquet_args,
) -> None:
"""Writes the :class:`~ray.data.Dataset` to parquet files under the provided ``path``.
Expand Down Expand Up @@ -3393,6 +3395,10 @@ def write_parquet(
/arrow.apache.org/docs/python/generated/\
pyarrow.parquet.ParquetWriter.html>`_, which is used to write
out each block to a file. See `arrow_parquet_args_fn` for more detail.
mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
"ignore", "append". Defaults to "append".
NOTE: This method isn't atomic. "Overwrite" first deletes all the data
before writing to `path`.
""" # noqa: E501
if arrow_parquet_args_fn is None:
arrow_parquet_args_fn = lambda: {} # noqa: E731
Expand All @@ -3418,6 +3424,7 @@ def write_parquet(
open_stream_args=arrow_open_stream_args,
filename_provider=filename_provider,
dataset_uuid=self._uuid,
mode=mode,
)
self.write_datasink(
datasink,
Expand All @@ -3440,6 +3447,7 @@ def write_json(
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
num_rows_per_file: Optional[int] = None,
mode: SaveMode = SaveMode.APPEND,
**pandas_json_args,
) -> None:
"""Writes the :class:`~ray.data.Dataset` to JSON and JSONL files.
Expand Down Expand Up @@ -3520,6 +3528,10 @@ def write_json(
which is used under the hood to write out each
:class:`~ray.data.Dataset` block. These
are dict(orient="records", lines=True) by default.
mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
"ignore", "append". Defaults to "append".
NOTE: This method isn't atomic. "Overwrite" first deletes all the data
before writing to `path`.
"""
if pandas_json_args_fn is None:
pandas_json_args_fn = lambda: {} # noqa: E731
Expand All @@ -3538,6 +3550,7 @@ def write_json(
open_stream_args=arrow_open_stream_args,
filename_provider=filename_provider,
dataset_uuid=self._uuid,
mode=mode,
)
self.write_datasink(
datasink,
Expand Down Expand Up @@ -3614,6 +3627,7 @@ def write_images(
filename_provider: Optional[FilenameProvider] = None,
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
mode: SaveMode = SaveMode.APPEND,
) -> None:
"""Writes the :class:`~ray.data.Dataset` to images.

Expand Down Expand Up @@ -3655,6 +3669,10 @@ def write_images(
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.
mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
"ignore", "append". Defaults to "append".
NOTE: This method isn't atomic. "Overwrite" first deletes all the data
before writing to `path`.
""" # noqa: E501
datasink = ImageDatasink(
path,
Expand All @@ -3665,6 +3683,7 @@ def write_images(
open_stream_args=arrow_open_stream_args,
filename_provider=filename_provider,
dataset_uuid=self._uuid,
mode=mode,
)
self.write_datasink(
datasink,
Expand All @@ -3687,6 +3706,7 @@ def write_csv(
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
num_rows_per_file: Optional[int] = None,
mode: SaveMode = SaveMode.APPEND,
**arrow_csv_args,
) -> None:
"""Writes the :class:`~ray.data.Dataset` to CSV files.
Expand Down Expand Up @@ -3765,6 +3785,10 @@ def write_csv(
arrow.apache.org/docs/python/generated/pyarrow.csv.write_csv.html\
#pyarrow.csv.write_csv>`_
when writing each block to a file.
mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
"ignore", "append". Defaults to "append".
NOTE: This method isn't atomic. "Overwrite" first deletes all the data
before writing to `path`.
"""
if arrow_csv_args_fn is None:
arrow_csv_args_fn = lambda: {} # noqa: E731
Expand All @@ -3783,6 +3807,7 @@ def write_csv(
open_stream_args=arrow_open_stream_args,
filename_provider=filename_provider,
dataset_uuid=self._uuid,
mode=mode,
)
self.write_datasink(
datasink,
Expand All @@ -3805,6 +3830,7 @@ def write_tfrecords(
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
num_rows_per_file: Optional[int] = None,
mode: SaveMode = SaveMode.APPEND,
) -> None:
"""Write the :class:`~ray.data.Dataset` to TFRecord files.

Expand Down Expand Up @@ -3872,6 +3898,10 @@ def write_tfrecords(
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.
num_rows_per_file: [Deprecated] Use min_rows_per_file instead.
mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
"ignore", "append". Defaults to "append".
NOTE: This method isn't atomic. "Overwrite" first deletes all the data
before writing to `path`.
"""
effective_min_rows = _validate_rows_per_file_args(
num_rows_per_file=num_rows_per_file, min_rows_per_file=min_rows_per_file
Expand Down Expand Up @@ -3908,6 +3938,7 @@ def write_webdataset(
encoder: Optional[Union[bool, str, callable, list]] = True,
concurrency: Optional[int] = None,
num_rows_per_file: Optional[int] = None,
mode: SaveMode = SaveMode.APPEND,
) -> None:
"""Writes the dataset to `WebDataset <https://github.com/webdataset/webdataset>`_ files.

Expand Down Expand Up @@ -3964,6 +3995,10 @@ def write_webdataset(
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.
num_rows_per_file: [Deprecated] Use min_rows_per_file instead.
mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
"ignore", "append". Defaults to "append".
NOTE: This method isn't atomic. "Overwrite" first deletes all the data
before writing to `path`.
"""
effective_min_rows = _validate_rows_per_file_args(
num_rows_per_file=num_rows_per_file, min_rows_per_file=min_rows_per_file
Expand Down Expand Up @@ -4000,6 +4035,7 @@ def write_numpy(
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
num_rows_per_file: Optional[int] = None,
mode: SaveMode = SaveMode.APPEND,
) -> None:
"""Writes a column of the :class:`~ray.data.Dataset` to .npy files.

Expand Down Expand Up @@ -4059,6 +4095,10 @@ def write_numpy(
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.
num_rows_per_file: [Deprecated] Use min_rows_per_file instead.
mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
"ignore", "append". Defaults to "append".
NOTE: This method isn't atomic. "Overwrite" first deletes all the data
before writing to `path`.
"""
effective_min_rows = _validate_rows_per_file_args(
num_rows_per_file=num_rows_per_file, min_rows_per_file=min_rows_per_file
Expand Down Expand Up @@ -4528,6 +4568,12 @@ def write_datasink(

try:
datasink.on_write_start()
if isinstance(datasink, _FileDatasink):
if not datasink.has_created_dir and datasink.mode == SaveMode.IGNORE:
logger.info(
f"Ignoring write because {datasink.path} already exists"
)
return

self._write_ds = Dataset(plan, logical_plan).materialize()
# TODO: Get and handle the blocks with an iterator instead of getting
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ray.data._internal.datasource.sql_datasource import Connection
from ray.data._internal.savemode import SaveMode
from ray.data.datasource.datasink import (
Datasink,
DummyOutputDatasink,
Expand Down Expand Up @@ -64,4 +65,5 @@
"_S3FileSystemWrapper",
"WriteResult",
"WriteReturnType",
"SaveMode",
]
20 changes: 19 additions & 1 deletion python/ray/data/datasource/file_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ray._private.arrow_utils import add_creatable_buckets_param_if_s3_uri
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.savemode import SaveMode
from ray.data._internal.util import (
RetryingPyFileSystem,
_is_local_scheme,
Expand Down Expand Up @@ -38,6 +39,7 @@ def __init__(
filename_provider: Optional[FilenameProvider] = None,
dataset_uuid: Optional[str] = None,
file_format: Optional[str] = None,
mode: SaveMode = SaveMode.APPEND,
):
"""Initialize this datasink.

Expand Down Expand Up @@ -76,13 +78,29 @@ def __init__(
self.filename_provider = filename_provider
self.dataset_uuid = dataset_uuid
self.file_format = file_format

self.mode = mode
self.has_created_dir = False

def open_output_stream(self, path: str) -> "pyarrow.NativeFile":
return self.filesystem.open_output_stream(path, **self.open_stream_args)

def on_write_start(self) -> None:
from pyarrow.fs import FileType

dir_exists = (
self.filesystem.get_file_info(self.path).type is not FileType.NotFound
)
if dir_exists:
if self.mode == SaveMode.ERROR:
raise ValueError(
f"Path {self.path} already exists. If this is unexpected, use mode='ignore' to ignore those files"
)
if self.mode == SaveMode.IGNORE:
logger.warning(f"[SaveMode={self.mode}] Skipping {self.path}")
return
if self.mode == SaveMode.OVERWRITE:
logger.warning(f"[SaveMode={self.mode}] Replacing contents {self.path}")
self.filesystem.delete_dir_contents(self.path)
self.has_created_dir = self._create_dir(self.path)

def _create_dir(self, dest) -> bool:
Expand Down
102 changes: 86 additions & 16 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,24 +909,94 @@ def test_parquet_write(ray_start_regular_shared, fs, data_path, endpoint_url):
fs.delete_dir(_unwrap_protocol(path))


def test_parquet_write_multiple_blocks(ray_start_regular_shared, tmp_path):
df = pd.DataFrame(
{"one": [1, 1, 1, 3, 3, 3], "two": ["a", "a", "b", "b", "c", "c"]}
)
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path=str(tmp_path),
partition_cols=["one"],
)
# 2 partitions, 1 empty partition, 3 block/read tasks, 1 empty block
def test_parquet_write_ignore_save_mode(ray_start_regular_shared, local_path):
data_path = local_path
path = os.path.join(data_path, "test_parquet_dir")
os.mkdir(path)
in_memory_table = pa.Table.from_pydict({"one": [1]})
ds = ray.data.from_arrow(in_memory_table)
ds.write_parquet(path, filesystem=None, mode="ignore")

# directory was created, should ignore
with os.scandir(path) as file_paths:
count_of_files = sum(1 for path in file_paths)
assert count_of_files == 0

# now remove dir
shutil.rmtree(path)

# should write
ds.write_parquet(path, filesystem=None, mode="ignore")
on_disk_table = pq.read_table(path)

assert in_memory_table.equals(on_disk_table)

ds = ray.data.read_parquet(
str(tmp_path), override_num_blocks=3, filter=(pa.dataset.field("two") == "a")
)

parquet_output_path = os.path.join(tmp_path, "parquet")
ds.write_parquet(parquet_output_path, num_rows_per_file=6)
def test_parquet_write_error_save_mode(ray_start_regular_shared, local_path):
data_path = local_path
path = os.path.join(data_path, "test_parquet_dir")
os.mkdir(path)
in_memory_table = pa.Table.from_pydict({"one": [1]})
ds = ray.data.from_arrow(in_memory_table)

with pytest.raises(ValueError):
ds.write_parquet(path, filesystem=None, mode="error")

# now remove dir
shutil.rmtree(path)

# should write
ds.write_parquet(path, filesystem=None, mode="error")
on_disk_table = pq.read_table(path)

assert in_memory_table.equals(on_disk_table)


def test_parquet_write_append_save_mode(ray_start_regular_shared, local_path):
data_path = local_path
path = os.path.join(data_path, "test_parquet_dir")
in_memory_table = pa.Table.from_pydict({"one": [1]})
ds = ray.data.from_arrow(in_memory_table)
ds.write_parquet(path, filesystem=None, mode="append")

# one file should be added
with os.scandir(path) as file_paths:
count_of_files = sum(1 for path in file_paths)
assert count_of_files == 1

appended_in_memory_table = pa.Table.from_pydict({"two": [2]})
ds = ray.data.from_arrow(appended_in_memory_table)
ds.write_parquet(path, filesystem=None, mode="append")

# another file should be added
with os.scandir(path) as file_paths:
count_of_files = sum(1 for path in file_paths)
assert count_of_files == 2


def test_parquet_write_overwrite_save_mode(ray_start_regular_shared, local_path):
data_path = local_path
path = os.path.join(data_path, "test_parquet_dir")
in_memory_table = pa.Table.from_pydict({"one": [1]})
ds = ray.data.from_arrow(in_memory_table)
ds.write_parquet(path, filesystem=None, mode="overwrite")

# one file should be added
with os.scandir(path) as file_paths:
count_of_files = sum(1 for path in file_paths)
assert count_of_files == 1

overwritten_in_memory_table = pa.Table.from_pydict({"two": [2]})
ds = ray.data.from_arrow(overwritten_in_memory_table)
ds.write_parquet(path, filesystem=None, mode="overwrite")

# another file should NOT be added
with os.scandir(path) as file_paths:
count_of_files = sum(1 for path in file_paths)
assert count_of_files == 1

on_disk_table = pq.read_table(path)
assert on_disk_table.equals(overwritten_in_memory_table)


def test_parquet_file_extensions(ray_start_regular_shared, tmp_path):
Expand Down