Skip to content

fix: adapt parquet datasource to ray 2.20 #2809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@ def _deserialize_fragments(
return [p.deserialize() for p in serialized_fragments]


class _ParquetFileFragmentMetaData:
"""Class to store metadata of a Parquet file fragment.

This includes all attributes from `pyarrow.parquet.FileMetaData` except for `schema`,
which is stored in `self.schema_pickled` as a pickled object from
`cloudpickle.loads()`, used in deduplicating schemas across multiple fragments.
"""

def __init__(self, fragment_metadata: "pyarrow.parquet.FileMetaData"):
self.created_by = fragment_metadata.created_by
self.format_version = fragment_metadata.format_version
self.num_columns = fragment_metadata.num_columns
self.num_row_groups = fragment_metadata.num_row_groups
self.num_rows = fragment_metadata.num_rows
self.serialized_size = fragment_metadata.serialized_size
# This is a pickled schema object, to be set later with
# `self.set_schema_pickled()`. To get the underlying schema, use
# `cloudpickle.loads(self.schema_pickled)`.
self.schema_pickled: bytes | None = None

# Calculate the total byte size of the file fragment using the original
# object, as it is not possible to access row groups from this class.
self.total_byte_size = 0
for row_group_idx in range(fragment_metadata.num_row_groups):
row_group_metadata = fragment_metadata.row_group(row_group_idx)
self.total_byte_size += row_group_metadata.total_byte_size

def set_schema_pickled(self, schema_pickled: bytes) -> None:
"""Note: to get the underlying schema, use `cloudpickle.loads(self.schema_pickled)`."""
self.schema_pickled = schema_pickled


# This retry helps when the upstream datasource is not able to handle
# overloaded read request or failed with some retriable failures.
# For example when reading data from HA hdfs service, hdfs might
Expand Down Expand Up @@ -267,7 +299,8 @@ def __init__( # noqa: PLR0912,PLR0915
prefetch_remote_args = {}
if self._local_scheduling:
prefetch_remote_args["scheduling_strategy"] = self._local_scheduling
self._metadata = meta_provider.prefetch_file_metadata(pq_ds.fragments, **prefetch_remote_args) or []
raw_metadata = meta_provider.prefetch_file_metadata(pq_ds.fragments, **prefetch_remote_args) or []
self._metadata = self._dedupe_metadata(raw_metadata)
except OSError as e:
_handle_read_os_error(e, paths)
except pa.ArrowInvalid as ex:
Expand All @@ -293,16 +326,45 @@ def __init__( # noqa: PLR0912,PLR0915
if shuffle == "files":
self._file_metadata_shuffler = np.random.default_rng()

def _dedupe_metadata(
self,
raw_metadatas: list["pyarrow.parquet.FileMetaData"],
) -> list[_ParquetFileFragmentMetaData]:
"""Deduplicate schemas to reduce memory usage.

For datasets with a large number of columns, the FileMetaData
(in particular the schema) can be very large. We can reduce the
memory usage by only keeping unique schema objects across all
file fragments. This method deduplicates the schemas and returns
a list of `_ParquetFileFragmentMetaData` objects.
"""
schema_to_id: dict[int, Any] = {} # schema_id -> serialized_schema
id_to_schema: dict[Any, bytes] = {} # serialized_schema -> schema_id
stripped_metadatas = []
for fragment_metadata in raw_metadatas:
stripped_md = _ParquetFileFragmentMetaData(fragment_metadata)

schema_ser = cloudpickle.dumps(fragment_metadata.schema.to_arrow_schema()) # type: ignore[no-untyped-call]
if schema_ser not in schema_to_id:
schema_id: int | None = len(schema_to_id)
schema_to_id[schema_ser] = schema_id
id_to_schema[schema_id] = schema_ser
stripped_md.set_schema_pickled(schema_ser)
else:
schema_id = schema_to_id.get(schema_ser)
existing_schema_ser = id_to_schema[schema_id]
stripped_md.set_schema_pickled(existing_schema_ser)
stripped_metadatas.append(stripped_md)
return stripped_metadatas

def estimate_inmemory_data_size(self) -> int | None:
"""Return an estimate of the Parquet files encoding ratio.

To avoid OOMs, it is safer to return an over-estimate than an underestimate.
"""
total_size: int = 0
for file_metadata in self._metadata:
for row_group_idx in range(file_metadata.num_row_groups):
row_group_metadata = file_metadata.row_group(row_group_idx)
total_size += row_group_metadata.total_byte_size
total_size += file_metadata.total_byte_size
return total_size * self._encoding_ratio # type: ignore[return-value]

def get_read_tasks(self, parallelism: int) -> list[ReadTask]:
Expand All @@ -315,7 +377,7 @@ def get_read_tasks(self, parallelism: int) -> list[ReadTask]:
if len(pq_metadata) < len(self._pq_fragments):
# Pad `pq_metadata` to be same length of `self._pq_fragments`.
# This can happen when no file metadata being prefetched.
pq_metadata += [None] * (len(self._pq_fragments) - len(pq_metadata))
pq_metadata += [None] * (len(self._pq_fragments) - len(pq_metadata)) # type: ignore[list-item]

if self._file_metadata_shuffler is not None:
files_metadata = list(zip(self._pq_fragments, self._pq_paths, pq_metadata))
Expand All @@ -330,10 +392,10 @@ def get_read_tasks(self, parallelism: int) -> list[ReadTask]:
)

read_tasks = []
for fragments, paths, metadata in zip(
for fragments, paths, metadata in zip( # type: ignore[var-annotated]
np.array_split(pq_fragments, parallelism),
np.array_split(pq_paths, parallelism),
np.array_split(pq_metadata, parallelism),
np.array_split(pq_metadata, parallelism), # type: ignore[arg-type]
):
if len(fragments) <= 0:
continue
Expand Down
Loading
Loading