diff --git a/Cargo.lock b/Cargo.lock index d0087d98..7d199b90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -300,6 +300,8 @@ dependencies = [ "serde", "serde_json", "thiserror", + "tracing", + "tracing-subscriber", ] [[package]] @@ -341,8 +343,8 @@ name = "gasket" version = "0.1.0" dependencies = [ "crossbeam", - "log", "thiserror", + "tracing", ] [[package]] @@ -587,6 +589,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num_cpus" version = "1.14.0" @@ -609,6 +621,12 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "pallas" version = "0.15.0" @@ -661,10 +679,10 @@ version = "0.15.0" dependencies = [ "hex", "itertools", - "log", "pallas-codec", "pallas-multiplexer", "thiserror", + "tracing", ] [[package]] @@ -677,6 +695,7 @@ dependencies = [ "pallas-codec", "rand", "thiserror", + "tracing", ] [[package]] @@ -746,6 +765,12 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + [[package]] name = "pkg-config" version = "0.3.26" @@ -964,6 +989,15 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.1.0" @@ -1028,6 +1062,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +dependencies = [ + "once_cell", +] + [[package]] name = "time" version = "0.3.17" @@ -1092,6 +1135,64 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if 1.0.0", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + [[package]] name = "unicode-bidi" version = "0.3.8" @@ -1124,6 +1225,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index fc9162d3..31900cd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ authors = ["Santiago Carmuega "] [dependencies] # pallas = "0.14.0" pallas = { path = "../pallas/pallas" } -# pallas = { git = "https://github.com/txpipe/pallas.git" } hex = "0.4.3" net2 = "0.2.37" bech32 = "0.8.1" @@ -35,3 +34,5 @@ thiserror = "1.0.30" lazy_static = "1.4.0" rayon = "1.5.3" rocksdb = "0.19.0" +tracing = "0.1.37" +tracing-subscriber = "0.3.16" diff --git a/src/lib.rs b/src/lib.rs index bb57d1bf..f46fe91c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,3 +5,6 @@ pub mod model; pub mod prelude; pub mod storage; pub mod upstream; + +#[cfg(test)] +mod tests; diff --git a/src/prelude.rs b/src/prelude.rs index 065660d6..431267fa 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,5 +1,6 @@ pub use super::model::*; pub use super::storage::Cursor; +pub use super::upstream::prelude::*; use std::fmt::Display; diff --git a/src/tests/plexing.rs b/src/tests/plexing.rs new file mode 100644 index 00000000..f0f705e6 --- /dev/null +++ b/src/tests/plexing.rs @@ -0,0 +1,43 @@ +use pallas::network::miniprotocols::Point; +use tracing::info; + +use crate::upstream::plexer; +use crate::upstream::prelude::*; + +#[test] +fn connect_to_real_relay() { + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) + .finish(), + ) + .unwrap(); + + let known_point = Point::Specific( + 3866155, + hex::decode("9a5446c4178c708706f8218ee05cec7674396e5f044f911eacc1ad1147cc353e").unwrap(), + ); + + let mut input = MuxInputPort::default(); + let mut output = DemuxOutputPort::default(); + + let client_channel = protocol_channel(2, &mut input, &mut output); + + let worker = plexer::Worker::new( + "preview-node.world.dev.cardano.org:30002".into(), + 2, + input, + output, + ); + + let tether = gasket::runtime::spawn_stage(worker, gasket::runtime::Policy::default(), None); + + let mut client = OuroborosClient::new(client_channel); + let (point, _) = client.find_intersect(vec![known_point.clone()]).unwrap(); + + assert_eq!(point.unwrap(), known_point); + + info!("dismissing"); + tether.dismiss_stage().unwrap(); + tether.join_stage(); +} diff --git a/src/upstream/mod.rs b/src/upstream/mod.rs index 949ffff9..57babe95 100644 --- a/src/upstream/mod.rs +++ b/src/upstream/mod.rs @@ -1,2 +1,3 @@ pub mod chainsync; pub mod plexer; +pub mod prelude; diff --git a/src/upstream/plexer.rs b/src/upstream/plexer.rs index f2990a0a..eb66618e 100644 --- a/src/upstream/plexer.rs +++ b/src/upstream/plexer.rs @@ -5,11 +5,11 @@ use pallas::network::multiplexer::bearers::Bearer; use pallas::network::multiplexer::demux::{Demuxer, Egress}; use pallas::network::multiplexer::mux::{Ingress, Muxer}; use pallas::network::multiplexer::sync::SyncPlexer; +use tracing::{debug, error, info, warn}; -type InputPort = gasket::messaging::InputPort<(u16, multiplexer::Payload)>; -type OutputPort = gasket::messaging::OutputPort; +use super::prelude::*; -struct GasketEgress(OutputPort); +struct GasketEgress(DemuxOutputPort); impl Egress for GasketEgress { fn send( @@ -22,7 +22,7 @@ impl Egress for GasketEgress { } } -struct GasketIngress(InputPort); +struct GasketIngress(MuxInputPort); impl Ingress for GasketIngress { fn recv_timeout( @@ -39,44 +39,46 @@ impl Ingress for GasketIngress { } } -struct Session { - demuxer: Demuxer, - muxer: Muxer, -} - type IsBusy = bool; -impl Session { - fn demux_tick(&mut self) -> Result { - match self.demuxer.tick() { - Ok(x) => match x { - multiplexer::demux::TickOutcome::Busy => Ok(true), - multiplexer::demux::TickOutcome::Idle => Ok(false), - }, - Err(err) => match err { - multiplexer::demux::DemuxError::BearerError(err) => { - Err(gasket::error::Error::ShouldRestart(err.to_string())) - } - multiplexer::demux::DemuxError::EgressDisconnected(x, _) => Err( - gasket::error::Error::WorkPanic(format!("egress disconnected {}", x)), - ), - multiplexer::demux::DemuxError::EgressUnknown(x, _) => Err( - gasket::error::Error::WorkPanic(format!("unknown egress {}", x)), - ), - }, - } +fn handle_demux_outcome( + outcome: Result, +) -> Result { + match outcome { + Ok(x) => match x { + multiplexer::demux::TickOutcome::Busy => Ok(true), + multiplexer::demux::TickOutcome::Idle => Ok(false), + }, + Err(err) => match err { + multiplexer::demux::DemuxError::BearerError(err) => { + error!("{}", err.kind()); + Err(gasket::error::Error::ShouldRestart) + } + multiplexer::demux::DemuxError::EgressDisconnected(x, _) => { + error!(protocol = x, "egress disconnected"); + Err(gasket::error::Error::WorkPanic) + } + multiplexer::demux::DemuxError::EgressUnknown(x, _) => { + error!(protocol = x, "unknown egress"); + Err(gasket::error::Error::WorkPanic) + } + }, } +} - fn mux_tick(&mut self) -> Result { - match self.muxer.tick() { - multiplexer::mux::TickOutcome::Busy => Ok(true), - multiplexer::mux::TickOutcome::Idle => Ok(false), - multiplexer::mux::TickOutcome::BearerError(err) => { - Err(gasket::error::Error::ShouldRestart(err.to_string())) - } - multiplexer::mux::TickOutcome::IngressDisconnected => Err( - gasket::error::Error::WorkPanic("ingress disconnected".into()), - ), +fn handle_mux_outcome( + outcome: multiplexer::mux::TickOutcome, +) -> Result { + match outcome { + multiplexer::mux::TickOutcome::Busy => Ok(true), + multiplexer::mux::TickOutcome::Idle => Ok(false), + multiplexer::mux::TickOutcome::BearerError(err) => { + warn!(%err); + Err(gasket::error::Error::ShouldRestart) + } + multiplexer::mux::TickOutcome::IngressDisconnected => { + error!("ingress disconnected"); + Err(gasket::error::Error::WorkPanic) } } } @@ -84,64 +86,52 @@ impl Session { pub struct Worker { peer_address: String, network_magic: u64, - input: InputPort, - channel5_out: OutputPort, - session: Option, + input: MuxInputPort, + channel2_out: DemuxOutputPort, + demuxer: Option>, + muxer: Option>, } impl Worker { pub fn new( peer_address: String, network_magic: u64, - input: InputPort, - channel5_out: OutputPort, + input: MuxInputPort, + channel2_out: DemuxOutputPort, ) -> Self { Self { peer_address, network_magic, input, - channel5_out, - session: None, + channel2_out, + demuxer: None, + muxer: None, } } fn handshake(&self, bearer: Bearer) -> Result { - log::debug!("doing handshake"); + info!("excuting handshake"); let plexer = SyncPlexer::new(bearer, 0); let versions = handshake::n2n::VersionTable::v7_and_above(self.network_magic); let mut client = handshake::Client::new(plexer); let output = client.handshake(versions).or_panic()?; - log::info!("handshake output: {:?}", output); + debug!("handshake output: {:?}", output); let bearer = client.unwrap().unwrap(); match output { handshake::Confirmation::Accepted(version, _) => { - log::info!("connected to upstream peer using version {}", version); + info!(version, "connected to upstream peer"); Ok(bearer) } - _ => Err(gasket::error::Error::WorkPanic( - "couldn't agree on handshake version".into(), - )), + _ => { + error!("couldn't agree on handshake version"); + Err(gasket::error::Error::WorkPanic) + } } } - - fn connect(&self) -> Result { - log::debug!("connecting muxer"); - - let bearer = multiplexer::bearers::Bearer::connect_tcp(&self.peer_address).or_restart()?; - - let bearer = self.handshake(bearer)?; - - let mut demuxer = Demuxer::new(bearer.clone()); - demuxer.register(5, GasketEgress(self.channel5_out.clone())); - - let muxer = Muxer::new(bearer, GasketIngress(self.input.clone())); - - Ok(Session { demuxer, muxer }) - } } impl gasket::runtime::Worker for Worker { @@ -151,17 +141,48 @@ impl gasket::runtime::Worker for Worker { } fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { - let session = self.connect()?; - self.session = Some(session); + debug!("connecting muxer"); + + let bearer = multiplexer::bearers::Bearer::connect_tcp(&self.peer_address).or_restart()?; + + let bearer = self.handshake(bearer)?; + + let mut demuxer = Demuxer::new(bearer.clone()); + demuxer.register(2, GasketEgress(self.channel2_out.clone())); + self.demuxer = Some(demuxer); + + let muxer = Muxer::new(bearer, GasketIngress(self.input.clone())); + self.muxer = Some(muxer); Ok(()) } fn work(&mut self) -> gasket::runtime::WorkResult { - let session = self.session.as_mut().unwrap(); - - session.demux_tick()?; - session.mux_tick()?; + let muxer = self.muxer.as_mut().unwrap(); + let demuxer = self.demuxer.as_mut().unwrap(); + + let span = tracing::span::Span::current(); + + let mut mux_res = None; + let mut demux_res = None; + + rayon::scope(|s| { + s.spawn(|_| { + let _guard = span.enter(); + info!("mux ticking"); + let outcome = muxer.tick(); + mux_res = Some(handle_mux_outcome(outcome)); + }); + s.spawn(|_| { + let _guard = span.enter(); + info!("demux ticking"); + let outcome = demuxer.tick(); + demux_res = Some(handle_demux_outcome(outcome)); + }); + }); + + mux_res.unwrap()?; + demux_res.unwrap()?; Ok(gasket::runtime::WorkOutcome::Partial) } diff --git a/src/upstream/prelude.rs b/src/upstream/prelude.rs new file mode 100644 index 00000000..8c381534 --- /dev/null +++ b/src/upstream/prelude.rs @@ -0,0 +1,56 @@ +use pallas::network::multiplexer; +use tracing::error; + +// ports used by plexer +pub type MuxOutputPort = gasket::messaging::OutputPort<(u16, multiplexer::Payload)>; +pub type DemuxInputPort = gasket::messaging::InputPort; + +// ports used by mini-protocols +pub type MuxInputPort = gasket::messaging::InputPort<(u16, multiplexer::Payload)>; +pub type DemuxOutputPort = gasket::messaging::OutputPort; + +pub struct ProtocolChannel(u16, MuxOutputPort, DemuxInputPort); + +impl multiplexer::agents::Channel for ProtocolChannel { + fn enqueue_chunk( + &mut self, + payload: multiplexer::Payload, + ) -> Result<(), multiplexer::agents::ChannelError> { + match self + .1 + .send(gasket::messaging::Message::from((self.0, payload))) + { + Ok(_) => Ok(()), + Err(error) => { + error!(?error, "enqueue chunk failed"); + Err(multiplexer::agents::ChannelError::NotConnected(None)) + } + } + } + + fn dequeue_chunk(&mut self) -> Result { + match self.2.recv_or_idle() { + Ok(msg) => Ok(msg.payload), + Err(error) => { + error!(?error, "dequeue chunk failed"); + Err(multiplexer::agents::ChannelError::NotConnected(None)) + } + } + } +} + +pub type OuroborosClient = pallas::network::miniprotocols::chainsync::N2NClient; + +pub fn protocol_channel( + id: u16, + plexer_input: &mut MuxInputPort, + plexer_output: &mut DemuxOutputPort, +) -> ProtocolChannel { + let mut muxer = MuxOutputPort::default(); + let mut demuxed = DemuxInputPort::default(); + + gasket::messaging::connect_ports(&mut muxer, plexer_input, 1000); + gasket::messaging::connect_ports(plexer_output, &mut demuxed, 1000); + + ProtocolChannel(id, muxer, demuxed) +}