Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-process communication transports #887

Merged
merged 12 commits into from
Feb 27, 2017
Merged

In-process communication transports #887

merged 12 commits into from
Feb 27, 2017

Conversation

pitrou
Copy link
Member

@pitrou pitrou commented Feb 21, 2017

This adds a new communication transport named inproc that is backed by in-process queues. The aim is to later allow local cluster instances to use such communication channels instead of TCP, to reduce the I/O overhead. That will be a bit more involved than I initially assumed, though, due to the notion of host and port being ingrained in some places (especially the scheduler).

raise QueueEmpty


def _maybe_deserialize(msg):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather avoid such code in the communication layer. @mrocklin, do you think it's ok to put this in distributed.protocol?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps you want to suggest another approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this.

@gen.coroutine
def check_deserialize(addr):
# Create a valid Serialized object
# (if using serialize(), it will lack a compression header)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this by design? Do you think we can change distributed.protocol.loads to assume no compression if the compression header is absent?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no compression information is present in the header I would assume that the data is not compressed. Is this not normally default behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, loads() always calls decompress() which fails with a KeyError if compression is not found in the header.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoiding that behavior seems sensible to me

@pitrou pitrou added the enhancement Improve existing functionality or make things work better label Feb 23, 2017
Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to include an integration test that actually uses inproc for some trivial dask.array work. It would also be useful to make it easier for naive users to set up a local cluster using inproc. I think that some basic user-focused documentation would go a long way here.



def run_coro(func, *args, **kwargs):
return func(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from .compatibility import apply ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, why not indeed. I guess I never use apply :-)

other.frames == self.frames)

def __ne__(self, other):
return not (self == other)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was defining eq necessary? Should we also now define __hash__?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the unit tests. It has no use in the rest of the code base. __hash__ isn't necessary as we don't use those objects as keys in associative containers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't in this codebase, that's true, but including objects in containers is generally useful. I'm fairly confident that I've put serialized objects in containers before while testing. I'm somewhat confident that this will come up for someone in the future when building something atypical. If it is cheap to keep this door open then I would prefer to do so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. For Serialize objects we can easily piggyback on the original object's __hash__ method. For Serialized object it's quite dangerous to assume hashability while the original object may not be hashable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concerned about mutation? I'm generally willing to assume that the user won't change values after giving them to us. We don't really have a reasonable way of proceeding correctly if we don't make this assumption.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning was that if a Serialized object is somehow meant to represent the original object, then its hashability should also reflect the original object's hashability.

To be frank, I'm not sure what hashability really brings for Serialize and Serialized objects, but I'm happy to defer to your judgement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a specific example concern in this case, I've just been bitten by projects in the past where people define __eq__ for convenience and then suddenly I can't use those objects with other python containers. I now avoid __eq__ by a strong habit unless there is a good API reason to implement it (such as with dask.array). So this may just be an irrational avoidance on my part, but it's still something I'd prefer to avoid if it's not particularly costly to avoid having an __eq__ method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I think I'll remove Serialized.__eq__ (which doesn't seem to be needed actually) and also implement Serialize.__hash__.

def f():
server = Server({'ping': pingpong})
server.listen(8883)
with listen_on(Server, 'inproc://') as server:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inproc stuff still seems pretty experimental to me. I hesitate to integrate it into our core tests. It would be nice to make sure that we can extract this cheaply in the future if we decide we don't want to keep it around.

Copy link
Member Author

@pitrou pitrou Feb 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was meant to verify that the basic core infrastructure works with inproc. I think a grep -r inproc distributed would easily point the places where it's used :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I was misreading things because of how the diff was laid out. I thought that you were replacing existing tests with inproc tests. Looking at the flat file makes it more clear that this isn't occurring.

@pitrou
Copy link
Member Author

pitrou commented Feb 23, 2017

It would be useful to include an integration test that actually uses inproc for some trivial dask.array work.

As stated, it doesn't work yet. The followup PRs start doing the cleanup and refactor work needed to make it happen.

@mrocklin
Copy link
Member

As stated, it doesn't work yet. The followup PRs start doing the cleanup and refactor work needed to make it happen.

How would you like to go about this then? Refactor out the small pieces of those PRs into clean commits and merge them before this? Merge all of the PRs into a single PR? Merge this and then merge those later?

@pitrou
Copy link
Member Author

pitrou commented Feb 23, 2017

Merge this and then merge those later?

This would have my preference. Reordering is not really doable: the refactors are based on this PR.

@mrocklin
Copy link
Member

I would also still like to release before merging this if possible. I will try to get that done tonight.

@mrocklin
Copy link
Member

This has a merge conflict. I'm +1 otherwise.

@pitrou pitrou merged commit 8f620a1 into dask:master Feb 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Improve existing functionality or make things work better
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants