Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Epic] Generate runtime errors if the memory budget is exceeded #3941

Closed
4 tasks done
Tracked by #587
alamb opened this issue Oct 24, 2022 · 18 comments
Closed
4 tasks done
Tracked by #587

[Epic] Generate runtime errors if the memory budget is exceeded #3941

alamb opened this issue Oct 24, 2022 · 18 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Oct 24, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
The basic challenge is that DataFusion can use an unbounded amount of memory for running a plan which typically results in DataFusion being killed by some system memory protection limit (e.g. the OOM Killer on Linux). See #587 for more details

As a first step towards supporting larger datasets in DataFusion, if a plan will exceed the overall budget, it should generate a runtime error (resource exhausted) rather than exceeding the budget and risking being killed

There should be a way to keep the current behavior as well (do not error due to resource exhausted)

Describe the solution you'd like

  1. The user can define a limit for memory via MemoryManagerConfig
  2. All operators that consume significant memory (Hash, Join, Sort) will properly account for and request memory from the MemoryManager via methods like try_grow
  3. If sufficient memory can not be allocated, the plan should return ResourcesExhausted

Needed:

Describe alternatives you've considered
We can always increase the accuracy of the memory allocation accounting (e.g. RecordBatches internal to operators, etc). However, for this initial epic I would like to get the major consumers of memory instrumented and using the MemoryManager interface. Hopefully this will also allow

Additional context
cc @yjshen @crepererum
related to issues like https://github.com/influxdata/influxdb_iox/issues/5776 (and some internal issues of our own)

@alamb alamb added the enhancement New feature or request label Oct 24, 2022
@xudong963
Copy link
Member

For join operator, we have sort-merge join.

When the memory budget is exceeded if we should try sort-merge join first?

@alamb
Copy link
Contributor Author

alamb commented Oct 24, 2022

When the memory budget is exceeded if we should try sort-merge join first?

Yes, that would be amazing @xudong963 -- I think it is covered by #1599

I think that getting the MemoryManager hooked into the join (so it knows when it would run out of memory) is probably a prerequisite to automatically switching to sort-merge join during execution.

@milenkovicm
Copy link
Contributor

milenkovicm commented Oct 24, 2022

Should memory limit be optimistic? What I mean is that in case of aggregation we could first process record batch, compare memory before and after batch is process and request delta value from memory manager. Otherwise we'd need to do two passes over records batch one to calculate required memory other to do actual processing, or, request memory for every record, which may lead to contention on memory manager and trigger spill in middle of batch processing.

End of batch processing would be a "safe point" which should have correct memory usage, or trigger spill.
wdyt?

@crepererum
Copy link
Contributor

I think it's OK to have some slack room on top of the limit, which is somewhat controlled by the batch size. It's unlikely that we are going to account for every single byte anyways, since there might be some aux data structures here and there that are heap-allocated. So I would treat this is a "best effort limit w/o impacting performance (too much)".

We could later (if there's demand for it) add a config option "strict memory enforcement" that impacts performance.

@milenkovicm
Copy link
Contributor

One thing I cant really wrap my head around is use of async with try_grow.

Issue I have is with use of try_grow within struct impl Stream e.g GroupedHashAggregateStreamV2 which actually handles RecordBatch

impl Stream for GroupedHashAggregateStreamV2 {
    type Item = ArrowResult<RecordBatch>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let this = &mut *self;
       // await` is only allowed inside `async` functions and blocks only allowed inside `async` functions and blocks
        this.mem_manager.try_grow(200).await;
  }
}

anybody has any better idea how to integrate it?

@crepererum
Copy link
Contributor

crepererum commented Oct 25, 2022

anybody has any better idea how to integrate it?

https://docs.rs/futures/latest/futures/stream/fn.unfold.html maybe?

@alamb
Copy link
Contributor Author

alamb commented Oct 25, 2022

Should memory limit be optimistic? What I mean is that in case of aggregation we could first process record batch, compare memory before and after batch is process and request delta value from memory manager.

Yes, I agree this would be best strategy: as memory is needed, we request more memory from the memory manager incrementally.

End of batch processing would be a "safe point" which should have correct memory usage, or trigger spill.
wdyt?

Yes

In general I think upgrading / improving the memory manager would likely be fine.

@alamb
Copy link
Contributor Author

alamb commented Oct 25, 2022

I think it's OK to have some slack room on top of the limit, which is somewhat controlled by the batch size.

There is also memory_fraction on MemoryManagerConfig https://docs.rs/datafusion/13.0.0/datafusion/execution/memory_manager/enum.MemoryManagerConfig.html to account for some slack in estimates

@milenkovicm
Copy link
Contributor

milenkovicm commented Oct 26, 2022

Thanks @alamb,

As I said in previous comment, the biggest problem of integrating aggregation with memory manager is async at MemoryConsumer::try_grow function. async is there to facilitate async spill(..) additional memory can't be acquired.

easy ways to work around async would be either to:

  • de-couple try_grow and spill, removing async from try_grow, do manual spill in case of failure.
  • component to use memory manager directly memory_manager().can_grow_directly (we need to make this function public, removing async as not needed), do manual spill in case of failure. This approach would bypass MemoryConsumer

wdyt?

@alamb
Copy link
Contributor Author

alamb commented Oct 26, 2022

de-couple try_grow and spill, removing async from try_grow, do manual spill in case of failure.

I think decoupling try_grow and spill would be my preference. In general, if memory is exceeded we may not actually want to spill for all operators (e.g. an initial hash table might simply flush its output if it exceeded memory rather than trying to spill)

cc @yjshen in case he has some additional thoughts

@yjshen
Copy link
Member

yjshen commented Oct 26, 2022

Hi, sorry to join this party late.

I don't quite get why the async is a problem for the memory manager while implementing aggregation.

We could do memory-limited aggregation as follows:

  1. try to allocate a fixed-sized buffer (a record batch or a row batch (if row aggregate is possible) with 8096 rows for example.) for incoming groups.
  2. aggregate incoming records, updating existing buffer slots or adding new buffer rows if there's space left for the last pre-allocated buffer.
  3. if the last allocated aggregate buffer is full, we try_grow aggregate's memory by allocating another fixed-sized buffer.
  • if more memory quota is granted, go to 2 and continue aggregating.
  • if we fail to get one more aggregation buffer from the memory manager, we spill (see spill described below).
  1. if the input stream is exhausted, we could either:
  • get a sorted iterator of the in-memory aggregation buffers, do multi-way merging with the spills, get the final results, and free all memory used at last (if the aggregate is final)
  • output the in-memory buffers, and free all the memory (in case the aggregate is partial).

The spill procedure would be:

  • if the aggregate is partial, produce partial output based on all the aggregation buffers we have, free all the memory, and go to agg step 1. to handle the following incoming tuples.
  • if the aggregate is final, sort all the current buffers by group key and spill to disk, store it in the spilled files list, and go back to agg step 1. to handle the following incoming tuples.

Also, one could refer to Apache Spark's hash aggregate impl if interested.

@milenkovicm
Copy link
Contributor

thanks for your comment @yjshen,

concern raised in previous comments is not spill algorithm for aggregation, it is about interaction between non-async and async code.

As it is implemented GroupedHashAggregateStreamV2 lives in non-async word but memory manager / memory consumer expose async methods which can't be called from poll_next:

impl Stream for GroupedHashAggregateStreamV2 {
    type Item = ArrowResult<RecordBatch>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let this = &mut *self;
       // await` is only allowed inside `async` functions and blocks only allowed inside `async` functions and blocks
        this.mem_manager.try_grow(200).await;
      // rest of the code which does aggregation and spill
  }
}

so the question is how to bridge the gap

@yjshen
Copy link
Member

yjshen commented Oct 26, 2022

Thanks for the explanation!

Sort in DataFusion is currently memory limited, and I think we could apply a similar approach in aggregation like that in sort: https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/sorts/sort.rs#L799-L813.

We could async do_agg as it is for do_sort.

@alamb
Copy link
Contributor Author

alamb commented Oct 27, 2022

I think we should consider reducing our hash implementation repetition prior to implementing spilling #2723

In general I think @yjshen 's algorithm for grouping in limited memory is 💯

I think we can implement it in stages, however, the first stage being tracking the current memory used and erroring when that is exceeded.

Then in the second stage, rather than erroring we can implement the externalized / spilling strategy

@yjshen
Copy link
Member

yjshen commented Oct 31, 2022

Thanks @alamb for reminding me! I'll start working on #2723 this week.

@alamb
Copy link
Contributor Author

alamb commented Oct 31, 2022

Also, as one might imagine given our renewed interest in this ticket, someone from the IOx team may start working on generating runtime errors in the next few days as well

@alamb
Copy link
Contributor Author

alamb commented Nov 28, 2022

An update here: We have added memory limit enforcement for Sort / Grouping. I plan to write some sql-level tests for this shortly. The only remaining work item that will remain is memory limiting Joins

given that joins are currently undergoing some serious rework (@liukun4515 @jackwener @mingmwang and others, eg. #4377) we don't plan to add memory limits in until that settles down

@alamb alamb changed the title Generate runtime errors if the memory budget is exceeded [EPIC] [Epic] Generate runtime errors if the memory budget is exceeded Mar 5, 2023
@alamb
Copy link
Contributor Author

alamb commented Jun 11, 2023

I believe we have completed all initial planned work for generating runtime errors if the memory budge is execeeded, so closing this issue 🎉

@alamb alamb closed this as completed Jun 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants