Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out of memory when sorting #5108

Open
andygrove opened this issue Jan 29, 2023 · 20 comments
Open

Out of memory when sorting #5108

andygrove opened this issue Jan 29, 2023 · 20 comments
Labels
bug Something isn't working

Comments

@andygrove
Copy link
Member

andygrove commented Jan 29, 2023

Describe the bug
Original bug was filed against Python bindings: apache/datafusion-python#157

Describe the bug
try a sort and export a parquet file using Colab generate an Out of memory error

To Reproduce

!curl -L 'https://drive.google.com/uc?export=download&id=18gv0Yd_a-Zc7CSolol8qeYVAAzSthnSN&confirm=t' > lineitem.parquet
from datafusion import SessionContext
ctx = SessionContext()
ctx.register_parquet('lineitem', 'lineitem.parquet')
df = ctx.sql("select * from lineitem order by l_shipdate")
df.write_parquet("lineitem_Datafusion.parquet")

Expected behavior
I expected to use only the available memory

here is the link comparing the same using Polars and DuckDB
https://colab.research.google.com/drive/1pfAPpIG7jpvGB_aHj-PXX66vRaRT0xlj#scrollTo=O8-lyg1y6RT2

@comphead
Copy link
Contributor

@andygrove any idea how this can be reproduced?
DF test is below

#[tokio::test]
async fn test_huge_sort() {
    let ctx = SessionContext::new();
    ctx.register_parquet(
        "lineitem",
        "/Users/a/lineitem.parquet",
        ParquetReadOptions::default(),
    )
    .await
    .unwrap();
    let sql = "select * from lineitem order by l_shipdate";
    let dataframe = ctx.sql(sql).await.unwrap();
    dataframe.write_parquet("/Users/a/lineitem_sorted.parquet", None).await;
}

Any idea how to limit mem for the process? I have only idea to bring up a VM with some limited memory or setting it up with ulimit

@tustvold
Copy link
Contributor

tustvold commented Jan 30, 2023

If running on Linux you could use cgroups to artificially limit the memory available to the process.

FWIW I think this relates to the default configuration of the RuntimeEnv, which has no limit, if you configure the session with a limiting MemoryPool it should enforce a limit. If you additional configure a DiskManager it should spill

@comphead
Copy link
Contributor

comphead commented Jan 31, 2023

Thanks @tustvold , I ran the test with configured mem max set up and spill enabled.

    #[tokio::test]
    async fn test_huge_sort() -> Result<()> {
        let runtime_config = crate::execution::runtime_env::RuntimeConfig::new()
            .with_memory_pool(Arc::new(crate::execution::memory_pool::GreedyMemoryPool::new(1024*1024*1024)))
            .with_disk_manager(crate::execution::disk_manager::DiskManagerConfig::new_specified(vec!["/Users/a/spill/".into()]));
        let runtime = Arc::new(crate::execution::runtime_env::RuntimeEnv::new(runtime_config).unwrap());
        let ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime);
        ctx.register_parquet(
            "lineitem",
            "/Users/a/lineitem.parquet",
            ParquetReadOptions::default(),
        )
        .await
        .unwrap();
        let sql = "select * from lineitem order by l_shipdate";
        let dataframe = ctx.sql(sql).await.unwrap();
        dataframe.show_limit(10).await?;
        //dataframe.write_parquet("/Users/a/lineitem_sorted.parquet", None).await?;
        Ok(())
    }

@andygrove Seems the test still trying to consume only available memory, without exhausting all machine memory

Error: External(ResourcesExhausted("Failed to allocate additional 1419104 bytes for RepartitionExec[3] with 0 bytes already allocated - maximum available is 496736"))
test dataframe::tests::test_huge_sort ... FAILED

@tustvold However diskmanager doesn't spill anything into the folder, is it expected?

@tustvold
Copy link
Contributor

You likely need to give it more than 1MB, not all operators can spill

@djouallah
Copy link

sorry, how do you pass those config using Python API

@andygrove
Copy link
Member Author

sorry, how do you pass those config using Python API

We will need to expose them there. Should be trivial. I will add notes on the Python issue

@DDtKey
Copy link
Contributor

DDtKey commented Feb 1, 2023

Was able to reproduce similar behaviour against the latest version (17.0.0) and wasn't able against 16.0.0 - it returned correct error about allocation ( Resources exhausted: Failed to allocate additional 12372192 bytes ...).

So looks like regression to me. Gonna use bisect to find commit that has introduced it and report it here with details.

My case in two words: with latest version memory limits (FairSpillPool - 4GiB) were ignored and memory consuming up to OOM (join of 4 files with ~1gb of data in each one, without explicit ordering)

But when I set prefer_hash_join = false it also returned error with resources exhausting (by 17.0.0), but it looked weird 🤔

@DDtKey
Copy link
Contributor

DDtKey commented Feb 1, 2023

This behavior(at least my case described above) were introduced here (a9ddcd3, PR link).

Before - it returned Resources exhausted when I used memory-pool and currently the memory usage grows up to OOM.

It could be reproduced with similar code:
UPD: a bit more easier way to repro this is described in #5162

   let ctx = SessionContext::with_config_rt(
        SessionConfig::default(),
        Arc::new(
            RuntimeEnv::new(
                RuntimeConfig::new()
                    .with_memory_pool(Arc::new(FairSpillPool::new(4 * 1024 * 1024 * 1024))),
            )
            .unwrap(),
        ),
    );
    
    // I can share the file - it's kind of random data, but not sure what I can use to do that. 
    // However, it's reproducible for any files which joins could lead to a large result file (> memory pool limit).
    ctx.register_csv("hr", file_path, CsvReadOptions::default())
        .await?;
        
    // 4 joins - just to represent a problem
    let data_frame = ctx
        .sql(
            r#"
        SELECT hr1."Emp_ID"
        from hr hr1 
        left join hr hr2 on hr1."Emp_ID" = hr2."Emp_ID" 
        left join hr hr3 on hr2."Emp_ID" = hr3."Emp_ID" 
        left join hr hr4 on hr3."Emp_ID" = hr4."Emp_ID"
    "#,
        )
        .await?;
        
        data_frame
        .write_csv(output_path)
        .await?;

Additional notes

It's could be useful to mention - memory consumption are jumping for 3rd join. For 2 it works totally fine and even don't return an error.
This query working fine with optimizer.repartition_joins = false, it just don't consume so much memory without repartitions for joins (but, it will take much more time for sure).

@comphead
Copy link
Contributor

comphead commented Feb 1, 2023

HI @DDtKey I have checked both implementations (GreedyMemoryPool and FairSpillPool) for sort problem above #5108 (comment)

Both works correctly and returns ExhaustionError. I'll try to reproduce the same for repartition.

in the meantime you may want also to check OOM test for repartition https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/repartition/mod.rs#L1150

@DDtKey
Copy link
Contributor

DDtKey commented Feb 1, 2023

@comphead likely it's different issues. Probably make sense to create another issue with this description

JFY: in my example there is no explicit ordering at all, it's only about repartitions.

It doesn't respect memory pool for sure in my cases 🤔
And probably related to usage of unbounded channels.

Thanks for the reference, I'll check

@alamb
Copy link
Contributor

alamb commented Feb 2, 2023

And probably related to usage of unbounded channels.

Interestingly, the point of #4867 was in fact to remove the unbounded channels. I am looking more into this issue

@alamb
Copy link
Contributor

alamb commented Feb 2, 2023

I made a copy of the colab thing and tried to run the test case in datafusion-0.6.0 to see if this is a regression: https://colab.research.google.com/drive/1RS31HPnkkHoJeshAirZCP4T52Xl0dl9f?usp=sharing

It appears there is no write_parquet functionality in the 0.6.0 release so it isn't a regression from the point of view that this used to work and now doesn't.

@alamb
Copy link
Contributor

alamb commented Feb 2, 2023

My measurements actually suggest that DataFusion 17.0.0 is better in this regards than DataFusion 16.0.0

Using this input file:

!curl -L 'https://drive.google.com/uc?export=download&id=18gv0Yd_a-Zc7CSolol8qeYVAAzSthnSN&confirm=t' > lineitem.parquet

Using this program:

use datafusion::{prelude::{SessionContext, SessionConfig}, error::Result, execution::{runtime_env::{RuntimeConfig, RuntimeEnv}, memory_pool::{GreedyMemoryPool, FairSpillPool}, disk_manager::DiskManagerConfig}};

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<()> {

    let runtime_config = RuntimeConfig::new()
    //.with_memory_pool(Arc::new(GreedyMemoryPool::new(1024*1024*1024)))
        .with_memory_pool(Arc::new(FairSpillPool::new(1024*1024*1024)))
        .with_disk_manager(DiskManagerConfig::new_specified(vec!["/tmp/".into()]));

    let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
    let ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime);

    ctx.register_parquet("lineitem", "/Users/alamb/Downloads/lineitem.parquet", Default::default())
        .await.unwrap();

    let df = ctx.sql("select * from lineitem order by l_shipdate").await.unwrap();

    df.write_parquet("/Users/alamb/Downloads/lineitem_Datafusion.parquet", None)
        .await
    .unwrap();

    Ok(())
}

I tested with both DataFusion 16.0.0 / 17.0.0 and FairSpillPill / GreedyMemoryPool

datafusion = { version = "16.0.0" }

or

datafusion = { version = "17.0.0" }

And this:

        .with_memory_pool(Arc::new(FairSpillPool::new(1024*1024*1024)))

Or

        .with_memory_pool(Arc::new(FairSpillPool::new(1024*1024*1024)))

Datafusion 16.0.0 with FairSpillPool:

     Running `/Users/alamb/Software/target-df/release/rust_arrow_playground`
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ParquetError(ArrowError("underlying Arrow error: External error: Arrow error: External error: Resources exhausted: Failed to allocate additional 1419488 bytes for RepartitionExec[14] with 2837440 bytes already allocated - maximum available is 0"))', src/main.rs:26:6
stack backtrace:

DataFusion 16.0.0 and GreedyMemoryPool

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ParquetError(ArrowError("underlying Arrow error: External error: Arrow error: External error: Resources exhausted: Failed to allocate additional 1419168 bytes for RepartitionExec[4] with 0 bytes already allocated - maximum available is 552160"))', src/main.rs:26:6

DataFusion 17.0.0 and FairMemoryPool I got:

The program completed successfully 🎉

DataFusion 17.0.0 and GreedyMemoryPool I got:

warning: `rust_arrow_playground` (bin "rust_arrow_playground") generated 1 warning
    Finished release [optimized] target(s) in 3m 35s
     Running `/Users/alamb/Software/target-df/release/rust_arrow_playground`
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ParquetError(ArrowError("underlying Arrow error: External error: Arrow error: External error: Resources exhausted: Failed to allocate additional 1419168 bytes for RepartitionExec[4] with 0 bytes already allocated - maximum available is 552160"))', src/main.rs:26:6
stack backtrace:

@comphead
Copy link
Contributor

comphead commented Feb 2, 2023

@alamb thanks for analysis, just to be sure, can it be also an issue for DF17 and FairMemoryPool. If FairMemoryPool doesn't respect mem limit and there is enough mem to complete the sorting, then the test will be ok, although it is the issue with memory manager

@alamb
Copy link
Contributor

alamb commented Feb 2, 2023

I agree the manager could be improved. Basically I think the issue is that some consumers get an allocation up front and some do it on demand. Thus the on demand operators (like repartition) may end up without any memory budget for some allocation strategies.

@DDtKey
Copy link
Contributor

DDtKey commented Feb 2, 2023

And probably related to usage of unbounded channels.

Interestingly, the point of #4867 was in fact to remove the unbounded channels. I am looking more into this issue

As I already said, my case it's not about sorting, so sorry for referring to it in this issue. It's somehow related to join-repartitions and channels (because it started to happen after getting rid of unbounded ones). I like that unbounded channels were refactored/removed, but somehow it introduced an issue that I faced with.

It returns correct error with 16.0.0 & just don't respect memory limit with 17.0.0.
However I can't reproduce it on small file sizes with lower mem-limits (it works correctly for both versions - returns error), so I think this might be related to buffering.

So the code is attached above in the my message, and I'm attaching file (about 1.3 GiB) - GDrive link. It's actually artificial case, but it shows the some kind of regression is exists.

@crepererum
Copy link
Contributor

@DDtKey I can look into your case next week.

The channel refactoring I did could increase memory usage if some child node of the repartition node starts to buffer data in some uncontrolled manner. Also it changes scheduling a bit so some edge case could now show up that was hidden before.

@alamb
Copy link
Contributor

alamb commented Feb 3, 2023

I think there was significant work done on the join operators themselves between 16.0.0 and 17.0.0 so it may well be the case that the plans or operators that run after the repartition change their behavior

It might help to start by looking at the plans

@crepererum
Copy link
Contributor

@DDtKey your code includes joins which are NOT included within the mem manager yet (see #5220). While my work in #4867 changed the buffering and hence can trigger issues w/ JOIN, I think that JOIN on large datasets doesn't OOM at the moment is luck to begin with. I propose we focus on #5220 and re-check your example afterwards because if it still OOMs (w/o an DataFusion error), that would be an actual bug and not just "bad luck".

@DDtKey
Copy link
Contributor

DDtKey commented Feb 9, 2023

@crepererum yes, that's true. After some investigation (and #5162 actually also related to this - and reproduces in old versions) I realized that joins didn't respect MemoryPool even before, so DataFusionError in previous version could be really just a luck. Changes of buffers just somehow highlighted the problem. My initial assumption was only due to searching difference in behavior with bisect.

So I totally agree about the priority of #5220

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants