diff --git a/crates/subspace-farmer/src/farming.rs b/crates/subspace-farmer/src/farming.rs index 9c7d6f528d03b..695528942f658 100644 --- a/crates/subspace-farmer/src/farming.rs +++ b/crates/subspace-farmer/src/farming.rs @@ -13,7 +13,7 @@ use tokio::task::JoinHandle; #[derive(Debug, Error)] pub enum FarmingError { #[error("jsonrpsee error: {0}")] - Rpc(jsonrpsee::types::Error), + RpcError(Box), #[error("Error joining task: {0}")] JoinTask(tokio::task::JoinError), #[error("Plot read error: {0}")] @@ -113,11 +113,12 @@ async fn subscribe_to_slot_info( let mut new_slots = client .subscribe_slot_info() .await - .map_err(FarmingError::Rpc)?; + .map_err(FarmingError::RpcError)?; let mut salts = Salts::default(); - while let Some(slot_info) = new_slots.next().await.map_err(FarmingError::Rpc)? { + // could also import futures mpsc, and call `try_next`, but without await. I think this is more robust + while let Some(slot_info) = new_slots.recv().await { debug!("New slot: {:?}", slot_info); update_commitments(plot, commitments, &mut salts, &slot_info); @@ -163,7 +164,7 @@ async fn subscribe_to_slot_info( secret_key: identity.secret_key().to_bytes().into(), }) .await - .map_err(FarmingError::Rpc)?; + .map_err(FarmingError::RpcError)?; } Ok(()) diff --git a/crates/subspace-farmer/src/plotting.rs b/crates/subspace-farmer/src/plotting.rs index 7dce2a1a6be03..5a8bd30741eee 100644 --- a/crates/subspace-farmer/src/plotting.rs +++ b/crates/subspace-farmer/src/plotting.rs @@ -23,7 +23,7 @@ pub enum PlottingError { #[error("Failed to get block {0} from the chain, probably need to erase existing plot")] GetBlockError(u32), #[error("jsonrpsee error: {0}")] - Rpc(jsonrpsee::types::Error), + RpcError(Box), #[error("Last block retrieval from plot, rocksdb error: {0}")] LastBlock(rocksdb::Error), #[error("Error joining task: {0}")] @@ -85,7 +85,6 @@ impl Drop for Plotting { // we don't have to do anything in here // these are for clarity and verbosity let _ = self.sender.take().unwrap().send(()); - info!("Plotting stopped!"); } } @@ -108,7 +107,10 @@ async fn background_plotting, T: RpcClient + Clone + Send + 'stat pre_genesis_object_size, pre_genesis_object_count, pre_genesis_object_seed, - } = client.farmer_metadata().await.map_err(PlottingError::Rpc)?; + } = client + .farmer_metadata() + .await + .map_err(PlottingError::RpcError)?; // TODO: This assumes fixed size segments, which might not be the case let merkle_num_leaves = u64::from(recorded_history_segment_size / record_size * 2); @@ -133,7 +135,7 @@ async fn background_plotting, T: RpcClient + Clone + Send + 'stat let maybe_last_archived_block = client .block_by_number(last_archived_block_number) .await - .map_err(PlottingError::Rpc)?; + .map_err(PlottingError::RpcError)?; match maybe_last_archived_block { Some(EncodedBlockWithObjectMapping { @@ -372,7 +374,7 @@ async fn background_plotting, T: RpcClient + Clone + Send + 'stat let mut new_head = client .subscribe_new_head() .await - .map_err(PlottingError::Rpc)?; + .map_err(PlottingError::RpcError)?; let block_to_archive = Arc::new(AtomicU32::default()); @@ -381,10 +383,10 @@ async fn background_plotting, T: RpcClient + Clone + Send + 'stat loop { tokio::select! { _ = &mut receiver => { - info!("plotting stopped!"); + info!("Plotting stopped!"); break; } - Ok(Some(head)) = new_head.next() => { + Some(head) = new_head.recv() => { let block_number = u32::from_str_radix(&head.number[2..], 16).unwrap(); debug!("Last block number: {:#?}", block_number); diff --git a/crates/subspace-farmer/src/rpc.rs b/crates/subspace-farmer/src/rpc.rs index 3865457b7582d..030350ebbef59 100644 --- a/crates/subspace-farmer/src/rpc.rs +++ b/crates/subspace-farmer/src/rpc.rs @@ -1,9 +1,9 @@ use async_trait::async_trait; -use jsonrpsee::types::{Error, Subscription}; use serde::Deserialize; use subspace_rpc_primitives::{ EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo, SolutionResponse, }; +use tokio::sync::mpsc::Receiver; // There are more fields in this struct, but we only care about one #[derive(Debug, Deserialize)] @@ -11,20 +11,27 @@ pub struct NewHead { pub number: String, } +/// To become error type agnostic +pub type Error = Box; + #[async_trait] pub trait RpcClient { /// Get farmer metadata. async fn farmer_metadata(&self) -> Result; + /// Get a block by number. async fn block_by_number( &self, block_number: u32, ) -> Result, Error>; - async fn subscribe_new_head(&self) -> Result, Error>; + /// Subscribe to chain head. + async fn subscribe_new_head(&self) -> Result, Error>; - async fn subscribe_slot_info(&self) -> Result, Error>; + /// Subscribe to slot. + async fn subscribe_slot_info(&self) -> Result, Error>; + /// Submit a slot solution. async fn submit_solution_response( &self, solution_response: SolutionResponse, diff --git a/crates/subspace-farmer/src/ws_rpc.rs b/crates/subspace-farmer/src/ws_rpc.rs index 0fe9842ec9f61..cfa1c6e375bcb 100644 --- a/crates/subspace-farmer/src/ws_rpc.rs +++ b/crates/subspace-farmer/src/ws_rpc.rs @@ -1,13 +1,14 @@ -use crate::rpc::{NewHead, RpcClient}; +use crate::rpc::{Error as RpcError, NewHead, RpcClient}; use async_trait::async_trait; use jsonrpsee::types::traits::{Client, SubscriptionClient}; use jsonrpsee::types::v2::params::JsonRpcParams; -use jsonrpsee::types::{Error, Subscription}; +use jsonrpsee::types::Error as JsonError; use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; use std::sync::Arc; use subspace_rpc_primitives::{ EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo, SolutionResponse, }; +use tokio::sync::mpsc; /// `WsClient` wrapper. #[derive(Clone, Debug)] @@ -17,7 +18,7 @@ pub struct WsRpc { impl WsRpc { /// Create a new instance of [`RpcClient`]. - pub async fn new(url: &str) -> Result { + pub async fn new(url: &str) -> Result { let client = Arc::new(WsClientBuilder::default().build(url).await?); Ok(Self { client }) } @@ -26,57 +27,82 @@ impl WsRpc { #[async_trait] impl RpcClient for WsRpc { /// Get farmer metadata. - async fn farmer_metadata(&self) -> Result { - self.client + async fn farmer_metadata(&self) -> Result { + Ok(self + .client .request("subspace_getFarmerMetadata", JsonRpcParams::NoParams) - .await + .await?) } /// Get a block by number. async fn block_by_number( &self, block_number: u32, - ) -> Result, Error> { - self.client + ) -> Result, RpcError> { + Ok(self + .client .request( "subspace_getBlockByNumber", JsonRpcParams::Array(vec![serde_json::to_value(block_number)?]), ) - .await + .await?) } /// Subscribe to chain head. - async fn subscribe_new_head(&self) -> Result, Error> { - self.client + async fn subscribe_new_head(&self) -> Result, RpcError> { + let mut subscription = self + .client .subscribe( "chain_subscribeNewHead", JsonRpcParams::NoParams, "chain_unsubscribeNewHead", ) - .await + .await?; + + let (sender, receiver) = mpsc::channel(1); + + tokio::spawn(async move { + while let Ok(Some(notification)) = subscription.next().await { + let _ = sender.send(notification).await; + } + }); + + Ok(receiver) } /// Subscribe to slot. - async fn subscribe_slot_info(&self) -> Result, Error> { - self.client + async fn subscribe_slot_info(&self) -> Result, RpcError> { + let mut subscription = self + .client .subscribe( "subspace_subscribeSlotInfo", JsonRpcParams::NoParams, "subspace_unsubscribeSlotInfo", ) - .await + .await?; + + let (sender, receiver) = mpsc::channel(1); + + tokio::spawn(async move { + while let Ok(Some(notification)) = subscription.next().await { + let _ = sender.send(notification).await; + } + }); + + Ok(receiver) } /// Submit a slot solution. async fn submit_solution_response( &self, solution_response: SolutionResponse, - ) -> Result<(), Error> { - self.client + ) -> Result<(), RpcError> { + Ok(self + .client .request( "subspace_submitSolutionResponse", JsonRpcParams::Array(vec![serde_json::to_value(&solution_response)?]), ) - .await + .await?) } }