Skip to content

Commit

Permalink
chore: Simplify ChainSync agent logic (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Feb 14, 2022
1 parent c3662e1 commit 630d53d
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 111 deletions.
12 changes: 2 additions & 10 deletions pallas-miniprotocols/examples/chainsync-blocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use pallas_primitives::alonzo::{crypto, Block, BlockWrapper};
use pallas_primitives::alonzo::{Block, BlockWrapper};
use pallas_primitives::Fragment;

use pallas_miniprotocols::chainsync::{BlockLike, Consumer, NoopObserver};
use pallas_miniprotocols::chainsync::{Consumer, NoopObserver};
use pallas_miniprotocols::handshake::n2c::{Client, VersionTable};
use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC};
use pallas_miniprotocols::{DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder};
Expand All @@ -25,14 +25,6 @@ impl DecodePayload for Content {
Ok(Content(block))
}
}

impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.0.header);
Ok(Point(self.0.header.header_body.slot, hash.to_vec()))
}
}

fn main() {
env_logger::init();

Expand Down
11 changes: 2 additions & 9 deletions pallas-miniprotocols/examples/chainsync-headers.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use minicbor::data::Tag;
use net2::TcpStreamExt;
use pallas_primitives::alonzo::{crypto, Header};
use pallas_primitives::alonzo::Header;
use pallas_primitives::Fragment;

use pallas_miniprotocols::Point;
use std::net::TcpStream;

use pallas_miniprotocols::chainsync::{BlockLike, Consumer, NoopObserver};
use pallas_miniprotocols::chainsync::{Consumer, NoopObserver};
use pallas_miniprotocols::handshake::n2n::{Client, VersionTable};
use pallas_miniprotocols::{
run_agent, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder, MAINNET_MAGIC,
Expand Down Expand Up @@ -38,13 +38,6 @@ impl DecodePayload for Content {
}
}

impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.1);
Ok(Point(self.1.header_body.slot, hash.to_vec()))
}
}

fn main() {
env_logger::init();

Expand Down
104 changes: 31 additions & 73 deletions pallas-miniprotocols/src/chainsync/clients.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,20 @@
use std::fmt::Debug;
use std::marker::PhantomData;

use log::{debug, log_enabled, trace};
use log::debug;

use crate::machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition,
};
use crate::machines::{Agent, MachineError, MachineOutput, Transition};
use crate::{DecodePayload, EncodePayload};

use crate::common::Point;
use crate::payloads::{PayloadDecoder, PayloadEncoder};

use super::{Message, State, Tip};

/// A trait to deal with polymorphic payloads in the ChainSync protocol
/// (WrappedHeader vs BlockBody)
pub trait BlockLike: EncodePayload + DecodePayload + Debug {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>>;
}
use super::{BlockContent, HeaderContent, Message, SkippedContent, State, Tip};

/// An observer of chain-sync events sent by the state-machine
pub trait Observer<C>
where
C: Debug,
{
fn on_block(
&self,
cursor: &Option<Point>,
content: &C,
) -> Result<(), Box<dyn std::error::Error>> {
log::debug!(
"asked to save block content {:?} at cursor {:?}",
content,
cursor
);
pub trait Observer<C> {
fn on_roll_forward(&self, _content: C, tip: &Tip) -> Result<(), Box<dyn std::error::Error>> {
log::debug!("asked to roll forward, tip at {:?}", tip);

Ok(())
}

Expand All @@ -57,43 +40,42 @@ where
#[derive(Debug)]
pub struct NoopObserver {}

impl<C> Observer<C> for NoopObserver where C: Debug {}
impl<C> Observer<C> for NoopObserver {}

#[derive(Debug)]
pub struct Consumer<C, O>
where
O: Observer<C>,
C: Debug,
{
pub state: State,
pub known_points: Vec<Point>,
pub cursor: Option<Point>,
pub intersect: Option<Point>,
pub tip: Option<Tip>,

observer: O,

// as recommended here: https://doc.rust-lang.org/error-index.html#E0207
_phantom: Option<C>,
_phantom: PhantomData<C>,
}

impl<C, O> Consumer<C, O>
where
C: BlockLike + EncodePayload + DecodePayload + Debug,
O: Observer<C>,
C: DecodePayload + EncodePayload,
{
pub fn initial(known_points: Vec<Point>, observer: O) -> Self {
Self {
state: State::Idle,
cursor: None,
intersect: None,
tip: None,
known_points,
observer,

_phantom: None,
_phantom: PhantomData::default(),
}
}

fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
debug!("requesting find intersect");

let msg = Message::<C>::FindIntersect(self.known_points.clone());

tx.send_msg(&msg)?;
Expand All @@ -105,6 +87,8 @@ where
}

fn send_request_next(self, tx: &impl MachineOutput) -> Transition<Self> {
debug!("requesting next");

let msg = Message::<C>::RequestNext;

tx.send_msg(&msg)?;
Expand All @@ -122,7 +106,7 @@ where

Ok(Self {
tip: Some(tip),
cursor: Some(point),
intersect: Some(point),
state: State::Idle,
..self
})
Expand All @@ -133,7 +117,7 @@ where

Ok(Self {
tip: Some(tip),
cursor: None,
intersect: None,
state: State::Done,
..self
})
Expand All @@ -142,17 +126,9 @@ where
fn on_roll_forward(self, content: C, tip: Tip) -> Transition<Self> {
debug!("rolling forward");

let point = content.block_point()?;

if log_enabled!(log::Level::Trace) {
trace!("content: {:?}", content);
}

debug!("reporting block to observer");
self.observer.on_block(&self.cursor, &content)?;
self.observer.on_roll_forward(content, &tip)?;

Ok(Self {
cursor: Some(point),
tip: Some(tip),
state: State::Idle,
..self
Expand All @@ -167,7 +143,7 @@ where

Ok(Self {
tip: Some(tip),
cursor: Some(point),
intersect: Some(point),
state: State::Idle,
..self
})
Expand All @@ -176,7 +152,6 @@ where
fn on_await_reply(self) -> Transition<Self> {
debug!("reached tip, await reply");

debug!("reporting tip to observer");
self.observer.on_tip_reached()?;

Ok(Self {
Expand All @@ -188,7 +163,7 @@ where

impl<C, O> Agent for Consumer<C, O>
where
C: BlockLike + EncodePayload + DecodePayload + Debug + 'static,
C: EncodePayload + DecodePayload + Debug + 'static,
O: Observer<C>,
{
type Message = Message<C>;
Expand All @@ -209,7 +184,7 @@ where

fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.state {
State::Idle => match self.cursor {
State::Idle => match self.intersect {
Some(_) => self.send_request_next(tx),
None => self.send_find_intersect(tx),
},
Expand All @@ -226,8 +201,8 @@ where
self.on_roll_backward(point, tip)
}
(State::CanAwait, Message::AwaitReply) => self.on_await_reply(),
(State::MustReply, Message::RollForward(header, tip)) => {
self.on_roll_forward(header, tip)
(State::MustReply, Message::RollForward(content, tip)) => {
self.on_roll_forward(content, tip)
}
(State::MustReply, Message::RollBackward(point, tip)) => {
self.on_roll_backward(point, tip)
Expand Down Expand Up @@ -258,7 +233,7 @@ impl TipFinder {
}

fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::<NoopContent>::FindIntersect(vec![self.wellknown_point.clone()]);
let msg = Message::<SkippedContent>::FindIntersect(vec![self.wellknown_point.clone()]);

tx.send_msg(&msg)?;

Expand Down Expand Up @@ -289,29 +264,12 @@ impl TipFinder {
}
}

#[derive(Debug)]
pub struct NoopContent {}

impl EncodePayload for NoopContent {
fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
pub type HeaderConsumer<O> = Consumer<HeaderContent, O>;

impl DecodePayload for NoopContent {
fn decode_payload(_d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
todo!()
}
}

impl BlockLike for NoopContent {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
todo!()
}
}
pub type BlockConsumer<O> = Consumer<BlockContent, O>;

impl Agent for TipFinder {
type Message = Message<NoopContent>;
type Message = Message<SkippedContent>;

fn is_done(&self) -> bool {
self.state == State::Done
Expand Down
79 changes: 74 additions & 5 deletions pallas-miniprotocols/src/chainsync/codec.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::Point;
use crate::machines::{CodecError, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder};

use super::{Message, Tip};
use super::{BlockContent, HeaderContent, Message, SkippedContent, Tip};

impl EncodePayload for Tip {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -25,7 +25,7 @@ impl DecodePayload for Tip {

impl<C> EncodePayload for Message<C>
where
C: EncodePayload + DecodePayload,
C: EncodePayload,
{
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
match self {
Expand All @@ -37,9 +37,9 @@ where
e.array(1)?.u16(1)?;
Ok(())
}
Message::RollForward(header, tip) => {
Message::RollForward(content, tip) => {
e.array(3)?.u16(2)?;
header.encode_payload(e)?;
content.encode_payload(e)?;
tip.encode_payload(e)?;
Ok(())
}
Expand Down Expand Up @@ -78,7 +78,7 @@ where

impl<C> DecodePayload for Message<C>
where
C: EncodePayload + DecodePayload,
C: DecodePayload,
{
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
Expand Down Expand Up @@ -115,3 +115,72 @@ where
}
}
}

impl DecodePayload for HeaderContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let variant = d.u32()?; // WTF is this value?

match variant {
// byron
0 => {
d.array()?;

// can't find a reference anywhere about the structure of these values, but they
// seem to provide the Byron-specific variant of the header
let (a, b): (u8, u64) = d.decode()?;

d.tag()?;
let bytes = d.bytes()?;

Ok(HeaderContent::Byron(a, b, Vec::from(bytes)))
}
// shelley
_ => {
d.tag()?;
let bytes = d.bytes()?;
Ok(HeaderContent::Shelley(Vec::from(bytes)))
}
}
}
}

impl EncodePayload for HeaderContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}

impl DecodePayload for BlockContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.tag()?;
Ok(BlockContent(d.decode()?))
}
}

impl EncodePayload for BlockContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for SkippedContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.skip()?;
Ok(SkippedContent)
}
}

impl EncodePayload for SkippedContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
}
Loading

0 comments on commit 630d53d

Please sign in to comment.