-
Notifications
You must be signed in to change notification settings - Fork 146
Use multiplexer in peer class #847
Use multiplexer in peer class #847
Conversation
da555b4
to
c6fb7d8
Compare
0f70fd6
to
495ef7a
Compare
@@ -300,25 +297,22 @@ def get_protocols(self) -> Tuple[ProtocolAPI, ...]: | |||
*self._protocols, | |||
token=token, | |||
), token=token) | |||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This try/except
doesn't appear to be necessary.
f9c9995
to
f34d621
Compare
Looks like you're still actively working on this, code keeps changing during review. Can you ping again when it's ready? |
p2p/peer.py
Outdated
except TypeError: | ||
self.logger.info('Unrecognized reason: %s', msg['reason']) | ||
async for cmd, msg in self.wait_iter(multiplexer.stream_protocol_messages(self.base_protocol)): # noqa: E501 | ||
if isinstance(cmd, Disconnect): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you like it better: leaving handle_p2p_msg
untouched and calling it from here. That gives the option of later doing something like #857 cleanly. (Plus it makes the diff smaller here)
p2p/peer.py
Outdated
self.handle_sub_proto_msg(cmd, msg) | ||
def handle_sub_proto_msg(self, cmd: CommandAPI, msg: Payload) -> None: | ||
""" | ||
Hook for peer classes to do something on a given message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why switch the previous mechanism of sub-classing? Just didn't want to have to call super()
?
I was going to suggest the same thing as with handle_p2p_msg()
, but the new construction makes that more awkward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your approach of just calling the handle method from the stream method makes sense. I'll update to use that approach.
try: | ||
self.process_msg(cmd, msg) | ||
except RemoteDisconnected as e: | ||
if self.uptime < BLACKLIST_SECONDS_QUICK_DISCONNECT: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we lost this check altogether. Just checking if that's intentional, or coming back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was intentional. I flip-flopped on my stance on the quick-disconnect ban and I no longer think it's right (at least at this stage). I'm not convinced it adds value at this stage and it could very easily have un-intended negative side effects like making it harder to do development against a local node that you start and stop a lot.
@carver sorry, yeah, I thought it was done and then realized I had some cleanup. I'll tidy it up and ping you again when it's ready. |
Heh, no worries. Sounds good! 👍 |
p2p/service.py
Outdated
except asyncio.CancelledError: | ||
pass | ||
await asyncio.wait_for( | ||
asyncio.gather(*tasks, loop=self.get_event_loop(), return_exceptions=True), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cburgdorf suggested this back when I originally made the change. I added this to see if it made the shutdown go any smoother but it doesn't appear to help so I'm going to revert it.
f34d621
to
ecd0483
Compare
@carver this is ready for you (for real this time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -151,7 +151,7 @@ def run_task(self, awaitable: Awaitable[Any]) -> None: | |||
except OperationCancelled: | |||
pass | |||
except Exception as e: | |||
self.logger.warning("Task %s finished unexpectedly: %s", awaitable, e) | |||
self.logger.warning("Task %s finished unexpectedly: %r", awaitable, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, that looks helpful.
@@ -22,13 +22,11 @@ | |||
remote.sub_proto.send_broadcast_data(b'broadcast-b') | |||
remote.sub_proto.send_get_sum(7, 8) | |||
remote.sub_proto.send_broadcast_data(b'broadcast-c') | |||
await asyncio.sleep(0.01) | |||
# yield to let remote and peer transmit. | |||
await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this something you tinkered with during implementation, or does it need to stay in? 100ms is on the long side for a test, just wondering if we can drop it back down to 0.01
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ended up needing to be longer but I just pushed a change to see if I can shorten it a little.
ecd0483
to
ae1ec67
Compare
ae1ec67
to
8f0aa98
Compare
Builds from #835
What was wrong?
The
p2p.multiplexer.Multiplexer
is not actively being used.How was it fixed?
Integrated it into the
BasePeer
class to handling the message stream.To-Do
Cute Animal Picture