-
Notifications
You must be signed in to change notification settings - Fork 146
Implement multiplexer #835
Implement multiplexer #835
Conversation
1da082d
to
7943ae9
Compare
7943ae9
to
3e1c980
Compare
@carver maybe hold on the review until the dependent PR's are merged and I can rebase this. |
c0db057
to
38b6185
Compare
38b6185
to
0be597a
Compare
7aad0ea
to
f19f1cb
Compare
Ok, sounds good. I was 😨 looking at the +/- |
@carver I have one more piece I'll split out tomorrow but if you want to look at |
c7afb06
to
eea863d
Compare
Alright @carver . This is as small as it is getting without doing something like committing the |
eea863d
to
27e4920
Compare
27e4920
to
18e3b91
Compare
Okay, great. I am on it. Sorry that I didn't get it done today. I promise, first thing tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to try something different: I essentially only read the tests, and commented on the API along the way. I said some things that will sound dumb after I read the implementation, I'm sure. :)
But I thought it was a nice way to give a first-impression of the API, in the last moments of being ignorant of how it actually works under the hood... Okay, on to the implementation.
@pytest.mark.asyncio | ||
async def test_multiplexer_properties(): | ||
multiplexer, _ = MultiplexerPairFactory( | ||
protocol_types=(SecondProtocol, ThirdProtocol), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're dropping the second mux here, I was curious what it looks like to build a single Mux. Is this right, below? It could be nice to have this either in the test, or maybe in its own API, like Multiplexer.build()
.
def build(transport, subprotocol_types, base_protocol_type=P2PProtocol, snappy_support=True, cancel_token=None):
noop_transport = NoopTransport()
p2p_protocol = P2PProtocol(noop_transport, snappy_support=snappy_support)
subprotocol_types = (SecondProtocol, ThirdProtocol)
cmd_id_offsets = get_cmd_offsets(subprotocol_types)
sub_protocols = tuple(
protocol_class(noop_transport, offset, snappy_support=snappy_support)
for protocol_class, offset
in zip(subprotocol_types, cmd_id_offsets)
)
return Multiplexer(
transport=noop_transport,
base_protocol=p2p_protocol,
protocols=sub_protocols,
)
tests/p2p/test_multiplexer.py
Outdated
bob_stream = bob_multiplexer.stream_protocol_messages(P2PProtocol) | ||
|
||
alice_p2p_protocol = alice_multiplexer.get_protocol_by_name('p2p') | ||
bob_p2p_protocol = bob_multiplexer.get_protocol_by_name('p2p') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to favor by_name
over by_type
? The by_type
is nicer to constrain at type-checking time.
And if we do prefer by_type
, how bad would it be to drop the by_name
option altogether?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check the code that's using this in the bigger branch and see if I can remove the by_name
APIs cleanly.
bob_p2p_protocol = bob_multiplexer.get_protocol_by_name('p2p') | ||
|
||
alice_p2p_protocol.send_ping() | ||
cmd, _ = await asyncio.wait_for(bob_stream.asend(None), timeout=0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.asend(None)
feels awkward, like it requires you to know something about the internals of how the stream works. Maybe it makes sense to add an alias, so you can do something likebob_stream.get()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are just vanilla python async generators and I'm calling their internal APIs for getting the next value from them where normally you would use these as async for thing in bob_stream
.
I can look into another way to structure the tests but this was the most direct way I could think of to test this code. Any thoughts on how it might be improved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's fine. Since the stream is just a vanilla async generator, I don't have any better suggestions. 👍
bob_p2p_protocol = bob_multiplexer.get_protocol_by_name('p2p') | ||
bob_second_protocol = bob_multiplexer.get_protocol_by_name('second') | ||
|
||
alice_second_protocol.send_cmd(CommandA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This surprised me: you can send the uninitialized command? I would have expected send_cmd(CommandA())
Is it just because CommandA
happens to have no data? I assume most commands (besides something like Ping
) would include some kind of data. If it usually requires including data, then supporting the API of sending an class may not be worth being able to drop the parentheses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you scroll up you can see that send_cmd
is just a method I implemented on SecondProtocol
to simplify sending different command types and isn't part of the core Protocol
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right, I was thinking it would be something on every Protocol
👍
alice_p2p_protocol.send_ping() | ||
alice_second_protocol.send_cmd(CommandB) | ||
cmd, _ = await asyncio.wait_for(bob_p2p_stream.asend(None), timeout=0.1) | ||
assert isinstance(cmd, Ping) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool, this test succinctly identifies the point of the mux, and shows it working. 👍
newsfragments/835.feature.rst
Outdated
@@ -0,0 +1 @@ | |||
Add ``p2p.multiplexer.Multiplexer`` for multiplexing the individual protocol messages for a devp2p connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a little hint for the people who otherwise have to look up multiplexing. Something like:
Add ``p2p.multiplexer.Multiplexer`` for multiplexing the individual protocol messages for a devp2p connection. | |
Add ``p2p.multiplexer.Multiplexer`` for combining the commands from different devp2p sub-protocols into a single network write stream, and split the incoming network stream into individually retrievable sub-protocol commands. |
cmd_id_cache[cmd_id] = base_protocol | ||
else: | ||
for protocol in protocols: | ||
if cmd_id < protocol.cmd_id_offset + protocol.cmd_length: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something still feels off about the protocol knowing its own offset. Like we should have some JointProtocol
that knows the offsets. It could do something like:
class JointProtocol:
def get_protocol_command(self, joint_command_id):
offset = self._get_nearest_offset(joint_command_id)
protocol = self._get_protocol_at_offset(offset)
protocol_command_index = joint_command_id - offset
return protocol.cmd_by_id[protocol_command_index]
Then the protocol itself knows nothing about offsets from combining the protocols. Anyway, just a nice to have, and probably would trigger a big refactor. Also, I remember you saying something about this being especially tricky on the sending side. But it could be nice if you like it, and the change doesn't seem huge.
self._base_protocol = base_protocol | ||
|
||
# the sub-protocol instances | ||
self._protocols = protocols |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to keep the lookup by name, we should probably validate that they there are no duplicate names here.
p2p/multiplexer.py
Outdated
raise TypeError("Unknown protocol identifier: {protocol}") | ||
|
||
if not self.has_protocol(protocol_class): | ||
raise Exception(f"TODO: Unknown protocol '{protocol_class}'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO?
p2p/multiplexer.py
Outdated
return self._transport.is_closing | ||
|
||
def close(self) -> None: | ||
# TODO: maybe graceflly close streams? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move to an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The token is actually closing the streams gracefully so I just removed the comment.
Test failures are unrelated. |
Extracted from #684
depends on #836
depends on #838
should be merged after #834
What was wrong?
In order to facilitate speaking multiple devp2p protocols, we need mechanisms for dividing the messages up by their individual protocol streams.
How was it fixed?
This is part of #684 It brings in a single component, the
p2p.multiplexer.Multiplexer
. This class is intentionally not a service. This is because it must be used outside of the service context during the handshake process to facilitate individual protocol handshakes streaming only the messages pertinent to that protocol.This PR only introduces the class and the relevant tests for it in order to keep the PR adequately small.
You can see how this API is used in this commit: c6fb7d8
To-Do
Cute Animal Picture