Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fix For DaskExecutor Bloating Memory #5004

Closed
wants to merge 3 commits into from

Conversation

kvnkho
Copy link
Contributor

@kvnkho kvnkho commented Sep 28, 2021

Work in Progress PR for Feedback on Design (No Tests Done Yet)

The Problem

A lot of users have reported seeing Prefect consuming a lot of memory when using the DaskExecutor. Sample issues can be seen here and here. It was also hinted here.

In the example of @sharkinsspatial in the pangeo-forge link above, a simple code snippet such as the one below causes the Dask scheduler to perform poorly (memory issues over a 8GB scheduler). The task does not even have a return.

from prefect import Flow, task
from time import sleep

@task
def map_fn(x):
    sleep(0.25)
task_range = list(range(1, 400000))
with Flow('map_testing') as flow:
    map_fn.map(task_range)

Yet, an equivalent snippet with pure Dask is able to run in a reasonable amount of time. Prefect is adding a significant amount of overhead to these submit calls.

client = Client(cluster)

def map_fn(x):
    sleep(0.25)

value_range = list(range(1, 400000))

for value in value_range:
    future = client.submit(map_fn, value)
    futures.add(future)

client.gather(futures)

Hypothesis

In order to understand where the overhead comes from, we need to look at the following Dask snippets. Snippet 1 is basically the pure Dask one from above. The size of x below is roughly 20 KB and the sum_the_list() function sums x with i.

x = list(range(1,2500))
n = 100000

_futures = []
for i in list(range(n)):
    _futures.append(client.submit(sum_the_list, x, i))
result = client.gather(_futures)

Compare this with a second snippet where we send a future ahead of time for the value of x. This way, the workers will already take care of getting a copy of x and we reduce data transfer. This is done by submitting the dummy() function ahead of time.

def dummy():
    return x

_futures = []
data_fut = client.submit(dummy)
for i in list(range(n)):
    _futures.append(client.submit(sum_the_list, data_fut, i))
result = client.gather(_futures)

Profiling

We can then profile these two setups using the performance_report context manager of Dask. For each of these tests, I spun up a new cluster to avoid any unmanaged memory from affecting the numbers.

from dask.distributed import performance_report

with performance_report(filename="dask-report-efficient.html"):
    ....

Scenario 1 - Inefficient Execution

This is the report for the inefficient case. Mean memory was 900 MB and total executor time was 640 seconds

Scenario 2 - Efficient Execution

This is the report for the efficient case. Mean memory was around 400 MB and total execution time was 200 seconds.

The Problem in Prefect Terms

In Prefect, the FlowRunner currently submits things to the executor with the following code.

# this is where each child is submitted for actual work
for idx, states in enumerate(list_of_upstream_states):
    .....
    submitted_states.append(
        executor.submit(
            run_task,
            task=task,
            state=current_state,
            upstream_states=states,
            context=dict(
                prefect.context,
                **task_contexts.get(task, {}),
                map_index=idx,
            ),
            flow_result=self.flow.result,
            task_runner_cls=self.task_runner_cls,
            task_runner_state_handlers=task_runner_state_handlers,
            upstream_mapped_states=upstream_mapped_states,
            extra_context=extra_context(task, task_index=idx),
        )
    )

The context alone here is around 9.8 KB, but we are also repeatedly sending the task, flow_result, task_runner_cls, task_runner_state_handlers, and upstream_mapped_states. It's hard to say what these add up to, but we can definitely ease the burden of the scheduler by moving relevant data to workers ahead of time.

Potential Mechanisms Needed in a Solution

client.scatter()

The Dask client has a method called client.scatter(). This takes a piece of data and moves it across the workers instantly. The instant part is the main problem. If workers eventually get added due to autoscaling, they don't have the pieces of data that were sent previously.

Using a future

This leads us to the second option, which is to send these variables as futures. When Dask needs them for an operation, it will take care of getting them to the relevant worker if not already present. This is what was shown in one of the earlier code snippets (posted again for quick viewing).:

def dummy():
    return x

_futures = []
data_fut = client.submit(dummy)
for i in list(range(n)):
    _futures.append(client.submit(sum_the_list, data_fut, i))
result = client.gather(_futures)

Potential Solutions

There are pretty much only two places the code can be edited to send the task data as futures. The first is the FlowRunner and the second is the DaskExecutor code.

Note that the FlowRunner makes 100 submit calls for a mapped task with 100 elements. So at the executor level, all that is seen is the individual element of the mapped tasks.

Editing DaskExecutor

I wanted to purely edit DaskExecutor, but I don't think it's reliable. The process would be editing the submit call to be responsible for passing the futures. The problem is, the DaskExecutor would need to check if the futures already exist to know whether to do this submission or if the submission was already done. Checking for future existence on the schedule is wonky.

You can try it with

print(client.futures)

which returns:
{'dummy-7c9a8c022791a7950f23fcb1179657a2': <FutureState: finished>}

This means that your functions need to be properly named in order to distinguish the Futures. You would need to parse this string, which could easily lead to false positives. But then even from there, it's unclear how to get a reference to this dummy() and then pass it to my client.submit() call even if I know the future is there.

Editing FlowRunner

This made me positive I needed to edit the flow runner. Before the entry of the loop that submits mapped tasks individually, we can already submit a future to the workers, and then feed that future into executor.submit(). That is the basis on the PR, and then the rest of the work is adding the scatter method across all Executors.

To-Do

This standalone PR should reduce the memory footprint by a bit already. The only problem is that context is a bit harder to untangle because it is like 90% the same across mapped tasks but 10% different. We need to split it and submit() the 90% and then combine it with the 10% inside the task. This involved editing the TaskRunner.

This PR:

  • adds new tests (if appropriate)
  • adds a change file in the changes/ directory (if appropriate)
  • updates docstrings for any new functions or function arguments, including docs/outline.toml for API reference docs (if appropriate)

@kvnkho kvnkho changed the title Dask Fix For DaskExecutor Bloating Memory Sep 28, 2021
@kvnkho kvnkho changed the title Fix For DaskExecutor Bloating Memory [WIP] Fix For DaskExecutor Bloating Memory Sep 28, 2021
@zanieb zanieb marked this pull request as draft October 1, 2021 17:04
@kvnkho kvnkho reopened this Nov 1, 2021
@BitTheByte
Copy link
Contributor

@kvnkho can we get an update for this?

@kvnkho
Copy link
Contributor Author

kvnkho commented Jan 17, 2022

Yeah I gave it a shot fairly recently (last 2 weeks of 2021) and replaced the task submission with futures (everything except context because that is harder). I found no improvement from doing that. It might be that context is the meat of the bloat, but context is quite intertwined in a few places so it's harder to create a future for the context.

If I am unable to get anything working soon, then the focus will probably be getting this right from Orion.

@BitTheByte
Copy link
Contributor

BitTheByte commented Jan 17, 2022

@kvnkho As I understood Orion doesn't have this problem currently? I've got a full ecosystem built on top of Prefect and was surprised to find such a problem it's really hard to replace it at this stage

@zanieb
Copy link
Contributor

zanieb commented Jan 21, 2022

@BitTheByte — Orion is naively submitting tasks to Dask right now so it likely has a similar problem. I'll be looking into solving this in Orion in the next few weeks. Hopefully, I'll learn enough to backport a solution to here.

@kvnkho — I'm going to close this for now. Thanks for investigating.

@cisaacstern
Copy link

@madkinsz, thanks for carrying forward @kvnkho's work on this.

In Pangeo Forge, we still seem to be encountering scheduler memory growth problems when using Prefect's DaskExecutor, e.g.

Any sense of if/when a fix may be available for this? Or anything I can do to help along the process, such as testing a branch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants