From 8f75217197546d2c03ead25b5764a6ffa89bb356 Mon Sep 17 00:00:00 2001 From: card Date: Tue, 14 May 2024 11:53:19 -0400 Subject: [PATCH] initial hydra udp source commit --- src/sources/hydra.rs | 86 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 src/sources/hydra.rs diff --git a/src/sources/hydra.rs b/src/sources/hydra.rs new file mode 100644 index 00000000..6c7fe12a --- /dev/null +++ b/src/sources/hydra.rs @@ -0,0 +1,86 @@ +use std::path::PathBuf; + +use gasket::framework::*; +use serde::Deserialize; +use tracing::{debug, info}; + +use pallas::ledger::traverse::MultiEraBlock; +use pallas::network::miniprotocols::chainsync::{BlockContent, NextResponse}; +use pallas::network::miniprotocols::Point; + +use crate::framework::*; + +pub struct HydraSession { + // we can probably use the udp sink as a source here + udp_url: UdpSocket, +} + +pub struct Worker { + session: HydraSession, +} +pub struct Stage { + config: Config, + + chain: GenesisValues, + intersect: IntersectConfig, // only Origin supported for now, we can maybe do tip later +} + +#[async_trait::async_trait(?Send)] +impl gasket::framework::Worker for Worker { + async fn bootstrap(stage: &Stage) -> Result { + debug!("connecting"); + + let mut peer_session = UdpSocket::bind("127.0.0.1:5678"); + + if stage.breadcrumbs.is_empty() { + intersect_from_config(&mut peer_session, &stage.intersect).await?; + } else { + intersect_from_breadcrumbs(&mut peer_session, &stage.breadcrumbs).await?; + } + + let worker = Self { peer_session }; + + Ok(worker) + } + + async fn schedule( + &mut self, + _stage: &mut Stage, + ) -> Result>, WorkerError> { + let client = self.peer_session.chainsync(); + + let next = match client.has_agency() { + true => { + info!("requesting next block"); + client.request_next().await.or_restart()? + } + false => { + info!("awaiting next block (blocking)"); + client.recv_while_must_reply().await.or_restart()? + } + }; + + Ok(WorkSchedule::Unit(next)) + } + + async fn execute( + &mut self, + unit: &NextResponse, + stage: &mut Stage, + ) -> Result<(), WorkerError> { + self.process_next(stage, unit).await + } +} + +async fn intersect_from_config( + peer: &mut NodeClient, + intersect: &IntersectConfig, +) -> Result<(), WorkerError> { + match intersect { + IntersectConfig::Origin => { + peer.intersect_origin().await.or_retry()?; + } + } + + Ok(()) +} \ No newline at end of file