From c83447abfad7028b3605499bfee10bf7b14edce2 Mon Sep 17 00:00:00 2001 From: Joe Crobak Date: Sun, 25 Sep 2016 16:31:49 +0000 Subject: [PATCH 1/2] Read repetition levels before definition levels. Since we didn't support repetition levels previously, this hadn't been tested. But we were reading the levels in the wrong order. --- parquet/__init__.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/parquet/__init__.py b/parquet/__init__.py index 684f3746..e3e16d8a 100644 --- a/parquet/__init__.py +++ b/parquet/__init__.py @@ -297,6 +297,19 @@ def read_data_page(file_obj, schema_helper, page_header, column_metadata, _get_name(parquet_thrift.Encoding, daph.repetition_level_encoding)) logger.debug(" encoding: %s", _get_name(parquet_thrift.Encoding, daph.encoding)) + # repetition levels are skipped if data is at the first level. + repetition_levels = None # pylint: disable=unused-variable + if len(column_metadata.path_in_schema) > 1: + max_repetition_level = schema_helper.max_repetition_level( + column_metadata.path_in_schema) + bit_width = encoding.width_from_max_int(max_repetition_level) + repetition_levels = _read_data(io_obj, + daph.repetition_level_encoding, + daph.num_values, + bit_width) + if debug_logging: + logger.debug(" Repetition levels: %s", len(repetition_levels)) + # definition levels are skipped if data is required. definition_levels = None num_nulls = 0 @@ -319,18 +332,7 @@ def read_data_page(file_obj, schema_helper, page_header, column_metadata, # any thing that isn't at max definition level is a null. num_nulls = len(definition_levels) - definition_levels.count(max_definition_level) if debug_logging: - logger.debug(" Definition levels: %s", len(definition_levels)) - - # repetition levels are skipped if data is at the first level. - repetition_levels = None # pylint: disable=unused-variable - if len(column_metadata.path_in_schema) > 1: - max_repetition_level = schema_helper.max_repetition_level( - column_metadata.path_in_schema) - bit_width = encoding.width_from_max_int(max_repetition_level) - repetition_levels = _read_data(io_obj, - daph.repetition_level_encoding, - daph.num_values, - bit_width) + logger.debug(" Definition levels: %d, nulls: %d", len(definition_levels), num_nulls) # NOTE: The repetition levels aren't yet used. if daph.encoding == parquet_thrift.Encoding.PLAIN: @@ -344,7 +346,8 @@ def read_data_page(file_obj, schema_helper, page_header, column_metadata, else: vals.extend(read_values) if debug_logging: - logger.debug(" Values: %s, nulls: %s", len(vals), num_nulls) + logger.debug(" Read %s values using PLAIN encoding and definition levels show %s nulls", + len(vals), num_nulls) elif daph.encoding == parquet_thrift.Encoding.PLAIN_DICTIONARY: # bit_width is stored as single byte. From 3c06dbb1fda567a262cfc14139bf959751493628 Mon Sep 17 00:00:00 2001 From: Joe Crobak Date: Sat, 1 Oct 2016 17:39:14 +0000 Subject: [PATCH 2/2] Add experimental support for nested schemas. Rather than flattening schemas and adding a '.' between paths in the schema (e.g. `foo.bar`), support the schema path as a first-class object (for schema operations, at least). This is an experimental implementation and likely has bugs. But it supports some simple cases. This implementation changes behavior. Specifically: * `DictReader()` now has a `flatten` argument that defaults to `False`. If Flatten is false, DictReader will read nested data as `{'foo': {'bar': 1}}` instead of as `{'foo.bar': 1}`. * Likewise, this is the new default behavior for the command-line tool with `--format json`. This can be changed with `--flatten`. Known issues: * Repetition-levels still aren't supported. A file with arrays will break. * nulls aren't interpretted at the level. (e.g.: `{"foo": null}` will be interpetted as `{"foo": {"bar": null}}` if `foo` has a child of `bar`. Refs: https://github.com/jcrobak/parquet-python/issues/32 --- README.rst | 8 ++- parquet/__init__.py | 98 +++++++++++++++++++++++----------- parquet/__main__.py | 2 + parquet/schema.py | 68 +++++++++++++++++++---- test-data/test-nested.parquet | Bin 0 -> 400 bytes test/test_read_support.py | 35 +++++++++++- test/test_schema.py | 56 +++++++++++++++++++ 7 files changed, 220 insertions(+), 47 deletions(-) create mode 100644 test-data/test-nested.parquet create mode 100644 test/test_schema.py diff --git a/README.rst b/README.rst index 6b7defed..e93f2a65 100644 --- a/README.rst +++ b/README.rst @@ -71,13 +71,11 @@ specified columns. Todos ===== -- Support the deprecated bitpacking -- Fix handling of repetition-levels and definition-levels -- Tests for nested schemas, null data +- Support for arrays and maps. +- Full support of repetition-levels and definition-levels. Some simple + cases work, but most complex cases do not. - Support reading of data from HDFS via snakebite and/or webhdfs. - Implement writing -- performance evaluation and optimization (i.e. how does it compare to - the c++, java implementations) Contributing ============ diff --git a/parquet/__init__.py b/parquet/__init__.py index e3e16d8a..d1cbd1d5 100644 --- a/parquet/__init__.py +++ b/parquet/__init__.py @@ -270,7 +270,9 @@ def _read_data(file_obj, fo_encoding, value_count, bit_width): vals += values seen += len(values) elif fo_encoding == parquet_thrift.Encoding.BIT_PACKED: - raise NotImplementedError("Bit packing not yet supported") + byte_count = 0 if bit_width == 0 else (value_count * bit_width - 1) // 8 + 1 + vals = encoding.read_bitpacked_deprecated( + file_obj, byte_count, value_count, bit_width, logger.isEnabledFor(logging.DEBUG)) return vals @@ -314,7 +316,7 @@ def read_data_page(file_obj, schema_helper, page_header, column_metadata, definition_levels = None num_nulls = 0 max_definition_level = -1 - if not schema_helper.is_required(column_metadata.path_in_schema[-1]): + if not schema_helper.is_required(column_metadata.path_in_schema): max_definition_level = schema_helper.max_definition_level( column_metadata.path_in_schema) bit_width = encoding.width_from_max_int(max_definition_level) @@ -337,7 +339,7 @@ def read_data_page(file_obj, schema_helper, page_header, column_metadata, # NOTE: The repetition levels aren't yet used. if daph.encoding == parquet_thrift.Encoding.PLAIN: read_values = encoding.read_plain(io_obj, column_metadata.type, daph.num_values - num_nulls) - schema_element = schema_helper.schema_element(column_metadata.path_in_schema[-1]) + schema_element = schema_helper.schema_element(column_metadata.path_in_schema) read_values = convert_column(read_values, schema_element) \ if schema_element.converted_type is not None else read_values if definition_levels: @@ -399,32 +401,11 @@ def _read_dictionary_page(file_obj, schema_helper, page_header, column_metadata) page_header.dictionary_page_header.num_values ) # convert the values once, if the dictionary is associated with a converted_type. - schema_element = schema_helper.schema_element(column_metadata.path_in_schema[-1]) + schema_element = schema_helper.schema_element(column_metadata.path_in_schema) return convert_column(values, schema_element) if schema_element.converted_type is not None else values -def DictReader(file_obj, columns=None): # pylint: disable=invalid-name - """ - Reader for a parquet file object. - - This function is a generator returning an OrderedDict for each row - of data in the parquet file. Nested values will be flattend into the - top-level dict and can be referenced with '.' notation (e.g. 'foo' -> 'bar' - is referenced as 'foo.bar') - - :param file_obj: the file containing parquet data - :param columns: the columns to include. If None (default), all columns - are included. Nested values are referenced with "." notation - """ - footer = _read_footer(file_obj) - keys = columns if columns else [s.name for s in - footer.schema if s.type] - - for row in reader(file_obj, columns): - yield OrderedDict(zip(keys, row)) - - -def reader(file_obj, columns=None): +def _row_group_reader(file_obj, columns=None): """ Reader for a parquet file object. @@ -439,8 +420,6 @@ def reader(file_obj, columns=None): logger.error("parquet.reader requires the fileobj to be opened in binary mode!") footer = _read_footer(file_obj) schema_helper = schema.SchemaHelper(footer.schema) - keys = columns if columns else [s.name for s in - footer.schema if s.type] debug_logging = logger.isEnabledFor(logging.DEBUG) for row_group in footer.row_groups: res = defaultdict(list) @@ -484,8 +463,65 @@ def reader(file_obj, columns=None): logger.info("Skipping unknown page type=%s", _get_name(parquet_thrift.PageType, page_header.type)) - for i in range(row_group.num_rows): - yield [res[k][i] for k in keys if res[k]] + yield row_group.num_rows, res + + +def reader(file_obj, columns=None): + """ + Reader for a parquet file object. + + This function is a generator returning a list of values for each row + of data in the parquet file. + + :param file_obj: the file containing parquet data + :param columns: the columns to include. If None (default), all columns + are included. Nested values are referenced with "." notation + """ + footer = _read_footer(file_obj) + schema_helper = schema.SchemaHelper(footer.schema) + keys = columns if columns else [".".join(path) for path in schema_helper.leaf_node_dict().keys()] + + for num_rows, data in _row_group_reader(file_obj, columns): + for i in range(num_rows): + yield [data[k][i] for k in keys if data[k]] + + +def DictReader(file_obj, columns=None, flatten=False): # pylint: disable=invalid-name + """ + Reader for a parquet file object. + + This function is a generator returning an OrderedDict for each row of data in the parquet file. If `flatten` is + True, nested values will be flattened into the top-level dict and can be referenced with '.' notation (e.g. + 'foo' -> 'bar' is referenced as 'foo.bar') + + :param file_obj: the file containing parquet data + :param columns: the columns to include. If None (default), all columns + are included. Nested values are referenced with "." notation + """ + footer = _read_footer(file_obj) + schema_helper = schema.SchemaHelper(footer.schema) + keys = columns if columns else [".".join(path) for path in schema_helper.leaf_node_dict().keys()] + + for num_rows, data in _row_group_reader(file_obj, columns): # pylint: disable=too-many-nested-blocks + for i in range(num_rows): + if flatten: + yield OrderedDict(zip(keys, [data[k][i] for k in keys if data[k]])) + else: + res = OrderedDict() + for key in keys: + parts = key.split(".") + num_keys = len(parts) + curr = res + for idx, part in enumerate(parts): + if part not in curr and idx + 1 < num_keys: + curr[part] = OrderedDict() + curr = curr[part] + continue + if idx + 1 == num_keys: + curr[part] = data[key][i] + else: + curr = curr[part] + yield res class JsonWriter(object): # pylint: disable=too-few-public-methods @@ -511,7 +547,7 @@ def _dump(file_obj, options, out=sys.stdout): total_count = 0 writer = None keys = None - for row in DictReader(file_obj, options.col): + for row in DictReader(file_obj, options.col, flatten=True if options.format == 'csv' else options.flatten): if not keys: keys = row.keys() if not writer: diff --git a/parquet/__main__.py b/parquet/__main__.py index 2e74d3ea..149e053b 100644 --- a/parquet/__main__.py +++ b/parquet/__main__.py @@ -44,6 +44,8 @@ def main(argv=None): 'format=csv)') parser.add_argument('--format', action='store', type=str, default='csv', help='format for the output data. can be csv or json.') + parser.add_argument('--flatten', action='store_true', + help='for json format, flatten all data into a single top-level object') parser.add_argument('--debug', action='store_true', help='log debug info to stderr') parser.add_argument('file', diff --git a/parquet/schema.py b/parquet/schema.py index 1b6369cd..dff8df7f 100644 --- a/parquet/schema.py +++ b/parquet/schema.py @@ -1,10 +1,13 @@ """Utils for working with the parquet thrift models.""" + from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals +import logging import os +from collections import OrderedDict import thriftpy @@ -12,39 +15,84 @@ THRIFT_FILE = os.path.join(os.path.dirname(__file__), "parquet.thrift") parquet_thrift = thriftpy.load(THRIFT_FILE, module_name=str("parquet_thrift")) # pylint: disable=invalid-name +logger = logging.getLogger("parquet") # pylint: disable=invalid-name + class SchemaHelper(object): """Utility providing convenience methods for schema_elements.""" def __init__(self, schema_elements): """Initialize with the specified schema_elements.""" + self._se_paths = paths(schema_elements) self.schema_elements = schema_elements - self.schema_elements_by_name = dict( - [(se.name, se) for se in schema_elements]) - assert len(self.schema_elements) == len(self.schema_elements_by_name) + self.schema_elements_by_path = OrderedDict( + [(tuple(self._se_paths[idx]), se) for idx, se in enumerate(schema_elements)]) + assert len(self.schema_elements) == len(self.schema_elements_by_path) - def schema_element(self, name): + def schema_element(self, path): """Get the schema element with the given name.""" - return self.schema_elements_by_name[name] + return self.schema_elements_by_path[tuple(path)] + + def leaf_node_dict(self): + """Get a dict of path -> schema_elements.""" + return OrderedDict( + [(tuple(self.path_for_index(idx)), s) for idx, s in enumerate(self.schema_elements) if s.type]) - def is_required(self, name): + def path_for_index(self, index): + """Get the path array for the schema_element at the given index.""" + return self._se_paths[index] + + def is_required(self, path): """Return true iff the schema element with the given name is required.""" - return self.schema_element(name).repetition_type == parquet_thrift.FieldRepetitionType.REQUIRED + return self.schema_element(path).repetition_type == parquet_thrift.FieldRepetitionType.REQUIRED def max_repetition_level(self, path): """Get the max repetition level for the given schema path.""" max_level = 0 + partial_path = [] for part in path: - element = self.schema_element(part) - if element.repetition_type == parquet_thrift.FieldRepetitionType.REQUIRED: + partial_path += [part] + element = self.schema_element(partial_path) + if element.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED: max_level += 1 return max_level def max_definition_level(self, path): """Get the max definition level for the given schema path.""" max_level = 0 + partial_path = [] for part in path: - element = self.schema_element(part) + partial_path += [part] + element = self.schema_element(partial_path) if element.repetition_type != parquet_thrift.FieldRepetitionType.REQUIRED: max_level += 1 return max_level + + +def paths(elements): + """Compute the paths for all the elements. + + the returned value is a map from index -> list of name parts. + """ + root = elements[0] + idx = 1 + p_names = {0: [root.name]} + while idx < len(elements): + idx = _path_names(elements, idx, [], p_names) + + return p_names + + +def _path_names(elements, idx, parents, p_names): + """Internal recursive function to compute pathnames.""" + element = elements[idx] + logger.debug("%s ... %s", parents, element.name) + num_children = element.num_children or 0 + p_names[idx] = [s.name for s in parents] + [element.name] + if num_children == 0: + return idx + 1 + + next_idx = idx + 1 + for _ in range(element.num_children): + next_idx = _path_names(elements, next_idx, parents + [element], p_names) + return next_idx diff --git a/test-data/test-nested.parquet b/test-data/test-nested.parquet new file mode 100644 index 0000000000000000000000000000000000000000..282099f0035cef64f28cac5976652c738d8bfad6 GIT binary patch literal 400 zcma)(L2H9B6vxx(f|qUV@PYv;GMIEwMKL-LyY#lh4yE59sVNAq=`y=R>1W+%S+tv; z2FoG*c<=vvJd*WllL3M`+%JJxAm9bQH$#^*5p`|lwsP$4Q1sikLIB}E%o~n-6OSTh zuP?6Ax&s##*mrhX14iIJSeC+!bV8J{fGs000GAbrcx=&!`w(DC=z(^T_dG2+(XeuK z9=(3>X>j(`MCZR9`Zc2jvB1BsP1Cl?r!pUn-uqWs@|2{=-yDQ_KdLT%GvQA!%#td} z;vpZKG+fAI)7D`r%d`+&iL{VqsWMT@UB&af$c5ZWUgccyt*Wa>bZ1{?ZU5pMd4D=1 literal 0 HcmV?d00001 diff --git a/test/test_read_support.py b/test/test_read_support.py index 8d7b69f0..f062f3a0 100644 --- a/test/test_read_support.py +++ b/test/test_read_support.py @@ -80,12 +80,13 @@ def test_dump_metadata(self): class Options(object): """Fake Options (a la `__main__.py`).""" - def __init__(self, col=None, format='csv', no_headers=True, limit=-1): + def __init__(self, col=None, format='csv', no_headers=True, limit=-1, flatten=False): """Create a fake options.""" self.col = col self.format = format self.no_headers = no_headers self.limit = limit + self.flatten = flatten class TestReadApi(unittest.TestCase): @@ -240,3 +241,35 @@ def test_null_plain_dictionary(self): [{"foo": None}] + [{"foo": "bar"}, {"foo": "baz"}] * 3, actual_data ) + + def test_nested(self): + """Test reading data for a nested schema.""" + with open(os.path.join(TEST_DATA, "test-nested.parquet"), "rb") as parquet_fo: + actual_data = list(parquet.DictReader(parquet_fo)) + + self.assertListEqual( + # this is the contents of test-nested.parquet. 3 records. + [ + {"foo": {"bar": 1}, "baz": None}, + {"foo": {"bar": None}, "baz": 1}, + {"foo": {"bar": None}, "baz": None}, + ], + actual_data + ) + + def test_nested_csv(self): + """Test the csv dump function for a nested schema.""" + actual_raw_data = io.StringIO() + parquet.dump(os.path.join(TEST_DATA, "test-nested.parquet"), Options(no_headers=False), out=actual_raw_data) + actual_raw_data.seek(0, 0) + actual_data = list(csv.reader(actual_raw_data, delimiter=TAB_DELIM)) + + self.assertListEqual( + [ + ["foo.bar", "baz"], + ["1", ""], + ["", "1"], + ["", ""], + ], + actual_data + ) diff --git a/test/test_schema.py b/test/test_schema.py new file mode 100644 index 00000000..57425820 --- /dev/null +++ b/test/test_schema.py @@ -0,0 +1,56 @@ +"""Tests for SchemaHelper and related functions.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import os +import unittest + +from parquet.schema import SchemaHelper + +import thriftpy + +THRIFT_FILE = os.path.join(os.path.dirname(__file__), "parquet.thrift") +parquet_thrift = thriftpy.load(THRIFT_FILE, module_name="parquet_thrift") # pylint: disable=invalid-name + + +class SchemaHelperTest(unittest.TestCase): + """Tests for the SchemaHelper class.""" + + ELEMENTS = [ + parquet_thrift.SchemaElement(name='root', type=None, type_length=None, repetition_type=None, num_children=2, + converted_type=None), + parquet_thrift.SchemaElement(name='version', type=parquet_thrift.Type.INT64, type_length=None, + repetition_type=parquet_thrift.FieldRepetitionType.OPTIONAL, num_children=None, + converted_type=None), + parquet_thrift.SchemaElement(name='geo', type=None, type_length=None, + repetition_type=parquet_thrift.FieldRepetitionType.OPTIONAL, num_children=2, + converted_type=None), + parquet_thrift.SchemaElement(name='version', type=parquet_thrift.Type.INT64, type_length=None, + repetition_type=parquet_thrift.FieldRepetitionType.OPTIONAL, num_children=None, + converted_type=None), + parquet_thrift.SchemaElement(name='country_code', type=parquet_thrift.Type.BYTE_ARRAY, type_length=None, + repetition_type=parquet_thrift.FieldRepetitionType.OPTIONAL, num_children=None, + converted_type=0) + ] + + def test_schema_element_by_path(self): + """Test lookup by path as array.""" + helper = SchemaHelper(SchemaHelperTest.ELEMENTS) + self.assertEquals(SchemaHelperTest.ELEMENTS[1], helper.schema_element(['version'])) + self.assertEquals(SchemaHelperTest.ELEMENTS[3], helper.schema_element(['geo', 'version'])) + self.assertEquals(SchemaHelperTest.ELEMENTS[4], helper.schema_element(['geo', 'country_code'])) + + def test_leaf_node_dict(self): + """Test retreiving the leaf nodes for a list of elements.""" + helper = SchemaHelper(SchemaHelperTest.ELEMENTS) + print(helper.leaf_node_dict()) + self.assertEquals( + set({ + ('version',): SchemaHelperTest.ELEMENTS[1], + ('geo', 'version'): SchemaHelperTest.ELEMENTS[3], + ('geo', 'country_code'): SchemaHelperTest.ELEMENTS[4] + }), + set(helper.leaf_node_dict()) + )