Skip to content

Unmanaged Memory Leak with Large Parquet Files (Dask + Modin) #8375

Closed
@seydar

Description

@seydar

Describe the issue:
I am using modin on a dask cluster.

When loading 30 parquet files (each ~150 MB) onto a cluster of 2 workers (32 GB RAM, 500 GB disk, AWS EC2 r5.xlarge), there is a runaway process of unmanaged memory (specified below) which causes the dask process to be restarted, which causes the rest of the cluster to fail during a shuffle.

I've been getting whiffs from online bug reports that there's a memory leak in pyarrow and that dask can't make the same decisions about parquet as it can with CSV (which works flawlessly! 56 GB of CSV, 2 workers, no problem!), so ultimately I think that might be the issue, but I don't know a) how to find where that leak is in pyarrow, and b) what dask/distributed can do to handle it.

Logs showing how the worker handles the runaway memory:

2023-11-28 16:23:45,096 - distributed.worker - INFO - Run out-of-band function '_disable_warnings'
2023-11-28 16:27:36,595 - distributed.core - INFO - Event loop was unresponsive in Worker for 6.73s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-11-28 16:28:47,525 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.26s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-11-28 16:28:53,487 - distributed.worker.memory - WARNING - Worker is at 83% memory usage. Pausing worker.  Process memory: 25.75 GiB -- Worker memory limit: 30.88 GiB
2023-11-28 16:29:00,198 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.73s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-11-28 16:29:00,943 - distributed.worker.memory - WARNING - Worker is at 53% memory usage. Resuming worker. Process memory: 16.67 GiB -- Worker memory limit: 30.88 GiB
2023-11-28 16:29:13,133 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.51s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-11-28 16:29:42,589 - distributed.nanny.memory - WARNING - Worker tcp://10.0.6.83:39675 (pid=1271) exceeded 95% memory budget. Restarting...
2023-11-28 16:29:42,776 - distributed.nanny - INFO - Worker process 1271 was killed by signal 15
2023-11-28 16:29:42,902 - distributed.nanny - WARNING - Restarting worker
2023-11-28 16:29:44,033 - distributed.worker - INFO -       Start worker at:      tcp://10.0.6.83:32949
2023-11-28 16:29:44,033 - distributed.worker - INFO -          Listening to:      tcp://10.0.6.83:32949
2023-11-28 16:29:44,033 - distributed.worker - INFO -          dashboard at:            10.0.6.83:50001
2023-11-28 16:29:44,033 - distributed.worker - INFO - Waiting to connect to:      tcp://10.0.6.100:8786
2023-11-28 16:29:44,033 - distributed.worker - INFO - -------------------------------------------------
2023-11-28 16:29:44,033 - distributed.worker - INFO -               Threads:                          4
2023-11-28 16:29:44,033 - distributed.worker - INFO -                Memory:                  30.88 GiB
2023-11-28 16:29:44,033 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-er0x7mmt
2023-11-28 16:29:44,033 - distributed.worker - INFO - -------------------------------------------------
2023-11-28 16:29:44,748 - distributed.worker - INFO - Starting Worker plugin shuffle
2023-11-28 16:29:44,749 - distributed.worker - INFO -         Registered to:      tcp://10.0.6.100:8786
2023-11-28 16:29:44,749 - distributed.worker - INFO - -------------------------------------------------
2023-11-28 16:29:44,750 - distributed.core - INFO - Starting established connection to tcp://10.0.6.100:8786

The runaway process:

/usr/bin/python3 -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=11, pipe_handle=17) --multiprocessing-fork

The ultimate error:

Traceback (most recent call last):
  File "/Users/ari/work/haystac/src/ray/modin_test.py", line 63, in <module>
    print(df.count())
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/pandas/base.py", line 1211, in count
    frame._query_compiler.count(axis=axis, numeric_only=numeric_only)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/dataframe/algebra/tree_reduce.py", line 58, in caller
    query_compiler._modin_frame.tree_reduce(
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/dataframe/pandas/dataframe/utils.py", line 501, in run_f_on_minimally_updated_metadata
    result = f(self, *args, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/dataframe/pandas/dataframe/dataframe.py", line 2045, in tree_reduce
    reduce_parts = self._partition_mgr_cls.map_axis_partitions(
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 629, in map_axis_partitions
    return cls.broadcast_axis_partitions(
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 62, in wait
    result = func(cls, *args, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 481, in broadcast_axis_partitions
    preprocessed_map_func = cls.preprocess_func(apply_func)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 129, in run_and_log
    return obj(*args, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/dataframe/pandas/partitioning/partition_manager.py", line 124, in preprocess_func
    return cls._partition_class.preprocess_func(map_func)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py", line 247, in preprocess_func
    return DaskWrapper.put(func, hash=False, broadcast=True)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/modin/core/execution/dask/common/engine_wrapper.py", line 136, in put
    return client.scatter(data, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/distributed/client.py", line 2604, in scatter
    return self.sync(
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/distributed/utils.py", line 434, in sync
    raise error
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/distributed/utils.py", line 408, in f
    result = yield future
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/distributed/client.py", line 2474, in _scatter
    await self.scheduler.scatter(
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/distributed/core.py", line 1396, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/ari/opt/miniconda3/envs/mamba_oa_env/lib/python3.10/site-packages/distributed/core.py", line 1180, in send_recv
    raise exc.with_traceback(tb)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/distributed/core.py", line 968, in _handle_comm
  File "/home/ubuntu/.local/lib/python3.10/site-packages/distributed/scheduler.py", line 5964, in scatter
  File "/home/ubuntu/.local/lib/python3.10/site-packages/distributed/scheduler.py", line 6769, in replicate
  File "/usr/lib/python3.10/random.py", line 482, in sample
    raise ValueError("Sample larger than population or is negative")

Minimal Complete Verifiable Example:

from   dask.distributed import Client
import dask
import time
import sys

import modin.pandas as pd
import modin.config as cfg

cfg.Engine.put("dask")
client = Client(sys.argv[1])

class Timing:
    def __init__(self, desc):
        self.desc = desc


    def __enter__(self):
        self.start = time.time()
        print(self.desc)
        return self


    def __exit__(self, exception_type, exception_value, exception_traceback):
        print(f"\t=> {time.time() - self.start}s")

manifest_files = ['s3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_0934fdc9-1bc7-4dae-a371-c3f58f3b31fc',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_7011b49a-bf6c-42d0-9852-85aa10a3ad37',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_8ca98d70-74c2-4d17-b059-83cd25e398c0',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_61265893-e992-4026-8b69-e60a4641dd10',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_d4318a64-7fc0-4b59-8f85-8b1790e72a70',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_4e2e19a8-b360-49fe-9b82-f66a1e23ad3e',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_c5a77760-f078-432c-aaf5-6d99fb0cee0c',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_db5f90f3-9fc0-4b86-98da-d840bb9b5423',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_2472c560-1113-448b-ae18-e23032d3f3d8',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_551944ef-47f4-475d-a575-e209ca1ee7b4',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_67569166-18f9-40bd-9845-20f069f8dc8a',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_8a81dca0-767d-43bf-b0aa-a087f98d41c5',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_c9dd6a02-0f92-4202-ba70-77fda02d4acf',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_9f094026-03f4-4ec2-a348-0498eeb6b04d',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_2c20c942-1e2c-4f37-86ec-4adce83edbea',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_237d6c26-8372-4584-ae15-9f693d2295a6',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_cf41cbf5-eb46-4bb9-b904-f1518affbefa',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_420f8434-5a71-4af8-91b5-d5364c941ded',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_f155616d-39bc-483e-b19a-56da9fbae685',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_ef8a7770-43b7-44bb-9dde-f92ed8faae9b',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_8aa7de20-91ee-4ada-8f48-c7a5b313572f',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_7c265580-ada5-4b94-b818-5cebdb4bb6c6',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_13562caf-4578-4e36-83fe-8b7a5eabc7e8',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_b0f80cd3-4aa5-40ac-a0c8-632c763f8969',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_2f1e190b-621c-4638-91f1-e961ba674714',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_5dcd43e4-67bd-41ba-b3cd-7e8f8da9b1f9',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_293c6916-4f22-4ca0-a240-078ebe48368b',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_80e8c47e-f254-47c4-80af-d744885e8174',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_6b1ba112-0545-4c3f-9321-dbf597d98bc1',
                  's3://my-athena-output/results/0.12892323498922598/20231127_175506_00007_3735a_0b3dd1a8-72ef-4852-bf1d-f7091629d3b6']

try:
    # Iteratively load files and scatter them to the cluster
    with Timing("reading parquet from athena"):
        df = pd.read_parquet(manifest_files)

    with Timing("count"):
        print(df.count())

    with Timing("latitude mean"):
        print(df.groupby('agent').latitude.mean())

    with Timing("counting again"):
        print(df.count())

    with Timing("counting again"):
        print(df.count())

finally:
    client.close()

Anything else we need to know?:
This is operating in an AWS environment. I'm booting the EC2 instances (via boto3) and running dask worker and dask scheduler and linking them manually (well, through a script with boto3).

Like I mentioned above, I'm using modin on top of a dask cluster. (This also fails with regular dask on a dask cluster if the memory is too small and is unable to spill properly — runaway memory of the same process as here, and then spilling never goes beyond 2-3 GB).

Environment:

  • Dask version: 2023.11.0
  • Modin version: 0.25.1
  • Python version: 3.10.12
  • Operating System: Ubuntu 22.04.3 LTS
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions