Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request paritytech#77 from subspace/extract-rpc-module
Browse files Browse the repository at this point in the history
Introduce RpcClient for abstracting all rpc related functions
  • Loading branch information
liuchengxu authored Oct 23, 2021
2 parents 38ae4c9 + c2c6955 commit dc117fd
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 154 deletions.
9 changes: 4 additions & 5 deletions crates/sc-consensus-subspace/src/verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,11 @@ pub(crate) fn verify_solution<B: BlockT + Sized>(
let subspace_solving = SubspaceCodec::new(&solution.public_key);

let mut piece = solution.encoding.clone();
if subspace_solving

// Ensure piece is decodable.
subspace_solving
.decode(solution.piece_index, &mut piece)
.is_err()
{
return Err(Error::InvalidEncoding(slot));
}
.map_err(|_| Error::InvalidEncoding(slot))?;

if !archiver::is_piece_valid(
&piece,
Expand Down
191 changes: 42 additions & 149 deletions crates/subspace-farmer/src/commands/farm.rs
Original file line number Diff line number Diff line change
@@ -1,127 +1,52 @@
use crate::commitments::Commitments;
use crate::common::{Salt, Tag};
use crate::common::Salt;
use crate::identity::Identity;
use crate::object_mappings::ObjectMappings;
use crate::plot::Plot;
use crate::rpc::{
EncodedBlockWithObjectMapping, FarmerMetadata, ProposedProofOfReplicationResponse, RpcClient,
SlotInfo, Solution,
};
use anyhow::{anyhow, Result};
use futures::future;
use futures::future::Either;
use jsonrpsee::types::traits::{Client, SubscriptionClient};
use jsonrpsee::types::v2::params::JsonRpcParams;
use jsonrpsee::types::Subscription;
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use log::{debug, error, info, trace};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Instant;
use subspace_archiving::archiver::{ArchivedSegment, BlockArchiver, ObjectArchiver};
use subspace_archiving::pre_genesis_data;
use subspace_core_primitives::objects::{
BlockObjectMapping, GlobalObject, PieceObject, PieceObjectMapping,
};
use subspace_core_primitives::objects::{GlobalObject, PieceObject, PieceObjectMapping};
use subspace_core_primitives::{crypto, Sha256Hash};
use subspace_solving::SubspaceCodec;

type SlotNumber = u64;

/// Metadata necessary for farmer operation
#[derive(Debug, Deserialize)]
struct FarmerMetadata {
/// Depth `K` after which a block enters the recorded history (a global constant, as opposed
/// to the client-dependent transaction confirmation depth `k`).
confirmation_depth_k: u32,
/// The size of data in one piece (in bytes).
record_size: u32,
/// Recorded history is encoded and plotted in segments of this size (in bytes).
recorded_history_segment_size: u32,
/// This constant defines the size (in bytes) of one pre-genesis object.
pre_genesis_object_size: u32,
/// This constant defines the number of a pre-genesis objects that will bootstrap the
/// history.
pre_genesis_object_count: u32,
/// This constant defines the seed used for deriving pre-genesis objects that will bootstrap
/// the history.
pre_genesis_object_seed: Vec<u8>,
}

/// Encoded block with mapping of objects that it contains
#[derive(Debug, Clone, Serialize, Deserialize)]
struct EncodedBlockWithObjectMapping {
/// Encoded block
block: Vec<u8>,
/// Mapping of objects inside of the block
object_mapping: BlockObjectMapping,
}

// There are more fields in this struct, but we only care about one
#[derive(Debug, Deserialize)]
struct NewHead {
number: String,
}

#[derive(Debug, Serialize)]
struct Solution {
public_key: [u8; 32],
piece_index: u64,
encoding: Vec<u8>,
signature: Vec<u8>,
tag: Tag,
}

/// Proposed proof of space consisting of solution and farmer's secret key for block signing
#[derive(Debug, Serialize)]
struct ProposedProofOfReplicationResponse {
/// Slot number
slot_number: SlotNumber,
/// Solution (if present) from farmer's plot corresponding to slot number above
solution: Option<Solution>,
// Secret key, used for signing blocks on the client node
secret_key: Vec<u8>,
}

/// Information about new slot that just arrived
#[derive(Debug, Deserialize)]
struct SlotInfo {
/// Slot number
slot_number: SlotNumber,
/// Slot challenge
challenge: [u8; 8],
/// Salt
salt: Salt,
/// Salt for the next eon
next_salt: Option<Salt>,
/// Acceptable solution range
solution_range: u64,
}

/// Start farming by using plot in specified path and connecting to WebSocket server at specified
/// address.
pub(crate) async fn farm(base_directory: PathBuf, ws_server: &str) -> Result<()> {
info!("Connecting to RPC server");
let client = Arc::new(WsClientBuilder::default().build(ws_server).await?);

let identity = Identity::open_or_create(&base_directory)?;

// TODO: This doesn't account for the fact that node can have a completely different history to
// what farmer expects
// TODO: This doesn't account for the fact that node can
// have a completely different history to what farmer expects
info!("Opening plot");
let plot = Plot::open_or_create(&base_directory.clone().into()).await?;

info!("Opening commitments");
let commitments = Commitments::new(base_directory.join("commitments").into()).await?;

info!("Opening object mapping");
let object_mappings = tokio::task::spawn_blocking({
let path = base_directory.join("object-mappings");

move || ObjectMappings::new(&path)
})
.await
.unwrap()?;
.await??;

info!("Connecting to RPC server: {}", ws_server);
let client = RpcClient::new(ws_server).await?;

let identity = Identity::open_or_create(&base_directory)?;

match future::select(
{
let client = Arc::clone(&client);
let client = client.clone();
let plot = plot.clone();
let commitments = commitments.clone();
let public_key = identity.public_key();
Expand Down Expand Up @@ -152,7 +77,7 @@ pub(crate) async fn farm(base_directory: PathBuf, ws_server: &str) -> Result<()>
// don't want eventually
/// Maintains plot in up to date state plotting new pieces as they are produced on the network.
async fn background_plotting<P: AsRef<[u8]>>(
client: Arc<WsClient>,
client: RpcClient,
plot: Plot,
commitments: Commitments,
object_mappings: ObjectMappings,
Expand All @@ -166,9 +91,7 @@ async fn background_plotting<P: AsRef<[u8]>>(
pre_genesis_object_size,
pre_genesis_object_count,
pre_genesis_object_seed,
} = client
.request("subspace_getFarmerMetadata", JsonRpcParams::NoParams)
.await?;
} = client.farmer_metadata().await?;

// TODO: This assumes fixed size segments, which might not be the case
let merkle_num_leaves = u64::from(recorded_history_segment_size / record_size * 2);
Expand All @@ -186,12 +109,7 @@ async fn background_plotting<P: AsRef<[u8]>>(
let last_archived_block_number = last_root_block.last_archived_block().number;
info!("Last archived block {}", last_archived_block_number);

let maybe_last_archived_block = client
.request(
"subspace_getBlockByNumber",
JsonRpcParams::Array(vec![serde_json::to_value(last_archived_block_number)?]),
)
.await?;
let maybe_last_archived_block = client.block_by_number(last_archived_block_number).await?;

match maybe_last_archived_block {
Some(EncodedBlockWithObjectMapping {
Expand Down Expand Up @@ -309,9 +227,9 @@ async fn background_plotting<P: AsRef<[u8]>>(
.map(|n| n + 1)
.unwrap_or_default();

// Erasure coding in archiver and piece encoding are a CPU-intensive operations
// Erasure coding in archiver and piece encoding are CPU-intensive operations.
tokio::task::spawn_blocking({
let client = Arc::clone(&client);
let client = client.clone();
let weak_plot = weak_plot.clone();

#[allow(clippy::mut_range_bound)]
Expand All @@ -330,17 +248,10 @@ async fn background_plotting<P: AsRef<[u8]>>(

let mut last_root_block = None;
for block_to_archive in blocks_to_archive_from..=blocks_to_archive_to {
let block_fut = client
.request::<'_, '_, '_, Option<EncodedBlockWithObjectMapping>>(
"subspace_getBlockByNumber",
JsonRpcParams::Array(vec![
serde_json::to_value(block_to_archive).unwrap()
]),
);
let EncodedBlockWithObjectMapping {
block,
object_mapping,
} = match runtime_handle.block_on(block_fut) {
} = match runtime_handle.block_on(client.block_by_number(block_to_archive)) {
Ok(Some(block)) => block,
Ok(None) => {
error!(
Expand Down Expand Up @@ -427,22 +338,16 @@ async fn background_plotting<P: AsRef<[u8]>>(
}
});

info!("Subscribing to new heads notifications");

let mut subscription: Subscription<NewHead> = client
.subscribe(
"chain_subscribeNewHead",
JsonRpcParams::NoParams,
"chain_unsubscribeNewHead",
)
.await?;
info!("Subscribing to new heads");
let mut new_head = client.subscribe_new_head().await?;

let block_to_archive = Arc::new(AtomicU32::default());

// Listen for new blocks produced on the network
while let Some(new_head) = subscription.next().await? {
while let Some(head) = new_head.next().await? {
// Numbers are in the format `0xabcd`, so strip `0x` prefix and interpret the rest as an
// integer in hex
let block_number = u32::from_str_radix(&new_head.number[2..], 16).unwrap();
let block_number = u32::from_str_radix(&head.number[2..], 16).unwrap();
debug!("Last block number: {:#?}", block_number);

if let Some(block) = block_number.checked_sub(confirmation_depth_k) {
Expand Down Expand Up @@ -479,25 +384,19 @@ fn create_global_object_mapping(
}

async fn subscribe_to_slot_info(
client: &WsClient,
client: &RpcClient,
plot: &Plot,
commitments: &Commitments,
identity: &Identity,
) -> Result<()> {
let farmer_public_key_hash = crypto::sha256_hash(&identity.public_key());

info!("Subscribing to slot info notifications");
let mut subscription: Subscription<SlotInfo> = client
.subscribe(
"subspace_subscribeSlotInfo",
JsonRpcParams::NoParams,
"subspace_unsubscribeSlotInfo",
)
.await?;
info!("Subscribing to slot info");
let mut new_slots = client.subscribe_slot_info().await?;

let mut salts = Salts::default();

while let Some(slot_info) = subscription.next().await? {
while let Some(slot_info) = new_slots.next().await? {
debug!("New slot: {:?}", slot_info);

update_commitments(plot, commitments, &mut salts, &slot_info);
Expand All @@ -511,14 +410,13 @@ async fn subscribe_to_slot_info(
{
Some((tag, piece_index)) => {
let encoding = plot.read(piece_index).await?;
let solution = Solution {
public_key: identity.public_key().to_bytes(),
let solution = Solution::new(
identity.public_key().to_bytes(),
piece_index,
encoding: encoding.to_vec(),
signature: identity.sign(&tag).to_bytes().to_vec(),
encoding.to_vec(),
identity.sign(&tag).to_bytes().to_vec(),
tag,
};

);
debug!("Solution found");
trace!("Solution found: {:?}", solution);

Expand All @@ -531,16 +429,11 @@ async fn subscribe_to_slot_info(
};

client
.request(
"subspace_proposeProofOfReplication",
JsonRpcParams::Array(vec![serde_json::to_value(
&ProposedProofOfReplicationResponse {
slot_number: slot_info.slot_number,
solution,
secret_key: identity.secret_key().to_bytes().into(),
},
)?]),
)
.propose_proof_of_replication(ProposedProofOfReplicationResponse {
slot_number: slot_info.slot_number,
solution,
secret_key: identity.secret_key().to_bytes().into(),
})
.await?;
}

Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod common;
mod identity;
mod object_mappings;
mod plot;
mod rpc;
mod utils;

use anyhow::Result;
Expand Down
Loading

0 comments on commit dc117fd

Please sign in to comment.