Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested schemas #45

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,11 @@ specified columns.
Todos
=====

- Support the deprecated bitpacking
- Fix handling of repetition-levels and definition-levels
- Tests for nested schemas, null data
- Support for arrays and maps.
- Full support of repetition-levels and definition-levels. Some simple
cases work, but most complex cases do not.
- Support reading of data from HDFS via snakebite and/or webhdfs.
- Implement writing
- performance evaluation and optimization (i.e. how does it compare to
the c++, java implementations)

Contributing
============
Expand Down
127 changes: 83 additions & 44 deletions parquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ def _read_data(file_obj, fo_encoding, value_count, bit_width):
vals += values
seen += len(values)
elif fo_encoding == parquet_thrift.Encoding.BIT_PACKED:
raise NotImplementedError("Bit packing not yet supported")
byte_count = 0 if bit_width == 0 else (value_count * bit_width - 1) // 8 + 1
vals = encoding.read_bitpacked_deprecated(
file_obj, byte_count, value_count, bit_width, logger.isEnabledFor(logging.DEBUG))

return vals

Expand All @@ -297,11 +299,24 @@ def read_data_page(file_obj, schema_helper, page_header, column_metadata,
_get_name(parquet_thrift.Encoding, daph.repetition_level_encoding))
logger.debug(" encoding: %s", _get_name(parquet_thrift.Encoding, daph.encoding))

# repetition levels are skipped if data is at the first level.
repetition_levels = None # pylint: disable=unused-variable
if len(column_metadata.path_in_schema) > 1:
max_repetition_level = schema_helper.max_repetition_level(
column_metadata.path_in_schema)
bit_width = encoding.width_from_max_int(max_repetition_level)
repetition_levels = _read_data(io_obj,
daph.repetition_level_encoding,
daph.num_values,
bit_width)
if debug_logging:
logger.debug(" Repetition levels: %s", len(repetition_levels))

# definition levels are skipped if data is required.
definition_levels = None
num_nulls = 0
max_definition_level = -1
if not schema_helper.is_required(column_metadata.path_in_schema[-1]):
if not schema_helper.is_required(column_metadata.path_in_schema):
max_definition_level = schema_helper.max_definition_level(
column_metadata.path_in_schema)
bit_width = encoding.width_from_max_int(max_definition_level)
Expand All @@ -319,23 +334,12 @@ def read_data_page(file_obj, schema_helper, page_header, column_metadata,
# any thing that isn't at max definition level is a null.
num_nulls = len(definition_levels) - definition_levels.count(max_definition_level)
if debug_logging:
logger.debug(" Definition levels: %s", len(definition_levels))

# repetition levels are skipped if data is at the first level.
repetition_levels = None # pylint: disable=unused-variable
if len(column_metadata.path_in_schema) > 1:
max_repetition_level = schema_helper.max_repetition_level(
column_metadata.path_in_schema)
bit_width = encoding.width_from_max_int(max_repetition_level)
repetition_levels = _read_data(io_obj,
daph.repetition_level_encoding,
daph.num_values,
bit_width)
logger.debug(" Definition levels: %d, nulls: %d", len(definition_levels), num_nulls)

# NOTE: The repetition levels aren't yet used.
if daph.encoding == parquet_thrift.Encoding.PLAIN:
read_values = encoding.read_plain(io_obj, column_metadata.type, daph.num_values - num_nulls)
schema_element = schema_helper.schema_element(column_metadata.path_in_schema[-1])
schema_element = schema_helper.schema_element(column_metadata.path_in_schema)
read_values = convert_column(read_values, schema_element) \
if schema_element.converted_type is not None else read_values
if definition_levels:
Expand All @@ -344,7 +348,8 @@ def read_data_page(file_obj, schema_helper, page_header, column_metadata,
else:
vals.extend(read_values)
if debug_logging:
logger.debug(" Values: %s, nulls: %s", len(vals), num_nulls)
logger.debug(" Read %s values using PLAIN encoding and definition levels show %s nulls",
len(vals), num_nulls)

elif daph.encoding == parquet_thrift.Encoding.PLAIN_DICTIONARY:
# bit_width is stored as single byte.
Expand Down Expand Up @@ -396,32 +401,11 @@ def _read_dictionary_page(file_obj, schema_helper, page_header, column_metadata)
page_header.dictionary_page_header.num_values
)
# convert the values once, if the dictionary is associated with a converted_type.
schema_element = schema_helper.schema_element(column_metadata.path_in_schema[-1])
schema_element = schema_helper.schema_element(column_metadata.path_in_schema)
return convert_column(values, schema_element) if schema_element.converted_type is not None else values


def DictReader(file_obj, columns=None): # pylint: disable=invalid-name
"""
Reader for a parquet file object.

This function is a generator returning an OrderedDict for each row
of data in the parquet file. Nested values will be flattend into the
top-level dict and can be referenced with '.' notation (e.g. 'foo' -> 'bar'
is referenced as 'foo.bar')

:param file_obj: the file containing parquet data
:param columns: the columns to include. If None (default), all columns
are included. Nested values are referenced with "." notation
"""
footer = _read_footer(file_obj)
keys = columns if columns else [s.name for s in
footer.schema if s.type]

for row in reader(file_obj, columns):
yield OrderedDict(zip(keys, row))


def reader(file_obj, columns=None):
def _row_group_reader(file_obj, columns=None):
"""
Reader for a parquet file object.

Expand All @@ -436,8 +420,6 @@ def reader(file_obj, columns=None):
logger.error("parquet.reader requires the fileobj to be opened in binary mode!")
footer = _read_footer(file_obj)
schema_helper = schema.SchemaHelper(footer.schema)
keys = columns if columns else [s.name for s in
footer.schema if s.type]
debug_logging = logger.isEnabledFor(logging.DEBUG)
for row_group in footer.row_groups:
res = defaultdict(list)
Expand Down Expand Up @@ -481,8 +463,65 @@ def reader(file_obj, columns=None):
logger.info("Skipping unknown page type=%s",
_get_name(parquet_thrift.PageType, page_header.type))

for i in range(row_group.num_rows):
yield [res[k][i] for k in keys if res[k]]
yield row_group.num_rows, res


def reader(file_obj, columns=None):
"""
Reader for a parquet file object.

This function is a generator returning a list of values for each row
of data in the parquet file.

:param file_obj: the file containing parquet data
:param columns: the columns to include. If None (default), all columns
are included. Nested values are referenced with "." notation
"""
footer = _read_footer(file_obj)
schema_helper = schema.SchemaHelper(footer.schema)
keys = columns if columns else [".".join(path) for path in schema_helper.leaf_node_dict().keys()]

for num_rows, data in _row_group_reader(file_obj, columns):
for i in range(num_rows):
yield [data[k][i] for k in keys if data[k]]


def DictReader(file_obj, columns=None, flatten=False): # pylint: disable=invalid-name
"""
Reader for a parquet file object.

This function is a generator returning an OrderedDict for each row of data in the parquet file. If `flatten` is
True, nested values will be flattened into the top-level dict and can be referenced with '.' notation (e.g.
'foo' -> 'bar' is referenced as 'foo.bar')

:param file_obj: the file containing parquet data
:param columns: the columns to include. If None (default), all columns
are included. Nested values are referenced with "." notation
"""
footer = _read_footer(file_obj)
schema_helper = schema.SchemaHelper(footer.schema)
keys = columns if columns else [".".join(path) for path in schema_helper.leaf_node_dict().keys()]

for num_rows, data in _row_group_reader(file_obj, columns): # pylint: disable=too-many-nested-blocks
for i in range(num_rows):
if flatten:
yield OrderedDict(zip(keys, [data[k][i] for k in keys if data[k]]))
else:
res = OrderedDict()
for key in keys:
parts = key.split(".")
num_keys = len(parts)
curr = res
for idx, part in enumerate(parts):
if part not in curr and idx + 1 < num_keys:
curr[part] = OrderedDict()
curr = curr[part]
continue
if idx + 1 == num_keys:
curr[part] = data[key][i]
else:
curr = curr[part]
yield res


class JsonWriter(object): # pylint: disable=too-few-public-methods
Expand All @@ -508,7 +547,7 @@ def _dump(file_obj, options, out=sys.stdout):
total_count = 0
writer = None
keys = None
for row in DictReader(file_obj, options.col):
for row in DictReader(file_obj, options.col, flatten=True if options.format == 'csv' else options.flatten):
if not keys:
keys = row.keys()
if not writer:
Expand Down
2 changes: 2 additions & 0 deletions parquet/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def main(argv=None):
'format=csv)')
parser.add_argument('--format', action='store', type=str, default='csv',
help='format for the output data. can be csv or json.')
parser.add_argument('--flatten', action='store_true',
help='for json format, flatten all data into a single top-level object')
parser.add_argument('--debug', action='store_true',
help='log debug info to stderr')
parser.add_argument('file',
Expand Down
68 changes: 58 additions & 10 deletions parquet/schema.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,98 @@
"""Utils for working with the parquet thrift models."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import logging
import os
from collections import OrderedDict

import thriftpy


THRIFT_FILE = os.path.join(os.path.dirname(__file__), "parquet.thrift")
parquet_thrift = thriftpy.load(THRIFT_FILE, module_name=str("parquet_thrift")) # pylint: disable=invalid-name

logger = logging.getLogger("parquet") # pylint: disable=invalid-name


class SchemaHelper(object):
"""Utility providing convenience methods for schema_elements."""

def __init__(self, schema_elements):
"""Initialize with the specified schema_elements."""
self._se_paths = paths(schema_elements)
self.schema_elements = schema_elements
self.schema_elements_by_name = dict(
[(se.name, se) for se in schema_elements])
assert len(self.schema_elements) == len(self.schema_elements_by_name)
self.schema_elements_by_path = OrderedDict(
[(tuple(self._se_paths[idx]), se) for idx, se in enumerate(schema_elements)])
assert len(self.schema_elements) == len(self.schema_elements_by_path)

def schema_element(self, name):
def schema_element(self, path):
"""Get the schema element with the given name."""
return self.schema_elements_by_name[name]
return self.schema_elements_by_path[tuple(path)]

def leaf_node_dict(self):
"""Get a dict of path -> schema_elements."""
return OrderedDict(
[(tuple(self.path_for_index(idx)), s) for idx, s in enumerate(self.schema_elements) if s.type])

def is_required(self, name):
def path_for_index(self, index):
"""Get the path array for the schema_element at the given index."""
return self._se_paths[index]

def is_required(self, path):
"""Return true iff the schema element with the given name is required."""
return self.schema_element(name).repetition_type == parquet_thrift.FieldRepetitionType.REQUIRED
return self.schema_element(path).repetition_type == parquet_thrift.FieldRepetitionType.REQUIRED

def max_repetition_level(self, path):
"""Get the max repetition level for the given schema path."""
max_level = 0
partial_path = []
for part in path:
element = self.schema_element(part)
if element.repetition_type == parquet_thrift.FieldRepetitionType.REQUIRED:
partial_path += [part]
element = self.schema_element(partial_path)
if element.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED:
max_level += 1
return max_level

def max_definition_level(self, path):
"""Get the max definition level for the given schema path."""
max_level = 0
partial_path = []
for part in path:
element = self.schema_element(part)
partial_path += [part]
element = self.schema_element(partial_path)
if element.repetition_type != parquet_thrift.FieldRepetitionType.REQUIRED:
max_level += 1
return max_level


def paths(elements):
"""Compute the paths for all the elements.

the returned value is a map from index -> list of name parts.
"""
root = elements[0]
idx = 1
p_names = {0: [root.name]}
while idx < len(elements):
idx = _path_names(elements, idx, [], p_names)

return p_names


def _path_names(elements, idx, parents, p_names):
"""Internal recursive function to compute pathnames."""
element = elements[idx]
logger.debug("%s ... %s", parents, element.name)
num_children = element.num_children or 0
p_names[idx] = [s.name for s in parents] + [element.name]
if num_children == 0:
return idx + 1

next_idx = idx + 1
for _ in range(element.num_children):
next_idx = _path_names(elements, next_idx, parents + [element], p_names)
return next_idx
Binary file added test-data/test-nested.parquet
Binary file not shown.
35 changes: 34 additions & 1 deletion test/test_read_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ def test_dump_metadata(self):
class Options(object):
"""Fake Options (a la `__main__.py`)."""

def __init__(self, col=None, format='csv', no_headers=True, limit=-1):
def __init__(self, col=None, format='csv', no_headers=True, limit=-1, flatten=False):
"""Create a fake options."""
self.col = col
self.format = format
self.no_headers = no_headers
self.limit = limit
self.flatten = flatten


class TestReadApi(unittest.TestCase):
Expand Down Expand Up @@ -240,3 +241,35 @@ def test_null_plain_dictionary(self):
[{"foo": None}] + [{"foo": "bar"}, {"foo": "baz"}] * 3,
actual_data
)

def test_nested(self):
"""Test reading data for a nested schema."""
with open(os.path.join(TEST_DATA, "test-nested.parquet"), "rb") as parquet_fo:
actual_data = list(parquet.DictReader(parquet_fo))

self.assertListEqual(
# this is the contents of test-nested.parquet. 3 records.
[
{"foo": {"bar": 1}, "baz": None},
{"foo": {"bar": None}, "baz": 1},
{"foo": {"bar": None}, "baz": None},
],
actual_data
)

def test_nested_csv(self):
"""Test the csv dump function for a nested schema."""
actual_raw_data = io.StringIO()
parquet.dump(os.path.join(TEST_DATA, "test-nested.parquet"), Options(no_headers=False), out=actual_raw_data)
actual_raw_data.seek(0, 0)
actual_data = list(csv.reader(actual_raw_data, delimiter=TAB_DELIM))

self.assertListEqual(
[
["foo.bar", "baz"],
["1", ""],
["", "1"],
["", ""],
],
actual_data
)
Loading