Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of ParquetFile.write_row_groups() & ParquetFile._sort_part_names() #712

Merged
merged 23 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
593057c
Re-ordering of 'write' to separate 'append' part and make it availabl…
yohplala Nov 21, 2021
1d24a2b
Reverting formatting changes.
yohplala Dec 3, 2021
6d79021
Reverting some other formatting changes.
yohplala Dec 3, 2021
ea8d8fe
Reverting further formatting changes.
yohplala Dec 3, 2021
2ffc94d
Rephrasing docstring of 'reset_row_idx()' + 'stabilizing' test case '…
yohplala Dec 3, 2021
822fb11
Renaming 'append_as_row_groups()' into 'write_row_groups()' + Pending…
yohplala Dec 8, 2021
b41bd91
Fix conflicts.
yohplala Dec 8, 2021
bc30ba4
Refactoring of 'overwrite' mode in 'write': source has been extracted…
yohplala Dec 12, 2021
3e70a16
'_sort_part_names()' implemented and tested.
yohplala Dec 13, 2021
61b7c8b
'update()' test case with file name shuffling.
yohplala Dec 14, 2021
714d116
Restore 'find_max_part()' in 'writer.py'.
yohplala Dec 14, 2021
550ec06
Same as previous commit, except is working now.
yohplala Dec 14, 2021
511f218
Fix compatibility with python 3.7.
yohplala Dec 14, 2021
b7d88ab
Superthrift conflict fixing - work-in-progress.
yohplala Dec 15, 2021
2307c09
'pf.fmd.row_groups[0].columns[0].file_path' is kept as bytes whenever…
yohplala Dec 15, 2021
c900f3b
Forgotten commented out lines. Now removed.
yohplala Dec 15, 2021
3b29e3a
Fix Dask test.
yohplala Dec 15, 2021
357c507
Fix 2 Dask test.
yohplala Dec 15, 2021
4e7378e
Fix 3 Dask test.
yohplala Dec 15, 2021
786101c
'pf.fmd.row_groups[].columns[].file_path' is string, whatever, whenev…
yohplala Dec 16, 2021
d7140e2
Revert some formatting changes + adds links to some discussions in repo.
yohplala Dec 29, 2021
2bb1a87
Merge branch 'main' of https://github.com/dask/fastparquet into appen…
yohplala Jan 3, 2022
aa44f48
Removed 'rename' parameter + revert 'file_path.decode()' for all colu…
yohplala Jan 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 93 additions & 8 deletions fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from .core import read_thrift
from .thrift_structures import parquet_thrift
from . import core, schema, converted_types, encoding, dataframe
from .util import (default_open, default_remove, ParquetException, val_to_num, ops,
ensure_bytes, check_column_names, metadata_from_many,
ex_from_sep, json_decoder, _strip_path_tail)
from .util import (default_open, default_remove, ParquetException, val_to_num,
ops, ensure_bytes, check_column_names, metadata_from_many,
ex_from_sep, json_decoder, _strip_path_tail,
reset_row_idx)


class ParquetFile(object):
Expand Down Expand Up @@ -342,7 +343,7 @@ def iter_row_groups(self, filters=None, **kwargs):
if not df.empty:
yield df

def remove_row_groups(self, rgs, write_fmd:bool = True,
def remove_row_groups(self, rgs, write_fmd:bool=True,
yohplala marked this conversation as resolved.
Show resolved Hide resolved
open_with=default_open, remove_with=None):
"""
Remove list of row groups from disk. `ParquetFile` metadata are
Expand Down Expand Up @@ -372,11 +373,11 @@ def remove_row_groups(self, rgs, write_fmd:bool = True,
remove_with = default_remove
if not isinstance(rgs, list):
rgs = [rgs]
rgs_to_remove = row_groups_map(rgs)
yohplala marked this conversation as resolved.
Show resolved Hide resolved
rgs_to_remove = map_row_groups(rgs)
if "fastparquet" not in self.created_by or self.file_scheme=='flat':
# Check if some files contain row groups both to be removed and to
# be kept.
all_rgs = row_groups_map(self.row_groups)
all_rgs = map_row_groups(self.row_groups)
for file in rgs_to_remove:
if len(rgs_to_remove[file]) < len(all_rgs[file]):
raise ValueError(f'File {file} contains row groups both \
Expand All @@ -391,10 +392,94 @@ def remove_row_groups(self, rgs, write_fmd:bool = True,
except IOError:
pass
self._set_attrs()

if write_fmd:
self._write_common_metadata(open_with)

def append_as_row_groups(self, data, row_group_offsets=None,
martindurant marked this conversation as resolved.
Show resolved Hide resolved
compression=None, write_fmd:bool=True,
open_with=default_open, mkdirs=None, append=True,
stats=True):
"""
Append data as new row groups to disk. Updated `ParquetFile` metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top of docstring should be kept to one line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminds me: how is this method different from write(..., append=True) ?

Copy link
Author

@yohplala yohplala Dec 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What append_as_row_groups does over write are:

  • it is used on an already instanciated ParquetFile, (if the instance is already existing/loaded, you don't re-load it again as opposed to write)
  • there are fewer checks on the ParquetFile instance / it requires less parameters, as some of them are defined directly from the instance attributes (partition_on for instance)
  • it leaves the option to write or not common metadata to disk (write_fmd=False)
  • it 'returns' the modified ParquetFile instance (allowing further modifications)

After this PR, I would like to prepare a new one containing documentation and an illustrative test case.
This will hopefully illustrate better the interest of completing the set of utilities to manage row groups of a ParquetFile instance.

This documentation would be a kind of 'tutorial' / example about how using these functions together to update a parquet dataset.

Assuming:

  • the user defines a way to identify overlaps between row groups.
  • the user defines a way to identify duplicates between rows.

Then the following update methodology with fastparquet is possible:

# Retrieve overlaps between existing row group and new data.
rgs_overlaps = filter_row_groups()

# Concat overlapping row groups with new data, and drop duplicates in rows.
df_to_add = pd.concat(new_data, pf[rgs_overlaps].to_pandas()).drop_duplicates()

# Remove overlaps from disk.
pf.remove_row_groups(rgs_overlaps, write_fmd=False)

# Add new data, along with data from overlapping row groups.
pf.append_as_row_groups(df_to_add, write_fmd=False)

# Sort row groups if relevant.
pf.fmd.row_groups = sorted(pf.row_groups, key=...)

# Finally write common metadata.
pf._write_common_metadata()

I would like to provide 2 application test cases:

  • one with overlaps defined by partition. For instance:
    filter_row_groups(pf, filter=[('name', 'in', ['Yoh','Fred'])
  • the other one with overlaps defined by an ordered column, for instance a time series:
    filter_row_groups(pf, filter=[('timestamp', '>=', pd.Timestamp('2021/01/01'), ('timestamp', '<=', pd.Timestamp('2021/01/04')])

and then roll-out above-proposed procedure.

Compared to append=overwrite, this approach:

  • pros:
    • does not force the user to use partitions.
    • in case the user is using partitions, it does not limit the user to have a single row group per partition
  • cons:
    • requires more knowledge about these different row group utilities (hence I think a documentation page is worthwhile)
    • (similar) more lines of codes

I introduced this in #676, I am sorry if I am writing a lot of things not necessarily very clear.
Near the end of the 1st post, with bullet points, I am briefly summarizing the main requirements of this 'generic' update feature. They correspond to the different utilities I have implemented:

  • remove_row_groups,
  • _write_common_metadata
  • append_as_row_groups.

Screenshot from 2021-12-05 07-46-38

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(made some updates to complete above answer)

Copy link
Author

@yohplala yohplala Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @martindurant
Replying here to
By the way, is "insert" possible (i.e., shuffling the other row-groups and filenames as necessary)?

I rephrased this question in my mind into:
Could your work be extended so as to make insertion of row groups ('shuffling' other row-groups and filenames as necessary)?

To this question, my answer is yes, and I think it is actually a good idea!
It has not been my target so far (as it is possible to do it by using sorted on row group list directly), but I think doing insertion directly in append_as_row_groups (that I will rename into write_row_groups as per our above discussion) is a neater solution.

I think i would also change the interface of write_row_groups() so that it accepts an iterable of dataframes (e.g. list of dataframes or generator of dataframes, each individual dataframe defining a row group) instead of a single dataframe to be split.
For each of these row groups, the list of indexes where these row groups have to be inserted will have to be provided optionally (if not provided, it will be usual append).

With these changes, we can make the existing overwrite feature a separate function, similar in the way it is 'external' as the existing merge, and as efficient as it is now (but which current implementation is intricated into write).

As I see it, it will thus bring us more modular code, easier to read and maintain, so yes, I am keen on investing this!
I will work out something in the coming days, and push updates, probably by the end of this week.
Thanks for your constructive insights!
Bests,

are written on disk accordingly (optional).

Parameter
---------
data: pandas dataframe
Data to append.
row_group_offsets: int or list of ints
If int, row-groups will be approximately this many rows, rounded
down to make row groups about the same size;
If a list, the explicit index values to start new row groups;
If `None`, set to 50000000.
compression: str, dict, None
compression to apply to each column, e.g. ``GZIP`` or ``SNAPPY`` or
a ``dict`` like ``{"col1": "SNAPPY", "col2": None}`` to specify per
column compression types.
By default, do not compress.
write_fmd: bool, True
Write updated common metadata to disk.
open_with: function
When called with a f(path, mode), returns an open file-like object.
mkdirs: function
When called with a path/URL, creates any necessary dictionaries to
make that location writable, e.g., ``os.makedirs``. This is not
necessary if using the simple file scheme.
append: bool (False) or 'overwrite'
If False, construct data-set from scratch;
If True, add new row-group(s) to existing data-set. In the latter
case, the data-set must exist, and the schema must match the input
data;
If 'overwrite', existing partitions will be replaced in-place, where
new data has any rows within an existing partition. To enable this,
in addition to append to a data-set with partitions, and 'hive'
scheme, row_group_offset has to be set to `[0]` (meaning writing a
single row group per partition).
stats: True|False|list(str)
Whether to calculate and write summary statistics.
If True (default), do it for every column;
If False, never do;
If a list of str, do it only for those specified columns.
"""
from .writer import write_simple, write_multi
if self._get_index():
# Adjust index of pandas dataframe.
data = reset_row_idx(data)
if (self.file_scheme == 'simple'
or (self.file_scheme == 'empty' and self.fn[-9:] != '_metadata')):
# Case 'simple'.
if sorted(self.columns) != sorted(data.columns):
raise ValueError('File schema is not compatible with '
'existing file schema.')
if append == 'overwrite':
raise ValueError("Not possible to overwrite with simple file \
scheme.")
write_simple(self.fn, data, self.fmd, row_group_offsets,
compression, open_with, append, stats)
else:
# Case 'hive' or 'drill'.
partition_on = list(self.cats)
if append == 'overwrite':
if (row_group_offsets != [0]) and (row_group_offsets != 0):
raise ValueError("When overwriting partitions, writing \
several row groups per partition is not possible. Please, force writing a \
single row group per partition by setting `row_group_offsets=[0]`.")
if not partition_on:
raise ValueError("No partitioning column has been set in \
existing data-set. Overwrite of partitions is not possible.")
exist_rgps = [rg.columns[0].file_path
for rg in self.row_groups]
if len(exist_rgps) > len(_strip_path_tail(exist_rgps)):
# Some row groups are in the same folder (partition).
raise ValueError("Some partition folders contain several \
row groups. This situation is not allowed with use of `append='overwrite'`.")
write_multi(self.basepath, data, self.fmd, row_group_offsets,
compression, self.file_scheme, write_fmd=write_fmd,
open_with=open_with, mkdirs=mkdirs,
partition_on=partition_on, append=append, stats=stats)
self._set_attrs()
return
yohplala marked this conversation as resolved.
Show resolved Hide resolved

def _write_common_metadata(self, open_with=default_open):
"""
Write common metadata to disk.
Expand Down Expand Up @@ -1224,7 +1309,7 @@ def filter_not_in(values, vmin=None, vmax=None):
return False


def row_groups_map(rgs: list) -> dict:
def map_row_groups(rgs: list) -> dict:
"""
Returns row group lists sorted by parquet files.

Expand Down
53 changes: 49 additions & 4 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import fastparquet
from fastparquet import write, ParquetFile
from fastparquet.api import (statistics, sorted_partitioned_columns, filter_in,
filter_not_in, row_groups_map)
filter_not_in, map_row_groups)
from fastparquet.util import join_path

TEST_DATA = "test-data"
Expand Down Expand Up @@ -1228,7 +1228,7 @@ def test_remove_rgs_partitioned_pyarrow_multi(tempdir):
with pytest.raises(ValueError, match="^File b=hi/a97cc141d16f4014a59e5b234dddf07c.parquet"):
pf.remove_row_groups(pf.row_groups[0])
# Removing all row groups of a same file is ok.
files_rgs = row_groups_map(pf.row_groups) # sort row groups per file
files_rgs = map_row_groups(pf.row_groups) # sort row groups per file
file = list(files_rgs)[0]
pf.remove_row_groups(files_rgs[file])
assert len(pf.row_groups) == 2 # check row group list updated (4 initially)
Expand All @@ -1248,10 +1248,55 @@ def test_remove_rgs_simple_merge(tempdir):
with pytest.raises(ValueError, match="^File fn1.parquet"):
pf.remove_row_groups(pf.row_groups[0])
# Removing all row groups of a same file is ok.
files_rgs = row_groups_map(pf.row_groups) # sort row groups per file
files_rgs = map_row_groups(pf.row_groups) # sort row groups per file
file = list(files_rgs)[0]
pf.remove_row_groups(files_rgs[file])
assert len(pf.row_groups) == 2 # check row group list updated (4 initially)
df_ref = pd.DataFrame({'a':range(4), 'b':['lo']*2+['hi']*2})
assert pf.to_pandas().equals(df_ref)



def test_append_rgs_simple(tempdir):
fn = os.path.join(tempdir, 'test.parq')
write(fn, df_remove_rgs[:2], file_scheme='simple')
pf = ParquetFile(fn)
pf.append_as_row_groups(df_remove_rgs[2:])
pf2 = ParquetFile(fn)
assert pf.fmd == pf2.fmd # metadata are updated in-place.
assert pf.to_pandas().equals(df_remove_rgs)


def test_append_rgs_simple_no_index(tempdir):
fn = os.path.join(tempdir, 'test.parq')
df = df_remove_rgs.reset_index(drop=True)
write(fn, df[:2], file_scheme='simple')
pf = ParquetFile(fn)
pf.append_as_row_groups(df[2:])
pf2 = ParquetFile(fn)
assert pf.fmd == pf2.fmd # metadata are updated in-place.
assert pf.to_pandas().equals(df)


def test_append_rgs_hive(tempdir):
dn = os.path.join(tempdir, 'test_parq')
write(dn, df_remove_rgs[:3], file_scheme='hive', row_group_offsets=[0,2])
pf = ParquetFile(dn)
pf.append_as_row_groups(df_remove_rgs[3:], [0, 1])
assert len(pf.row_groups) == 4
pf2 = ParquetFile(dn)
assert pf.fmd == pf2.fmd # metadata are updated in-place.
assert pf.to_pandas().equals(df_remove_rgs)


def test_append_rgs_hive_partitions(tempdir):
dn = os.path.join(tempdir, 'test_parq')
write(dn, df_remove_rgs[:3], file_scheme='hive', row_group_offsets=[0,2],
partition_on=['country'])
pf = ParquetFile(dn)
pf.append_as_row_groups(df_remove_rgs[3:], [0, 1])
assert len(pf.row_groups) == 4
pf2 = ParquetFile(dn)
assert pf.fmd == pf2.fmd # metadata are updated in-place.
df = df_remove_rgs.sort_index()
df['country'] = df['country'].astype('category')
assert pf.to_pandas().sort_index().equals(df)
40 changes: 28 additions & 12 deletions fastparquet/test/test_overwrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@


def test_write_with_rgp_by_date_as_index(tempdir):

# Step 1 - Writing of a 1st df, with `row_group_offsets=0`,
# `file_scheme=hive` and `partition_on=['location', 'color`].
df1 = pd.DataFrame({'humidity': [0.3, 0.8, 0.9],
'pressure': [1e5, 1.1e5, 0.95e5],
'location': ['Paris', 'Paris', 'Milan'],
'color': ['red', 'black', 'blue']})
write(tempdir, df1, row_group_offsets=0, file_scheme='hive',
write(tempdir, df1, row_group_offsets=[0], file_scheme='hive',
martindurant marked this conversation as resolved.
Show resolved Hide resolved
partition_on=['location', 'color'])

# Step 2 - Overwriting with a 2nd df having overlapping data, in
# 'overwrite' mode:
# `row_group_offsets=0`, `file_scheme=hive`,
Expand All @@ -28,10 +26,8 @@ def test_write_with_rgp_by_date_as_index(tempdir):
'pressure': [9e4, 1e5, 1.1e5, 1.1e5, 0.95e5],
'location': ['Milan', 'Paris', 'Paris', 'Paris', 'Paris'],
'color': ['red', 'black', 'black', 'green', 'green' ]})

write(tempdir, df2, row_group_offsets=0, file_scheme='hive', append='overwrite',
write(tempdir, df2, row_group_offsets=[0], file_scheme='hive', append='overwrite',
partition_on=['location', 'color'])

expected = pd.DataFrame({'humidity': [0.9, 0.5, 0.3, 0.4, 0.8, 1.1, 0.3],
'pressure': [9.5e4, 9e4, 1e5, 1.1e5, 1.1e5, 9.5e4, 1e5],
'location': ['Milan', 'Milan', 'Paris', 'Paris', 'Paris', 'Paris', 'Paris'],
Expand All @@ -44,18 +40,38 @@ def test_write_with_rgp_by_date_as_index(tempdir):
# df1. Total resulting number of rows is 7.
assert expected.equals(recorded)

def test_several_existing_parts_in_folder_exception(tempdir):

def test_exception_1(tempdir):
df1 = pd.DataFrame({'humidity': [0.3, 0.8, 0.9, 0.7],
'pressure': [1e5, 1.1e5, 0.95e5, 1e5],
'location': ['Paris', 'Paris', 'Milan', 'Paris'],
'exterior': ['yes', 'no', 'yes', 'yes']})

write(tempdir, df1, row_group_offsets = 1, file_scheme='hive',
# Several existing parts in folder exception.
write(tempdir, df1, row_group_offsets=1, file_scheme='hive',
write_index=False, partition_on=['location', 'exterior'])

with pytest.raises(ValueError, match="^Some partition folders"):
write(tempdir, df1, row_group_offsets = 0, file_scheme='hive',
write(tempdir, df1, row_group_offsets=0, file_scheme='hive',
write_index=False, partition_on=['location', 'exterior'],
append='overwrite')
with pytest.raises(ValueError, match="^Some partition folders"):
write(tempdir, df1, row_group_offsets=[0], file_scheme='hive',
write_index=False, partition_on=['location', 'exterior'],
append='overwrite')
# If not 0 row group offset, not accepted.
with pytest.raises(ValueError, match="^When overwriting"):
write(tempdir, df1, row_group_offsets=1, file_scheme='hive',
write_index=False, partition_on=['location', 'exterior'],
append='overwrite')


def test_exception_2(tempdir):
df1 = pd.DataFrame({'humidity': [0.3, 0.8, 0.9, 0.7],
'pressure': [1e5, 1.1e5, 0.95e5, 1e5],
'location': ['Paris', 'Paris', 'Milan', 'Paris'],
'exterior': ['yes', 'no', 'yes', 'yes']})
# No partitions.
write(tempdir, df1, row_group_offsets=1, file_scheme='hive',
write_index=False)
with pytest.raises(ValueError, match="^No partitioning"):
write(tempdir, df1, row_group_offsets=0, file_scheme='hive',
write_index=False, append='overwrite')

52 changes: 52 additions & 0 deletions fastparquet/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,58 @@ def check_column_names(columns, *args):
"" % (missing, arg, columns))


def reset_row_idx(data: pd.DataFrame) -> pd.DataFrame:
"""
Shift row index to columns of the DataFrame, compatible for storing to
martindurant marked this conversation as resolved.
Show resolved Hide resolved
parquet.

Parameters
----------
data: pd.DataFrame
Returns
-------
data: pd.DataFrame
"""
if isinstance(data.index, pd.MultiIndex):
for name, cats, codes in zip(data.index.names, data.index.levels,
data.index.codes):
data = data.assign(**{name: pd.Categorical.from_codes(codes,
martindurant marked this conversation as resolved.
Show resolved Hide resolved
cats)})
data.reset_index(drop=True)
else:
data = data.reset_index()
return data


def to_rg_offsets(row_group_offset: int, n_rows: int):
"""
Express 'row_group_offset' as a list of row indexes.

Parameters
----------
row_group_offset: int
Row-groups will be approximately this many rows, rounded down to make
row groups about the same size
n_rows: int
Total number of rows in dataset.

Returns
-------
row_group_offsets: List[int]
List of row indexes marking the start of each row group.
"""
# TODO
# Could be extended so that instead of a target number of rows per
# row group, it accepts a target size (MB or GB) per row group.
if not row_group_offset: # if row group is 0.
row_group_offsets = [0]
else:
nparts = max((n_rows - 1) // row_group_offset + 1, 1)
chunksize = max(min((n_rows - 1) // nparts + 1, n_rows), 1)
row_group_offsets = list(range(0, n_rows, chunksize))
return row_group_offsets


def metadata_from_many(file_list, verify_schema=False, open_with=default_open,
root=False, fs=None):
"""
Expand Down
Loading