Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Parquet] Memory Leak when writing #44295

Open
folkvir opened this issue Oct 3, 2024 · 2 comments
Open

[Python][Parquet] Memory Leak when writing #44295

folkvir opened this issue Oct 3, 2024 · 2 comments

Comments

@folkvir
Copy link

folkvir commented Oct 3, 2024

Summary

After investigating an uwsgi worker crash we found that a memory leak of our parquet export was the cause.
We use python 3.9 with pyarrow 17.0.0 but this is reproductible with the following code and also with python 3.12.
We found that the problem comes from writer.writer.write_table(table, row_group_size=1) (row_group_size=1 for the example).

If you have any advice on how to reduce or fix this memory leak it will be very appreciated 🙏

How to reproduce?

Install: pip install pyarrow memory_profiler

Then use the following script to generate 100 000 records with an empty {"a": ""}. You will show that the RSS is increasing up to 90mo for only 100 000 records before closing the writer and still stay at +75mo after closing it!

import io
import pyarrow as pa
import pyarrow.parquet as pq
import psutil
from memory_profiler import profile

# change it to mimalloc on Mac (this can run in a debian container)
pool = pa.jemalloc_memory_pool()
pa.jemalloc_set_decay_ms(0)
pa.set_memory_pool(pool)

previous_rss = 0
first_rss = 0

@profile
def test(previous_rss, first_rss):

    def yield_batch(count: int, schema):
        for i in range(count):
            yield i, None # pa.RecordBatch.from_pylist([{"a": "1"}], schema=schema)

    buf = io.BytesIO()
    buffer = pa.output_stream(buf, compression=None, buffer_size=None)
    schema = pa.schema([pa.field("a", pa.string())])
    writer = pq.ParquetWriter(
        buffer,
        schema=schema,
        compression='none',
        write_statistics=False,
        write_page_index=False,
        write_page_checksum=False,
        use_dictionary=False,
        store_schema=True,
    )
    bytes_length = 0
    table = pa.Table.from_pylist([{"a": ""}], schema=schema)

    for i, batch in yield_batch(100000, schema):
        writer.writer.write_table(table, row_group_size=1)

        buf.seek(0)
        val = buf.read()
        bytes_length += len(val)
        del val
        buf.flush()
        buf.truncate(0)
        buf.seek(0)

        if i % 1_000 == 0:
            p = psutil.Process().memory_info()
            if previous_rss == 0:
                previous_rss = p.rss
                first_rss = p.rss
            diff = p.rss - previous_rss
            print((
                f"Chunk: {i}; RSS: {p.rss / 1_000_000}; "
                f"(mb) Pool size:{pool.bytes_allocated()}; "
                f"Diff: {diff}; +{(p.rss - first_rss) / 1_000_000} (mb);"
            ))
            previous_rss = p.rss

    writer.close()

    buf.seek(0)
    val = buf.read()
    bytes_length += len(val)
    del val
    buf.flush()
    buf.truncate(0)
    buf.seek(0)

    print(f"Export size: {bytes_length / 1_000_000} mb")
    return previous_rss, first_rss

if __name__=='__main__':
    previous_rss, first_rss = test(previous_rss, first_rss)

    p = psutil.Process().memory_info()
    if previous_rss == 0:
        previous_rss = p.rss
        first_rss = p.rss
    diff = p.rss - previous_rss
    print((
        f"RSS: {p.rss / 1_000_000}; "
        f"(mb) Pool size:{pool.bytes_allocated()}; "
        f"Diff: {diff}; +{(p.rss - first_rss) / 1_000_000} (mb);"
    ))
    previous_rss = p.rss

Output:

Chunk: 0; RSS: 90.619904; (mb) Pool size:384; Diff: 0; +0.0 (mb);
Chunk: 1000; RSS: 91.25888; (mb) Pool size:384; Diff: 638976; +0.638976 (mb);
Chunk: 2000; RSS: 91.930624; (mb) Pool size:384; Diff: 671744; +1.31072 (mb);
Chunk: 3000; RSS: 93.650944; (mb) Pool size:384; Diff: 1720320; +3.03104 (mb);
Chunk: 4000; RSS: 94.388224; (mb) Pool size:384; Diff: 737280; +3.76832 (mb);
Chunk: 5000; RSS: 95.748096; (mb) Pool size:384; Diff: 1359872; +5.128192 (mb);
Chunk: 6000; RSS: 96.534528; (mb) Pool size:384; Diff: 786432; +5.914624 (mb);
Chunk: 7000; RSS: 97.271808; (mb) Pool size:384; Diff: 737280; +6.651904 (mb);
Chunk: 8000; RSS: 98.05824; (mb) Pool size:384; Diff: 786432; +7.438336 (mb);
Chunk: 9000; RSS: 99.61472; (mb) Pool size:384; Diff: 1556480; +8.994816 (mb);
Chunk: 10000; RSS: 100.450304; (mb) Pool size:384; Diff: 835584; +9.8304 (mb);
Chunk: 11000; RSS: 101.220352; (mb) Pool size:384; Diff: 770048; +10.600448 (mb);
Chunk: 12000; RSS: 102.023168; (mb) Pool size:384; Diff: 802816; +11.403264 (mb);
Chunk: 13000; RSS: 102.760448; (mb) Pool size:384; Diff: 737280; +12.140544 (mb);
Chunk: 14000; RSS: 103.514112; (mb) Pool size:384; Diff: 753664; +12.894208 (mb);
Chunk: 15000; RSS: 104.316928; (mb) Pool size:384; Diff: 802816; +13.697024 (mb);
Chunk: 16000; RSS: 105.070592; (mb) Pool size:384; Diff: 753664; +14.450688 (mb);
Chunk: 17000; RSS: 107.413504; (mb) Pool size:384; Diff: 2342912; +16.7936 (mb);
Chunk: 18000; RSS: 108.232704; (mb) Pool size:384; Diff: 819200; +17.6128 (mb);
Chunk: 19000; RSS: 109.002752; (mb) Pool size:384; Diff: 770048; +18.382848 (mb);
Chunk: 20000; RSS: 109.7728; (mb) Pool size:384; Diff: 770048; +19.152896 (mb);
Chunk: 21000; RSS: 110.51008; (mb) Pool size:384; Diff: 737280; +19.890176 (mb);
Chunk: 22000; RSS: 111.312896; (mb) Pool size:384; Diff: 802816; +20.692992 (mb);
Chunk: 23000; RSS: 112.082944; (mb) Pool size:384; Diff: 770048; +21.46304 (mb);
Chunk: 24000; RSS: 112.88576; (mb) Pool size:384; Diff: 802816; +22.265856 (mb);
Chunk: 25000; RSS: 113.639424; (mb) Pool size:384; Diff: 753664; +23.01952 (mb);
Chunk: 26000; RSS: 114.393088; (mb) Pool size:384; Diff: 753664; +23.773184 (mb);
Chunk: 27000; RSS: 115.146752; (mb) Pool size:384; Diff: 753664; +24.526848 (mb);
Chunk: 28000; RSS: 115.982336; (mb) Pool size:384; Diff: 835584; +25.362432 (mb);
Chunk: 29000; RSS: 116.752384; (mb) Pool size:384; Diff: 770048; +26.13248 (mb);
Chunk: 30000; RSS: 117.506048; (mb) Pool size:384; Diff: 753664; +26.886144 (mb);
Chunk: 31000; RSS: 118.276096; (mb) Pool size:384; Diff: 770048; +27.656192 (mb);
Chunk: 32000; RSS: 119.062528; (mb) Pool size:384; Diff: 786432; +28.442624 (mb);
Chunk: 33000; RSS: 122.994688; (mb) Pool size:384; Diff: 3932160; +32.374784 (mb);
Chunk: 34000; RSS: 123.748352; (mb) Pool size:384; Diff: 753664; +33.128448 (mb);
Chunk: 35000; RSS: 124.551168; (mb) Pool size:384; Diff: 802816; +33.931264 (mb);
Chunk: 36000; RSS: 125.304832; (mb) Pool size:384; Diff: 753664; +34.684928 (mb);
Chunk: 37000; RSS: 126.042112; (mb) Pool size:384; Diff: 737280; +35.422208 (mb);
Chunk: 38000; RSS: 126.861312; (mb) Pool size:384; Diff: 819200; +36.241408 (mb);
Chunk: 39000; RSS: 127.647744; (mb) Pool size:384; Diff: 786432; +37.02784 (mb);
Chunk: 40000; RSS: 128.434176; (mb) Pool size:384; Diff: 786432; +37.814272 (mb);
Chunk: 41000; RSS: 129.253376; (mb) Pool size:384; Diff: 819200; +38.633472 (mb);
Chunk: 42000; RSS: 129.990656; (mb) Pool size:384; Diff: 737280; +39.370752 (mb);
Chunk: 43000; RSS: 130.793472; (mb) Pool size:384; Diff: 802816; +40.173568 (mb);
Chunk: 44000; RSS: 131.547136; (mb) Pool size:384; Diff: 753664; +40.927232 (mb);
Chunk: 45000; RSS: 132.317184; (mb) Pool size:384; Diff: 770048; +41.69728 (mb);
Chunk: 46000; RSS: 133.103616; (mb) Pool size:384; Diff: 786432; +42.483712 (mb);
Chunk: 47000; RSS: 133.873664; (mb) Pool size:384; Diff: 770048; +43.25376 (mb);
Chunk: 48000; RSS: 134.643712; (mb) Pool size:384; Diff: 770048; +44.023808 (mb);
Chunk: 49000; RSS: 135.380992; (mb) Pool size:384; Diff: 737280; +44.761088 (mb);
Chunk: 50000; RSS: 136.15104; (mb) Pool size:384; Diff: 770048; +45.531136 (mb);
Chunk: 51000; RSS: 136.97024; (mb) Pool size:384; Diff: 819200; +46.350336 (mb);
Chunk: 52000; RSS: 137.773056; (mb) Pool size:384; Diff: 802816; +47.153152 (mb);
Chunk: 53000; RSS: 138.510336; (mb) Pool size:384; Diff: 737280; +47.890432 (mb);
Chunk: 54000; RSS: 139.296768; (mb) Pool size:384; Diff: 786432; +48.676864 (mb);
Chunk: 55000; RSS: 140.066816; (mb) Pool size:384; Diff: 770048; +49.446912 (mb);
Chunk: 56000; RSS: 140.853248; (mb) Pool size:384; Diff: 786432; +50.233344 (mb);
Chunk: 57000; RSS: 141.606912; (mb) Pool size:384; Diff: 753664; +50.987008 (mb);
Chunk: 58000; RSS: 142.409728; (mb) Pool size:384; Diff: 802816; +51.789824 (mb);
Chunk: 59000; RSS: 143.163392; (mb) Pool size:384; Diff: 753664; +52.543488 (mb);
Chunk: 60000; RSS: 143.93344; (mb) Pool size:384; Diff: 770048; +53.313536 (mb);
Chunk: 61000; RSS: 144.703488; (mb) Pool size:384; Diff: 770048; +54.083584 (mb);
Chunk: 62000; RSS: 145.522688; (mb) Pool size:384; Diff: 819200; +54.902784 (mb);
Chunk: 63000; RSS: 146.276352; (mb) Pool size:384; Diff: 753664; +55.656448 (mb);
Chunk: 64000; RSS: 147.030016; (mb) Pool size:384; Diff: 753664; +56.410112 (mb);
Chunk: 65000; RSS: 147.78368; (mb) Pool size:384; Diff: 753664; +57.163776 (mb);
Chunk: 66000; RSS: 154.91072; (mb) Pool size:384; Diff: 7127040; +64.290816 (mb);
Chunk: 67000; RSS: 155.680768; (mb) Pool size:384; Diff: 770048; +65.060864 (mb);
Chunk: 68000; RSS: 156.483584; (mb) Pool size:384; Diff: 802816; +65.86368 (mb);
Chunk: 69000; RSS: 157.2864; (mb) Pool size:384; Diff: 802816; +66.666496 (mb);
Chunk: 70000; RSS: 158.072832; (mb) Pool size:384; Diff: 786432; +67.452928 (mb);
Chunk: 71000; RSS: 158.826496; (mb) Pool size:384; Diff: 753664; +68.206592 (mb);
Chunk: 72000; RSS: 159.629312; (mb) Pool size:384; Diff: 802816; +69.009408 (mb);
Chunk: 73000; RSS: 160.39936; (mb) Pool size:384; Diff: 770048; +69.779456 (mb);
Chunk: 74000; RSS: 161.13664; (mb) Pool size:384; Diff: 737280; +70.516736 (mb);
Chunk: 75000; RSS: 161.988608; (mb) Pool size:384; Diff: 851968; +71.368704 (mb);
Chunk: 76000; RSS: 162.725888; (mb) Pool size:384; Diff: 737280; +72.105984 (mb);
Chunk: 77000; RSS: 163.495936; (mb) Pool size:384; Diff: 770048; +72.876032 (mb);
Chunk: 78000; RSS: 164.265984; (mb) Pool size:384; Diff: 770048; +73.64608 (mb);
Chunk: 79000; RSS: 165.0688; (mb) Pool size:384; Diff: 802816; +74.448896 (mb);
Chunk: 80000; RSS: 165.855232; (mb) Pool size:384; Diff: 786432; +75.235328 (mb);
Chunk: 81000; RSS: 166.641664; (mb) Pool size:384; Diff: 786432; +76.02176 (mb);
Chunk: 82000; RSS: 167.411712; (mb) Pool size:384; Diff: 770048; +76.791808 (mb);
Chunk: 83000; RSS: 168.214528; (mb) Pool size:384; Diff: 802816; +77.594624 (mb);
Chunk: 84000; RSS: 168.935424; (mb) Pool size:384; Diff: 720896; +78.31552 (mb);
Chunk: 85000; RSS: 169.689088; (mb) Pool size:384; Diff: 753664; +79.069184 (mb);
Chunk: 86000; RSS: 170.459136; (mb) Pool size:384; Diff: 770048; +79.839232 (mb);
Chunk: 87000; RSS: 171.311104; (mb) Pool size:384; Diff: 851968; +80.6912 (mb);
Chunk: 88000; RSS: 172.081152; (mb) Pool size:384; Diff: 770048; +81.461248 (mb);
Chunk: 89000; RSS: 172.867584; (mb) Pool size:384; Diff: 786432; +82.24768 (mb);
Chunk: 90000; RSS: 173.654016; (mb) Pool size:384; Diff: 786432; +83.034112 (mb);
Chunk: 91000; RSS: 174.40768; (mb) Pool size:384; Diff: 753664; +83.787776 (mb);
Chunk: 92000; RSS: 175.177728; (mb) Pool size:384; Diff: 770048; +84.557824 (mb);
Chunk: 93000; RSS: 175.931392; (mb) Pool size:384; Diff: 753664; +85.311488 (mb);
Chunk: 94000; RSS: 176.70144; (mb) Pool size:384; Diff: 770048; +86.081536 (mb);
Chunk: 95000; RSS: 177.487872; (mb) Pool size:384; Diff: 786432; +86.867968 (mb);
Chunk: 96000; RSS: 178.307072; (mb) Pool size:384; Diff: 819200; +87.687168 (mb);
Chunk: 97000; RSS: 179.109888; (mb) Pool size:384; Diff: 802816; +88.489984 (mb);
Chunk: 98000; RSS: 179.879936; (mb) Pool size:384; Diff: 770048; +89.260032 (mb);
Chunk: 99000; RSS: 180.649984; (mb) Pool size:384; Diff: 770048; +90.03008 (mb);
Export size: 12.596596 mb
Filename: /Users/xxxxxxx/Documents/github/xxxxxx/xxxxxxx.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    15     42.9 MiB     42.9 MiB           1   @profile
    16                                         def test(previous_rss, first_rss):
    17                                         
    18     85.9 MiB      0.0 MiB           2       def yield_batch(count: int, schema):
    19    173.0 MiB     -0.2 MiB      100001           for i in range(count):
    20    173.0 MiB     -0.3 MiB      200000               yield i, None # pa.RecordBatch.from_pylist([{"a": "1"}], schema=schema)
    21                                         
    22     42.9 MiB      0.0 MiB           1       buf = io.BytesIO()
    23     42.9 MiB      0.1 MiB           1       buffer = pa.output_stream(buf, compression=None, buffer_size=None)
    24     43.0 MiB      0.1 MiB           1       schema = pa.schema([pa.field("a", pa.string())])
    25     44.0 MiB      1.0 MiB           2       writer = pq.ParquetWriter(
    26     43.0 MiB      0.0 MiB           1           buffer,
    27     43.0 MiB      0.0 MiB           1           schema=schema,
    28     43.0 MiB      0.0 MiB           1           compression='none',
    29     43.0 MiB      0.0 MiB           1           write_statistics=False,
    30     43.0 MiB      0.0 MiB           1           write_page_index=False,
    31     43.0 MiB      0.0 MiB           1           write_page_checksum=False,
    32     43.0 MiB      0.0 MiB           1           use_dictionary=False,
    33     43.0 MiB      0.0 MiB           1           store_schema=True,
    34                                             )
    35     44.0 MiB      0.0 MiB           1       bytes_length = 0
    36     85.9 MiB     41.9 MiB           1       table = pa.Table.from_pylist([{"a": ""}], schema=schema)
    37                                         
    38    173.0 MiB     -0.2 MiB      100001       for i, batch in yield_batch(100000, schema):
    39    173.0 MiB     86.9 MiB      100000           writer.writer.write_table(table, row_group_size=1)
    40                                         
    41    173.0 MiB     -0.8 MiB      100000           buf.seek(0)
    42    173.0 MiB     -0.2 MiB      100000           val = buf.read()
    43    173.0 MiB     -0.2 MiB      100000           bytes_length += len(val)
    44    173.0 MiB     -0.2 MiB      100000           del val
    45    173.0 MiB     -0.2 MiB      100000           buf.flush()
    46    173.0 MiB     -0.2 MiB      100000           buf.truncate(0)
    47    173.0 MiB     -0.1 MiB      100000           buf.seek(0)
    48                                         
    49    173.0 MiB     -0.2 MiB      100000           if i % 1_000 == 0:
    50    172.3 MiB      0.0 MiB         100               p = psutil.Process().memory_info()
    51    172.3 MiB      0.0 MiB         100               if previous_rss == 0:
    52     86.4 MiB      0.0 MiB           1                   previous_rss = p.rss
    53     86.4 MiB      0.0 MiB           1                   first_rss = p.rss
    54    172.3 MiB      0.0 MiB         100               diff = p.rss - previous_rss
    55    172.3 MiB      0.0 MiB         200               print((
    56    172.3 MiB      0.0 MiB         400                   f"Chunk: {i}; RSS: {p.rss / 1_000_000}; "
    57    172.3 MiB      0.0 MiB         100                   f"(mb) Pool size:{pool.bytes_allocated()}; "
    58    172.3 MiB      0.0 MiB         200                   f"Diff: {diff}; +{(p.rss - first_rss) / 1_000_000} (mb);"
    59                                                     ))
    60    172.3 MiB      0.0 MiB         100               previous_rss = p.rss
    61                                         
    62    226.0 MiB     53.0 MiB           1       writer.close()
    63                                         
    64    226.0 MiB      0.0 MiB           1       buf.seek(0)
    65    226.0 MiB      0.0 MiB           1       val = buf.read()
    66    226.0 MiB      0.0 MiB           1       bytes_length += len(val)
    67    226.0 MiB      0.0 MiB           1       del val
    68    226.0 MiB      0.0 MiB           1       buf.flush()
    69    226.0 MiB      0.0 MiB           1       buf.truncate(0)
    70    226.0 MiB      0.0 MiB           1       buf.seek(0)
    71                                         
    72    226.0 MiB      0.0 MiB           1       print(f"Export size: {bytes_length / 1_000_000} mb")
    73    226.0 MiB      0.0 MiB           1       return previous_rss, first_rss


RSS: 164.806656; (mb) Pool size:0; Diff: -15843328; +74.186752 (mb);

Component(s)

Parquet, Python

@kr-hansen
Copy link

kr-hansen commented Jan 21, 2025

I'm having the same issue here with python 3.11 and pyarrow 19.0.0. Is this related to or another variant of this issue that seems old and has kind of stalled out? Or are they likely different?

@folkvir
Copy link
Author

folkvir commented Jan 21, 2025

It could be related to this issue because of the example I use here. But the issues are not exactly the same.

Here it appears to be a problem with how metadata are handled. Due to the row_group_size set to 1 the metadata are collected and written at the very end of the export. And with a row_group_size set to 1 the metadata collected are huge! It also seems they are not correctly garbaged at the end of the process because the memory usage is not garbaged correctly at the end of the export.

For "fixing" our issues, we set at least a row_group_size around 10 000 to reduce the memory usage and the number of collected metadata. It seems to be almost stable with such values.
Hence, also reducing the size of the footer. I don't have other advice to give.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants