Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
subscription and error are abstracted from rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
ozgunozerk committed Nov 15, 2021
1 parent 268a996 commit 4223ac2
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 32 deletions.
9 changes: 5 additions & 4 deletions crates/subspace-farmer/src/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::task::JoinHandle;
#[derive(Debug, Error)]
pub enum FarmingError {
#[error("jsonrpsee error: {0}")]
Rpc(jsonrpsee::types::Error),
RpcError(Box<dyn std::error::Error + Send + Sync>),
#[error("Error joining task: {0}")]
JoinTask(tokio::task::JoinError),
#[error("Plot read error: {0}")]
Expand Down Expand Up @@ -113,11 +113,12 @@ async fn subscribe_to_slot_info<T: RpcClient>(
let mut new_slots = client
.subscribe_slot_info()
.await
.map_err(FarmingError::Rpc)?;
.map_err(FarmingError::RpcError)?;

let mut salts = Salts::default();

while let Some(slot_info) = new_slots.next().await.map_err(FarmingError::Rpc)? {
// could also import futures mpsc, and call `try_next`, but without await. I think this is more robust
while let Some(slot_info) = new_slots.recv().await {
debug!("New slot: {:?}", slot_info);

update_commitments(plot, commitments, &mut salts, &slot_info);
Expand Down Expand Up @@ -163,7 +164,7 @@ async fn subscribe_to_slot_info<T: RpcClient>(
secret_key: identity.secret_key().to_bytes().into(),
})
.await
.map_err(FarmingError::Rpc)?;
.map_err(FarmingError::RpcError)?;
}

Ok(())
Expand Down
16 changes: 9 additions & 7 deletions crates/subspace-farmer/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum PlottingError {
#[error("Failed to get block {0} from the chain, probably need to erase existing plot")]
GetBlockError(u32),
#[error("jsonrpsee error: {0}")]
Rpc(jsonrpsee::types::Error),
RpcError(Box<dyn std::error::Error + Send + Sync>),
#[error("Last block retrieval from plot, rocksdb error: {0}")]
LastBlock(rocksdb::Error),
#[error("Error joining task: {0}")]
Expand Down Expand Up @@ -85,7 +85,6 @@ impl Drop for Plotting {
// we don't have to do anything in here
// these are for clarity and verbosity
let _ = self.sender.take().unwrap().send(());
info!("Plotting stopped!");
}
}

Expand All @@ -108,7 +107,10 @@ async fn background_plotting<P: AsRef<[u8]>, T: RpcClient + Clone + Send + 'stat
pre_genesis_object_size,
pre_genesis_object_count,
pre_genesis_object_seed,
} = client.farmer_metadata().await.map_err(PlottingError::Rpc)?;
} = client
.farmer_metadata()
.await
.map_err(PlottingError::RpcError)?;

// TODO: This assumes fixed size segments, which might not be the case
let merkle_num_leaves = u64::from(recorded_history_segment_size / record_size * 2);
Expand All @@ -133,7 +135,7 @@ async fn background_plotting<P: AsRef<[u8]>, T: RpcClient + Clone + Send + 'stat
let maybe_last_archived_block = client
.block_by_number(last_archived_block_number)
.await
.map_err(PlottingError::Rpc)?;
.map_err(PlottingError::RpcError)?;

match maybe_last_archived_block {
Some(EncodedBlockWithObjectMapping {
Expand Down Expand Up @@ -372,7 +374,7 @@ async fn background_plotting<P: AsRef<[u8]>, T: RpcClient + Clone + Send + 'stat
let mut new_head = client
.subscribe_new_head()
.await
.map_err(PlottingError::Rpc)?;
.map_err(PlottingError::RpcError)?;

let block_to_archive = Arc::new(AtomicU32::default());

Expand All @@ -381,10 +383,10 @@ async fn background_plotting<P: AsRef<[u8]>, T: RpcClient + Clone + Send + 'stat
loop {
tokio::select! {
_ = &mut receiver => {
info!("plotting stopped!");
info!("Plotting stopped!");
break;
}
Ok(Some(head)) = new_head.next() => {
Some(head) = new_head.recv() => {
let block_number = u32::from_str_radix(&head.number[2..], 16).unwrap();
debug!("Last block number: {:#?}", block_number);

Expand Down
13 changes: 10 additions & 3 deletions crates/subspace-farmer/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,37 @@
use async_trait::async_trait;
use jsonrpsee::types::{Error, Subscription};
use serde::Deserialize;
use subspace_rpc_primitives::{
EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo, SolutionResponse,
};
use tokio::sync::mpsc::Receiver;

// There are more fields in this struct, but we only care about one
#[derive(Debug, Deserialize)]
pub struct NewHead {
pub number: String,
}

/// To become error type agnostic
pub type Error = Box<dyn std::error::Error + Send + Sync>;

#[async_trait]
pub trait RpcClient {
/// Get farmer metadata.
async fn farmer_metadata(&self) -> Result<FarmerMetadata, Error>;

/// Get a block by number.
async fn block_by_number(
&self,
block_number: u32,
) -> Result<Option<EncodedBlockWithObjectMapping>, Error>;

async fn subscribe_new_head(&self) -> Result<Subscription<NewHead>, Error>;
/// Subscribe to chain head.
async fn subscribe_new_head(&self) -> Result<Receiver<NewHead>, Error>;

async fn subscribe_slot_info(&self) -> Result<Subscription<SlotInfo>, Error>;
/// Subscribe to slot.
async fn subscribe_slot_info(&self) -> Result<Receiver<SlotInfo>, Error>;

/// Submit a slot solution.
async fn submit_solution_response(
&self,
solution_response: SolutionResponse,
Expand Down
62 changes: 44 additions & 18 deletions crates/subspace-farmer/src/ws_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::rpc::{NewHead, RpcClient};
use crate::rpc::{Error as RpcError, NewHead, RpcClient};
use async_trait::async_trait;
use jsonrpsee::types::traits::{Client, SubscriptionClient};
use jsonrpsee::types::v2::params::JsonRpcParams;
use jsonrpsee::types::{Error, Subscription};
use jsonrpsee::types::Error as JsonError;
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use std::sync::Arc;
use subspace_rpc_primitives::{
EncodedBlockWithObjectMapping, FarmerMetadata, SlotInfo, SolutionResponse,
};
use tokio::sync::mpsc;

/// `WsClient` wrapper.
#[derive(Clone, Debug)]
Expand All @@ -17,7 +18,7 @@ pub struct WsRpc {

impl WsRpc {
/// Create a new instance of [`RpcClient`].
pub async fn new(url: &str) -> Result<Self, Error> {
pub async fn new(url: &str) -> Result<Self, JsonError> {
let client = Arc::new(WsClientBuilder::default().build(url).await?);
Ok(Self { client })
}
Expand All @@ -26,57 +27,82 @@ impl WsRpc {
#[async_trait]
impl RpcClient for WsRpc {
/// Get farmer metadata.
async fn farmer_metadata(&self) -> Result<FarmerMetadata, Error> {
self.client
async fn farmer_metadata(&self) -> Result<FarmerMetadata, RpcError> {
Ok(self
.client
.request("subspace_getFarmerMetadata", JsonRpcParams::NoParams)
.await
.await?)
}

/// Get a block by number.
async fn block_by_number(
&self,
block_number: u32,
) -> Result<Option<EncodedBlockWithObjectMapping>, Error> {
self.client
) -> Result<Option<EncodedBlockWithObjectMapping>, RpcError> {
Ok(self
.client
.request(
"subspace_getBlockByNumber",
JsonRpcParams::Array(vec![serde_json::to_value(block_number)?]),
)
.await
.await?)
}

/// Subscribe to chain head.
async fn subscribe_new_head(&self) -> Result<Subscription<NewHead>, Error> {
self.client
async fn subscribe_new_head(&self) -> Result<mpsc::Receiver<NewHead>, RpcError> {
let mut subscription = self
.client
.subscribe(
"chain_subscribeNewHead",
JsonRpcParams::NoParams,
"chain_unsubscribeNewHead",
)
.await
.await?;

let (sender, receiver) = mpsc::channel(1);

tokio::spawn(async move {
while let Ok(Some(notification)) = subscription.next().await {
let _ = sender.send(notification).await;
}
});

Ok(receiver)
}

/// Subscribe to slot.
async fn subscribe_slot_info(&self) -> Result<Subscription<SlotInfo>, Error> {
self.client
async fn subscribe_slot_info(&self) -> Result<mpsc::Receiver<SlotInfo>, RpcError> {
let mut subscription = self
.client
.subscribe(
"subspace_subscribeSlotInfo",
JsonRpcParams::NoParams,
"subspace_unsubscribeSlotInfo",
)
.await
.await?;

let (sender, receiver) = mpsc::channel(1);

tokio::spawn(async move {
while let Ok(Some(notification)) = subscription.next().await {
let _ = sender.send(notification).await;
}
});

Ok(receiver)
}

/// Submit a slot solution.
async fn submit_solution_response(
&self,
solution_response: SolutionResponse,
) -> Result<(), Error> {
self.client
) -> Result<(), RpcError> {
Ok(self
.client
.request(
"subspace_submitSolutionResponse",
JsonRpcParams::Array(vec![serde_json::to_value(&solution_response)?]),
)
.await
.await?)
}
}

0 comments on commit 4223ac2

Please sign in to comment.