Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eth-wire): Implement p2p stream #114

Merged
merged 28 commits into from
Nov 3, 2022
Merged

feat(eth-wire): Implement p2p stream #114

merged 28 commits into from
Nov 3, 2022

Conversation

Rjected
Copy link
Member

@Rjected Rjected commented Oct 21, 2022

WIP meant to implement #64, specifically the p2p connection part

TODO:

  • complete p2p handshake
  • finish Stream and Sink implementations
    • transform message id based on shared capabilities
  • complete p2p state task
  • implement test on top of ECIESStream and another stream type
  • implement EthStream test on top of P2PStream
  • consider moving some types out of p2pstream.rs

Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's allow dead_code for now, otherwise, this is a bit hard to review in the browser.

@onbjerg onbjerg added C-enhancement New feature or request A-devp2p Related to the Ethereum P2P protocol labels Oct 21, 2022
Comment on lines 183 to 186
// snappy::encode(ping): [0x01, 0x00, 0x80]
// snappy::encode(pong): [0x01, 0x00, 0x80]
// snappy::encode(reason): [0x01, 0x00, reason as u8]
// the reason for doing this is so we don't need to run it through the encoder or decoder.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably implement it as a constant and add a test that checks they are correct

// complete overhaul of the p2p protocol?
// should we even accept protocol v4?

// how should we accept arbitrary subprotocol messages?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need arbitrary subprotocols and how would we use them?

Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice progress.

I think the main loop is halfway there, but since Ping/Pong are part of the message protocol, we can simplify how they are tracked

@@ -31,7 +34,8 @@ const MAX_P2P_MESSAGE_ID: u8 = P2PMessageID::Pong as u8;
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(15);
const PING_INTERVAL: Duration = Duration::from_secs(60);
const MAX_FAILED_PINGS: usize = 3;
const GRACE_PERIOD: Duration = Duration::from_secs(2);
const MAX_FAILED_PINGS: u8 = 3;

/// A P2PStream wraps over any `Stream` that yields bytes and makes it compatible with `p2p`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should mention what (part of) the protocol this represents.

///
/// This task must be created only after the [`Hello`] handshake has been completed, so this
/// method will return an error if the [`P2PStream`] is not yet authed.
pub fn ping_task(&mut self) -> Result<impl Future<Output = Result<(), P2PStreamError>> + '_, P2PStreamError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the actual session loop?

There seems to big a big emphasis on ping/pongs here, but I don't think there are that important? aren't they just regular requests that expect a pong response?

// when we are not waiting for a pong, we will only be selecting on the ping interval.
// otherwise, we will be retrying the ping until we either get a pong or we have reached
// the maximum number of retries.
let task = async move {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify this by moving everything in the poll function and looping over messages

fn poll() {
   // here we need to poll the POLL_INTERVAL or `pinger` type to decide if a pong did not arrive, or whether to send a new ping

  while let Some(msg) = this.inner.poll_next(cx) {
    // this is where we handle all messages including pings
  }

}

The pinger looks good, we can add a poll(cx) function that returns Poll<Result<Ping, Err>> where Ok(Ping) should trigger a new ping message and Err(Err) that a pong did not arrive as expected.

@Rjected Rjected force-pushed the dan/p2p-stream branch 3 times, most recently from aed5093 to 78e41ab Compare October 27, 2022 01:37
Copy link
Member

@gakonst gakonst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love the direction

Comment on lines 32 to 52
/// [`HANDSHAKE_TIMEOUT`] determines the amount of time to wait before determining that a `p2p`
/// handshake has timed out.
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(15);
const PING_INTERVAL: Duration = Duration::from_secs(60);
const GRACE_PERIOD: Duration = Duration::from_secs(2);
const MAX_FAILED_PINGS: u8 = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these protocol-specific? Or should they be made configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are not protocol specific, they could definitely be made configurable (with these values as defaults)

Comment on lines 47 to 153
/// Whether the `Hello` handshake has been completed
authed: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should refactor the stream to UnauthenticatedStream and P2PStream and remove the auth: bool variable. The UnauthenticatedStream would only contain the handshake method and it'd return a P2PStream. This lets us remove the if self.authed error code path from the start_send function below: https://github.com/foundry-rs/reth/pull/114/files#diff-669cc8ac18463cf275fc4d66d3d4dff60e2ae6d4ff6e634098db97fc83dfdb47R427-R429.

If you think you can do apply this pattern in this PR, and do a follow-up refactor on the EthStream that'd be great.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that makes sense, we can enforce that a stream is authenticated at the type level

Comment on lines +42 to +138
#[pin_project]
pub struct P2PStream<S> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to add examples on how to use these, and their various message types. For a follow-up.

Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great progress.

some comments re Stream+Sink and codec.

Comment on lines 66 to 60
/// An un-authenticated `P2PStream`. This is consumed and returns a [`P2PStream`] after the `Hello`
/// handshake is completed.
#[pin_project]
pub struct UnauthedP2PStream<S> {
#[pin]
stream: P2PStream<S>,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great.

This is essentially a future type that does the handshake.

{
/// Consumes the `UnauthedP2PStream` and returns a `P2PStream` after the `Hello` handshake is
/// completed.
pub async fn handshake(mut self, hello: HelloMessage) -> Result<P2PStream<S>, P2PStreamError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also needs to return the contents of the received hello message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking in #157

crates/net/eth-wire/src/p2pstream.rs Show resolved Hide resolved
Comment on lines 288 to 190

let send_res = Pin::new(&mut this.inner).send(ping_bytes.into()).poll_unpin(cx)?;
ready!(send_res)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is invalid usage of the Sink API here since this SinkExt::send creates a future that resolves when the item has been sent.

the underlying stream should be Framed, right? so we should use a codec for the stream I think.
But I think this could be tricky for the sending part since we need to handle the offset?

// we should loop here to ensure we don't return Poll::Pending if we have a message to
// return behind any pings we need to respond to
while let Poll::Ready(res) = this.inner.as_mut().poll_next(cx) {
let mut bytes = match res {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this guaranteed to be a framed message?

Comment on lines +323 to +226
let send_res = Pin::new(&mut this.inner).send(pong_bytes.into()).poll_unpin(cx)?;
ready!(send_res)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to use the actual sink api here which is always:

if poll_ready().is_ready() {
   start_send(msg)
} else {
  // need to buffer until sink ready.
}

see Sink trait docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking in #156

Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks very good now, some parts are not quite complete yet, but I'd suggest merging this now and do smaller iterations moving forward? Because I think the stream-type interfaces are somewhat stable now.
we only really need Stream+Sinkand handshake function I believe.

wdyt @Rjected

@Rjected
Copy link
Member Author

Rjected commented Nov 1, 2022

@mattsse sure, let me get tests passing with the handshake and we can merge. I'll make some issues for things left to do

 * TODO: make it compile
 * TODO: test ping/pong/disconnect state machine
 * TODO: send subprotocol messages to stream
 * TODO: encode non-hello p2p messages as snappy encoding without using
   an encoder
 * TODO: create test comparing encoder to hand-written snappy encoding
   for ping, pong, disconnect messages
 * restricts S to be Stream+Sink for P2PStream to implement Stream
 * start of a poll-based refactor
 * add tests
 * TODO: make stream/sink types compatible
 * TODO: handshake message ids
 * TODO: inner poll fn
 * TODO: pinger interval
 * TODO: ethstream test
 * TODO: passthrough test
 * it should be much easier to poll for pings and detect timeouts now
 * change item produced by stream so it's compatible with EthStream
   * add note on pros/cons
 * shorten message sends in stream
 * switch to snappy formatting for non-hello p2p messages
 * remove check for `Hello` messages outside of the handshake because
   `P2PStream`s should assume messages sent in the sink are subprotocol
   messages, not `p2p` messages.
 * disallow protocol versions other than v5
 * todo: test shared capabilities
 * todo: determine how / when / why to support multiple capabilities
 * removes obsolete authed and offset fields
 * should add protocols when necessary rather than name unsupported
   protocols
 * add test for p2pstream over a passthrough codec which tests that
   peers agree on a single shared capability
@Rjected
Copy link
Member Author

Rjected commented Nov 3, 2022

OK, I got tests to pass - we can probably merge this now, the TODOs / smaller suggestions can be converted to issues if that sounds good

Copy link
Member

@gakonst gakonst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really clean

use bytes::{Buf, Bytes, BytesMut};
use futures::{ready, FutureExt, Sink, SinkExt, StreamExt};
use pin_project::pin_project;
use reth_primitives::H512 as PeerId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add PeerId as a re export in primitives and note why it's that size? It seems common enough

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good idea

@gakonst gakonst merged commit 3c72a12 into main Nov 3, 2022
@gakonst gakonst deleted the dan/p2p-stream branch November 3, 2022 07:01
This was referenced Nov 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-devp2p Related to the Ethereum P2P protocol C-enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants