Skip to content

Commit

Permalink
Python: Add gzip metadata support (#7984)
Browse files Browse the repository at this point in the history
* Python: Add gzip metadata support

Resolves #7977

* Remove `get` verb
  • Loading branch information
Fokko authored Jul 5, 2023
1 parent f3826bd commit c6b64e6
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 11 deletions.
70 changes: 65 additions & 5 deletions python/pyiceberg/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,83 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import codecs
import gzip
import json
from abc import ABC, abstractmethod
from typing import Callable

from pyiceberg.io import InputFile, InputStream, OutputFile
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil

GZIP = "gzip"


class Compressor(ABC):
@staticmethod
def get_compressor(location: str) -> Compressor:
return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR

@abstractmethod
def stream_decompressor(self, inp: InputStream) -> InputStream:
"""Returns a stream decompressor.
Args:
inp: The input stream that needs decompressing.
Returns:
The wrapped stream
"""

@abstractmethod
def bytes_compressor(self) -> Callable[[bytes], bytes]:
"""Returns a function to compress bytes.
Returns:
A function that can be used to compress bytes.
"""


class NoopCompressor(Compressor):
def stream_decompressor(self, inp: InputStream) -> InputStream:
return inp

def bytes_compressor(self) -> Callable[[bytes], bytes]:
return lambda b: b


NOOP_COMPRESSOR = NoopCompressor()


class GzipCompressor(Compressor):
def stream_decompressor(self, inp: InputStream) -> InputStream:
return gzip.open(inp)

def bytes_compressor(self) -> Callable[[bytes], bytes]:
return gzip.compress


class FromByteStream:
"""A collection of methods that deserialize dictionaries into Iceberg objects."""

@staticmethod
def table_metadata(byte_stream: InputStream, encoding: str = "utf-8") -> TableMetadata:
def table_metadata(
byte_stream: InputStream, encoding: str = "utf-8", compression: Compressor = NOOP_COMPRESSOR
) -> TableMetadata:
"""Instantiate a TableMetadata object from a byte stream.
Args:
byte_stream: A file-like byte stream object.
encoding (default "utf-8"): The byte encoder to use for the reader.
compression: Optional compression method
"""
reader = codecs.getreader(encoding)
metadata = json.load(reader(byte_stream))
with compression.stream_decompressor(byte_stream) as byte_stream:
reader = codecs.getreader(encoding)
json_bytes = reader(byte_stream)
metadata = json.load(json_bytes)

return TableMetadataUtil.parse_obj(metadata)


Expand All @@ -54,7 +110,9 @@ def table_metadata(input_file: InputFile, encoding: str = "utf-8") -> TableMetad
"""
with input_file.open() as input_stream:
return FromByteStream.table_metadata(byte_stream=input_stream, encoding=encoding)
return FromByteStream.table_metadata(
byte_stream=input_stream, encoding=encoding, compression=Compressor.get_compressor(location=input_file.location)
)


class ToOutputFile:
Expand All @@ -69,4 +127,6 @@ def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite:
overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
"""
with output_file.create(overwrite=overwrite) as output_stream:
output_stream.write(metadata.json().encode("utf-8"))
json_bytes = metadata.json().encode("utf-8")
json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
output_stream.write(json_bytes)
8 changes: 8 additions & 0 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str:
return metadata_location


@pytest.fixture(scope="session")
def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.gz.metadata.json")
metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2)
ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
return metadata_location


manifest_entry_records = [
{
"status": 1,
Expand Down
14 changes: 8 additions & 6 deletions python/tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ def table(example_table_metadata_v2: Dict[str, Any]) -> Table:
)


@pytest.fixture
def static_table(metadata_location: str) -> StaticTable:
return StaticTable.from_metadata(metadata_location)


def test_schema(table: Table) -> None:
assert table.schema() == Schema(
NestedField(field_id=1, name="x", field_type=LongType(), required=True),
Expand Down Expand Up @@ -260,7 +255,14 @@ def test_table_scan_projection_unknown_column(table: Table) -> None:
assert "Could not find column: 'a'" in str(exc_info.value)


def test_static_table_same_as_table(table: Table, static_table: StaticTable) -> None:
def test_static_table_same_as_table(table: Table, metadata_location: str) -> None:
static_table = StaticTable.from_metadata(metadata_location)
assert isinstance(static_table, Table)
assert static_table.metadata == table.metadata


def test_static_table_gz_same_as_table(table: Table, metadata_location_gz: str) -> None:
static_table = StaticTable.from_metadata(metadata_location_gz)
assert isinstance(static_table, Table)
assert static_table.metadata == table.metadata

Expand Down

0 comments on commit c6b64e6

Please sign in to comment.