Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of ParquetFile.write_row_groups() & ParquetFile._sort_part_names() #712

Merged
merged 23 commits into from
Jan 10, 2022
Merged

Implementation of ParquetFile.write_row_groups() & ParquetFile._sort_part_names() #712

merged 23 commits into from
Jan 10, 2022

Conversation

yohplala
Copy link

@yohplala yohplala commented Nov 21, 2021

'Re-ordering' of 'write' to separate 'append' part and make it available in ParquetFile through 'append_as_row_groups' method.

As per initial PR:

Implementation of ParquetFile.append_as_row_groups()

This PR completes the proposal made in ticket #676.

The work done consisted in extracting from current writer.write() code dealing

  • with 'multi-part' format into a write_multi() (similar to the existing write_simple())
  • with data index management (to have it fit for parquet format) into a reset_row_idx()
  • with some checks related 'append' into ParquetFile.append_as_row_groups()

In the end, the root work (appending or writing a new file) is achieved either by write_multi or write_simple.

Both writer.write and api.ParquetFile.append_as_row_groups call write_multi or write_simple:

  • writer.write call them directly when writing a file from scratch
  • api.ParquetFile.append_as_row_groups call them when append=True (or overwrite)

For appending, writer.write.py calls api.ParquetFile.append_as_row_groups, bypassing creation of metadata.
This was not the case before, but it was a useless step, these metadata not being used afterwards.

Among others changes:

  • I indicated in writer.write docstring the mention: 'Ignored if appending to an existing parquet dataset.' when related parameter is not used when appending.
  • several test cases have been added in test_api.py to check api.ParquetFile.append_as_row_groups, and complete cases when append='overwrite'.

…e in ParquetFile through 'append_as_row_groups' method.
@yohplala
Copy link
Author

yohplala commented Dec 1, 2021

Hi @martindurant
Please, do you have any feedbacks regarding this PR?

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts so far, without having yet looked through the big refactor on write()

fastparquet/api.py Outdated Show resolved Hide resolved
fastparquet/api.py Show resolved Hide resolved
open_with=default_open, mkdirs=None, append=True,
stats=True):
"""
Append data as new row groups to disk. Updated `ParquetFile` metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top of docstring should be kept to one line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminds me: how is this method different from write(..., append=True) ?

Copy link
Author

@yohplala yohplala Dec 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What append_as_row_groups does over write are:

  • it is used on an already instanciated ParquetFile, (if the instance is already existing/loaded, you don't re-load it again as opposed to write)
  • there are fewer checks on the ParquetFile instance / it requires less parameters, as some of them are defined directly from the instance attributes (partition_on for instance)
  • it leaves the option to write or not common metadata to disk (write_fmd=False)
  • it 'returns' the modified ParquetFile instance (allowing further modifications)

After this PR, I would like to prepare a new one containing documentation and an illustrative test case.
This will hopefully illustrate better the interest of completing the set of utilities to manage row groups of a ParquetFile instance.

This documentation would be a kind of 'tutorial' / example about how using these functions together to update a parquet dataset.

Assuming:

  • the user defines a way to identify overlaps between row groups.
  • the user defines a way to identify duplicates between rows.

Then the following update methodology with fastparquet is possible:

# Retrieve overlaps between existing row group and new data.
rgs_overlaps = filter_row_groups()

# Concat overlapping row groups with new data, and drop duplicates in rows.
df_to_add = pd.concat(new_data, pf[rgs_overlaps].to_pandas()).drop_duplicates()

# Remove overlaps from disk.
pf.remove_row_groups(rgs_overlaps, write_fmd=False)

# Add new data, along with data from overlapping row groups.
pf.append_as_row_groups(df_to_add, write_fmd=False)

# Sort row groups if relevant.
pf.fmd.row_groups = sorted(pf.row_groups, key=...)

# Finally write common metadata.
pf._write_common_metadata()

I would like to provide 2 application test cases:

  • one with overlaps defined by partition. For instance:
    filter_row_groups(pf, filter=[('name', 'in', ['Yoh','Fred'])
  • the other one with overlaps defined by an ordered column, for instance a time series:
    filter_row_groups(pf, filter=[('timestamp', '>=', pd.Timestamp('2021/01/01'), ('timestamp', '<=', pd.Timestamp('2021/01/04')])

and then roll-out above-proposed procedure.

Compared to append=overwrite, this approach:

  • pros:
    • does not force the user to use partitions.
    • in case the user is using partitions, it does not limit the user to have a single row group per partition
  • cons:
    • requires more knowledge about these different row group utilities (hence I think a documentation page is worthwhile)
    • (similar) more lines of codes

I introduced this in #676, I am sorry if I am writing a lot of things not necessarily very clear.
Near the end of the 1st post, with bullet points, I am briefly summarizing the main requirements of this 'generic' update feature. They correspond to the different utilities I have implemented:

  • remove_row_groups,
  • _write_common_metadata
  • append_as_row_groups.

Screenshot from 2021-12-05 07-46-38

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(made some updates to complete above answer)

Copy link
Author

@yohplala yohplala Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @martindurant
Replying here to
By the way, is "insert" possible (i.e., shuffling the other row-groups and filenames as necessary)?

I rephrased this question in my mind into:
Could your work be extended so as to make insertion of row groups ('shuffling' other row-groups and filenames as necessary)?

To this question, my answer is yes, and I think it is actually a good idea!
It has not been my target so far (as it is possible to do it by using sorted on row group list directly), but I think doing insertion directly in append_as_row_groups (that I will rename into write_row_groups as per our above discussion) is a neater solution.

I think i would also change the interface of write_row_groups() so that it accepts an iterable of dataframes (e.g. list of dataframes or generator of dataframes, each individual dataframe defining a row group) instead of a single dataframe to be split.
For each of these row groups, the list of indexes where these row groups have to be inserted will have to be provided optionally (if not provided, it will be usual append).

With these changes, we can make the existing overwrite feature a separate function, similar in the way it is 'external' as the existing merge, and as efficient as it is now (but which current implementation is intricated into write).

As I see it, it will thus bring us more modular code, easier to read and maintain, so yes, I am keen on investing this!
I will work out something in the coming days, and push updates, probably by the end of this week.
Thanks for your constructive insights!
Bests,

fastparquet/api.py Outdated Show resolved Hide resolved
fastparquet/api.py Outdated Show resolved Hide resolved
fastparquet/test/test_overwrite.py Outdated Show resolved Hide resolved
fastparquet/util.py Outdated Show resolved Hide resolved
fastparquet/writer.py Show resolved Hide resolved
fastparquet/writer.py Show resolved Hide resolved
@yohplala
Copy link
Author

yohplala commented Dec 12, 2021

Hi, here is a status of the on-going work.

Content of last commit

The 'row group insertion + sorting' is (obviously) closely connected to the 'append=overwrite' mode, and it seemed necessary to me to refactor it while equipping pf.write_row_groups() with the 'row group sorting' feature.

It seems to me now that the name of this PR is kind of a tree that hides the forest. It is actually a refactoring of write function, which has been split between write_multi, pf.write_row_groups, iter_dataframe, and update (along with write_simple that was already existing).

Features of pf.write_row_groups over write are now:

  • uses as input a ParquetFile instance (if the instance is already existing/loaded, you don't re-load it again as opposed to write),
  • returns the modified ParquetFile instance (if you need it for subsequent use, no need to reload it)
  • fewer checks on the ParquetFile instance / requires less parameters, as some of them are defined directly from the instance attributes (partition_on for instance)
  • leaves the option to write or not common metadata to disk (write_fmd=False)
  • uses a key to sort row groups (or not)
  • requires an iterable of dataframes (aka row groups as separate dataframes)

Filename shuffling

This PR is not yet completed, as filename shuffling is still missing.
For this feature, my proposal is to optionally make possible renaming of filenames as per related row group position in row group list. My target is to finalize this by end of this week.
It should not be too much code.

update()

A comment regarding new update function.
This new function manages the 'overwrite' mode.
Its name is currently inadequate, but my next step (next PR) will be to make it the skeleton to be used for operating the update of an existing dataset.
Its content relates to what I was formalizing when speaking about overlaps between row-groups and duplicates between rows.

So as not to change its name in the next PR, I already name it update.
If this is too much inadequate, its name as per its current scope, would be overwrite_partitioned.
Please, let me know if i should change it or not.

Iterable of dataframes for iterating separate row groups

Some other frameworks than pandas featuring out of core dataframe management (for instance vaex) can generate pandas dataframe in chunks (to manage data that do not fit into RAM).
pf.write_row_groups(), write_multi and write_simple now accepts directly such an iterable.
Users can take advantage of this feature to benefit from fastparquet efficiency and features to write parquet file from a vaex dataframe, the way they want :).

@yohplala
Copy link
Author

@martindurant
I am working on the filename renaming.
I am wondering how is set rg.columns[0].file_path?

Is it set at parquet files discovery when instanciating a ParquetFile?
If yes, then changing the name of the file is enough.

But if this attribute also persists in the metadata, I need to update it there as well.
In this latter case,I don't know how. Please could you provide some guidance?

Thanks for your help, bests

@yohplala
Copy link
Author

But if this attribute also persists in the metadata, I need to update it there as well. In this latter case,I don't know how. Please could you provide some guidance?

Ok, found it:

  • yes, it is persisted
  • code example in write_muli:
            partname = join_path(dn, part)
            with open_with(partname, 'wb') as f2:
                rg = make_part_file(f2, row_group, fmd.schema,
                                    compression=compression, fmd=fmd,
                                    stats=stats)
            for chunk in rg.columns:
                chunk.file_path = part

@martindurant
Copy link
Member

As usual, it isn't simple :)
A _metadata file contains a full thrift description of the dataset, including the file_paths of all of the columns of all of the row-groups. If this special file exists, fastparquet will use it and not search for files or even read their own copies of the matadata. Note that the file_paths in the metadata of the file that contains those columns should be None, it's only used when referencing a column chunk in another file.

If the special file does not exist, fastparquet effectively constructs the corresponding thrift object in memory.

Other frameworks do something different, including fastparquet-on-dask. They may well ignore any _metadata file even if it exists, so here the filenames and their ordering becomes important.

@yohplala
Copy link
Author

As usual, it isn't simple :) A _metadata file contains a full thrift description of the dataset, including the file_paths of all of the columns of all of the row-groups. If this special file exists, fastparquet will use it and not search for files or even read their own copies of the matadata. Note that the file_paths in the metadata of the file that contains those columns should be None, it's only used when referencing a column chunk in another file.

Ok, if think it fits with what I am implementing.
I will update file_path in each row_group in fmd.row_groups.

@martindurant
Copy link
Member

Sorry, I created some merge conflicts for you

@yohplala
Copy link
Author

yohplala commented Dec 14, 2021

Sorry, I created some merge conflicts for you

Martin, it will be I that am sorry when I will finally pull-request this 800+ lines of code. ;)

@yohplala yohplala changed the title Implementation of ParquetFile.append_as_row_groups() Implementation of ParquetFile.write_row_groups() & ParquetFile._sort_part_names() Dec 15, 2021
@yohplala
Copy link
Author

yohplala commented Dec 15, 2021

Hi @martindurant

Overall, I am done with this PR, except for this file_path that dask wants as string.
I could put some if here and there, but even in dask itself, there is a function that I cannot modify:

dask/dask/utils.py:1335: in natural_sort_key
    return [int(part) if part.isdigit() else part for part in re.split(r"(\d+)", s)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

pattern = '(\\d+)', string = b'a=apple/b=large/part.0.parquet', maxsplit = 0
flags = 0
    def split(pattern, string, maxsplit=0, flags=0):
        """Split the source string by the occurrences of the pattern,
        returning a list containing the resulting substrings.  If
        capturing parentheses are used in pattern, then the text of all
        groups in the pattern are also returned as part of the resulting
        list.  If maxsplit is nonzero, at most maxsplit splits occur,
        and the remainder of the string is returned as the final element
        of the list."""
>       return _compile(pattern, flags).split(string, maxsplit)
E       TypeError: cannot use a string pattern on a bytes-like object

Hmmm, I am running out of ideas.
If you have any on what should be done, I would gladly read about them.

I had in mind to have bytes only in metadata, but maybe we should do the reverse.
Once file_path is written, we revert it to string. I have to check where this is done.
Then, we only keep string everywhere, except in this function that actually writes the value.
Do you think this could be ok? (stopping here for today)

@yohplala
Copy link
Author

yohplala commented Dec 16, 2021

Hi @martindurant
It now seems ok.
As I could conclude yesterday, I let pf.fmd;row_groups[].columns[].file_path be a string as soon as it is set.

I think that with everything that I could write, everything has been 'introduced' so to say.

One thing remaining regarding update which is the refactoring of write(...., append='overwrite').
With the formalization I have made, it removes the constraint to use a single row group per partition.
The only constraint that remains when using update (or the redirection write(...., append='overwrite')) is that the existing dataset has to be written with partitions (which also implicitly implies hive format).

Please, let me know about your feedback.
Again I am sorry for such a big PR.
Next ones will be much more focused, I will take care about this.

@yohplala
Copy link
Author

Hi @martindurant ,
Please, do you have any feedbacks for this PR?

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed as far as writer.py, which only has a couple of comments. As usual, the extensive refactor there makes it tricky to see what has really changed and what not.

fastparquet/api.py Outdated Show resolved Hide resolved
fastparquet/api.py Outdated Show resolved Hide resolved
fastparquet/api.py Outdated Show resolved Hide resolved
@@ -371,6 +376,15 @@ def remove_row_groups(self, rgs, write_fmd:bool = True,
(and any contained files). Not required if this ParquetFile has
a .fs file system attribute
"""
if not isinstance(rgs, list):
if not hasattr(rgs, '__iter__'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain this is comprehensive. The following is probably better

try:
    rgs = list(rgs)
except TypeError:
    rgs = [rgs]

So, I gather it's OK to "remove" an empty list of row-groups, but it has no effect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I rewrote the code.
I did not keep the try/except though, because lines of code immediately after take care of empty row-groups list.
This isinstance/hasattr has been here to catch row group list given as generator.

I left a comment explaining why I swith from [] (previous code) to list().

  • On a generator, [] encapsulate it in a list. When you are iterating over the list content, you get the generator, not the items of the generator.
  • list() materializes the generator into a list. when you iterate over it, it will iterate the row groups.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the first branch ever get called? A tuple also has __iter__, actually many things do. If what you really want to check for generator, why not do exactly that (inspect.isgenerator) ?

If int, row-groups will be approximately this many rows, rounded down
to make row groups about the same size;
If a list, the explicit index values to start new row groups;
If `None`, set to 50_000_000.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have this as the default, then?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, but it is not set in writer.write().
I moved the code slicing the pandas dataframe in chunks into a separate function: writer.iter_dataframe().
It is where default value is set (the 2 first lines of the function). I could make it a 'constant' and set it at the beginning of writer.py though?

import ...

ROW_GROUP_SIZE = 50_000_000

...

def iter_dataframe():
    if row_group_offsets is None:
        row_group_offsets = ROW_GROUP_SIZE

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually yes, I like using a constant. Then a user could change it once and have it be the default for all future writes.

fastparquet/test/test_output.py Show resolved Hide resolved
fastparquet/util.py Show resolved Hide resolved
chunk.file_path = '/'.join(
[fn, chunk.file_path if isinstance(chunk.file_path, str) else chunk.file_path.decode()]
)
chunk.file_path = '/'.join([fn, chunk.file_path])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we now certain that this is always a str, not bytes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed at ParquetFile instantiation the decoding to string behavior.
In current version of fastparquet, only the file_path of the 1st column is decoded to string.
This is seen in _parse_header() (current version below).

        for rg in fmd[4]:
            chunks = rg[1]
            if chunks:
                chunk = chunks[0]
                s = chunk.get(1)
                if s:
                    chunk[1] = s.decode()

Now, included in this PR, all columns are dealt with:

        # for rg in fmd.row_groups:
        for rg in fmd[4]:
            # chunks = rg.columns
            chunks = rg[1]    
            if chunks:
                for chunk in chunks:
                    # s = chunk.file_path
                    s = chunk.get(1)
                    if s:
                        # chunk.file_path = s.decode()
                        chunk[1] = s.decode()

To be noticed, there was a related TODO in current code, in writer.find_max_part().

def find_max_part(row_groups):
    """
    Find the highest integer matching "**part.*.parquet" in referenced paths.
    """
    paths = [c.file_path or "" for rg in row_groups for c in rg.columns]
    # TODO: the following line should not be necessary
    paths = [p.decode() if isinstance(p, bytes) else p for p in paths]

I confirm that after the change in logic, previously related failing test cases are now ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this might be an unfortunate change then. Decoding the bytes can take a non-trivial amount of time when there are many columns, and in the typical read-only case, it's unnecessary.

fastparquet/writer.py Show resolved Hide resolved
def update(dirpath, data, row_group_offsets=None, sort_pnames:bool=True,
compression=None, open_with=default_open, mkdirs=None, rename=None,
remove_with=None, stats=True):
"""Merge new data to existing parquet dataset.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what exactly is the correspondence between this and the new methods on ParquetFile? Do we need both?

Copy link
Author

@yohplala yohplala Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetFile.write_row_groups() 'only':

  • adds new data to an existing dataset,
  • define where to insert new row groups in row group list,
  • can reorder partition files (rename them),
  • modify metadata

update is a 'macro' function that rolls out currently only one logic for updating a partitioned dataset (I identify a different update logic in the case of an ordered dataset, having no partition). It is the re-implementation of append=overwrite feature.

  • it identifies partition files to be removed based on partition values identified in new data,
  • it calls ParquetFile.remove_row_groups() to remove them
  • it defines the logic to sort row groups that will be added from new data
  • it calls ParquetFile.write_row_groups() to add new data

I would not say there is correspondence between both.

  • update is the implementation of an update logic for partitioned datasets.
  • ParquetFile.write_row_groups 'only' does the 'add new row groups' parts, applying also the row group sorting logic that is defined in update (and is specific to this update logic for partitioned datasets).

@yohplala
Copy link
Author

yohplala commented Jan 3, 2022

Hello @martindurant ,
Wish you a happy new year!!!
Martin, please, do you have new feedbacks? (I am not willing to bother here, I can wait, but I think, even at small steps, it is better to continue than stopping for too long)
Thanks for your support! Bests,

@@ -371,6 +376,15 @@ def remove_row_groups(self, rgs, write_fmd:bool = True,
(and any contained files). Not required if this ParquetFile has
a .fs file system attribute
"""
if not isinstance(rgs, list):
if not hasattr(rgs, '__iter__'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the first branch ever get called? A tuple also has __iter__, actually many things do. If what you really want to check for generator, why not do exactly that (inspect.isgenerator) ?

fastparquet/api.py Show resolved Hide resolved
If int, row-groups will be approximately this many rows, rounded down
to make row groups about the same size;
If a list, the explicit index values to start new row groups;
If `None`, set to 50_000_000.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually yes, I like using a constant. Then a user could change it once and have it be the default for all future writes.

@martindurant
Copy link
Member

The following comment arrived in my inbox apparently without a link to this PR, so pasting here.

OK, got it. If dict is also acceptable for the condition (because a ThriftObject is just a wrapper for dict), please also allow that.

I am sorry I don't understand.
rgs can be:

a row group (in which case it is a ThriftObject)
or an iterable of row groups (generator, list, tuple...)
I cannot imagine a case when a user would provide a dict, except if fastparquet, elsewhere, can also accept indifferently dict as ThriftObject. Is it the case? If yes, then yes, let's also correctly manage this type with []and not list() (on a dict, list() would only keep the keys of the dict, not encapsulate the dict in a list indeed)

As you have noticed, fmd.row_groups returns a list of ThriftObjects with row-group type, but fmd[3] returns a list of dicts. Both have the same information. The mapping between field names and integer index is in the specs constant in cencoding.pyx. I don't know if we ever expect the user to manipulate the raw dicts, but the code in fastparquet does, i.e., those portions where I commented in this PR to keep the index notation rather than attribute access, because it is (much) faster.

@yohplala
Copy link
Author

yohplala commented Jan 3, 2022

@martindurant
I checked the dask failing test casess.
They are all related to a DeprecationWarning.

E       DeprecationWarning: the `interpolation=` argument to percentile was renamed to `method=`, which has additional options.
E       Users of the modes 'nearest', 'lower', 'higher', or 'midpoint' are encouraged to review the method they. (Deprecated NumPy 1.22)

This seems to be unrelated to this commit.

@martindurant
Copy link
Member

dask/dask#8525

@martindurant
Copy link
Member

Am I right in thinking that the only thing left here is the questions of decoding all of the file paths on open, where this is costly in time and not needed for most users?

@yohplala
Copy link
Author

yohplala commented Jan 5, 2022

Am I right in thinking that the only thing left here is the questions of decoding all of the file paths on open, where this is costly in time and not needed for most users?

@martindurant , I reverted this as well, this is the last message I wrote (decoding all of the file path occured in api._parse_header):

Ok, sorry, I understand. I reverted the change, both in util.metadata_from_many() and in api._parse_header.

I took into account all of your feedbacks. (if not, it is a miss on my side then, please highlight if you see miss)
For me 'all' is solved, waiting for new feedbacks ;)

@yohplala
Copy link
Author

Hi @martindurant ,
As indicated in previous message, all your feedbacks have been taken into account so far.
Please, if you have other ones to share, I can see to address them.
Thans! Bests,

@martindurant martindurant merged commit c9ce8b7 into dask:main Jan 10, 2022
@yohplala
Copy link
Author

Thanks @martindurant !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants