From 5a98e118bbf4f85d452beb1baae2e65f77cedb79 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Mon, 14 Feb 2022 18:16:22 -0300 Subject: [PATCH 1/2] chore: Simplify ChainSync library --- pallas-miniprotocols/src/chainsync/clients.rs | 104 ++++++------------ pallas-miniprotocols/src/chainsync/codec.rs | 79 ++++++++++++- .../src/chainsync/protocol.rs | 28 ++++- pallas-miniprotocols/src/payloads.rs | 24 ++-- 4 files changed, 143 insertions(+), 92 deletions(-) diff --git a/pallas-miniprotocols/src/chainsync/clients.rs b/pallas-miniprotocols/src/chainsync/clients.rs index a44526ca..ad6f1dfe 100644 --- a/pallas-miniprotocols/src/chainsync/clients.rs +++ b/pallas-miniprotocols/src/chainsync/clients.rs @@ -1,37 +1,20 @@ use std::fmt::Debug; +use std::marker::PhantomData; -use log::{debug, log_enabled, trace}; +use log::debug; -use crate::machines::{ - Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition, -}; +use crate::machines::{Agent, MachineError, MachineOutput, Transition}; +use crate::{DecodePayload, EncodePayload}; use crate::common::Point; -use crate::payloads::{PayloadDecoder, PayloadEncoder}; -use super::{Message, State, Tip}; - -/// A trait to deal with polymorphic payloads in the ChainSync protocol -/// (WrappedHeader vs BlockBody) -pub trait BlockLike: EncodePayload + DecodePayload + Debug { - fn block_point(&self) -> Result>; -} +use super::{BlockContent, HeaderContent, Message, SkippedContent, State, Tip}; /// An observer of chain-sync events sent by the state-machine -pub trait Observer -where - C: Debug, -{ - fn on_block( - &self, - cursor: &Option, - content: &C, - ) -> Result<(), Box> { - log::debug!( - "asked to save block content {:?} at cursor {:?}", - content, - cursor - ); +pub trait Observer { + fn on_roll_forward(&self, _content: C, tip: &Tip) -> Result<(), Box> { + log::debug!("asked to roll forward, tip at {:?}", tip); + Ok(()) } @@ -57,43 +40,42 @@ where #[derive(Debug)] pub struct NoopObserver {} -impl Observer for NoopObserver where C: Debug {} +impl Observer for NoopObserver {} #[derive(Debug)] pub struct Consumer where O: Observer, - C: Debug, { pub state: State, pub known_points: Vec, - pub cursor: Option, + pub intersect: Option, pub tip: Option, observer: O, - // as recommended here: https://doc.rust-lang.org/error-index.html#E0207 - _phantom: Option, + _phantom: PhantomData, } impl Consumer where - C: BlockLike + EncodePayload + DecodePayload + Debug, O: Observer, + C: DecodePayload + EncodePayload, { pub fn initial(known_points: Vec, observer: O) -> Self { Self { state: State::Idle, - cursor: None, + intersect: None, tip: None, known_points, observer, - - _phantom: None, + _phantom: PhantomData::default(), } } fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { + debug!("requesting find intersect"); + let msg = Message::::FindIntersect(self.known_points.clone()); tx.send_msg(&msg)?; @@ -105,6 +87,8 @@ where } fn send_request_next(self, tx: &impl MachineOutput) -> Transition { + debug!("requesting next"); + let msg = Message::::RequestNext; tx.send_msg(&msg)?; @@ -122,7 +106,7 @@ where Ok(Self { tip: Some(tip), - cursor: Some(point), + intersect: Some(point), state: State::Idle, ..self }) @@ -133,7 +117,7 @@ where Ok(Self { tip: Some(tip), - cursor: None, + intersect: None, state: State::Done, ..self }) @@ -142,17 +126,9 @@ where fn on_roll_forward(self, content: C, tip: Tip) -> Transition { debug!("rolling forward"); - let point = content.block_point()?; - - if log_enabled!(log::Level::Trace) { - trace!("content: {:?}", content); - } - - debug!("reporting block to observer"); - self.observer.on_block(&self.cursor, &content)?; + self.observer.on_roll_forward(content, &tip)?; Ok(Self { - cursor: Some(point), tip: Some(tip), state: State::Idle, ..self @@ -167,7 +143,7 @@ where Ok(Self { tip: Some(tip), - cursor: Some(point), + intersect: Some(point), state: State::Idle, ..self }) @@ -176,7 +152,6 @@ where fn on_await_reply(self) -> Transition { debug!("reached tip, await reply"); - debug!("reporting tip to observer"); self.observer.on_tip_reached()?; Ok(Self { @@ -188,7 +163,7 @@ where impl Agent for Consumer where - C: BlockLike + EncodePayload + DecodePayload + Debug + 'static, + C: EncodePayload + DecodePayload + Debug + 'static, O: Observer, { type Message = Message; @@ -209,7 +184,7 @@ where fn send_next(self, tx: &impl MachineOutput) -> Transition { match self.state { - State::Idle => match self.cursor { + State::Idle => match self.intersect { Some(_) => self.send_request_next(tx), None => self.send_find_intersect(tx), }, @@ -226,8 +201,8 @@ where self.on_roll_backward(point, tip) } (State::CanAwait, Message::AwaitReply) => self.on_await_reply(), - (State::MustReply, Message::RollForward(header, tip)) => { - self.on_roll_forward(header, tip) + (State::MustReply, Message::RollForward(content, tip)) => { + self.on_roll_forward(content, tip) } (State::MustReply, Message::RollBackward(point, tip)) => { self.on_roll_backward(point, tip) @@ -258,7 +233,7 @@ impl TipFinder { } fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition { - let msg = Message::::FindIntersect(vec![self.wellknown_point.clone()]); + let msg = Message::::FindIntersect(vec![self.wellknown_point.clone()]); tx.send_msg(&msg)?; @@ -289,29 +264,12 @@ impl TipFinder { } } -#[derive(Debug)] -pub struct NoopContent {} - -impl EncodePayload for NoopContent { - fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box> { - todo!() - } -} +pub type HeaderConsumer = Consumer; -impl DecodePayload for NoopContent { - fn decode_payload(_d: &mut PayloadDecoder) -> Result> { - todo!() - } -} - -impl BlockLike for NoopContent { - fn block_point(&self) -> Result> { - todo!() - } -} +pub type BlockConsumer = Consumer; impl Agent for TipFinder { - type Message = Message; + type Message = Message; fn is_done(&self) -> bool { self.state == State::Done diff --git a/pallas-miniprotocols/src/chainsync/codec.rs b/pallas-miniprotocols/src/chainsync/codec.rs index c61ec788..a62db811 100644 --- a/pallas-miniprotocols/src/chainsync/codec.rs +++ b/pallas-miniprotocols/src/chainsync/codec.rs @@ -1,7 +1,7 @@ use crate::common::Point; use crate::machines::{CodecError, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder}; -use super::{Message, Tip}; +use super::{BlockContent, HeaderContent, Message, SkippedContent, Tip}; impl EncodePayload for Tip { fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { @@ -25,7 +25,7 @@ impl DecodePayload for Tip { impl EncodePayload for Message where - C: EncodePayload + DecodePayload, + C: EncodePayload, { fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box> { match self { @@ -37,9 +37,9 @@ where e.array(1)?.u16(1)?; Ok(()) } - Message::RollForward(header, tip) => { + Message::RollForward(content, tip) => { e.array(3)?.u16(2)?; - header.encode_payload(e)?; + content.encode_payload(e)?; tip.encode_payload(e)?; Ok(()) } @@ -78,7 +78,7 @@ where impl DecodePayload for Message where - C: EncodePayload + DecodePayload, + C: DecodePayload, { fn decode_payload(d: &mut PayloadDecoder) -> Result> { d.array()?; @@ -115,3 +115,72 @@ where } } } + +impl DecodePayload for HeaderContent { + fn decode_payload(d: &mut crate::PayloadDecoder) -> Result> { + d.array()?; + let variant = d.u32()?; // WTF is this value? + + match variant { + // byron + 0 => { + d.array()?; + + // can't find a reference anywhere about the structure of these values, but they + // seem to provide the Byron-specific variant of the header + let (a, b): (u8, u64) = d.decode()?; + + d.tag()?; + let bytes = d.bytes()?; + + Ok(HeaderContent::Byron(a, b, Vec::from(bytes))) + } + // shelley + _ => { + d.tag()?; + let bytes = d.bytes()?; + Ok(HeaderContent::Shelley(Vec::from(bytes))) + } + } + } +} + +impl EncodePayload for HeaderContent { + fn encode_payload( + &self, + _e: &mut crate::PayloadEncoder, + ) -> Result<(), Box> { + todo!() + } +} + +impl DecodePayload for BlockContent { + fn decode_payload(d: &mut crate::PayloadDecoder) -> Result> { + d.tag()?; + Ok(BlockContent(d.decode()?)) + } +} + +impl EncodePayload for BlockContent { + fn encode_payload( + &self, + _e: &mut crate::PayloadEncoder, + ) -> Result<(), Box> { + todo!() + } +} +impl DecodePayload for SkippedContent { + fn decode_payload(d: &mut crate::PayloadDecoder) -> Result> { + d.skip()?; + Ok(SkippedContent) + } +} + +impl EncodePayload for SkippedContent { + fn encode_payload( + &self, + _e: &mut crate::PayloadEncoder, + ) -> Result<(), Box> { + Ok(()) + } +} diff --git a/pallas-miniprotocols/src/chainsync/protocol.rs b/pallas-miniprotocols/src/chainsync/protocol.rs index c3596969..2f54f717 100644 --- a/pallas-miniprotocols/src/chainsync/protocol.rs +++ b/pallas-miniprotocols/src/chainsync/protocol.rs @@ -1,7 +1,6 @@ -use std::fmt::Debug; +use std::{fmt::Debug, ops::Deref}; use crate::common::Point; -use crate::machines::{DecodePayload, EncodePayload}; #[derive(Debug)] pub struct Tip(pub Point, pub u64); @@ -17,10 +16,7 @@ pub enum State { /// A generic chain-sync message for either header or block content #[derive(Debug)] -pub enum Message -where - C: EncodePayload + DecodePayload + Sized, -{ +pub enum Message { RequestNext, AwaitReply, RollForward(C, Tip), @@ -30,3 +26,23 @@ where IntersectNotFound(Tip), Done, } + +#[derive(Debug)] +pub enum HeaderContent { + Byron(u8, u64, Vec), + Shelley(Vec), +} + +#[derive(Debug)] +pub struct BlockContent(pub Vec); + +impl Deref for BlockContent { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Debug)] +pub struct SkippedContent; diff --git a/pallas-miniprotocols/src/payloads.rs b/pallas-miniprotocols/src/payloads.rs index 20b1ccce..b8b6c432 100644 --- a/pallas-miniprotocols/src/payloads.rs +++ b/pallas-miniprotocols/src/payloads.rs @@ -142,14 +142,22 @@ impl<'a> PayloadDeconstructor<'a> { debug!("consumed {} from payload buffer", new_pos); Ok(t) } - Err(_err) => { - //TODO: we need to match EndOfInput kind of errors - - debug!("payload incomplete, fetching next segment"); - let payload = self.rx.recv()?; - self.remaining.extend(payload); - - self.consume_next_message::() + Err(err) => { + let downcast = err.downcast::(); + + match downcast { + Ok(err) => match err.deref() { + minicbor::decode::Error::EndOfInput => { + debug!("payload incomplete, fetching next segment"); + let payload = self.rx.recv()?; + self.remaining.extend(payload); + + self.consume_next_message::() + } + _ => Err(err), + }, + Err(err) => Err(err), + } } } } From 046e69766939904b8bc2139dd0df83a0045f5fa6 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Mon, 14 Feb 2022 18:23:22 -0300 Subject: [PATCH 2/2] fix: Upgrade examples to new chainsync approach --- pallas-miniprotocols/examples/chainsync-blocks.rs | 12 ++---------- pallas-miniprotocols/examples/chainsync-headers.rs | 11 ++--------- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/pallas-miniprotocols/examples/chainsync-blocks.rs b/pallas-miniprotocols/examples/chainsync-blocks.rs index 426a0bdd..9e344da5 100644 --- a/pallas-miniprotocols/examples/chainsync-blocks.rs +++ b/pallas-miniprotocols/examples/chainsync-blocks.rs @@ -1,7 +1,7 @@ -use pallas_primitives::alonzo::{crypto, Block, BlockWrapper}; +use pallas_primitives::alonzo::{Block, BlockWrapper}; use pallas_primitives::Fragment; -use pallas_miniprotocols::chainsync::{BlockLike, Consumer, NoopObserver}; +use pallas_miniprotocols::chainsync::{Consumer, NoopObserver}; use pallas_miniprotocols::handshake::n2c::{Client, VersionTable}; use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC}; use pallas_miniprotocols::{DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder}; @@ -25,14 +25,6 @@ impl DecodePayload for Content { Ok(Content(block)) } } - -impl BlockLike for Content { - fn block_point(&self) -> Result> { - let hash = crypto::hash_block_header(&self.0.header); - Ok(Point(self.0.header.header_body.slot, hash.to_vec())) - } -} - fn main() { env_logger::init(); diff --git a/pallas-miniprotocols/examples/chainsync-headers.rs b/pallas-miniprotocols/examples/chainsync-headers.rs index 1d688479..3e97346f 100644 --- a/pallas-miniprotocols/examples/chainsync-headers.rs +++ b/pallas-miniprotocols/examples/chainsync-headers.rs @@ -1,12 +1,12 @@ use minicbor::data::Tag; use net2::TcpStreamExt; -use pallas_primitives::alonzo::{crypto, Header}; +use pallas_primitives::alonzo::Header; use pallas_primitives::Fragment; use pallas_miniprotocols::Point; use std::net::TcpStream; -use pallas_miniprotocols::chainsync::{BlockLike, Consumer, NoopObserver}; +use pallas_miniprotocols::chainsync::{Consumer, NoopObserver}; use pallas_miniprotocols::handshake::n2n::{Client, VersionTable}; use pallas_miniprotocols::{ run_agent, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder, MAINNET_MAGIC, @@ -38,13 +38,6 @@ impl DecodePayload for Content { } } -impl BlockLike for Content { - fn block_point(&self) -> Result> { - let hash = crypto::hash_block_header(&self.1); - Ok(Point(self.1.header_body.slot, hash.to_vec())) - } -} - fn main() { env_logger::init();