Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Worker.delete_data sync #3922

Merged
merged 6 commits into from
Jun 26, 2020
Merged

Conversation

pentschev
Copy link
Member

This should fix issues where Worker.delete_data isn't awaited.

Fixes #3920

Comment on lines 1358 to 1360
self.scheduler.remove_keys(
address=self.contact_address, keys=list(keys)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't await a coroutine then it won't occur. So this code probably doesn't make sense.

However, the TODO note just above it is interesting. Maybe this code isn't necessary? It might be worth looking at git blame here to see why it was added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's even harder to await a function that doesn't exist, which is the case here. 😄

With git log -c -S remove_keys distributed/scheduler.py, I managed to find that this function was removed somewhere in between dc54748 and 2c175e8 . Unfortunately that dates back to 2016, I believe this was a leftover from some commit around that time, and I can remove that line if it makes sense, but unfortunately I can't dig in deeper.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'd be +1 to just dropping this and the TODO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tentatively dropped it on the newest commit, if there are any concerns with that I'll revert it.

Comment on lines 1641 to 1645
df = dask.datasets.timeseries(dtypes={"x": int, "y": float}, freq="1s").reset_index(
drop=True
)

await client.compute(df.map_partitions(lambda df: df.__sizeof__()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is everything here necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like getting rid of await client.compute(df.map_partitions(lambda df: df.__sizeof__())) works just the same, so just the dataframe creation suffices.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in latest commits.

Comment on lines 1656 to 1660
del df
await client.run(_check_data, False)

del df2
await client.run(_check_data, True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To simplify this logic I recommend that you just check w.data

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you're suggesting assert len(w.data) > 0/assert len(w.data) == 0, is that right? If so, that unfortunately fails on the second assertion, could this be some async black magic? E.g., client.run will force some syncing while w.data is still outdated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client.run doesn't force any synchronization, but maybe the GC hasn't run yet at this point. Regardless we should probably make this robust to GC taking a while (GC delays are a major source of intermittent testing failures for us). Most tests of this form look like the following:

start = time()
while w.data:
    await asyncio.sleep(0.01)
    assert time() < start + 2

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation @mrocklin , your suggestion works and that's fixed in latest commits.

Comment on lines 1641 to 1643
df = dask.datasets.timeseries(dtypes={"x": int, "y": float}, freq="1s").reset_index(
drop=True
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a dataset this complex? Do we need these datatypes or the reset_index call? If not, lets remove them to keep things as simple as possible for future reviewers.

Almost all tests in this file intentionally use very simple computations, like future = client.submit(inc, 1). I recommend that we do the same here unless having a dataframe with a reset index is critical to test the delete_data operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no. I can simplify that a bit.

@pentschev
Copy link
Member Author

@mrocklin I dropped the test and left only the actual code changes. From my side it's good to go, let me know if there are other things to be addressed.

@mrocklin mrocklin merged commit 93701f8 into dask:master Jun 26, 2020
@mrocklin
Copy link
Member

This is in. Thanks @pentschev

@pentschev
Copy link
Member Author

Thanks @mrocklin for reviewing and merging!

@pentschev pentschev deleted the make-delete_data-sync branch November 11, 2020 17:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Await on handle_stream raises missing delete_data await warning
3 participants