Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker can memory overflow by fetching too much data at once #6208

Closed
fjetter opened this issue Apr 26, 2022 · 13 comments · Fixed by #6975
Closed

Worker can memory overflow by fetching too much data at once #6208

fjetter opened this issue Apr 26, 2022 · 13 comments · Fixed by #6975
Assignees
Labels

Comments

@fjetter
Copy link
Member

fjetter commented Apr 26, 2022

There is a possibility for a worker to suicide by fetching too much data at once

Old version:

while self.data_needed and (
len(self.in_flight_workers) < self.total_out_connections
or self.comm_nbytes < self.comm_threshold_bytes
):

New version:

if (
len(self.in_flight_workers) >= self.total_out_connections
and self.comm_nbytes >= self.comm_threshold_bytes
):
return {}, []

Simple example

Let's assume that keys have on average ~500MB
Local memory available is ~10GB
total_out_connections: 50 (default)

That would cause us to fetch up to 25GB at once which would kill the worker immediately.

This problem is exacerbated if the keys data sizes are underestimated (e.g. bias/bug in dask.sizeof)

cc @crusaderky

@gjoseph92
Copy link
Collaborator

That would cause us to fetch up to 25GB at once which would kill the worker immediately.

I think it's also likely this wouldn't always kill the worker, but freeze it like in #6110. If the 25GiB come somewhat staggered in those 500MB increments, physical memory usage will creep up very close to the 10GB max, without actually requesting more than it. Once it gets close enough, the system will thrash to the point that everything stops, so more memory isn't allocated (which would put it over the limit and kill it). The smaller the key size and larger the key count, this more likely this is to happen (probably why a shuffle was a good reproducer).

Since we know the (approximate) size of all the keys we're fetching, this seems like something we can avoid doing in the first place (don't fetch more than we have memory capacity for, as a start). Maybe also a use-case for receiving data directly to disk #5996?

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 27, 2022

pseudocode:

if current managed_in_memory + self.comm_nbytes > threshold% * max_memory:
    stop scheduling further gather_dep's

or with better encapsulation:

    if self.data.would_exceed_threshold(self.comm_nbytes):
        stop scheduling further gather_dep's

There are two ways to resume gathering:

  1. a dependency fails to arrive, thus reducing comm_nbytes
  2. enough dependencies arrive to send the worker over threshold. Older keys are automatically spilled, thus reducing managed_in_memory until it's below threshold.

Note that the above mitigates the issue, but doesn't actually prevent the use case that @fjetter showed above in case of a task that individually requires more memory than what's available:

CHUNK_SIZE = 500 // 8 * 2**20  # 500 MiB
a = da.random.random(CHUNK_SIZE * 20, CHUNK_SIZE)  # 10 GiB total
a = a.persist()
f = client.submit(lambda _: None, futures_of(a))

What will happen is that whatever worker runs f will start accumulating all chunks of a locally.
In main, if there are 20+ workers in the cluster, this means immediate death from gather_dep.
With the change above, gather_dep will be successful and all the data will be copied onto the worker and progressively spilled.
However, when the time comes to transition f from ready to running, then

  1. chunks of a will be moved from slow to fast
  2. before f can start running, the first chunks are already being sent back from fast to slow; however they also exist in memory. The worker goes to 100% and if we're lucky it's terminated, if not it just freezes due to swap thrashing.

And this is not considering unrelated unmanaged memory - e.g. on a multithreaded worker, there may be other tasks running.

@mrocklin
Copy link
Member

Two thoughts:

  1. How often does this happen? Presumably this was related to the Florian Shuffle Deadlock (I need a better name for this). Have we seen this happen in other common situations?

Let's assume that keys have on average ~500MB
Local memory available is ~10GB
total_out_connections: 50 (default)

That would cause us to fetch up to 25GB at once which would kill the worker immediately.

If planning for worst case would cripple things (You must have 25GB of memory free before you do any work!) then I would not want to plan for worst case. I think that we could reduce this burden in a few ways:

  1. We could try to learn the size of transfers on the worker (creating something like a worker.py::TaskPrefix)
  2. We could ask the sender's side to cancel for us. "Hey other worker! I want data "x" but I'm running kinda low on memory. I only have 4 GB free. Please only send something if it's less than 2GB? If it's greater, let me know how large it is instead so that I can plan accordingly"
  3. We could start to reduce the number of concurrent inbound connetions as our memory starts to dry up.

Just some thoughts to reduce the burden here. Please don't let these distract too much from the main conversation.

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 27, 2022

  1. We could try to learn the size of transfers on the worker (creating something like a worker.py::TaskPrefix)
  2. We could ask the sender's side to cancel for us. "Hey other worker! I want data "x" but I'm running kinda low on memory. I only have 4 GB free. Please only send something if it's less than 2GB? If it's greater, let me know how large it is instead so that I can plan accordingly"

The receiving worker already knows in advance how large the data will be before it asks for it. That's returned by select_keys_for_gather which in turn is reflected into Worker.comm_nbytes.

  1. We could start to reduce the number of concurrent inbound connetions as our memory starts to dry up.

I would not want to count workers towards the maximum if they are going to send me only a few kilobytes .

@mrocklin
Copy link
Member

Ah, great then.

@hendrikmakait
Copy link
Member

hendrikmakait commented Aug 16, 2022

In coiled/benchmarks#229, we have found a reliable reproducer for a worker getting oom-killed by fetching too much data. The issue is exacerbated by a misconfigured Worker.memory_limit (coiled/feedback#185), but I am confident that we can also reproduce this issue if that is fixed by simply scaling test_tensordot_stress to more workers. With a default chunk size of 100 MiB and the default of 50 outgoing and 10 incoming connections per worker, we might use up to ~2.9 GiB for communication, which is ~90 % of all available memory available to the worker on a t3.medium.

EDIT: Apart from 100 MiB being the default chosen by dask.array itself in coiled/benchmarks#229, we recommend chunk sizes between 10(0) MiB and 1 GiB in several places:

@hendrikmakait
Copy link
Member

hendrikmakait commented Aug 16, 2022

One quick yet imperfect fix might be to introduce a parameter that limits the amount of memory we want to dedicate to communications, effectively adjusting the above if-clause to

if ( 
    (
        len(self.in_flight_workers) >= self.total_out_connections 
         and self.comm_nbytes >= self.comm_threshold_bytes
    ) or self.comm_nbytes >= self.comm_memory_limit
 ): 
     return {}, [] 

This value could be set as a percentage of Worker.memory_limit to account for different machines. As a downside, this might have negative effects on situations where all we want to do for now is fetching an initial set of data to work with.

To better understand the burden comms put onto our workers, we may also want to add metrics about the number of open connections and the size of self.comm_nbytes. XREF: #6892

In addition, we may want to drop self.comm_threshold_bytes as well as the limits on incoming/outgoing connections as a parameters and fully rely on a mechanism that looks at currently used memory to schedule comms limited by the size of the connection pool.

cc @fjetter

@gjoseph92
Copy link
Collaborator

It's also worth noting that the same issue exists for workers sending data. There's a limit to the number of incoming connections a worker can have at once (default 10), but not the number of bytes it can be sending at once. If a worker is paused, it abruptly limits itself to 1 connection, but that message could still be large.

Serialization may not be zero-copy. If compression is used, it's never zero-copy. TLS comms will also require a copy in the SSL buffer. So even if the sent data is already in memory, we could potentially be doubling (or more) memory usage of that data until the other worker confirms it received the message.

The worst case is when the data being sent is currently spilled to disk. get_data will then read it into memory (which stores a strong reference to it in the data.fast dict, so even after it's been sent, it will remain in memory at least until the next memory monitor interval, which could a half-second away—months away, compared to the speed of memory allocations). Then, it might have even more copies in serialization, compression, and encryption. Sending a spilled key can be a very costly operation, as well as slow, which is why I advocate for #5996 so much.

Ideally, a worker shouldn't be willing to send more than memory_target - current_memory bytes. Of course, current_memory isn't something we can measure all the time, so a fixed threshold like @hendrikmakait suggested above might be a good place to start.

@gjoseph92
Copy link
Collaborator

I wrote this a big ago, but I'd like to point to #6212 as a more holistic approach to this problem. The overall idea is that:

  • We do many things concurrently that require non-trivial amounts of memory (run user tasks, receive keys, send keys, etc.).
  • If we have 1gb of memory left, and we want to both receive 600mb of data from another worker, and load 600mb of spilled data back into memory to run a task, wouldn't it be nice if those operations could coordinate to wait for each other, instead of running concurrently and probably killing the worker?

It's common to use a synchronization primitive like a semaphore or CapacityLimiter to manage concurrent code's access to a limited resource. Maybe we could have a similar structure around memory?

@crusaderky
Copy link
Collaborator

crusaderky commented Aug 20, 2022

The worst case is when the data being sent is currently spilled to disk. get_data will then read it into memory (which stores a strong reference to it in the data.fast dict, so even after it's been sent, it will remain in memory at least until the next memory monitor interval, which could a half-second away

Not quite correct. If unspilling a key causes the reported managed memory to get above target (60%), then other keys will immediately be spilled out, until you are again below target.

The mechanism you mention happens specifically and only when managed memory is below 60% and process memory is above spill (70%). In other words, you either have at least 10% unmanaged memory or sizeof() is severely under-reporting memory usage.
The 10% unmanaged memory use case is interesting, as it is a lot easier to reach on smaller workers. A brand new worker (with pandas imported) occupies a baseline of 138 MiB on my machine.

@hendrikmakait hendrikmakait self-assigned this Aug 25, 2022
@fjetter
Copy link
Member Author

fjetter commented Aug 26, 2022

It's common to use a synchronization primitive like a semaphore or CapacityLimiter to manage concurrent code's access to a limited resource. Maybe we could have a similar structure around memory?

We do not know how much data sending/receiving actually requires, nor do we know how much a user task requires. Without reliable measurements I would prefer sticking to a pragmatic, low complexity approach for now.

I do believe that simply introducing a threshold as proposed by @hendrikmakait in #6208 (comment) will improve things significantly already.

I'm not sure if an absolute threshold, a relative (to memory_limit) threshold or a dynamic threshold (percentage of free/available memory) fits best. My gut feeling tells me to start with a relative threshold (based on memory_limit). Together with the added instrumentation, we can iterate on this choice incrementally, if necessary. If there are still significant shortcomings afterwards, we can think about a more complex solution.

@jrbourbeau
Copy link
Member

We reverted #6975 in #6994 before releasing. Re-opening this issue as a reminder to restore #6975 post-release

@jrbourbeau jrbourbeau reopened this Sep 2, 2022
@fjetter
Copy link
Member Author

fjetter commented Oct 24, 2022

This is in, by now

@fjetter fjetter closed this as completed Oct 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants