Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Simplify ChainSync agent logic #48

Merged
merged 2 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions pallas-miniprotocols/examples/chainsync-blocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use pallas_primitives::alonzo::{crypto, Block, BlockWrapper};
use pallas_primitives::alonzo::{Block, BlockWrapper};
use pallas_primitives::Fragment;

use pallas_miniprotocols::chainsync::{BlockLike, Consumer, NoopObserver};
use pallas_miniprotocols::chainsync::{Consumer, NoopObserver};
use pallas_miniprotocols::handshake::n2c::{Client, VersionTable};
use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC};
use pallas_miniprotocols::{DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder};
Expand All @@ -25,14 +25,6 @@ impl DecodePayload for Content {
Ok(Content(block))
}
}

impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.0.header);
Ok(Point(self.0.header.header_body.slot, hash.to_vec()))
}
}

fn main() {
env_logger::init();

Expand Down
11 changes: 2 additions & 9 deletions pallas-miniprotocols/examples/chainsync-headers.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use minicbor::data::Tag;
use net2::TcpStreamExt;
use pallas_primitives::alonzo::{crypto, Header};
use pallas_primitives::alonzo::Header;
use pallas_primitives::Fragment;

use pallas_miniprotocols::Point;
use std::net::TcpStream;

use pallas_miniprotocols::chainsync::{BlockLike, Consumer, NoopObserver};
use pallas_miniprotocols::chainsync::{Consumer, NoopObserver};
use pallas_miniprotocols::handshake::n2n::{Client, VersionTable};
use pallas_miniprotocols::{
run_agent, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder, MAINNET_MAGIC,
Expand Down Expand Up @@ -38,13 +38,6 @@ impl DecodePayload for Content {
}
}

impl BlockLike for Content {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
let hash = crypto::hash_block_header(&self.1);
Ok(Point(self.1.header_body.slot, hash.to_vec()))
}
}

fn main() {
env_logger::init();

Expand Down
104 changes: 31 additions & 73 deletions pallas-miniprotocols/src/chainsync/clients.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,20 @@
use std::fmt::Debug;
use std::marker::PhantomData;

use log::{debug, log_enabled, trace};
use log::debug;

use crate::machines::{
Agent, DecodePayload, EncodePayload, MachineError, MachineOutput, Transition,
};
use crate::machines::{Agent, MachineError, MachineOutput, Transition};
use crate::{DecodePayload, EncodePayload};

use crate::common::Point;
use crate::payloads::{PayloadDecoder, PayloadEncoder};

use super::{Message, State, Tip};

/// A trait to deal with polymorphic payloads in the ChainSync protocol
/// (WrappedHeader vs BlockBody)
pub trait BlockLike: EncodePayload + DecodePayload + Debug {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>>;
}
use super::{BlockContent, HeaderContent, Message, SkippedContent, State, Tip};

/// An observer of chain-sync events sent by the state-machine
pub trait Observer<C>
where
C: Debug,
{
fn on_block(
&self,
cursor: &Option<Point>,
content: &C,
) -> Result<(), Box<dyn std::error::Error>> {
log::debug!(
"asked to save block content {:?} at cursor {:?}",
content,
cursor
);
pub trait Observer<C> {
fn on_roll_forward(&self, _content: C, tip: &Tip) -> Result<(), Box<dyn std::error::Error>> {
log::debug!("asked to roll forward, tip at {:?}", tip);

Ok(())
}

Expand All @@ -57,43 +40,42 @@ where
#[derive(Debug)]
pub struct NoopObserver {}

impl<C> Observer<C> for NoopObserver where C: Debug {}
impl<C> Observer<C> for NoopObserver {}

#[derive(Debug)]
pub struct Consumer<C, O>
where
O: Observer<C>,
C: Debug,
{
pub state: State,
pub known_points: Vec<Point>,
pub cursor: Option<Point>,
pub intersect: Option<Point>,
pub tip: Option<Tip>,

observer: O,

// as recommended here: https://doc.rust-lang.org/error-index.html#E0207
_phantom: Option<C>,
_phantom: PhantomData<C>,
}

impl<C, O> Consumer<C, O>
where
C: BlockLike + EncodePayload + DecodePayload + Debug,
O: Observer<C>,
C: DecodePayload + EncodePayload,
{
pub fn initial(known_points: Vec<Point>, observer: O) -> Self {
Self {
state: State::Idle,
cursor: None,
intersect: None,
tip: None,
known_points,
observer,

_phantom: None,
_phantom: PhantomData::default(),
}
}

fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
debug!("requesting find intersect");

let msg = Message::<C>::FindIntersect(self.known_points.clone());

tx.send_msg(&msg)?;
Expand All @@ -105,6 +87,8 @@ where
}

fn send_request_next(self, tx: &impl MachineOutput) -> Transition<Self> {
debug!("requesting next");

let msg = Message::<C>::RequestNext;

tx.send_msg(&msg)?;
Expand All @@ -122,7 +106,7 @@ where

Ok(Self {
tip: Some(tip),
cursor: Some(point),
intersect: Some(point),
state: State::Idle,
..self
})
Expand All @@ -133,7 +117,7 @@ where

Ok(Self {
tip: Some(tip),
cursor: None,
intersect: None,
state: State::Done,
..self
})
Expand All @@ -142,17 +126,9 @@ where
fn on_roll_forward(self, content: C, tip: Tip) -> Transition<Self> {
debug!("rolling forward");

let point = content.block_point()?;

if log_enabled!(log::Level::Trace) {
trace!("content: {:?}", content);
}

debug!("reporting block to observer");
self.observer.on_block(&self.cursor, &content)?;
self.observer.on_roll_forward(content, &tip)?;

Ok(Self {
cursor: Some(point),
tip: Some(tip),
state: State::Idle,
..self
Expand All @@ -167,7 +143,7 @@ where

Ok(Self {
tip: Some(tip),
cursor: Some(point),
intersect: Some(point),
state: State::Idle,
..self
})
Expand All @@ -176,7 +152,6 @@ where
fn on_await_reply(self) -> Transition<Self> {
debug!("reached tip, await reply");

debug!("reporting tip to observer");
self.observer.on_tip_reached()?;

Ok(Self {
Expand All @@ -188,7 +163,7 @@ where

impl<C, O> Agent for Consumer<C, O>
where
C: BlockLike + EncodePayload + DecodePayload + Debug + 'static,
C: EncodePayload + DecodePayload + Debug + 'static,
O: Observer<C>,
{
type Message = Message<C>;
Expand All @@ -209,7 +184,7 @@ where

fn send_next(self, tx: &impl MachineOutput) -> Transition<Self> {
match self.state {
State::Idle => match self.cursor {
State::Idle => match self.intersect {
Some(_) => self.send_request_next(tx),
None => self.send_find_intersect(tx),
},
Expand All @@ -226,8 +201,8 @@ where
self.on_roll_backward(point, tip)
}
(State::CanAwait, Message::AwaitReply) => self.on_await_reply(),
(State::MustReply, Message::RollForward(header, tip)) => {
self.on_roll_forward(header, tip)
(State::MustReply, Message::RollForward(content, tip)) => {
self.on_roll_forward(content, tip)
}
(State::MustReply, Message::RollBackward(point, tip)) => {
self.on_roll_backward(point, tip)
Expand Down Expand Up @@ -258,7 +233,7 @@ impl TipFinder {
}

fn send_find_intersect(self, tx: &impl MachineOutput) -> Transition<Self> {
let msg = Message::<NoopContent>::FindIntersect(vec![self.wellknown_point.clone()]);
let msg = Message::<SkippedContent>::FindIntersect(vec![self.wellknown_point.clone()]);

tx.send_msg(&msg)?;

Expand Down Expand Up @@ -289,29 +264,12 @@ impl TipFinder {
}
}

#[derive(Debug)]
pub struct NoopContent {}

impl EncodePayload for NoopContent {
fn encode_payload(&self, _e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
pub type HeaderConsumer<O> = Consumer<HeaderContent, O>;

impl DecodePayload for NoopContent {
fn decode_payload(_d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
todo!()
}
}

impl BlockLike for NoopContent {
fn block_point(&self) -> Result<Point, Box<dyn std::error::Error>> {
todo!()
}
}
pub type BlockConsumer<O> = Consumer<BlockContent, O>;

impl Agent for TipFinder {
type Message = Message<NoopContent>;
type Message = Message<SkippedContent>;

fn is_done(&self) -> bool {
self.state == State::Done
Expand Down
79 changes: 74 additions & 5 deletions pallas-miniprotocols/src/chainsync/codec.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::Point;
use crate::machines::{CodecError, DecodePayload, EncodePayload, PayloadDecoder, PayloadEncoder};

use super::{Message, Tip};
use super::{BlockContent, HeaderContent, Message, SkippedContent, Tip};

impl EncodePayload for Tip {
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -25,7 +25,7 @@ impl DecodePayload for Tip {

impl<C> EncodePayload for Message<C>
where
C: EncodePayload + DecodePayload,
C: EncodePayload,
{
fn encode_payload(&self, e: &mut PayloadEncoder) -> Result<(), Box<dyn std::error::Error>> {
match self {
Expand All @@ -37,9 +37,9 @@ where
e.array(1)?.u16(1)?;
Ok(())
}
Message::RollForward(header, tip) => {
Message::RollForward(content, tip) => {
e.array(3)?.u16(2)?;
header.encode_payload(e)?;
content.encode_payload(e)?;
tip.encode_payload(e)?;
Ok(())
}
Expand Down Expand Up @@ -78,7 +78,7 @@ where

impl<C> DecodePayload for Message<C>
where
C: EncodePayload + DecodePayload,
C: DecodePayload,
{
fn decode_payload(d: &mut PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
Expand Down Expand Up @@ -115,3 +115,72 @@ where
}
}
}

impl DecodePayload for HeaderContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.array()?;
let variant = d.u32()?; // WTF is this value?

match variant {
// byron
0 => {
d.array()?;

// can't find a reference anywhere about the structure of these values, but they
// seem to provide the Byron-specific variant of the header
let (a, b): (u8, u64) = d.decode()?;

d.tag()?;
let bytes = d.bytes()?;

Ok(HeaderContent::Byron(a, b, Vec::from(bytes)))
}
// shelley
_ => {
d.tag()?;
let bytes = d.bytes()?;
Ok(HeaderContent::Shelley(Vec::from(bytes)))
}
}
}
}

impl EncodePayload for HeaderContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}

impl DecodePayload for BlockContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.tag()?;
Ok(BlockContent(d.decode()?))
}
}

impl EncodePayload for BlockContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
todo!()
}
}
impl DecodePayload for SkippedContent {
fn decode_payload(d: &mut crate::PayloadDecoder) -> Result<Self, Box<dyn std::error::Error>> {
d.skip()?;
Ok(SkippedContent)
}
}

impl EncodePayload for SkippedContent {
fn encode_payload(
&self,
_e: &mut crate::PayloadEncoder,
) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
}
Loading