diff --git a/fastparquet/api.py b/fastparquet/api.py index 4ef8ab78..24456dc7 100644 --- a/fastparquet/api.py +++ b/fastparquet/api.py @@ -607,6 +607,7 @@ def to_numpy(self, columns=None, filters=None, worker_threads=MAX_WORKERS): else: infile = None if worker_threads: + # TODO: consider the case (tiny data) where it's just not worthwhile ex = ThreadPool(num_workers=worker_threads) else: ex = None diff --git a/fastparquet/cencoding.pyx b/fastparquet/cencoding.pyx index 0e29caa1..be96a840 100644 --- a/fastparquet/cencoding.pyx +++ b/fastparquet/cencoding.pyx @@ -1203,18 +1203,17 @@ def one_level_optional( ): # nullable simple type; equivalent: # inds[defs == 1] = np.arange(count0, len(values) + count0) - # inds[defs == 0] = 0 + # inds[defs == 0] = -1 # this can be parallel, by passing count0 (values so far), which we always can know - cdef uint64_t i, count + cdef uint64_t i with nogil: for i in range(defs.shape[0]): if defs[i] == max_def: - inds[count] = count0 + inds[i] = count0 count0 += 1 else: - inds[count] = -1 - count += 1 - return count + inds[i] = -1 + return count0 def make_offsets_and_masks( @@ -1222,7 +1221,7 @@ def make_offsets_and_masks( uint8_t[::1] defs, # definition levels list offsets, # contains uint64 np arrays uint8_t[::1] rep_map, # rep value -> offset index mapping - uint8_t[::1] rep_flags, # is index (not offset) for each item of offsets list + uint8_t[::1] rep_flags, # of offsets list, 0: offset, 1: is index, 2: not used uint64_t[::1] ocounts, # offsets counter of length offsets, len(offsets) + 1 ): # general case @@ -1275,6 +1274,23 @@ def parse_plain_strings(uint8_t[::1] data, uint64_t[::1] offsets, uint64_t nvalu return out +def check_arange(int64_t[::1] arr): + """Is this like a range (with NULLs)""" + cdef int64_t val + cdef int64_t last = -1 + cdef uint8_t out = 1 + + with nogil: + for val in arr: + if val < 0: + continue + if val - last != 1: + out = 0 + break + last = val + return bool(out) + + def filter_rg_cols(ThriftObject rg, list cols): if cols is None: return rg.columns diff --git a/fastparquet/compression.py b/fastparquet/compression.py index 01188d70..cf23c95b 100644 --- a/fastparquet/compression.py +++ b/fastparquet/compression.py @@ -59,12 +59,6 @@ def lz4_decomp(data, size): decompressions['LZ4_RAW'] = lz4_decomp compressions['ZSTD'] = cramjam.zstd.compress decompressions['ZSTD'] = cramjam.zstd.decompress -decom_into = { - "GZIP": cramjam.gzip.decompress_into, - "SNAPPY": cramjam.snappy.decompress_raw_into, - "ZSTD": cramjam.zstd.decompress_into, - "BROTLI": cramjam.brotli.decompress_into -} compressions = {k.upper(): v for k, v in compressions.items()} decompressions = {k.upper(): v for k, v in decompressions.items()} @@ -106,9 +100,4 @@ def decompress_data(data, uncompressed_size, algorithm='gzip'): "Decompression '%s' not available. Options: %s" % (algorithm.upper(), sorted(decompressions)) ) - if algorithm.upper() in decom_into: - # ensures writable buffer from cramjam - x = np.empty(uncompressed_size, dtype='uint8') - decom_into[algorithm.upper()](np.frombuffer(data, dtype=np.uint8), x) - return x return decompressions[algorithm.upper()](data, uncompressed_size) diff --git a/fastparquet/core.py b/fastparquet/core.py index 83371bf4..6a9c3fbb 100644 --- a/fastparquet/core.py +++ b/fastparquet/core.py @@ -1,11 +1,11 @@ -import concurrent.futures import io import logging import numpy as np +import threading from fastparquet.encoding import read_plain, byte_stream_unsplit4, byte_stream_unsplit8, DECODE_TYPEMAP import fastparquet.cencoding as encoding -from fastparquet.compression import decompress_data, rev_map, decom_into +from fastparquet.compression import decompress_data from fastparquet.converted_types import convert, simple, converts_inplace from fastparquet import parquet_thrift from fastparquet.cencoding import ThriftObject @@ -255,35 +255,17 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, # input and output element sizes match see = se.type_length == assign.dtype.itemsize * 8 or simple.get(se.type).itemsize == assign.dtype.itemsize # can read-into - into0 = ((use_cat or converts_inplace(se) and see) - and data_header2.num_nulls == 0 - and max_rep == 0 and assign.dtype.kind != "O" and row_filter is None - and assign.dtype.kind not in "Mm") # TODO: this can be done in place but is complex if row_filter is None: row_filter = Ellipsis # can decompress-into if data_header2.is_compressed is None: data_header2.is_compressed = True - into = (data_header2.is_compressed and rev_map[cmd.codec] in decom_into - and into0) if nullable: assign = assign._data uncompressed_page_size = (ph.uncompressed_page_size - data_header2.definition_levels_byte_length - data_header2.repetition_levels_byte_length) - if into0 and data_header2.encoding == parquet_thrift.Encoding.PLAIN and ( - not data_header2.is_compressed or cmd.codec == parquet_thrift.CompressionCodec.UNCOMPRESSED - ): - # PLAIN read directly into output (a copy for remote files) - assign[num:num+n_values].view('uint8')[:] = infile.read(size) - convert(assign[num:num+n_values], se) - elif into and data_header2.encoding == parquet_thrift.Encoding.PLAIN: - # PLAIN decompress directly into output - decomp = decom_into[rev_map[cmd.codec]] - decomp(np.frombuffer(infile.read(size), dtype="uint8"), - assign[num:num+data_header2.num_values].view('uint8')) - convert(assign[num:num+n_values], se) - elif data_header2.encoding == parquet_thrift.Encoding.PLAIN: + if data_header2.encoding == parquet_thrift.Encoding.PLAIN: # PLAIN, but with nulls or not in-place conversion codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED" raw_bytes = decompress_data(np.frombuffer(infile.read(size), "uint8"), @@ -409,14 +391,14 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd, return data_header2.num_values -# TODO: this executor should not persist between runs to free up threads. -import threading lock = threading.Lock() def _run(raw: bytes, ph: ThriftObject, dph: ThriftObject, o: int, cmd, rep_width, def_width, assign, colname, se, with_data=True): + # TODO: pass filters to here for specific column, page might + # be skipped. # V1 or dict page, decompress immediately io = encoding.NumpyIO(decompress_data(raw, ph.uncompressed_page_size, cmd.codec)) reps = defs = None @@ -531,15 +513,14 @@ def read_col(column: ThriftObject, schema_helper: SchemaHelper, infile: io.IOBas assign.setdefault(name, []).append(o) offsets.append(o) rep_flags[i] = 1 - else: + else: # or stats say there are no NULLs? offsets.append(DUMMY) rep_flags[i] = 2 i += 1 elif schema_helper.schema_element(parts).repetition_type == REP: - # offset has one extra element for closing last list name = f'{".".join(parts)}-offsets' if name not in assign: - o = np.empty(rows + 1, dtype="int64") + o = np.empty(rows, dtype="int64") assign.setdefault(name, []).append(o) offsets.append(o) else: @@ -553,6 +534,7 @@ def read_col(column: ThriftObject, schema_helper: SchemaHelper, infile: io.IOBas off = min((cmd.dictionary_page_offset or cmd.data_page_offset, cmd.data_page_offset)) with lock: + # If we have pre-fetched data, we don't need a lock infile.seek(off) column_binary = infile.read(cmd.total_compressed_size) infile = encoding.NumpyIO(column_binary) @@ -649,8 +631,7 @@ def read_col(column: ThriftObject, schema_helper: SchemaHelper, infile: io.IOBas if OPT == 1: encoding.make_offsets_and_masks_no_nulls(reps, defs, offsets, ocounts) elif OPT == 2: - - count = encoding.one_level_optional(defs, offsets[-1], 0, max_def) + ocounts[-2:] = len(defs), encoding.one_level_optional(defs, offsets[-1], 0, max_def) elif OPT == 3: encoding.make_offsets_and_masks_no_reps(defs, offsets, ocounts) else: @@ -658,15 +639,10 @@ def read_col(column: ThriftObject, schema_helper: SchemaHelper, infile: io.IOBas # TODO: from [o]counts, we know if an index/offsets array is redundant and # can be removed - for o, count, flag in zip(offsets, ocounts, rep_flags): - if o is not DUMMY: - o.resize(count + flag, refcheck=False) - - -def batcher(batch): - for func, args, kwargs in batch: - func(*args, **kwargs) + cnt = int(count + (flag == 2) + 1) + if o is not DUMMY and cnt > 0: + o.resize(cnt, refcheck=False) def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, @@ -678,9 +654,9 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, will be pandas Categorical objects: the codes and the category labels are arrays. """ - # TODO: batch reads and submit groups of tasks instead of one task per column - # - where there are many columns, the current causes a lot of thread waiting overhead - # - reads will currently happen haphazardly in the input and not make use of caching + # TODO: batch reads, since we know all the offsets, and that this is a single file + # maybe could extend to supra-group too. + # see fsspec.parquet for column in encoding.filter_rg_cols(rg, columns): if ex is None: read_col(column, schema_helper, file, use_cat=False, @@ -700,8 +676,6 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats, Access row-group in a file and read some columns into a data-frame. """ partition_meta = partition_meta or {} - # TODO: pass through rg.num_rows, which is the number of top-most offsets/index - # whereas the cmd.num_values (also in page headers) gives the number of leaf items read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, selfmade, assign=assign, row_filter=row_filter, ex=ex) @@ -716,4 +690,5 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats, rg.columns[0].file_path.split('/')[:-1])] key, val = [p for p in partitions if p[0] == cat][0] val = val_to_num(val, meta=partition_meta.get(key)) + # TODO: this is a perfect IndexedArray assign[cat][:] = cats[cat].index(val) diff --git a/fastparquet/test/test_nest.py b/fastparquet/test/test_nest.py new file mode 100644 index 00000000..e461abca --- /dev/null +++ b/fastparquet/test/test_nest.py @@ -0,0 +1,29 @@ +import os + +import numpy as np + +import fastparquet + +data = os.path.join( + os.path.dirname(os.path.dirname( + os.path.dirname(__file__)) + ), + "TEST-DATA" +) + + +def test_short(): + pf = fastparquet.ParquetFile(os.path.join(data, "output_table.parquet")) + out = pf.to_numpy() + expected = { + 'foo.with.strings-data': [0, 1, -1], + 'foo.with.strings-cats': ["hey", "there"], + 'foo.with.ints-data': [1, 2, 3], + 'foo.with.lists.list-offsets': [0, 1, 2, 3], + 'foo.with.lists.list.element-data': [0, 0, 0], + 'foo.with.lists.list.element-cats': [0] + } + final = {k: list(v) if isinstance(v, np.ndarray) else v + for k, v in out[0].items()} + + assert final == expected diff --git a/fastparquet/wrappers.py b/fastparquet/wrappers.py index 990d1a59..1f4ad3bc 100644 --- a/fastparquet/wrappers.py +++ b/fastparquet/wrappers.py @@ -55,12 +55,13 @@ def __len__(self): class String: def __init__(self, offsets, data) -> None: + # TODO: make this start/stop so we can take from length-data encoding? self.offsets = offsets self.data = data def __getitem__(self, item): if isinstance(item, int): - return self.data[self.offsets[item]: self.offsets[item + 1]].decode() + return self.data[self.offsets[item]: self.offsets[item + 1]].tobytes().decode() elif isinstance(item, slice): assert item.step is None if item.stop is None or item.stop == -1: diff --git a/test-data/output_table.parquet b/test-data/output_table.parquet new file mode 100644 index 00000000..e5806387 Binary files /dev/null and b/test-data/output_table.parquet differ