Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sequencer): cleanup sync logic #122

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/prism/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ sp1-sdk = { workspace = true }
[dev-dependencies]
serial_test = "3.1.1"
criterion = "0.5.1"

[[test]]
name = "integration_tests"
path = "tests/integration_tests.rs"
harness = true
2 changes: 1 addition & 1 deletion crates/prism/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::{
consts::{DA_RETRY_COUNT, DA_RETRY_INTERVAL},
da::memory::InMemoryDataAvailabilityLayer,
};
use prism_errors::{DataAvailabilityError, GeneralError, PrismError};
use anyhow::{anyhow, Context, Result};
use clap::{Parser, Subcommand};
use config::{builder::DefaultState, ConfigBuilder, File};
use dirs::home_dir;
use dotenvy::dotenv;
use prism_errors::{DataAvailabilityError, GeneralError, PrismError};
use serde::{Deserialize, Serialize};
use std::{fs, path::Path, sync::Arc};

Expand Down
140 changes: 57 additions & 83 deletions crates/prism/src/da/celestia.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
use crate::{
cfg::CelestiaConfig,
consts::CHANNEL_BUFFER_SIZE,
da::{DataAvailabilityLayer, FinalizedEpoch},
};
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use celestia_rpc::{BlobClient, Client, HeaderClient};
use celestia_types::{blob::GasPrice, nmt::Namespace, Blob};
use prism_common::operation::Operation;
use prism_errors::{DataAvailabilityError, GeneralError};
use std::{self, sync::Arc};
use tokio::{
use std::{
self,
sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
atomic::{AtomicU64, Ordering},
Arc,
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
},
task::spawn,
};
use tokio::{sync::broadcast, task::spawn};

use bincode;

Expand All @@ -35,14 +34,12 @@ pub struct CelestiaConnection {
pub snark_namespace: Namespace,
pub operation_namespace: Namespace,

sync_target_tx: Arc<Sender<u64>>,
sync_target_rx: Arc<Mutex<Receiver<u64>>>,
height_update_tx: broadcast::Sender<u64>,
sync_target: Arc<AtomicU64>,
}

impl CelestiaConnection {
pub async fn new(config: &CelestiaConfig, auth_token: Option<&str>) -> Result<Self> {
let (tx, rx) = channel(CHANNEL_BUFFER_SIZE);

let client = Client::new(&config.connection_string, auth_token)
.await
.context("Failed to initialize websocket connection")
Expand All @@ -61,12 +58,14 @@ impl CelestiaConnection {
None => snark_namespace,
};

let (height_update_tx, _) = broadcast::channel(100);

Ok(CelestiaConnection {
client,
snark_namespace,
operation_namespace,
sync_target_tx: Arc::new(tx),
sync_target_rx: Arc::new(Mutex::new(rx)),
height_update_tx,
sync_target: Arc::new(AtomicU64::new(0)),
})
}
}
Expand All @@ -86,41 +85,38 @@ fn create_namespace(namespace_hex: &str) -> Result<Namespace> {
#[async_trait]
impl DataAvailabilityLayer for CelestiaConnection {
async fn get_latest_height(&self) -> Result<u64> {
match self.sync_target_rx.lock().await.recv().await {
Some(height) => Ok(height),
None => Err(anyhow!(DataAvailabilityError::ChannelReceiveError)),
}
Ok(self.sync_target.load(Ordering::Relaxed))
}

async fn initialize_sync_target(&self) -> Result<u64> {
HeaderClient::header_network_head(&self.client)
let height = HeaderClient::header_network_head(&self.client)
.await
.context("Failed to get network head from DA layer")
.map(|extended_header| extended_header.header.height.value())
.map(|extended_header| extended_header.header.height.value())?;

self.sync_target.store(height, Ordering::Relaxed);
Ok(height)
}

async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
async fn get_finalized_epoch(&self, height: u64) -> Result<Option<FinalizedEpoch>> {
trace!("searching for epoch on da layer at height {}", height);

match BlobClient::blob_get_all(&self.client, height, &[self.snark_namespace]).await {
Ok(blobs) => {
let mut epochs = Vec::new();
for blob in blobs.iter() {
match FinalizedEpoch::try_from(blob) {
Ok(epoch_json) => epochs.push(epoch_json),
Err(_) => {
GeneralError::ParsingError(format!(
"marshalling blob from height {} to epoch json: {:?}",
height, &blob
));
}
}
}
Ok(epochs)
}
Ok(blobs) => blobs
.into_iter()
.next()
.map(|blob| {
FinalizedEpoch::try_from(&blob).map_err(|_| {
anyhow!(GeneralError::ParsingError(format!(
"marshalling blob from height {} to epoch json: {:?}",
height, &blob
)))
})
})
.transpose(),
Err(err) => {
// todo: this is a hack to handle a retarded error from cel-node that will be fixed in v0.15.0
if err.to_string().contains("blob: not found") {
Ok(vec![])
Ok(None)
} else {
Err(anyhow!(DataAvailabilityError::DataRetrievalError(
height,
Expand All @@ -131,38 +127,22 @@ impl DataAvailabilityLayer for CelestiaConnection {
}
}

async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> Result<u64> {
if epochs.is_empty() {
bail!("no epochs provided for submission");
}
async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result<u64> {
debug!("posting {}th epoch to da layer", epoch.height);

debug!("posting {} epochs to da layer", epochs.len());
let data = bincode::serialize(&epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch {}: {}",
epoch.height, e
)))
})?;

let blobs: Result<Vec<Blob>, DataAvailabilityError> = epochs
.iter()
.map(|epoch| {
let data = bincode::serialize(epoch).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing epoch {}: {}",
epoch.height, e
)))
})?;
Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

let blobs = blobs?;

for (i, blob) in blobs.iter().enumerate() {
trace!("blob {}: {:?}", i, blob);
}
let blob = Blob::new(self.snark_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(e.to_string()))
})?;

self.client
.blob_submit(&blobs, GasPrice::from(-1.0))
.blob_submit(&[blob], GasPrice::from(-1.0))
.await
.map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string())))
}
Expand Down Expand Up @@ -230,35 +210,29 @@ impl DataAvailabilityLayer for CelestiaConnection {
.map_err(|e| anyhow!(DataAvailabilityError::SubmissionError(e.to_string())))
}

fn subscribe_to_heights(&self) -> broadcast::Receiver<u64> {
self.height_update_tx.subscribe()
}

async fn start(&self) -> Result<()> {
let mut header_sub = HeaderClient::header_subscribe(&self.client)
.await
.context("Failed to subscribe to headers from DA layer")
.map_err(|e| DataAvailabilityError::NetworkError(e.to_string()))?;
.context("Failed to subscribe to headers from DA layer")?;

let sync_target = self.sync_target.clone();
let height_update_tx = self.height_update_tx.clone();

let synctarget_buffer = self.sync_target_tx.clone();
spawn(async move {
while let Some(extended_header_result) = header_sub.next().await {
match extended_header_result {
Ok(extended_header) => {
let height = extended_header.header.height.value();
match synctarget_buffer.send(height).await {
Ok(_) => {
debug!("sent sync target update for height {}", height);
}
Err(_) => {
DataAvailabilityError::SyncTargetError(format!(
"sending sync target update message for height {}",
height
));
}
}
sync_target.store(height, Ordering::Relaxed);
let _ = height_update_tx.send(height);
debug!("updated sync target for height {}", height);
}
Err(e) => {
DataAvailabilityError::NetworkError(format!(
"retrieving header from da layer: {}",
e
));
error!("Error retrieving header from DA layer: {}", e);
}
}
}
Expand Down
25 changes: 14 additions & 11 deletions crates/prism/src/da/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::da::{DataAvailabilityLayer, FinalizedEpoch};
use anyhow::Result;
use async_trait::async_trait;
use prism_common::operation::Operation;
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};
use tokio::{
sync::{broadcast, RwLock},
time::{interval, Duration},
Expand All @@ -12,14 +12,14 @@ use tokio::{
pub struct Block {
pub height: u64,
pub operations: Vec<Operation>,
pub epochs: Vec<FinalizedEpoch>,
pub epoch: Option<FinalizedEpoch>,
}

#[derive(Clone)]
pub struct InMemoryDataAvailabilityLayer {
blocks: Arc<RwLock<Vec<Block>>>,
pending_operations: Arc<RwLock<Vec<Operation>>>,
pending_epochs: Arc<RwLock<Vec<FinalizedEpoch>>>,
pending_epochs: Arc<RwLock<VecDeque<FinalizedEpoch>>>,
latest_height: Arc<RwLock<u64>>,
height_update_tx: broadcast::Sender<u64>,
block_update_tx: broadcast::Sender<Block>,
Expand All @@ -34,7 +34,7 @@ impl InMemoryDataAvailabilityLayer {
Self {
blocks: Arc::new(RwLock::new(Vec::new())),
pending_operations: Arc::new(RwLock::new(Vec::new())),
pending_epochs: Arc::new(RwLock::new(Vec::new())),
pending_epochs: Arc::new(RwLock::new(VecDeque::new())),
latest_height: Arc::new(RwLock::new(0)),
height_update_tx: height_tx,
block_update_tx: block_tx,
Expand All @@ -58,13 +58,12 @@ impl InMemoryDataAvailabilityLayer {
let new_block = Block {
height: *latest_height,
operations: std::mem::take(&mut *pending_operations),
epochs: std::mem::take(&mut *pending_epochs),
epoch: pending_epochs.pop_front(),
};
debug!(
"new block produced at height {} with {} operations and {} snarks",
"new block produced at height {} with {} operations",
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
new_block.height,
new_block.operations.len(),
new_block.epochs.len()
);
blocks.push(new_block.clone());

Expand All @@ -81,6 +80,10 @@ impl InMemoryDataAvailabilityLayer {

#[async_trait]
impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer {
fn subscribe_to_heights(&self) -> broadcast::Receiver<u64> {
self.height_update_tx.subscribe()
}

async fn get_latest_height(&self) -> Result<u64> {
Ok(*self.latest_height.read().await)
}
Expand All @@ -89,18 +92,18 @@ impl DataAvailabilityLayer for InMemoryDataAvailabilityLayer {
self.get_latest_height().await
}

async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>> {
async fn get_finalized_epoch(&self, height: u64) -> Result<Option<FinalizedEpoch>> {
let blocks = self.blocks.read().await;
Ok(blocks
.iter()
.find(|block| block.height == height)
.map(|block| block.epochs.clone())
.map(|block| block.epoch.clone())
.unwrap_or_default())
}

async fn submit_snarks(&self, epochs: Vec<FinalizedEpoch>) -> Result<u64> {
async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result<u64> {
let mut pending_epochs = self.pending_epochs.write().await;
pending_epochs.extend(epochs);
pending_epochs.push_back(epoch);
self.get_latest_height().await
}

Expand Down
6 changes: 4 additions & 2 deletions crates/prism/src/da/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use prism_errors::GeneralError;
use serde::{Deserialize, Serialize};
use sp1_sdk::SP1ProofWithPublicValues;
use std::{self, str::FromStr};
use tokio::sync::broadcast;

pub mod celestia;
pub mod memory;
Expand Down Expand Up @@ -50,9 +51,10 @@ impl SignedContent for FinalizedEpoch {
pub trait DataAvailabilityLayer: Send + Sync {
async fn get_latest_height(&self) -> Result<u64>;
async fn initialize_sync_target(&self) -> Result<u64>;
async fn get_snarks(&self, height: u64) -> Result<Vec<FinalizedEpoch>>;
async fn submit_snarks(&self, epoch: Vec<FinalizedEpoch>) -> Result<u64>;
async fn get_finalized_epoch(&self, height: u64) -> Result<Option<FinalizedEpoch>>;
async fn submit_finalized_epoch(&self, epoch: FinalizedEpoch) -> Result<u64>;
async fn get_operations(&self, height: u64) -> Result<Vec<Operation>>;
async fn submit_operations(&self, operations: Vec<Operation>) -> Result<u64>;
async fn start(&self) -> Result<()>;
fn subscribe_to_heights(&self) -> broadcast::Receiver<u64>;
}
Loading
Loading