Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce polling-based multiplexer #241

Merged
merged 5 commits into from
Apr 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/block-decode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ publish = false
[dependencies]
pallas = { path = "../../pallas" }
net2 = "0.2.37"
env_logger = "0.10.0"
hex = "0.4.3"
2 changes: 0 additions & 2 deletions examples/block-download/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ publish = false
[dependencies]
pallas = { path = "../../pallas" }
net2 = "0.2.37"
env_logger = "0.10.0"
hex = "0.4.3"

3 changes: 0 additions & 3 deletions examples/n2c-miniprotocols/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,5 @@ publish = false
[dependencies]
pallas = { path = "../../pallas" }
net2 = "0.2.37"
env_logger = "0.10.0"
hex = "0.4.3"
log = "0.4.16"


2 changes: 0 additions & 2 deletions examples/n2n-miniprotocols/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,5 @@ publish = false
[dependencies]
pallas = { path = "../../pallas" }
net2 = "0.2.37"
env_logger = "0.10.0"
hex = "0.4.3"
log = "0.4.16"

3 changes: 3 additions & 0 deletions pallas-miniprotocols/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ hex = "0.4.3"
itertools = "0.10.3"
thiserror = "1.0.31"
tracing = "0.1.37"

[dev-dependencies]
tokio = { version = "1.27.0", features = ["macros", "rt"] }
52 changes: 30 additions & 22 deletions pallas-miniprotocols/src/blockfetch/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,32 +99,35 @@ where
}
}

pub fn send_message(&mut self, msg: &Message) -> Result<(), Error> {
pub async fn send_message(&mut self, msg: &Message) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;

Ok(())
}

pub fn recv_message(&mut self) -> Result<Message, Error> {
pub async fn recv_message(&mut self) -> Result<Message, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
self.assert_inbound_state(&msg)?;

Ok(msg)
}

pub fn send_request_range(&mut self, range: (Point, Point)) -> Result<(), Error> {
pub async fn send_request_range(&mut self, range: (Point, Point)) -> Result<(), Error> {
let msg = Message::RequestRange { range };
self.send_message(&msg)?;
self.send_message(&msg).await?;
self.0 = State::Busy;

Ok(())
}

pub fn recv_while_busy(&mut self) -> Result<HasBlocks, Error> {
match self.recv_message()? {
pub async fn recv_while_busy(&mut self) -> Result<HasBlocks, Error> {
match self.recv_message().await? {
Message::StartBatch => {
info!("batch start");
self.0 = State::Streaming;
Expand All @@ -139,14 +142,14 @@ where
}
}

pub fn request_range(&mut self, range: Range) -> Result<HasBlocks, Error> {
self.send_request_range(range)?;
pub async fn request_range(&mut self, range: Range) -> Result<HasBlocks, Error> {
self.send_request_range(range).await?;
debug!("range requested");
self.recv_while_busy()
self.recv_while_busy().await
}

pub fn recv_while_streaming(&mut self) -> Result<Option<Body>, Error> {
match self.recv_message()? {
pub async fn recv_while_streaming(&mut self) -> Result<Option<Body>, Error> {
match self.recv_message().await? {
Message::Block { body } => Ok(Some(body)),
Message::BatchDone => {
self.0 = State::Idle;
Expand All @@ -156,35 +159,40 @@ where
}
}

pub fn fetch_single(&mut self, point: Point) -> Result<Body, Error> {
self.request_range((point.clone(), point))?
pub async fn fetch_single(&mut self, point: Point) -> Result<Body, Error> {
self.request_range((point.clone(), point))
.await?
.ok_or(Error::NoBlocks)?;

let body = self.recv_while_streaming()?.ok_or(Error::InvalidInbound)?;
let body = self
.recv_while_streaming()
.await?
.ok_or(Error::InvalidInbound)?;

debug!("body received");

match self.recv_while_streaming()? {
match self.recv_while_streaming().await? {
Some(_) => Err(Error::InvalidInbound),
None => Ok(body),
}
}

pub fn fetch_range(&mut self, range: Range) -> Result<Vec<Body>, Error> {
self.request_range(range)?.ok_or(Error::NoBlocks)?;
pub async fn fetch_range(&mut self, range: Range) -> Result<Vec<Body>, Error> {
self.request_range(range).await?.ok_or(Error::NoBlocks)?;

let mut all = vec![];

while let Some(block) = self.recv_while_streaming()? {
while let Some(block) = self.recv_while_streaming().await? {
debug!("body received");
all.push(block);
}

Ok(all)
}

pub fn send_done(&mut self) -> Result<(), Error> {
pub async fn send_done(&mut self) -> Result<(), Error> {
let msg = Message::ClientDone;
self.send_message(&msg)?;
self.send_message(&msg).await?;
self.0 = State::Done;

Ok(())
Expand Down
57 changes: 30 additions & 27 deletions pallas-miniprotocols/src/chainsync/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,35 +108,38 @@ where
}
}

pub fn send_message(&mut self, msg: &Message<O>) -> Result<(), Error> {
pub async fn send_message(&mut self, msg: &Message<O>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;

self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;

Ok(())
}

pub fn recv_message(&mut self) -> Result<Message<O>, Error> {
pub async fn recv_message(&mut self) -> Result<Message<O>, Error> {
self.assert_agency_is_theirs()?;

let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;

self.assert_inbound_state(&msg)?;

Ok(msg)
}

pub fn send_find_intersect(&mut self, points: Vec<Point>) -> Result<(), Error> {
pub async fn send_find_intersect(&mut self, points: Vec<Point>) -> Result<(), Error> {
let msg = Message::FindIntersect(points);
self.send_message(&msg)?;
self.send_message(&msg).await?;
self.0 = State::Intersect;

Ok(())
}

pub fn recv_intersect_response(&mut self) -> Result<IntersectResponse, Error> {
match self.recv_message()? {
pub async fn recv_intersect_response(&mut self) -> Result<IntersectResponse, Error> {
match self.recv_message().await? {
Message::IntersectFound(point, tip) => {
self.0 = State::Idle;
Ok((Some(point), tip))
Expand All @@ -149,21 +152,21 @@ where
}
}

pub fn find_intersect(&mut self, points: Vec<Point>) -> Result<IntersectResponse, Error> {
self.send_find_intersect(points)?;
self.recv_intersect_response()
pub async fn find_intersect(&mut self, points: Vec<Point>) -> Result<IntersectResponse, Error> {
self.send_find_intersect(points).await?;
self.recv_intersect_response().await
}

pub fn send_request_next(&mut self) -> Result<(), Error> {
pub async fn send_request_next(&mut self) -> Result<(), Error> {
let msg = Message::RequestNext;
self.send_message(&msg)?;
self.send_message(&msg).await?;
self.0 = State::CanAwait;

Ok(())
}

pub fn recv_while_can_await(&mut self) -> Result<NextResponse<O>, Error> {
match self.recv_message()? {
pub async fn recv_while_can_await(&mut self) -> Result<NextResponse<O>, Error> {
match self.recv_message().await? {
Message::AwaitReply => {
self.0 = State::MustReply;
Ok(NextResponse::Await)
Expand All @@ -180,8 +183,8 @@ where
}
}

pub fn recv_while_must_reply(&mut self) -> Result<NextResponse<O>, Error> {
match self.recv_message()? {
pub async fn recv_while_must_reply(&mut self) -> Result<NextResponse<O>, Error> {
match self.recv_message().await? {
Message::RollForward(a, b) => {
self.0 = State::Idle;
Ok(NextResponse::RollForward(a, b))
Expand All @@ -194,35 +197,35 @@ where
}
}

pub fn request_next(&mut self) -> Result<NextResponse<O>, Error> {
pub async fn request_next(&mut self) -> Result<NextResponse<O>, Error> {
debug!("requesting next block");

self.send_request_next()?;
self.send_request_next().await?;

self.recv_while_can_await()
self.recv_while_can_await().await
}

pub fn intersect_origin(&mut self) -> Result<Point, Error> {
pub async fn intersect_origin(&mut self) -> Result<Point, Error> {
debug!("intersecting origin");

let (point, _) = self.find_intersect(vec![Point::Origin])?;
let (point, _) = self.find_intersect(vec![Point::Origin]).await?;

point.ok_or(Error::IntersectionNotFound)
}

pub fn intersect_tip(&mut self) -> Result<Point, Error> {
let (_, Tip(point, _)) = self.find_intersect(vec![Point::Origin])?;
pub async fn intersect_tip(&mut self) -> Result<Point, Error> {
let (_, Tip(point, _)) = self.find_intersect(vec![Point::Origin]).await?;

debug!(?point, "found tip value");

let (point, _) = self.find_intersect(vec![point])?;
let (point, _) = self.find_intersect(vec![point]).await?;

point.ok_or(Error::IntersectionNotFound)
}

pub fn send_done(&mut self) -> Result<(), Error> {
pub async fn send_done(&mut self) -> Result<(), Error> {
let msg = Message::Done;
self.send_message(&msg)?;
self.send_message(&msg).await?;
self.0 = State::Done;

Ok(())
Expand Down
32 changes: 21 additions & 11 deletions pallas-miniprotocols/src/handshake/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use pallas_codec::Fragment;
use pallas_multiplexer::agents::{Channel, ChannelBuffer};
use std::marker::PhantomData;
use tracing::debug;

use super::{Error, Message, RefuseReason, State, VersionNumber, VersionTable};

Expand Down Expand Up @@ -71,47 +72,56 @@ where
}
}

pub fn send_message(&mut self, msg: &Message<D>) -> Result<(), Error> {
pub async fn send_message(&mut self, msg: &Message<D>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;
self.1.send_msg_chunks(msg).map_err(Error::ChannelError)?;
self.1
.send_msg_chunks(msg)
.await
.map_err(Error::ChannelError)?;

Ok(())
}

pub fn recv_message(&mut self) -> Result<Message<D>, Error> {
pub async fn recv_message(&mut self) -> Result<Message<D>, Error> {
self.assert_agency_is_theirs()?;
let msg = self.1.recv_full_msg().map_err(Error::ChannelError)?;
let msg = self.1.recv_full_msg().await.map_err(Error::ChannelError)?;
self.assert_inbound_state(&msg)?;

Ok(msg)
}

pub fn send_propose(&mut self, versions: VersionTable<D>) -> Result<(), Error> {
pub async fn send_propose(&mut self, versions: VersionTable<D>) -> Result<(), Error> {
let msg = Message::Propose(versions);
self.send_message(&msg)?;
self.send_message(&msg).await?;
self.0 = State::Confirm;

debug!("version proposed");

Ok(())
}

pub fn recv_while_confirm(&mut self) -> Result<Confirmation<D>, Error> {
match self.recv_message()? {
pub async fn recv_while_confirm(&mut self) -> Result<Confirmation<D>, Error> {
match self.recv_message().await? {
Message::Accept(v, m) => {
self.0 = State::Done;
debug!("handshake accepted");

Ok(Confirmation::Accepted(v, m))
}
Message::Refuse(r) => {
self.0 = State::Done;
debug!("handshake refused");

Ok(Confirmation::Rejected(r))
}
_ => Err(Error::InvalidInbound),
}
}

pub fn handshake(&mut self, versions: VersionTable<D>) -> Result<Confirmation<D>, Error> {
self.send_propose(versions)?;
self.recv_while_confirm()
pub async fn handshake(&mut self, versions: VersionTable<D>) -> Result<Confirmation<D>, Error> {
self.send_propose(versions).await?;
self.recv_while_confirm().await
}

pub fn unwrap(self) -> H {
Expand Down
Loading