Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite of P2P control flow #7268

Merged
merged 26 commits into from
Nov 10, 2022
Merged

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Nov 8, 2022

This builds on #7195 which I was not able to finish because I ran into concurrency issues due to leaky tests. Particularly the global memory_limits on the MultiFile and MultiComm classes caused problems.

I went ahead and rewrote major parts of the concurrency model

  • MultiComm and MultiFile are now called CommShardsBuffer and DiskShardsBuffer respectively
  • Both Disk and Comm buffers inherit from a common base class ShardsBuffer that implements the concurrency and limiting of both classes
  • Both buffers are operating on the event loop such that there is no longer need for threading synchronization primitives.
  • To enable this, Shuffle needed to be rewritten slightly and uses another offload (specifically in Shuffle.add_partitions). We're still doing all the compute on a thread but it's a different thread now. The actual worker thread is mostly idle now.
  • No need for any queues any more to control concurrency. Instead, the buffers initialize N coroutines that run until the buffer is closed
  • Memory limiting is synchronized with a new ResourceLimiter primitive. This is a fairly simple class that allows one to acquire as much of a given resources as one wants but allows us to wait until the acquired resource drops below the specified level. This behavior represents our buffer usage pretty well. Blocking on acquire is not applicable since the memory is already in memory. However, after this acquire we'd like to wait for the memory to calm down before releasing again. This is effectively the same backpressure protocol as on main. This also allows for more flexible control over the buffer size (e.g. comm + disk buffer could have the same limit if we wanted to) and allows for much safer testing by now leaking global state
  • Last but not least, concurrency control is mostly implemented using Conditions instead of polling. That may be a subjectively a bit harder to read than the polling that's been there before but allows for significantly faster test runtimes.

I haven't touched serialization, worker dispatching, etc. This is all merely about control flow and responsibility. In follow up PRs I intend to modify actual behavior (e.g. fail on leaving workers)

cc @hendrikmakait @mrocklin

@github-actions
Copy link
Contributor

github-actions bot commented Nov 8, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   6h 30m 0s ⏱️ - 5m 44s
  3 213 tests +  44    3 128 ✔️ +  44    83 💤  - 1  2 +1 
23 758 runs  +310  22 856 ✔️ +314  900 💤  - 5  2 +1 

For more details on these failures, see this check.

Results for commit d5320b9. ± Comparison against base commit 69a5709.

♻️ This comment has been updated with latest results.

@mrocklin
Copy link
Member

mrocklin commented Nov 8, 2022

As a heads-up I may not have time to review this during the next couple of days (prepping presentations, travelling, delivering presentations). The opening comment seems not-scary to me though. I wouldn't block on my review if you're feeling confident.

@hendrikmakait hendrikmakait self-requested a review November 8, 2022 15:45
@fjetter fjetter self-assigned this Nov 8, 2022
Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I like this change, the new separation of concerns looks like a great improvement. I have a few questions about minor parts of this PR as well as a number of small nits and suggestions.

sizes: defaultdict[str, int]
_exception: None | Exception

_queues: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, we do not need this anymore.

_queues: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary()

@property
def _instances(self) -> set:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we implement this or mark as # TODO?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code. Should also be removed

return {
"memory": self.bytes_memory,
"total": self.bytes_total,
"buckets": len(self.shards),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are self.shards shards or buckets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shards is a mapping from buckets to shards. buckets are basically the partition IDs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe rename self.shards to self.bucketed_shards or self.partitioned_shards to highlight that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is OK. We're not naming every single mapping as key_value. Context is typically sufficient to infer this. In this case I think the shards mapping is fine and a more verbose attribute name make the code harder to read

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Together with the new docstrings I agree this is now fine.

self.bytes_memory -= size

async def _process(self, id: str, shards: list[ShardType]) -> None:
raise NotImplementedError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: Do we (not) want to utilize abc.ABC to mark ShardsBuffer as an abstract base class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the benefit of this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking ShardsBuffer as an abc.ABC as well as decorating _process with @abc.abstractmethod feels mildly cleaner since ShardsBuffer should not be initialized and any concrete subclass needs to implement _process[1]. This would allow mypy to catch errors such as trying to instantiate ShardsBuffer or a subclass that did not implement _process. Similarly, Python would throw somewhat more informative errors at runtime.

I guess this is mainly a question about conventions: If a class is an abstract class, should it always be marked as such or do we not care about that. If it is the latter, why do we not care? (We don't see any value in this case is a perfectly valid answer).

Feel free to shut this discussion down, the case at hand revolves around an abstract class that will have two concrete subclasses for the foreseeable future and is in a private module, so I'm rather interested in whether we have some conventions/guidelines around marking ABCs, not splitting hairs about this specific case.

[1] read not so much, which leads me off-trail to asking whether read belongs here in the class hierarchy, but that's a fringe problem that requires more changes and I don't want to deal with this right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a first shot. These classes will still change over time significantly and I'm not worried about anybody inheriting from this class since it's private.
If this is something we want to do we can do this later. This PR is definitely too big to have this conversation. I also don't want to block because of this

async def shuffle_inputs_done(self, comm: object, shuffle_id: ShuffleId) -> None:
await shuffle.receive(data)

async def shuffle_inputs_done(self, shuffle_id: ShuffleId) -> None:
"""
Hander: Inform the extension that all input partitions have been handed off to extensions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Driveby:

Suggested change
Hander: Inform the extension that all input partitions have been handed off to extensions.
Handler: Inform the extension that all input partitions have been handed off to extensions.

(same in L327)


worker_for_mapping = {}

for part in range(npartitions):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for readability:

Suggested change
for part in range(npartitions):
for output_partition in range(npartitions):

(requires similar adjustments below)

total_bytes_recvd += metrics["disk"]["total"]
total_bytes_recvd_shuffle += s.total_recvd

assert total_bytes_recvd_shuffle == total_bytes_sent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert total_bytes_recvd_shuffle == total_bytes_sent
assert total_bytes_recvd_shuffle == total_bytes_sent == total_bytes_recvd

From what I understand, we probably want to test that as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are not identical. there appears to be a small drift between sizes when comparing pyarrow buffers and bytes directly. Therefore, what the comms measure is slightly different from what is actually received. This is very ugly but I'm not willing to fix this right now. Refactoring the serialization parts is out of scope

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's highlight this in the test then or drop total_bytes_recvd altogether. Right now it looks like we just forgot to do something useful with total_bytes_recvd.

Comment on lines 39 to 40
queue: asyncio.Queue
A queue holding tokens used to limit concurrency
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
queue: asyncio.Queue
A queue holding tokens used to limit concurrency

total_bytes_recvd += metrics["disk"]["total"]
total_bytes_recvd_shuffle += s.total_recvd

assert total_bytes_recvd_shuffle == total_bytes_sent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's highlight this in the test then or drop total_bytes_recvd altogether. Right now it looks like we just forgot to do something useful with total_bytes_recvd.

return {
"memory": self.bytes_memory,
"total": self.bytes_total,
"buckets": len(self.shards),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe rename self.shards to self.bucketed_shards or self.partitioned_shards to highlight that.

self.bytes_memory -= size

async def _process(self, id: str, shards: list[ShardType]) -> None:
raise NotImplementedError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking ShardsBuffer as an abc.ABC as well as decorating _process with @abc.abstractmethod feels mildly cleaner since ShardsBuffer should not be initialized and any concrete subclass needs to implement _process[1]. This would allow mypy to catch errors such as trying to instantiate ShardsBuffer or a subclass that did not implement _process. Similarly, Python would throw somewhat more informative errors at runtime.

I guess this is mainly a question about conventions: If a class is an abstract class, should it always be marked as such or do we not care about that. If it is the latter, why do we not care? (We don't see any value in this case is a perfectly valid answer).

Feel free to shut this discussion down, the case at hand revolves around an abstract class that will have two concrete subclasses for the foreseeable future and is in a private module, so I'm rather interested in whether we have some conventions/guidelines around marking ABCs, not splitting hairs about this specific case.

[1] read not so much, which leads me off-trail to asking whether read belongs here in the class hierarchy, but that's a fringe problem that requires more changes and I don't want to deal with this right now.

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the additional documentation, @fjetter. This looks good to me, my remaining questions/nits can be safely ignored.

return {
"memory": self.bytes_memory,
"total": self.bytes_total,
"buckets": len(self.shards),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Together with the new docstrings I agree this is now fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants