Skip to content

Commit

Permalink
latest
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Sep 11, 2024
1 parent 71683ca commit a9d3f30
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 60 deletions.
1 change: 1 addition & 0 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ def to_numpy(self, columns=None, filters=None, worker_threads=MAX_WORKERS):
else:
infile = None
if worker_threads:
# TODO: consider the case (tiny data) where it's just not worthwhile
ex = ThreadPool(num_workers=worker_threads)
else:
ex = None
Expand Down
30 changes: 23 additions & 7 deletions fastparquet/cencoding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1203,26 +1203,25 @@ def one_level_optional(
):
# nullable simple type; equivalent:
# inds[defs == 1] = np.arange(count0, len(values) + count0)
# inds[defs == 0] = 0
# inds[defs == 0] = -1
# this can be parallel, by passing count0 (values so far), which we always can know
cdef uint64_t i, count
cdef uint64_t i
with nogil:
for i in range(defs.shape[0]):
if defs[i] == max_def:
inds[count] = count0
inds[i] = count0
count0 += 1
else:
inds[count] = -1
count += 1
return count
inds[i] = -1
return count0


def make_offsets_and_masks(
uint8_t[::1] reps, # repetition levels
uint8_t[::1] defs, # definition levels
list offsets, # contains uint64 np arrays
uint8_t[::1] rep_map, # rep value -> offset index mapping
uint8_t[::1] rep_flags, # is index (not offset) for each item of offsets list
uint8_t[::1] rep_flags, # of offsets list, 0: offset, 1: is index, 2: not used
uint64_t[::1] ocounts, # offsets counter of length offsets, len(offsets) + 1
):
# general case
Expand Down Expand Up @@ -1275,6 +1274,23 @@ def parse_plain_strings(uint8_t[::1] data, uint64_t[::1] offsets, uint64_t nvalu
return out


def check_arange(int64_t[::1] arr):
"""Is this like a range (with NULLs)"""
cdef int64_t val
cdef int64_t last = -1
cdef uint8_t out = 1

with nogil:
for val in arr:
if val < 0:
continue
if val - last != 1:
out = 0
break
last = val
return bool(out)


def filter_rg_cols(ThriftObject rg, list cols):
if cols is None:
return rg.columns
Expand Down
11 changes: 0 additions & 11 deletions fastparquet/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ def lz4_decomp(data, size):
decompressions['LZ4_RAW'] = lz4_decomp
compressions['ZSTD'] = cramjam.zstd.compress
decompressions['ZSTD'] = cramjam.zstd.decompress
decom_into = {
"GZIP": cramjam.gzip.decompress_into,
"SNAPPY": cramjam.snappy.decompress_raw_into,
"ZSTD": cramjam.zstd.decompress_into,
"BROTLI": cramjam.brotli.decompress_into
}

compressions = {k.upper(): v for k, v in compressions.items()}
decompressions = {k.upper(): v for k, v in decompressions.items()}
Expand Down Expand Up @@ -106,9 +100,4 @@ def decompress_data(data, uncompressed_size, algorithm='gzip'):
"Decompression '%s' not available. Options: %s" %
(algorithm.upper(), sorted(decompressions))
)
if algorithm.upper() in decom_into:
# ensures writable buffer from cramjam
x = np.empty(uncompressed_size, dtype='uint8')
decom_into[algorithm.upper()](np.frombuffer(data, dtype=np.uint8), x)
return x
return decompressions[algorithm.upper()](data, uncompressed_size)
57 changes: 16 additions & 41 deletions fastparquet/core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import concurrent.futures
import io
import logging
import numpy as np
import threading

from fastparquet.encoding import read_plain, byte_stream_unsplit4, byte_stream_unsplit8, DECODE_TYPEMAP
import fastparquet.cencoding as encoding
from fastparquet.compression import decompress_data, rev_map, decom_into
from fastparquet.compression import decompress_data
from fastparquet.converted_types import convert, simple, converts_inplace
from fastparquet import parquet_thrift
from fastparquet.cencoding import ThriftObject
Expand Down Expand Up @@ -255,35 +255,17 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
# input and output element sizes match
see = se.type_length == assign.dtype.itemsize * 8 or simple.get(se.type).itemsize == assign.dtype.itemsize
# can read-into
into0 = ((use_cat or converts_inplace(se) and see)
and data_header2.num_nulls == 0
and max_rep == 0 and assign.dtype.kind != "O" and row_filter is None
and assign.dtype.kind not in "Mm") # TODO: this can be done in place but is complex
if row_filter is None:
row_filter = Ellipsis
# can decompress-into
if data_header2.is_compressed is None:
data_header2.is_compressed = True
into = (data_header2.is_compressed and rev_map[cmd.codec] in decom_into
and into0)
if nullable:
assign = assign._data

uncompressed_page_size = (ph.uncompressed_page_size - data_header2.definition_levels_byte_length -
data_header2.repetition_levels_byte_length)
if into0 and data_header2.encoding == parquet_thrift.Encoding.PLAIN and (
not data_header2.is_compressed or cmd.codec == parquet_thrift.CompressionCodec.UNCOMPRESSED
):
# PLAIN read directly into output (a copy for remote files)
assign[num:num+n_values].view('uint8')[:] = infile.read(size)
convert(assign[num:num+n_values], se)
elif into and data_header2.encoding == parquet_thrift.Encoding.PLAIN:
# PLAIN decompress directly into output
decomp = decom_into[rev_map[cmd.codec]]
decomp(np.frombuffer(infile.read(size), dtype="uint8"),
assign[num:num+data_header2.num_values].view('uint8'))
convert(assign[num:num+n_values], se)
elif data_header2.encoding == parquet_thrift.Encoding.PLAIN:
if data_header2.encoding == parquet_thrift.Encoding.PLAIN:
# PLAIN, but with nulls or not in-place conversion
codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
raw_bytes = decompress_data(np.frombuffer(infile.read(size), "uint8"),
Expand Down Expand Up @@ -409,14 +391,14 @@ def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
return data_header2.num_values


# TODO: this executor should not persist between runs to free up threads.
import threading
lock = threading.Lock()


def _run(raw: bytes, ph: ThriftObject, dph: ThriftObject, o: int,
cmd, rep_width, def_width, assign, colname, se,
with_data=True):
# TODO: pass filters to here for specific column, page might
# be skipped.
# V1 or dict page, decompress immediately
io = encoding.NumpyIO(decompress_data(raw, ph.uncompressed_page_size, cmd.codec))
reps = defs = None
Expand Down Expand Up @@ -531,15 +513,14 @@ def read_col(column: ThriftObject, schema_helper: SchemaHelper, infile: io.IOBas
assign.setdefault(name, []).append(o)
offsets.append(o)
rep_flags[i] = 1
else:
else: # or stats say there are no NULLs?
offsets.append(DUMMY)
rep_flags[i] = 2
i += 1
elif schema_helper.schema_element(parts).repetition_type == REP:
# offset has one extra element for closing last list
name = f'{".".join(parts)}-offsets'
if name not in assign:
o = np.empty(rows + 1, dtype="int64")
o = np.empty(rows, dtype="int64")
assign.setdefault(name, []).append(o)
offsets.append(o)
else:
Expand All @@ -553,6 +534,7 @@ def read_col(column: ThriftObject, schema_helper: SchemaHelper, infile: io.IOBas
off = min((cmd.dictionary_page_offset or cmd.data_page_offset,
cmd.data_page_offset))
with lock:
# If we have pre-fetched data, we don't need a lock
infile.seek(off)
column_binary = infile.read(cmd.total_compressed_size)
infile = encoding.NumpyIO(column_binary)
Expand Down Expand Up @@ -649,24 +631,18 @@ def read_col(column: ThriftObject, schema_helper: SchemaHelper, infile: io.IOBas
if OPT == 1:
encoding.make_offsets_and_masks_no_nulls(reps, defs, offsets, ocounts)
elif OPT == 2:

count = encoding.one_level_optional(defs, offsets[-1], 0, max_def)
ocounts[-2:] = len(defs), encoding.one_level_optional(defs, offsets[-1], 0, max_def)
elif OPT == 3:
encoding.make_offsets_and_masks_no_reps(defs, offsets, ocounts)
else:
encoding.make_offsets_and_masks(reps, defs, offsets, rep_map, rep_flags, ocounts)

# TODO: from [o]counts, we know if an index/offsets array is redundant and
# can be removed

for o, count, flag in zip(offsets, ocounts, rep_flags):
if o is not DUMMY:
o.resize(count + flag, refcheck=False)


def batcher(batch):
for func, args, kwargs in batch:
func(*args, **kwargs)
cnt = int(count + (flag == 2) + 1)
if o is not DUMMY and cnt > 0:
o.resize(cnt, refcheck=False)


def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
Expand All @@ -678,9 +654,9 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
will be pandas Categorical objects: the codes and the category labels
are arrays.
"""
# TODO: batch reads and submit groups of tasks instead of one task per column
# - where there are many columns, the current causes a lot of thread waiting overhead
# - reads will currently happen haphazardly in the input and not make use of caching
# TODO: batch reads, since we know all the offsets, and that this is a single file
# maybe could extend to supra-group too.
# see fsspec.parquet
for column in encoding.filter_rg_cols(rg, columns):
if ex is None:
read_col(column, schema_helper, file, use_cat=False,
Expand All @@ -700,8 +676,6 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats,
Access row-group in a file and read some columns into a data-frame.
"""
partition_meta = partition_meta or {}
# TODO: pass through rg.num_rows, which is the number of top-most offsets/index
# whereas the cmd.num_values (also in page headers) gives the number of leaf items
read_row_group_arrays(file, rg, columns, categories, schema_helper,
cats, selfmade, assign=assign, row_filter=row_filter, ex=ex)

Expand All @@ -716,4 +690,5 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats,
rg.columns[0].file_path.split('/')[:-1])]
key, val = [p for p in partitions if p[0] == cat][0]
val = val_to_num(val, meta=partition_meta.get(key))
# TODO: this is a perfect IndexedArray
assign[cat][:] = cats[cat].index(val)
29 changes: 29 additions & 0 deletions fastparquet/test/test_nest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

import numpy as np

import fastparquet

data = os.path.join(
os.path.dirname(os.path.dirname(
os.path.dirname(__file__))
),
"TEST-DATA"
)


def test_short():
pf = fastparquet.ParquetFile(os.path.join(data, "output_table.parquet"))
out = pf.to_numpy()
expected = {
'foo.with.strings-data': [0, 1, -1],
'foo.with.strings-cats': ["hey", "there"],
'foo.with.ints-data': [1, 2, 3],
'foo.with.lists.list-offsets': [0, 1, 2, 3],
'foo.with.lists.list.element-data': [0, 0, 0],
'foo.with.lists.list.element-cats': [0]
}
final = {k: list(v) if isinstance(v, np.ndarray) else v
for k, v in out[0].items()}

assert final == expected
3 changes: 2 additions & 1 deletion fastparquet/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ def __len__(self):

class String:
def __init__(self, offsets, data) -> None:
# TODO: make this start/stop so we can take from length-data encoding?
self.offsets = offsets
self.data = data

def __getitem__(self, item):
if isinstance(item, int):
return self.data[self.offsets[item]: self.offsets[item + 1]].decode()
return self.data[self.offsets[item]: self.offsets[item + 1]].tobytes().decode()
elif isinstance(item, slice):
assert item.step is None
if item.stop is None or item.stop == -1:
Expand Down
Binary file added test-data/output_table.parquet
Binary file not shown.

0 comments on commit a9d3f30

Please sign in to comment.