Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions python/ray/data/_internal/datasource/json_datasource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import io
import logging
from io import BytesIO
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

import pandas as pd
Expand Down Expand Up @@ -82,7 +82,7 @@ def _read_with_pyarrow_read_json(self, buffer: "pyarrow.lib.Buffer"):
while True:
try:
yield pajson.read_json(
BytesIO(buffer),
io.BytesIO(buffer),
read_options=self.read_options,
**self.arrow_json_args,
)
Expand Down Expand Up @@ -124,7 +124,7 @@ def _read_with_python_json(self, buffer: "pyarrow.lib.Buffer"):
if buffer.size == 0:
return

parsed_json = json.load(BytesIO(buffer))
parsed_json = json.load(io.BytesIO(buffer))
try:
yield pa.Table.from_pylist(parsed_json)
except AttributeError as e:
Expand Down Expand Up @@ -159,6 +159,15 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str):


class PandasJSONDatasource(FileBasedDatasource):

# Buffer size in bytes for reading files. Default is 1MB.
#
# pandas reads data in small chunks (~8 KiB), which leads to many costly
# small read requests when accessing cloud storage. To reduce overhead and
# improve performance, we wrap the file in a larger buffered reader that
# reads bigger blocks at once.
_BUFFER_SIZE = 1024**2

def __init__(
self,
paths: Union[str, List[str]],
Expand All @@ -171,22 +180,31 @@ def __init__(

def _read_stream(self, f: "pyarrow.NativeFile", path: str):
chunksize = self._estimate_chunksize(f)
with pd.read_json(f, chunksize=chunksize, lines=True) as reader:
stream = StrictBufferedReader(f, buffer_size=self._BUFFER_SIZE)
with pd.read_json(stream, chunksize=chunksize, lines=True) as reader:
for df in reader:
yield _cast_range_index_to_string(df)

def _estimate_chunksize(self, f: "pyarrow.NativeFile") -> int:
"""Estimate the chunksize by sampling the first row.

This is necessary to avoid OOMs while reading the file.
"""
assert f.tell() == 0, "File pointer must be at the beginning"

with pd.read_json(f, chunksize=1, lines=True) as reader:
df = _cast_range_index_to_string(next(reader))
stream = StrictBufferedReader(f, buffer_size=self._BUFFER_SIZE)
with pd.read_json(stream, chunksize=1, lines=True) as reader:
try:
df = _cast_range_index_to_string(next(reader))
except StopIteration:
return 1

block_accessor = PandasBlockAccessor.for_block(df)
if block_accessor.num_rows() == 0:
return 1

bytes_per_row = block_accessor.size_bytes() / block_accessor.num_rows()
chunksize = max(round(self._target_output_size_bytes / bytes_per_row), 1)
chunksize = 1
else:
bytes_per_row = block_accessor.size_bytes() / block_accessor.num_rows()
chunksize = max(round(self._target_output_size_bytes / bytes_per_row), 1)

# Reset file pointer to the beginning.
f.seek(0)
Expand All @@ -199,7 +217,7 @@ def _open_input_source(
path: str,
**open_args,
) -> "pyarrow.NativeFile":
# Use seekable file to ensure we can correctly sample the first row.
# Use seekable file so we can reset the file after sampling the first row.
file = filesystem.open_input_file(path, **open_args)
assert file.seekable(), "File must be seekable"
return file
Expand All @@ -211,3 +229,34 @@ def _cast_range_index_to_string(df: pd.DataFrame):
if isinstance(df.columns, pd.RangeIndex):
df.columns = df.columns.astype(str)
return df


class StrictBufferedReader(io.RawIOBase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, why call it Strict?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I observed that the built-in io.BufferedReader implementation doesn't do a good job of actually buffering the data. I think it's because pandas calls BufferedReader.read1, and read1 doesn't prefill the 1 MiB buffer.

The name "Strict" is used to denote that it always performs the buffering.

"""Wrapper that prevents premature file closure and ensures full-buffered reads.

This is necessary for two reasons:
1. The datasource reads the file twice -- first to sample and determine the chunk size,
and again to load the actual data. Since pandas assumes ownership of the file and
may close it, we prevent that by explicitly detaching the underlying file before
closing the buffer.

2. pandas wraps the file in a TextIOWrapper to decode bytes into text. TextIOWrapper
prefers calling read1(), which doesn't prefetch for random-access files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is read1 supposed to be read()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, TextIOWrapper calls read1 I think

(e.g., from PyArrow). This wrapper forces all reads through the full buffer to
avoid inefficient small-range S3 GETs.
"""

def __init__(self, file: io.RawIOBase, buffer_size: int):
self._file = io.BufferedReader(file, buffer_size=buffer_size)

def read(self, size=-1, /):
return self._file.read(size)

def readable(self) -> bool:
return True

def close(self):
if not self.closed:
self._file.detach()
self._file.close()
super().close()