Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RAM usage and predicate pushdown #3974

Closed
cbilot opened this issue Jul 11, 2022 · 12 comments
Closed

RAM usage and predicate pushdown #3974

cbilot opened this issue Jul 11, 2022 · 12 comments
Labels
performance Performance issues or improvements

Comments

@cbilot
Copy link

cbilot commented Jul 11, 2022

What language are you using?

Python

Have you tried latest version of polars?

yes

What version of polars are you using?

0.13.52

What operating system are you using polars on?

Linux Mint 20.3

What language version are you using

3.10.4

My Question

When using filter, fetch, limit, or slice with scan_parquet, it seems that the entire contents (or nearly all) of the file are loaded into RAM before the filter is applied. Should this occur?

I would post this question on Stack Overflow, except that the MWE setup is somewhat complex.

MWE

This MWE is configurable, so anyone should be able to replicate this. We'll also re-use the mem_squash function from #3972 and #3971.

RAM usage and Garbage Collection

One complication with observing RAM usage is garbage collection. At any point, the RAM used by Python/Polars might include objects waiting to be garbage-collected. Thus, merely observing RAM usage using top (or similar tools) may not be representative of the RAM that is actually required by an algorithm.

As such, this MWE is designed to force an OOM situation to demonstrate that Polars is reading the entire contents (or nearly all) of a file into RAM before applying filtering.

Overview

Here's how we'll show that Polars is reading the entire contents of the file into RAM:

  1. We'll create a large DataFrame and save it as a parquet file. The DataFrame when loaded into RAM should occupy a significant share of RAM. In my case, I'll create a DataFrame that consumes 225 GB of RAM when loaded.
  2. We'll restart the Python interpreter, to ensure that all RAM is released.
  3. In the new instance of the Python interpreter, we'll first create a "boulder" - a very large DataFrame whose sole purpose is to consume a significant share of RAM. On my machine, I'll create a "boulder" DataFrame that consumes 300 GB of RAM, leaving only 512 GB - 300 GB ~ 200 GB remaining for the next step.
  4. We'll use scan_parquet and filter for a single record from the parquet file created in step 1 above.

In step 4 above, if Polars needs more than 200 GB of RAM to run the scan_parquet and filter, then an OOM will occur. This should demonstrate that Polars is consuming more than 200 GB to read a single record from the parquet file.

Presumably, Python/Polars will use garbage collection to reclaim as much RAM as possible before allowing an OOM. This sidesteps the issues that occur when merely watching overall RAM usage using top. In essence, we are forcing the issue to reveal itself.

Step 1: Create the Parquet File

I'll create a DataFrame of 225 GB (in RAM) and save it to a parquet file. We'll later use this in step 4 below.

import polars as pl
import math

mem_in_GB = 225
def mem_squash(mem_size_GB: int) -> pl.DataFrame:
    nbr_uint64 = mem_size_GB * (2**30) / 8
    nbr_cols = math.ceil(nbr_uint64 ** (0.15))
    nbr_rows = math.ceil(nbr_uint64 / nbr_cols)

    return pl.DataFrame(
        data={
            "col_" + str(col_nbr): pl.arange(0, nbr_rows, eager=True)
            for col_nbr in range(nbr_cols)
        }
    )


df = mem_squash(mem_in_GB)
df.estimated_size() / (2**30)

df.write_parquet('tmp.parquet')
>>> df.estimated_size() / (2**30)
225.0000001192093

Steps 2 & 3: Restart the Python Interpreter and Create the Boulder

Restart the Python interpreter and using mem_squash once again, this time to create the "boulder".

import polars as pl
import math
def mem_squash(mem_size_GB: int) -> pl.DataFrame:
    nbr_uint64 = mem_size_GB * (2**30) / 8
    nbr_cols = math.ceil(nbr_uint64 ** (0.15))
    nbr_rows = math.ceil(nbr_uint64 / nbr_cols)

    return pl.DataFrame(
        data={
            "col_" + str(col_nbr): pl.arange(0, nbr_rows, eager=True)
            for col_nbr in range(nbr_cols)
        }
    )


boulder_df = mem_squash(300)
boulder_df.estimated_size() / (2**30)
>>> boulder_df.estimated_size() / (2**30)
300.00000013411045

The boulder is now occupying 300 GB of my system RAM and cannot be garbage-collected. top shows that I have only 200 GB of available RAM for the next step.

Step 4: scan_parquet and filter

Now we'll runs a set of queries, using scan_parquet along with filter, limit, fetch, and slice, and observe what happens. The filter below should return only one record from the parquet file.

ldf = pl.scan_parquet('tmp.parquet')
df = ldf.filter(pl.col('col_1') == 100).collect()
>>> ldf = pl.scan_parquet('tmp.parquet')
>>> df = ldf.filter(pl.col('col_1') == 100).collect()
Killed

slice

Re-running Step 2, 3, and 4 (and showing only the last two lines.)

>>> ldf = pl.scan_parquet('tmp.parquet')
>>> df = ldf.slice(100, 1000).collect()
Killed

limit

>>> ldf = pl.scan_parquet('tmp.parquet')
>>> df = ldf.limit().collect()
Killed

fetch

fetch does succeed if there are 200 GB of remaining system RAM.

>>> boulder_df = mem_squash(300)
>>> boulder_df.estimated_size() / (2**30)
300.00000013411045
>>> ldf = pl.scan_parquet('tmp.parquet')
>>> df = ldf.fetch()
>>> df
shape: (500, 38)
┌───────┬───────┬───────┬───────┬─────┬────────┬────────┬────────┬────────┐
│ col_0 ┆ col_1 ┆ col_2 ┆ col_3 ┆ ... ┆ col_34 ┆ col_35 ┆ col_36 ┆ col_37 │
│ ---   ┆ ---   ┆ ---   ┆ ---   ┆     ┆ ---    ┆ ---    ┆ ---    ┆ ---    │
│ i64   ┆ i64   ┆ i64   ┆ i64   ┆     ┆ i64    ┆ i64    ┆ i64    ┆ i64    │
╞═══════╪═══════╪═══════╪═══════╪═════╪════════╪════════╪════════╪════════╡
│ 0     ┆ 0     ┆ 0     ┆ 0     ┆ ... ┆ 0      ┆ 0      ┆ 0      ┆ 0      │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 1     ┆ 1     ┆ 1     ┆ 1     ┆ ... ┆ 1      ┆ 1      ┆ 1      ┆ 1      │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2     ┆ 2     ┆ 2     ┆ 2     ┆ ... ┆ 2      ┆ 2      ┆ 2      ┆ 2      │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 3     ┆ 3     ┆ 3     ┆ 3     ┆ ... ┆ 3      ┆ 3      ┆ 3      ┆ 3      │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ ...   ┆ ...   ┆ ...   ┆ ...   ┆ ... ┆ ...    ┆ ...    ┆ ...    ┆ ...    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 496   ┆ 496   ┆ 496   ┆ 496   ┆ ... ┆ 496    ┆ 496    ┆ 496    ┆ 496    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 497   ┆ 497   ┆ 497   ┆ 497   ┆ ... ┆ 497    ┆ 497    ┆ 497    ┆ 497    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 498   ┆ 498   ┆ 498   ┆ 498   ┆ ... ┆ 498    ┆ 498    ┆ 498    ┆ 498    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 499   ┆ 499   ┆ 499   ┆ 499   ┆ ... ┆ 499    ┆ 499    ┆ 499    ┆ 499    │
└───────┴───────┴───────┴───────┴─────┴────────┴────────┴────────┴────────┘

However, if I increase the "boulder" to 400 GB, leaving a mere 100 GB for fetch to run, we get an OOM.

>>> boulder_df = mem_squash(400)
>>> boulder_df.estimated_size() / (2**30)
400.0000000670552
>>> ldf = pl.scan_parquet('tmp.parquet')
>>> df = ldf.fetch()
Killed

Discussion

The above came about as I was about to propose a solution to a Stack Overflow question. The OP states that memory pressure is an issue, so I wanted to ensure that my solution would work in a situation where a file is too large to load in RAM. (Hence the use of a "boulder" above, as well as the mem_squash function.)

Is the above issue one of row groups? That is, since my saved parquet file is created with a single row group, is Polars required to read the entire file contents in lazy mode?

Is this a issue of parallelism? That is, should I have set the parallel option on scan_parquet to something other than "auto"?

@ritchie46
Copy link
Member

I will answer this question in relation to #3971 and #3972 as well. I believe there are a few things at play here.

  1. One is that linux memory used by a process almost never is returned to the OS. Peak memory usage can therefore fill RAM and make it seem that you only have n - x RAM left. While x constists of a free and an occupied peace of RAM. This doesn't have to be a problem, but it can be. If we have fragmented the heap, it can be increasingly slow and difficult to allocate a slab that fits our requirements.

  2. I think the most important thing here is how we write the parquet files. It is something that we should revisit. Especially since parquet: parallelize over row groups ~3x #3924 where we seem to be faster if we into small row groups (even with rechunking).

Currently polars writes a parquet file into a single row group. This has the benefit that we don't have to rechunk when reading, but it also has some downsides:

  • we can only parallelize over columns
  • we need to decompress very large buffers (in parallel) so require lots of memory
  • we apply predicates only after the row_group is read and materialized (this is the entire file if we have a single row group)

I think we should default to writing to multiple row groups. T

  • This will greatly reduce memory usage when we have low selectivity due to applied predicates.
  • It might even be faster (even though we need to rechunk)
  • Depending on the lazy query, we might don't have to rechunk. We have optimizations for groupby operations that used vertical partitioning. These benefit from partitioned reading.

Finally, if RAM is still tight, as a last low memory resort we can turn of paralellism. This will lead to reading a single row group/ single column and therefore only 1/ N_THREAD the memory overhead (given that the filtered memory would fit into ram).

@cbilot
Copy link
Author

cbilot commented Jul 13, 2022

Using row groups did not affect the result. I still get OOM in Step 4 (that is, Polars is requiring more than 200GB of RAM to fetch one record from a parquet file of approx 800 million records.)

I kept everything else the same, but used the row_group_size keyword on write_parquet in step 1. I also tried different values for the parallel keyword in scan_parquet in step 4.

row_group_size = 1_000_000

It took 358 sec to write the parquet file in step 1.
pl.scan_parquet('tmp.parquet', parallel="auto") : OOM
pl.scan_parquet('tmp.parquet', parallel="row_groups") : OOM
pl.scan_parquet('tmp.parquet', parallel="columns") : OOM
pl.scan_parquet('tmp.parquet', parallel="none") : OOM

row_group_size = 100_000

It took 347 sec to write the parquet file in step 1.
pl.scan_parquet('tmp.parquet', parallel="auto") : OOM
pl.scan_parquet('tmp.parquet', parallel="row_groups") : OOM
pl.scan_parquet('tmp.parquet', parallel="columns") : OOM

row_group_size = 10_000

It took 1883 sec to write the parquet file in step 1.
pl.scan_parquet('tmp.parquet', parallel="auto") : I terminated this. It maxed all 32 cores for 20 minutes, with no end in sight.

row_group_size = 1_000

I terminated the write_parquet. It was over 30 minutes into the write_parquet and Polars had not even started writing to the disk.

@ritchie46
Copy link
Member

That is interesting (and not what I expected). Do you know what the selectivity of the filter is (e.g. ratio of rows filtered)?

I shall explore this further.

@ritchie46
Copy link
Member

#4006 reduces peak memory when reading over row groups in parallel.

@daviewales
Copy link

daviewales commented Jul 14, 2022

I think I'm experiencing this issue.

I'm trying to inspect a large Parquet file (approx 3GB on disk) using the lazy api.
My system has 16 GB RAM.
I can use pl.scan_parquet and df.columns without issue.
However, if I try to use e.g. df.head().collect(), I get a memory allocation error.

import polars as pl
df = pl.scan_parquet('file.parquet')
df.select([
    pl.col('columnA')
    ]).head().collect()
<polars.internals.lazy_frame.LazyFrame at 0x1e1b2cd3100>
# memory allocation of 2952790016 bytes failed

I was hoping that it wouldn't use too much RAM due to using head(), but this is apparently not the case.
The only tool I've found that can successfully operate on the file is dask dataframe, using the split_row_groups=True argument.
(It killed duckdb too!)

@cbilot
Copy link
Author

cbilot commented Jul 14, 2022

@daviewales try upgrading to 0.13.53 and see if that works. Yesterday, I compiled Polars using the two fixes #4000 and #4006 , and this significantly reduced the RAM needed.

If I have a chance today, I want to benchmark how much RAM Polars uses to read a Parquet file, given the size of the row groups (in RAM), given a low selectivity, and considering the number of parallel threads that are concurrently decompressing and processing row groups. For example, if each row group is 1GB (in RAM) and I have 8 threads, how much RAM should we predict that Polars will need to process the query on a compressed parquet file.

Of course, this means purposely provoking OOM situations, so this might take some time. (I can’t do anything else on my machine while this is running.)

@zundertj zundertj added the performance Performance issues or improvements label Jul 16, 2022
@traviscross
Copy link

@cbilot; you might look into cgroups. You can provoke Linux to OOM kill a process long before it threatens your entire machine. E.g.:

$ umask 022 && cd /sys/fs/cgroup/
$ sudo mkdir pylimit && cd pylimit
$ echo $((20 * 2**20)) | sudo tee memory.max
$ echo $$ | sudo tee cgroup.procs
$ python -c "x = [x for x in range(100 * 2**20)]"
Killed
$ grep oom memory.events
oom 1
oom_kill 1

@daviewales
Copy link

daviewales commented Jul 27, 2022

@daviewales try upgrading to 0.13.53 and see if that works. Yesterday, I compiled Polars using the two fixes #4000 and #4006 , and this significantly reduced the RAM needed.

@cbilot Just upgraded to polars 0.13.58, and I can now do all of the following without crashing due to RAM usage:

  • df.head().collect() (all columns)
  • df.select([pl.col('Column name')]).collect() (a whole column (158 million rows))

The following still crashes:

  • df.collect() (all rows and columns at once)

So, a definite improvement.

@traviscross
Copy link

One other natural and useful place to do predicate pushdown is during cross joins. E.g., this query would return only one row, but it instead runs out of memory unless you have quite a lot of it.

import polars as pl
x = pl.DataFrame([pl.Series(
  "x", pl.arange(0, 2**16 - 1, eager=True) % 2**15
).cast(pl.UInt16)])
x.lazy().join(x.lazy(), how="cross", suffix="_").filter(
  (pl.col("x") & pl.col("x_")) == 0x7fff).collect()

We hit this in an actual use case.

@cbilot
Copy link
Author

cbilot commented Jul 31, 2022

Unless anyone has any remaining issues, we can close this.

Using Polars 0.13.59, I created a DataFrame that occupies 225 GB of RAM, and stored this DataFrame as a Parquet file split into 10 row groups. (For reference, the saved Parquet file is 120.2 GB on disk.) Thus, each row group of the Parquet file represents (conceptually) a DataFrame that would occupy 22.5GB of RAM when fully loaded. I then ran a simple filter query against the Parquet file using scan_parquet, and allowed Polars only 50 GB of available RAM with work with. And it ran successfully. That's an enormous improvement.

@cbilot
Copy link
Author

cbilot commented Aug 1, 2022

One other natural and useful place to do predicate pushdown is during cross joins. E.g., this query would return only one row, but it instead runs out of memory unless you have quite a lot of it.

@traviscross This was a great suggestion. (FYI, I think #4194 addresses this.)

@ritchie46
Copy link
Member

#4194 was similar, but then for slices instead of predicates.

I will follow up with @traviscross suggestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Performance issues or improvements
Projects
None yet
Development

No branches or pull requests

5 participants