Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add chainsync integration test #6

Merged
merged 2 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use pallas::{crypto::hash::Hash, network::miniprotocols::Point};
pub type BlockSlot = u64;
pub type BlockHash = Hash<32>;

#[derive(Debug)]
pub enum ChainSyncEvent {
RollForward(BlockSlot, BlockHash),
Rollback(Point),
Expand Down
10 changes: 7 additions & 3 deletions src/storage/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ use pallas::network::miniprotocols::Point;

use crate::prelude::*;

pub struct Cursor {}
pub enum Cursor {
StaticCursor(Vec<Point>),
}

impl Cursor {
pub fn last_point(&mut self) -> Result<Point, Error> {
todo!();
pub fn intersections(&self) -> Result<Vec<Point>, Error> {
match self {
Cursor::StaticCursor(x) => Ok(x.clone()),
}
}
}
2 changes: 2 additions & 0 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod plexing;
mod syncing;
47 changes: 47 additions & 0 deletions src/tests/syncing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::ops::DerefMut;
use std::time::Duration;

use crate::prelude::Cursor;
use crate::upstream::chainsync;
use crate::upstream::plexer;
use crate::upstream::prelude::*;

#[test]
fn connect_to_real_relay() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE)
.finish(),
)
.unwrap();

let mut input = MuxInputPort::default();
let mut output = DemuxOutputPort::default();

let plexer_2_chainsync = protocol_channel(2, &mut input, &mut output);

let plexer = plexer::Worker::new(
"preview-node.world.dev.cardano.org:30002".into(),
2,
input,
output,
);

let mut downstream = chainsync::DownstreamPort::default();
let mut sink = gasket::messaging::SinkPort::default();
gasket::messaging::connect_ports(&mut downstream, sink.deref_mut(), 20);

let cursor = Cursor::StaticCursor(vec![]);
let chainsync = chainsync::Worker::new(cursor, plexer_2_chainsync, downstream);

let plexer =
gasket::runtime::spawn_stage(plexer, gasket::runtime::Policy::default(), Some("plexer"));

let chainsync = gasket::runtime::spawn_stage(
chainsync,
gasket::runtime::Policy::default(),
Some("chainsync"),
);

let results = sink.drain_at_least::<20>(Duration::from_secs(60)).unwrap();
}
59 changes: 12 additions & 47 deletions src/upstream/chainsync.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use gasket::error::AsWorkError;
use pallas::ledger::traverse::MultiEraHeader;
use pallas::network::miniprotocols::chainsync;
use pallas::network::miniprotocols::chainsync::{HeaderContent, N2NClient, NextResponse};
use pallas::network::multiplexer;
use pallas::network::miniprotocols::chainsync::{HeaderContent, NextResponse};
use tracing::info;

use crate::prelude::*;

Expand All @@ -15,36 +15,7 @@ fn to_traverse<'b>(header: &'b chainsync::HeaderContent) -> Result<MultiEraHeade
out.map_err(Error::parse)
}

type MuxerPort = gasket::messaging::OutputPort<(u16, multiplexer::Payload)>;
type DemuxerPort = gasket::messaging::InputPort<multiplexer::Payload>;

type DownstreamPort = gasket::messaging::OutputPort<ChainSyncEvent>;

pub struct GasketChannel(u16, MuxerPort, DemuxerPort);

impl multiplexer::agents::Channel for GasketChannel {
fn enqueue_chunk(
&mut self,
payload: multiplexer::Payload,
) -> Result<(), multiplexer::agents::ChannelError> {
match self
.1
.send(gasket::messaging::Message::from((self.0, payload)))
{
Ok(_) => Ok(()),
Err(err) => Err(multiplexer::agents::ChannelError::NotConnected(None)),
}
}

fn dequeue_chunk(&mut self) -> Result<multiplexer::Payload, multiplexer::agents::ChannelError> {
match self.2.recv() {
Ok(msg) => Ok(msg.payload),
Err(_) => Err(multiplexer::agents::ChannelError::NotConnected(None)),
}
}
}

type OuroborosClient = N2NClient<GasketChannel>;
pub type DownstreamPort = gasket::messaging::OutputPort<ChainSyncEvent>;

pub struct Worker {
chain_cursor: Cursor,
Expand All @@ -55,14 +26,8 @@ pub struct Worker {
}

impl Worker {
pub fn new(
chain_cursor: Cursor,
muxer: MuxerPort,
demuxer: DemuxerPort,
downstream: DownstreamPort,
) -> Self {
let channel = GasketChannel(5, muxer, demuxer);
let client = OuroborosClient::new(channel);
pub fn new(chain_cursor: Cursor, plexer: ProtocolChannel, downstream: DownstreamPort) -> Self {
let client = OuroborosClient::new(plexer);

Self {
chain_cursor,
Expand Down Expand Up @@ -91,20 +56,20 @@ impl Worker {
Ok(())
}
chainsync::NextResponse::Await => {
log::info!("chain-sync reached the tip of the chain");
info!("chain-sync reached the tip of the chain");
Ok(())
}
}
}

fn request_next(&mut self) -> Result<(), gasket::error::Error> {
log::info!("requesting next block");
info!("requesting next block");
let next = self.client.request_next().or_restart()?;
self.process_next(next)
}

fn await_next(&mut self) -> Result<(), gasket::error::Error> {
log::info!("awaiting next block (blocking)");
info!("awaiting next block (blocking)");
let next = self.client.recv_while_must_reply().or_restart()?;
self.process_next(next)
}
Expand All @@ -119,17 +84,17 @@ impl gasket::runtime::Worker for Worker {
}

fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
let point = self.chain_cursor.last_point().or_panic()?;
let intersects = self.chain_cursor.intersections().or_panic()?;

log::info!("intersecting chain at point: {:?}", point);
info!("intersecting chain at points: {:?}", intersects);

let (point, _) = self
.client
.find_intersect(vec![point])
.find_intersect(intersects)
.map_err(Error::client)
.or_restart()?;

log::info!("chain-sync intersection is {:?}", point);
info!(?point, "chain-sync intersected");

Ok(())
}
Expand Down