Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tune work stealing to be data size aware #278

Merged
merged 4 commits into from
May 25, 2016

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented May 17, 2016

Work stealing can be inefficient if it pulls large datasets around. We avoid these cases.

sum(self.nbytes.get(d, 1000) for d in
self.dependencies[t]) > 1000000]
bads = set(bad)
good = [t for t in tasks if t not in bads]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a hack and needs to be replaced.

@mrocklin mrocklin changed the title [WIP] Avoid work stealing large data Work stealing May 18, 2016
@mrocklin mrocklin changed the title Work stealing Tune work stealing to be data size aware May 18, 2016
@mrocklin
Copy link
Member Author

@jcrist @martindurant if either of you are interested in eventually learning more about the distributed scheduler, this PR is somewhat bite-sized and could use review.

@mrocklin mrocklin force-pushed the work-steal-big branch 3 times, most recently from 8834d99 to a654da7 Compare May 18, 2016 22:28
@mrocklin
Copy link
Member Author

OK, this fails in some workloads (takes up 80% of our scheduling budget) because it's non-constant per new task. When we have many idle and saturated workers with tasks that can't easily be shared (too much data transfer to warrant the movement) we end up cycling though all workers over and over again on each update.

I'll change things around to only respond to newly saturated or newly idle workers.

@mrocklin
Copy link
Member Author

OK, this is efficient for the shuffle problem (which is particularly demanding)

# self.ensure_occupied(new_worker)
else:
self.ready.appendleft(key)
# self.ensure_idle_ready()

def should_steal(self, key, bandwidth=100e6):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any attempt to estimate the bandwidth?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe using a module level constant and/or an environment variable would allow people with high end hardware to tune this to better reflect their empirically observed bandwidth.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a module level constant. I think we should specify these sorts of things with config files eventually when there are other such parameters like choice of compression.

At the moment I'm not too concerned with bandwidth for work-stealing. For most tasks I've seen the computation / communication ratio is either very high or very low.

@martindurant
Copy link
Member

I see no check for whether the potential thief is on the same machine as the victim, whereas I would have thought that's important for communication bandwidth.

Do you plan to also steal based on memory exhaustion sometime?

--------------------------

If a task has been specifically restricted to run on particular workers (such
as is the case when special hardware is required) then we do not steal.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stealing amongst the set of allowed workers is prohibited for a hard restriction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not prohibited, just not implemented. Generally there is a high cost for adding any logic here (error prone, very performance sensitive) so I've tended to avoid some of the more fringe cases. They're definitely valid, they just haven't shown yet themselves to be worth the development and maintenance cost.

@mrocklin
Copy link
Member Author

I see no check for whether the potential thief is on the same machine as the victim, whereas I would have thought that's important for communication bandwidth.

True, although in practice intra-node communication (300MB/s + serialization) is not hugely different from inter-node communication (100MB/s + serialization) so at the moment I'm tempted to avoid caring.

Do you plan to also steal based on memory exhaustion sometime?

Perhaps, as applications demand.

@mrocklin
Copy link
Member Author

The general lesson so far has been that stealing work definitely helps robustness for computations. Lots of computations are pretty terrible without it. However at the same time it introduces a lot of complexity that is hard to reason about and verify. It's been a bit of a rough ride so far.


@gen_cluster(executor=True, ncores=[('127.0.0.1', 1)] * 10)
def test_worksteal_many_thieves(e, s, *workers):
np = pytest.importorskip('numpy')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numpy is unused here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@mrocklin
Copy link
Member Author

Future work here I think is to have workers send tasks back to the scheduler once it's clear that they're over-burdened. I've run into cases where this occurs, especially when the length of functions varies significantly. This could be done by maintaining excess tasks on the worker in a queue and running a periodic callback to filter out any task that had not been sent to the thread-pool-executor since the previous iteration.

@mrocklin mrocklin force-pushed the work-steal-big branch 2 times, most recently from 5179198 to bdff777 Compare May 23, 2016 21:20
@mrocklin
Copy link
Member Author

If there are no other comments then I may merge this tomorrow.

@ogrisel
Copy link
Contributor

ogrisel commented May 24, 2016

Future work here I think is to have workers send tasks back to the scheduler once it's clear that they're over-burdened. I've run into cases where this occurs, especially when the length of functions varies significantly. This could be done by maintaining excess tasks on the worker in a queue and running a periodic callback to filter out any task that had not been sent to the thread-pool-executor since the previous iteration.

That's an interesting idea. I think it would be worth maintaining a suite of workload that highlight both common and pathological scheduling behaviors to make sure that we do not introduce performance regressions when refactoring the scheduling logics.

``log(n)`` cost to the common case.

Instead we allow Python to iterate through the set of saturated workers however
it finds to be the most efficient.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the phrasing of this sentence. Could you please try to make it more explicit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrased

@mrocklin
Copy link
Member Author

I think it would be worth maintaining a suite of workload that highlight both common and pathological scheduling behaviors to make sure that we do not introduce performance regressions when refactoring the scheduling logics.

This sounds good to me. Can you recommend a benchmark suites from a couple of other projects? I'd like to see how other people do this. What do you suggest?

@mrocklin
Copy link
Member Author

Currently my solution to avoid performance regressions is to add unit tests to ensure specific behavior:

@gen_cluster(executor=True, ncores=[('127.0.0.1', 1)] * 2)
def test_even_load_on_startup(e, s, a, b):
    x, y = e.map(inc, [1, 2])
    yield _wait([x, y])
    assert len(a.data) == len(b.data) == 1

However, I usually add these only after I notice that something is broken. I've been testing applications by hand. It'd be nice to automate alerts here.

@minrk
Copy link
Contributor

minrk commented May 24, 2016

Pandas uses asv, which is something I've had some positive experiences with, but haven't managed to fully integrate into a project. I tried to use it myself for pyzmq, but I couldn't get around the hangs that would happen for certain non-functional stages in history.

@mrocklin
Copy link
Member Author

I'm planning to handle performance benchmarks after this.

This branch holds some fairly important fixes. I'd like to merge. Any further comments?

@ogrisel
Copy link
Contributor

ogrisel commented May 24, 2016

LGTM. I have not tested this branch on a real cluster / workload but I guess we can always fine tune later.

@ogrisel
Copy link
Contributor

ogrisel commented May 25, 2016

BTW +1 for asv as a bench framework. While I have never used it myself, it looks really neat.

mrocklin added 4 commits May 25, 2016 07:15
This was oddly causing a lot of CPU use
We now inspect the expected communication and computation time
before deciding to steal a task.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants