Skip to content

Commit 6dd3776

Browse files
authored
[data] Fix reading from zipped json (#58214)
## Description ### Status Quo This PR #54667 addressed issues of OOM by sampling a few lines of the file. However, this code always assumes the input file is seekable(ie, not compressed). This means zipped files are broken like this issue: #55356 ### Potential Workaround - Refractor reused code between JsonDatasource and FileDatasource - default to 10000 if zipped file found ## Related issues #55356 ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
1 parent 92d8471 commit 6dd3776

File tree

4 files changed

+98
-46
lines changed

4 files changed

+98
-46
lines changed

python/ray/data/_internal/datasource/json_datasource.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ class PandasJSONDatasource(FileBasedDatasource):
171171
# reads bigger blocks at once.
172172
_BUFFER_SIZE = 1024**2
173173

174+
# In the case of zipped json files, we cannot infer the chunk_size.
175+
_DEFAULT_CHUNK_SIZE = 10000
176+
174177
def __init__(
175178
self,
176179
paths: Union[str, List[str]],
@@ -200,6 +203,9 @@ def _estimate_chunksize(self, f: "pyarrow.NativeFile") -> Optional[int]:
200203
201204
This is necessary to avoid OOMs while reading the file.
202205
"""
206+
207+
if not f.seekable():
208+
return self._DEFAULT_CHUNK_SIZE
203209
assert f.tell() == 0, "File pointer must be at the beginning"
204210

205211
if self._target_output_size_bytes is None:
@@ -230,10 +236,14 @@ def _open_input_source(
230236
path: str,
231237
**open_args,
232238
) -> "pyarrow.NativeFile":
233-
# Use seekable file so we can reset the file after sampling the first row.
234-
file = filesystem.open_input_file(path, **open_args)
235-
assert file.seekable(), "File must be seekable"
236-
return file
239+
240+
compression = self.resolve_compression(path, open_args)
241+
242+
if compression is None:
243+
# We use a seekable file to estimate chunksize.
244+
return filesystem.open_input_file(path)
245+
246+
return super()._open_input_source(filesystem, path, **open_args)
237247

238248

239249
def _cast_range_index_to_string(df: pd.DataFrame):

python/ray/data/_internal/util.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import ray
3737
from ray._common.retry import call_with_retry
3838
from ray.data.context import DEFAULT_READ_OP_MIN_NUM_BLOCKS, WARN_PREFIX, DataContext
39+
from ray.util.annotations import DeveloperAPI
3940

4041
import psutil
4142

@@ -1712,3 +1713,21 @@ def merge_resources_to_ray_remote_args(
17121713
if memory is not None:
17131714
ray_remote_args["memory"] = memory
17141715
return ray_remote_args
1716+
1717+
1718+
@DeveloperAPI
1719+
def infer_compression(path: str) -> Optional[str]:
1720+
import pyarrow as pa
1721+
1722+
compression = None
1723+
try:
1724+
# Try to detect compression codec from path.
1725+
compression = pa.Codec.detect(path).name
1726+
except (ValueError, TypeError):
1727+
# Arrow's compression inference on the file path doesn't work for Snappy, so we double-check ourselves.
1728+
import pathlib
1729+
1730+
suffix = pathlib.Path(path).suffix
1731+
if suffix and suffix[1:] == "snappy":
1732+
compression = "snappy"
1733+
return compression

python/ray/data/datasource/file_based_datasource.py

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
RetryingPyFileSystem,
2323
_check_pyarrow_version,
2424
_is_local_scheme,
25+
infer_compression,
2526
iterate_with_retry,
2627
make_async_gen,
2728
)
@@ -321,6 +322,50 @@ def read_task_fn():
321322

322323
return read_tasks
323324

325+
def resolve_compression(
326+
self, path: str, open_args: Dict[str, Any]
327+
) -> Optional[str]:
328+
"""Resolves the compression format for a stream.
329+
330+
Args:
331+
path: The file path to resolve compression for.
332+
open_args: kwargs passed to
333+
`pyarrow.fs.FileSystem.open_input_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.open_input_stream>`_
334+
when opening input files to read.
335+
336+
Returns:
337+
The compression format (e.g., "gzip", "snappy", "bz2") or None if
338+
no compression is detected or specified.
339+
"""
340+
compression = open_args.get("compression", None)
341+
if compression is None:
342+
compression = infer_compression(path)
343+
return compression
344+
345+
def _resolve_buffer_size(self, open_args: Dict[str, Any]) -> Optional[int]:
346+
buffer_size = open_args.pop("buffer_size", None)
347+
if buffer_size is None:
348+
buffer_size = self._data_context.streaming_read_buffer_size
349+
return buffer_size
350+
351+
def _file_to_snappy_stream(
352+
self,
353+
file: "pyarrow.NativeFile",
354+
filesystem: "RetryingPyFileSystem",
355+
) -> "pyarrow.PythonFile":
356+
import pyarrow as pa
357+
import snappy
358+
from pyarrow.fs import HadoopFileSystem
359+
360+
stream = io.BytesIO()
361+
if isinstance(filesystem.unwrap(), HadoopFileSystem):
362+
snappy.hadoop_snappy.stream_decompress(src=file, dst=stream)
363+
else:
364+
snappy.stream_decompress(src=file, dst=stream)
365+
stream.seek(0)
366+
367+
return pa.PythonFile(stream, mode="r")
368+
324369
def _open_input_source(
325370
self,
326371
filesystem: "RetryingPyFileSystem",
@@ -336,53 +381,22 @@ def _open_input_source(
336381
Implementations that do not support streaming reads (e.g. that require random
337382
access) should override this method.
338383
"""
339-
import pyarrow as pa
340-
from pyarrow.fs import HadoopFileSystem
341384

342-
compression = open_args.get("compression", None)
343-
if compression is None:
344-
try:
345-
# If no compression manually given, try to detect
346-
# compression codec from path.
347-
compression = pa.Codec.detect(path).name
348-
except (ValueError, TypeError):
349-
# Arrow's compression inference on the file path
350-
# doesn't work for Snappy, so we double-check ourselves.
351-
import pathlib
352-
353-
suffix = pathlib.Path(path).suffix
354-
if suffix and suffix[1:] == "snappy":
355-
compression = "snappy"
356-
else:
357-
compression = None
358-
359-
buffer_size = open_args.pop("buffer_size", None)
360-
if buffer_size is None:
361-
buffer_size = self._data_context.streaming_read_buffer_size
385+
compression = self.resolve_compression(path, open_args)
386+
buffer_size = self._resolve_buffer_size(open_args)
362387

363388
if compression == "snappy":
364389
# Arrow doesn't support streaming Snappy decompression since the canonical
365390
# C++ Snappy library doesn't natively support streaming decompression. We
366391
# works around this by manually decompressing the file with python-snappy.
367392
open_args["compression"] = None
368-
else:
369-
open_args["compression"] = compression
370-
371-
file = filesystem.open_input_stream(path, buffer_size=buffer_size, **open_args)
372-
373-
if compression == "snappy":
374-
import snappy
375-
376-
stream = io.BytesIO()
377-
if isinstance(filesystem.unwrap(), HadoopFileSystem):
378-
snappy.hadoop_snappy.stream_decompress(src=file, dst=stream)
379-
else:
380-
snappy.stream_decompress(src=file, dst=stream)
381-
stream.seek(0)
382-
383-
file = pa.PythonFile(stream, mode="r")
393+
file = filesystem.open_input_stream(
394+
path, buffer_size=buffer_size, **open_args
395+
)
396+
return self._file_to_snappy_stream(file, filesystem)
384397

385-
return file
398+
open_args["compression"] = compression
399+
return filesystem.open_input_stream(path, buffer_size=buffer_size, **open_args)
386400

387401
def _rows_per_file(self):
388402
"""Returns the number of rows per file, or None if unknown."""

python/ray/data/tests/test_json.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -528,13 +528,22 @@ class TestPandasJSONDatasource:
528528
[{"a": []}, {"a": [1]}, {"a": [1, 2, 3]}],
529529
ids=["empty", "single", "multiple"],
530530
)
531+
@pytest.mark.parametrize(
532+
"compression,filename",
533+
[("gzip", "test.json.gz"), ("infer", "test.json")], # infer = default
534+
)
531535
def test_read_stream(
532-
self, data, tmp_path, target_max_block_size_infinite_or_default
536+
self,
537+
data,
538+
tmp_path,
539+
compression,
540+
filename,
541+
target_max_block_size_infinite_or_default,
533542
):
534543
# Setup test file.
535544
df = pd.DataFrame(data)
536-
path = os.path.join(tmp_path, "test.json")
537-
df.to_json(path, orient="records", lines=True)
545+
path = os.path.join(tmp_path, filename)
546+
df.to_json(path, orient="records", lines=True, compression=compression)
538547

539548
# Setup datasource.
540549
local_filesystem = fs.LocalFileSystem()

0 commit comments

Comments
 (0)