Skip to content

Commit

Permalink
chore: Implement multiplexer stage (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Nov 30, 2022
1 parent 337ee92 commit c03cf33
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 72 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
minicbor = "0.14.1"
prometheus_exporter = { version = "0.8.4", default-features = false }
# gasket = { path = "../../construkts/gasket-rs" }
gasket = { git = "https://github.com/construkts/gasket-rs.git" }
gasket = { path = "../../construkts/gasket-rs" }
# gasket = { git = "https://github.com/construkts/gasket-rs.git" }
thiserror = "1.0.30"
lazy_static = "1.4.0"
rayon = "1.5.3"
Expand Down
117 changes: 48 additions & 69 deletions src/upstream/chainsync.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use gasket::error::AsWorkError;
use pallas::ledger::traverse::MultiEraHeader;
use pallas::network::miniprotocols::chainsync;
use pallas::network::miniprotocols::chainsync::{HeaderContent, N2NClient, NextResponse};
use pallas::network::miniprotocols::{chainsync, handshake};
use pallas::network::multiplexer::{self, StdChannel};
use pallas::network::multiplexer;

use crate::prelude::*;

Expand All @@ -15,83 +15,78 @@ fn to_traverse<'b>(header: &'b chainsync::HeaderContent) -> Result<MultiEraHeade
out.map_err(Error::parse)
}

type OuroborosClient = N2NClient<StdChannel>;
type MuxerPort = gasket::messaging::OutputPort<(u16, multiplexer::Payload)>;
type DemuxerPort = gasket::messaging::InputPort<multiplexer::Payload>;

type OutputPort = gasket::messaging::OutputPort<ChainSyncEvent>;
type DownstreamPort = gasket::messaging::OutputPort<ChainSyncEvent>;

pub struct GasketChannel(u16, MuxerPort, DemuxerPort);

impl multiplexer::agents::Channel for GasketChannel {
fn enqueue_chunk(
&mut self,
payload: multiplexer::Payload,
) -> Result<(), multiplexer::agents::ChannelError> {
match self
.1
.send(gasket::messaging::Message::from((self.0, payload)))
{
Ok(_) => Ok(()),
Err(err) => Err(multiplexer::agents::ChannelError::NotConnected(None)),
}
}

fn dequeue_chunk(&mut self) -> Result<multiplexer::Payload, multiplexer::agents::ChannelError> {
match self.2.recv() {
Ok(msg) => Ok(msg.payload),
Err(_) => Err(multiplexer::agents::ChannelError::NotConnected(None)),
}
}
}

type OuroborosClient = N2NClient<GasketChannel>;

pub struct Worker {
peer_address: String,
network_magic: u64,
chain_cursor: Cursor,
ouroboros_client: Option<OuroborosClient>,
output: OutputPort,
client: OuroborosClient,
downstream: DownstreamPort,
block_count: gasket::metrics::Counter,
chain_tip: gasket::metrics::Gauge,
}

impl Worker {
pub fn new(
peer_address: String,
network_magic: u64,
chain_cursor: Cursor,
output: OutputPort,
muxer: MuxerPort,
demuxer: DemuxerPort,
downstream: DownstreamPort,
) -> Self {
let channel = GasketChannel(5, muxer, demuxer);
let client = OuroborosClient::new(channel);

Self {
peer_address,
network_magic,
chain_cursor,
output,
ouroboros_client: None,
client,
downstream,
block_count: Default::default(),
chain_tip: Default::default(),
}
}

pub fn connect(&self) -> Result<OuroborosClient, Error> {
log::debug!("connecting muxer");

let bearer = multiplexer::bearers::Bearer::connect_unix(&self.peer_address)
.map_err(Error::client)?;
let mut plexer = multiplexer::StdPlexer::new(bearer);

let channel0 = plexer.use_channel(0);
let channel5 = plexer.use_channel(5);

plexer.muxer.spawn();
plexer.demuxer.spawn();

log::debug!("doing handshake");

let versions = handshake::n2n::VersionTable::v7_and_above(self.network_magic);
let mut client = handshake::Client::new(channel0);

let output = client.handshake(versions).map_err(Error::client)?;

log::info!("handshake output: {:?}", output);

match output {
handshake::Confirmation::Accepted(version, _) => {
log::info!("connected to upstream peer using version {}", version);
Ok(OuroborosClient::new(channel5))
}
_ => Err(Error::client("couldn't agree on handshake version")),
}
}

fn process_next(
&mut self,
next: NextResponse<HeaderContent>,
) -> Result<(), gasket::error::Error> {
match next {
chainsync::NextResponse::RollForward(h, t) => {
let h = to_traverse(&h).or_panic()?;
self.output
self.downstream
.send(ChainSyncEvent::RollForward(h.slot(), h.hash()).into())?;
self.chain_tip.set(t.1 as i64);
Ok(())
}
chainsync::NextResponse::RollBackward(p, t) => {
self.output.send(ChainSyncEvent::Rollback(p).into())?;
self.downstream.send(ChainSyncEvent::Rollback(p).into())?;
self.chain_tip.set(t.1 as i64);
Ok(())
}
Expand All @@ -104,27 +99,13 @@ impl Worker {

fn request_next(&mut self) -> Result<(), gasket::error::Error> {
log::info!("requesting next block");

let next = self
.ouroboros_client
.as_mut()
.unwrap()
.request_next()
.or_restart()?;

let next = self.client.request_next().or_restart()?;
self.process_next(next)
}

fn await_next(&mut self) -> Result<(), gasket::error::Error> {
log::info!("awaiting next block (blocking)");

let next = self
.ouroboros_client
.as_mut()
.unwrap()
.recv_while_must_reply()
.or_restart()?;

let next = self.client.recv_while_must_reply().or_restart()?;
self.process_next(next)
}
}
Expand All @@ -138,25 +119,23 @@ impl gasket::runtime::Worker for Worker {
}

fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
let mut client = self.connect().or_retry()?;

let point = self.chain_cursor.last_point().or_panic()?;

log::info!("intersecting chain at point: {:?}", point);

let (point, _) = client
let (point, _) = self
.client
.find_intersect(vec![point])
.map_err(Error::client)
.or_restart()?;

log::info!("chain-sync intersection is {:?}", point);

self.ouroboros_client = Some(client);
Ok(())
}

fn work(&mut self) -> gasket::runtime::WorkResult {
match self.ouroboros_client.as_ref().unwrap().has_agency() {
match self.client.has_agency() {
true => self.request_next()?,
false => self.await_next()?,
};
Expand Down
1 change: 1 addition & 0 deletions src/upstream/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod chainsync;
pub mod plexer;
168 changes: 168 additions & 0 deletions src/upstream/plexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use gasket::error::AsWorkError;
use pallas::network::miniprotocols::handshake;
use pallas::network::multiplexer;
use pallas::network::multiplexer::bearers::Bearer;
use pallas::network::multiplexer::demux::{Demuxer, Egress};
use pallas::network::multiplexer::mux::{Ingress, Muxer};
use pallas::network::multiplexer::sync::SyncPlexer;

type InputPort = gasket::messaging::InputPort<(u16, multiplexer::Payload)>;
type OutputPort = gasket::messaging::OutputPort<multiplexer::Payload>;

struct GasketEgress(OutputPort);

impl Egress for GasketEgress {
fn send(
&mut self,
payload: multiplexer::Payload,
) -> Result<(), multiplexer::demux::EgressError> {
self.0
.send(gasket::messaging::Message::from(payload))
.map_err(|_| multiplexer::demux::EgressError(vec![]))
}
}

struct GasketIngress(InputPort);

impl Ingress for GasketIngress {
fn recv_timeout(
&mut self,
duration: std::time::Duration,
) -> Result<multiplexer::Message, multiplexer::mux::IngressError> {
self.0
.recv_timeout(duration)
.map(|msg| msg.payload)
.map_err(|err| match err {
gasket::error::Error::RecvIdle => multiplexer::mux::IngressError::Empty,
_ => multiplexer::mux::IngressError::Disconnected,
})
}
}

struct Session {
demuxer: Demuxer<GasketEgress>,
muxer: Muxer<GasketIngress>,
}

type IsBusy = bool;

impl Session {
fn demux_tick(&mut self) -> Result<IsBusy, gasket::error::Error> {
match self.demuxer.tick() {
Ok(x) => match x {
multiplexer::demux::TickOutcome::Busy => Ok(true),
multiplexer::demux::TickOutcome::Idle => Ok(false),
},
Err(err) => match err {
multiplexer::demux::DemuxError::BearerError(err) => {
Err(gasket::error::Error::ShouldRestart(err.to_string()))
}
multiplexer::demux::DemuxError::EgressDisconnected(x, _) => Err(
gasket::error::Error::WorkPanic(format!("egress disconnected {}", x)),
),
multiplexer::demux::DemuxError::EgressUnknown(x, _) => Err(
gasket::error::Error::WorkPanic(format!("unknown egress {}", x)),
),
},
}
}

fn mux_tick(&mut self) -> Result<IsBusy, gasket::error::Error> {
match self.muxer.tick() {
multiplexer::mux::TickOutcome::Busy => Ok(true),
multiplexer::mux::TickOutcome::Idle => Ok(false),
multiplexer::mux::TickOutcome::BearerError(err) => {
Err(gasket::error::Error::ShouldRestart(err.to_string()))
}
multiplexer::mux::TickOutcome::IngressDisconnected => Err(
gasket::error::Error::WorkPanic("ingress disconnected".into()),
),
}
}
}

pub struct Worker {
peer_address: String,
network_magic: u64,
input: InputPort,
channel5_out: OutputPort,
session: Option<Session>,
}

impl Worker {
pub fn new(
peer_address: String,
network_magic: u64,
input: InputPort,
channel5_out: OutputPort,
) -> Self {
Self {
peer_address,
network_magic,
input,
channel5_out,
session: None,
}
}

fn handshake(&self, bearer: Bearer) -> Result<Bearer, gasket::error::Error> {
log::debug!("doing handshake");

let plexer = SyncPlexer::new(bearer, 0);
let versions = handshake::n2n::VersionTable::v7_and_above(self.network_magic);
let mut client = handshake::Client::new(plexer);

let output = client.handshake(versions).or_panic()?;
log::info!("handshake output: {:?}", output);

let bearer = client.unwrap().unwrap();

match output {
handshake::Confirmation::Accepted(version, _) => {
log::info!("connected to upstream peer using version {}", version);
Ok(bearer)
}
_ => Err(gasket::error::Error::WorkPanic(
"couldn't agree on handshake version".into(),
)),
}
}

fn connect(&self) -> Result<Session, gasket::error::Error> {
log::debug!("connecting muxer");

let bearer = multiplexer::bearers::Bearer::connect_tcp(&self.peer_address).or_restart()?;

let bearer = self.handshake(bearer)?;

let mut demuxer = Demuxer::new(bearer.clone());
demuxer.register(5, GasketEgress(self.channel5_out.clone()));

let muxer = Muxer::new(bearer, GasketIngress(self.input.clone()));

Ok(Session { demuxer, muxer })
}
}

impl gasket::runtime::Worker for Worker {
fn metrics(&self) -> gasket::metrics::Registry {
// TODO: define networking metrics (bytes in / out, etc)
gasket::metrics::Builder::new().build()
}

fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
let session = self.connect()?;
self.session = Some(session);

Ok(())
}

fn work(&mut self) -> gasket::runtime::WorkResult {
let session = self.session.as_mut().unwrap();

session.demux_tick()?;
session.mux_tick()?;

Ok(gasket::runtime::WorkOutcome::Partial)
}
}

0 comments on commit c03cf33

Please sign in to comment.