diff --git a/python/ray/data/_internal/datasource/parquet_datasink.py b/python/ray/data/_internal/datasource/parquet_datasink.py index e1512e012fc6..593f1faddd43 100644 --- a/python/ray/data/_internal/datasource/parquet_datasink.py +++ b/python/ray/data/_internal/datasource/parquet_datasink.py @@ -4,6 +4,7 @@ from ray.data._internal.arrow_ops.transform_pyarrow import concat from ray.data._internal.execution.interfaces import TaskContext +from ray.data._internal.savemode import SaveMode from ray.data._internal.util import call_with_retry from ray.data.block import Block, BlockAccessor from ray.data.datasource.file_based_datasource import _resolve_kwargs @@ -33,6 +34,7 @@ def __init__( open_stream_args: Optional[Dict[str, Any]] = None, filename_provider: Optional[FilenameProvider] = None, dataset_uuid: Optional[str] = None, + mode: SaveMode = SaveMode.APPEND, ): if arrow_parquet_args_fn is None: arrow_parquet_args_fn = lambda: {} # noqa: E731 @@ -53,6 +55,7 @@ def __init__( filename_provider=filename_provider, dataset_uuid=dataset_uuid, file_format="parquet", + mode=mode, ) def write( diff --git a/python/ray/data/_internal/savemode.py b/python/ray/data/_internal/savemode.py new file mode 100644 index 000000000000..e4f9703be09b --- /dev/null +++ b/python/ray/data/_internal/savemode.py @@ -0,0 +1,11 @@ +from enum import Enum + +from ray.util.annotations import PublicAPI + + +@PublicAPI(stablity="alpha") +class SaveMode(str, Enum): + APPEND = "append" + OVERWRITE = "overwrite" + IGNORE = "ignore" + ERROR = "error" diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 4127a045e6d8..c1d5c4720c4b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -108,7 +108,8 @@ _apply_batch_format, ) from ray.data.context import DataContext -from ray.data.datasource import Connection, Datasink, FilenameProvider +from ray.data.datasource import Connection, Datasink, FilenameProvider, SaveMode +from ray.data.datasource.file_datasink import _FileDatasink from ray.data.iterator import DataIterator from ray.data.random_access_dataset import RandomAccessDataset from ray.types import ObjectRef @@ -3316,6 +3317,7 @@ def write_parquet( ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, num_rows_per_file: Optional[int] = None, + mode: SaveMode = SaveMode.APPEND, **arrow_parquet_args, ) -> None: """Writes the :class:`~ray.data.Dataset` to parquet files under the provided ``path``. @@ -3393,6 +3395,10 @@ def write_parquet( /arrow.apache.org/docs/python/generated/\ pyarrow.parquet.ParquetWriter.html>`_, which is used to write out each block to a file. See `arrow_parquet_args_fn` for more detail. + mode: Determines how to handle existing files. Valid modes are "overwrite", "error", + "ignore", "append". Defaults to "append". + NOTE: This method isn't atomic. "Overwrite" first deletes all the data + before writing to `path`. """ # noqa: E501 if arrow_parquet_args_fn is None: arrow_parquet_args_fn = lambda: {} # noqa: E731 @@ -3418,6 +3424,7 @@ def write_parquet( open_stream_args=arrow_open_stream_args, filename_provider=filename_provider, dataset_uuid=self._uuid, + mode=mode, ) self.write_datasink( datasink, @@ -3440,6 +3447,7 @@ def write_json( ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, num_rows_per_file: Optional[int] = None, + mode: SaveMode = SaveMode.APPEND, **pandas_json_args, ) -> None: """Writes the :class:`~ray.data.Dataset` to JSON and JSONL files. @@ -3520,6 +3528,10 @@ def write_json( which is used under the hood to write out each :class:`~ray.data.Dataset` block. These are dict(orient="records", lines=True) by default. + mode: Determines how to handle existing files. Valid modes are "overwrite", "error", + "ignore", "append". Defaults to "append". + NOTE: This method isn't atomic. "Overwrite" first deletes all the data + before writing to `path`. """ if pandas_json_args_fn is None: pandas_json_args_fn = lambda: {} # noqa: E731 @@ -3538,6 +3550,7 @@ def write_json( open_stream_args=arrow_open_stream_args, filename_provider=filename_provider, dataset_uuid=self._uuid, + mode=mode, ) self.write_datasink( datasink, @@ -3614,6 +3627,7 @@ def write_images( filename_provider: Optional[FilenameProvider] = None, ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, + mode: SaveMode = SaveMode.APPEND, ) -> None: """Writes the :class:`~ray.data.Dataset` to images. @@ -3655,6 +3669,10 @@ def write_images( to control number of tasks to run concurrently. This doesn't change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources. + mode: Determines how to handle existing files. Valid modes are "overwrite", "error", + "ignore", "append". Defaults to "append". + NOTE: This method isn't atomic. "Overwrite" first deletes all the data + before writing to `path`. """ # noqa: E501 datasink = ImageDatasink( path, @@ -3665,6 +3683,7 @@ def write_images( open_stream_args=arrow_open_stream_args, filename_provider=filename_provider, dataset_uuid=self._uuid, + mode=mode, ) self.write_datasink( datasink, @@ -3687,6 +3706,7 @@ def write_csv( ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, num_rows_per_file: Optional[int] = None, + mode: SaveMode = SaveMode.APPEND, **arrow_csv_args, ) -> None: """Writes the :class:`~ray.data.Dataset` to CSV files. @@ -3765,6 +3785,10 @@ def write_csv( arrow.apache.org/docs/python/generated/pyarrow.csv.write_csv.html\ #pyarrow.csv.write_csv>`_ when writing each block to a file. + mode: Determines how to handle existing files. Valid modes are "overwrite", "error", + "ignore", "append". Defaults to "append". + NOTE: This method isn't atomic. "Overwrite" first deletes all the data + before writing to `path`. """ if arrow_csv_args_fn is None: arrow_csv_args_fn = lambda: {} # noqa: E731 @@ -3783,6 +3807,7 @@ def write_csv( open_stream_args=arrow_open_stream_args, filename_provider=filename_provider, dataset_uuid=self._uuid, + mode=mode, ) self.write_datasink( datasink, @@ -3805,6 +3830,7 @@ def write_tfrecords( ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, num_rows_per_file: Optional[int] = None, + mode: SaveMode = SaveMode.APPEND, ) -> None: """Write the :class:`~ray.data.Dataset` to TFRecord files. @@ -3872,6 +3898,10 @@ def write_tfrecords( total number of tasks run. By default, concurrency is dynamically decided based on the available resources. num_rows_per_file: [Deprecated] Use min_rows_per_file instead. + mode: Determines how to handle existing files. Valid modes are "overwrite", "error", + "ignore", "append". Defaults to "append". + NOTE: This method isn't atomic. "Overwrite" first deletes all the data + before writing to `path`. """ effective_min_rows = _validate_rows_per_file_args( num_rows_per_file=num_rows_per_file, min_rows_per_file=min_rows_per_file @@ -3908,6 +3938,7 @@ def write_webdataset( encoder: Optional[Union[bool, str, callable, list]] = True, concurrency: Optional[int] = None, num_rows_per_file: Optional[int] = None, + mode: SaveMode = SaveMode.APPEND, ) -> None: """Writes the dataset to `WebDataset `_ files. @@ -3964,6 +3995,10 @@ def write_webdataset( total number of tasks run. By default, concurrency is dynamically decided based on the available resources. num_rows_per_file: [Deprecated] Use min_rows_per_file instead. + mode: Determines how to handle existing files. Valid modes are "overwrite", "error", + "ignore", "append". Defaults to "append". + NOTE: This method isn't atomic. "Overwrite" first deletes all the data + before writing to `path`. """ effective_min_rows = _validate_rows_per_file_args( num_rows_per_file=num_rows_per_file, min_rows_per_file=min_rows_per_file @@ -4000,6 +4035,7 @@ def write_numpy( ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, num_rows_per_file: Optional[int] = None, + mode: SaveMode = SaveMode.APPEND, ) -> None: """Writes a column of the :class:`~ray.data.Dataset` to .npy files. @@ -4059,6 +4095,10 @@ def write_numpy( total number of tasks run. By default, concurrency is dynamically decided based on the available resources. num_rows_per_file: [Deprecated] Use min_rows_per_file instead. + mode: Determines how to handle existing files. Valid modes are "overwrite", "error", + "ignore", "append". Defaults to "append". + NOTE: This method isn't atomic. "Overwrite" first deletes all the data + before writing to `path`. """ effective_min_rows = _validate_rows_per_file_args( num_rows_per_file=num_rows_per_file, min_rows_per_file=min_rows_per_file @@ -4528,6 +4568,12 @@ def write_datasink( try: datasink.on_write_start() + if isinstance(datasink, _FileDatasink): + if not datasink.has_created_dir and datasink.mode == SaveMode.IGNORE: + logger.info( + f"Ignoring write because {datasink.path} already exists" + ) + return self._write_ds = Dataset(plan, logical_plan).materialize() # TODO: Get and handle the blocks with an iterator instead of getting diff --git a/python/ray/data/datasource/__init__.py b/python/ray/data/datasource/__init__.py index 9c825b2026af..ef2eca5977ed 100644 --- a/python/ray/data/datasource/__init__.py +++ b/python/ray/data/datasource/__init__.py @@ -1,4 +1,5 @@ from ray.data._internal.datasource.sql_datasource import Connection +from ray.data._internal.savemode import SaveMode from ray.data.datasource.datasink import ( Datasink, DummyOutputDatasink, @@ -64,4 +65,5 @@ "_S3FileSystemWrapper", "WriteResult", "WriteReturnType", + "SaveMode", ] diff --git a/python/ray/data/datasource/file_datasink.py b/python/ray/data/datasource/file_datasink.py index 10d9549c52ec..0e9eada1ced3 100644 --- a/python/ray/data/datasource/file_datasink.py +++ b/python/ray/data/datasource/file_datasink.py @@ -6,6 +6,7 @@ from ray._private.arrow_utils import add_creatable_buckets_param_if_s3_uri from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.execution.interfaces import TaskContext +from ray.data._internal.savemode import SaveMode from ray.data._internal.util import ( RetryingPyFileSystem, _is_local_scheme, @@ -38,6 +39,7 @@ def __init__( filename_provider: Optional[FilenameProvider] = None, dataset_uuid: Optional[str] = None, file_format: Optional[str] = None, + mode: SaveMode = SaveMode.APPEND, ): """Initialize this datasink. @@ -76,13 +78,29 @@ def __init__( self.filename_provider = filename_provider self.dataset_uuid = dataset_uuid self.file_format = file_format - + self.mode = mode self.has_created_dir = False def open_output_stream(self, path: str) -> "pyarrow.NativeFile": return self.filesystem.open_output_stream(path, **self.open_stream_args) def on_write_start(self) -> None: + from pyarrow.fs import FileType + + dir_exists = ( + self.filesystem.get_file_info(self.path).type is not FileType.NotFound + ) + if dir_exists: + if self.mode == SaveMode.ERROR: + raise ValueError( + f"Path {self.path} already exists. If this is unexpected, use mode='ignore' to ignore those files" + ) + if self.mode == SaveMode.IGNORE: + logger.warning(f"[SaveMode={self.mode}] Skipping {self.path}") + return + if self.mode == SaveMode.OVERWRITE: + logger.warning(f"[SaveMode={self.mode}] Replacing contents {self.path}") + self.filesystem.delete_dir_contents(self.path) self.has_created_dir = self._create_dir(self.path) def _create_dir(self, dest) -> bool: diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 58a27831811f..74f55e7fe600 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -909,24 +909,94 @@ def test_parquet_write(ray_start_regular_shared, fs, data_path, endpoint_url): fs.delete_dir(_unwrap_protocol(path)) -def test_parquet_write_multiple_blocks(ray_start_regular_shared, tmp_path): - df = pd.DataFrame( - {"one": [1, 1, 1, 3, 3, 3], "two": ["a", "a", "b", "b", "c", "c"]} - ) - table = pa.Table.from_pandas(df) - pq.write_to_dataset( - table, - root_path=str(tmp_path), - partition_cols=["one"], - ) - # 2 partitions, 1 empty partition, 3 block/read tasks, 1 empty block +def test_parquet_write_ignore_save_mode(ray_start_regular_shared, local_path): + data_path = local_path + path = os.path.join(data_path, "test_parquet_dir") + os.mkdir(path) + in_memory_table = pa.Table.from_pydict({"one": [1]}) + ds = ray.data.from_arrow(in_memory_table) + ds.write_parquet(path, filesystem=None, mode="ignore") + + # directory was created, should ignore + with os.scandir(path) as file_paths: + count_of_files = sum(1 for path in file_paths) + assert count_of_files == 0 + + # now remove dir + shutil.rmtree(path) + + # should write + ds.write_parquet(path, filesystem=None, mode="ignore") + on_disk_table = pq.read_table(path) + + assert in_memory_table.equals(on_disk_table) - ds = ray.data.read_parquet( - str(tmp_path), override_num_blocks=3, filter=(pa.dataset.field("two") == "a") - ) - parquet_output_path = os.path.join(tmp_path, "parquet") - ds.write_parquet(parquet_output_path, num_rows_per_file=6) +def test_parquet_write_error_save_mode(ray_start_regular_shared, local_path): + data_path = local_path + path = os.path.join(data_path, "test_parquet_dir") + os.mkdir(path) + in_memory_table = pa.Table.from_pydict({"one": [1]}) + ds = ray.data.from_arrow(in_memory_table) + + with pytest.raises(ValueError): + ds.write_parquet(path, filesystem=None, mode="error") + + # now remove dir + shutil.rmtree(path) + + # should write + ds.write_parquet(path, filesystem=None, mode="error") + on_disk_table = pq.read_table(path) + + assert in_memory_table.equals(on_disk_table) + + +def test_parquet_write_append_save_mode(ray_start_regular_shared, local_path): + data_path = local_path + path = os.path.join(data_path, "test_parquet_dir") + in_memory_table = pa.Table.from_pydict({"one": [1]}) + ds = ray.data.from_arrow(in_memory_table) + ds.write_parquet(path, filesystem=None, mode="append") + + # one file should be added + with os.scandir(path) as file_paths: + count_of_files = sum(1 for path in file_paths) + assert count_of_files == 1 + + appended_in_memory_table = pa.Table.from_pydict({"two": [2]}) + ds = ray.data.from_arrow(appended_in_memory_table) + ds.write_parquet(path, filesystem=None, mode="append") + + # another file should be added + with os.scandir(path) as file_paths: + count_of_files = sum(1 for path in file_paths) + assert count_of_files == 2 + + +def test_parquet_write_overwrite_save_mode(ray_start_regular_shared, local_path): + data_path = local_path + path = os.path.join(data_path, "test_parquet_dir") + in_memory_table = pa.Table.from_pydict({"one": [1]}) + ds = ray.data.from_arrow(in_memory_table) + ds.write_parquet(path, filesystem=None, mode="overwrite") + + # one file should be added + with os.scandir(path) as file_paths: + count_of_files = sum(1 for path in file_paths) + assert count_of_files == 1 + + overwritten_in_memory_table = pa.Table.from_pydict({"two": [2]}) + ds = ray.data.from_arrow(overwritten_in_memory_table) + ds.write_parquet(path, filesystem=None, mode="overwrite") + + # another file should NOT be added + with os.scandir(path) as file_paths: + count_of_files = sum(1 for path in file_paths) + assert count_of_files == 1 + + on_disk_table = pq.read_table(path) + assert on_disk_table.equals(overwritten_in_memory_table) def test_parquet_file_extensions(ray_start_regular_shared, tmp_path):