Skip to content

Commit

Permalink
Merge pull request #2758 from subspace/farming-cluster
Browse files Browse the repository at this point in the history
Farming cluster
  • Loading branch information
nazar-pc authored May 21, 2024
2 parents 51ceae7 + 116fa68 commit 873e5e8
Show file tree
Hide file tree
Showing 19 changed files with 5,462 additions and 22 deletions.
31 changes: 28 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions crates/subspace-farmer-components/src/segment_reconstruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{ArchivedHistorySegment, Piece, PieceIndex, RecordedHistorySegment};
use thiserror::Error;
use tokio::sync::Semaphore;
use tokio::task::JoinError;
use tracing::{debug, error, info, trace, warn};

// TODO: Probably should be made configurable
const PARALLELISM_LEVEL: usize = 20;

#[derive(Debug, Error)]
pub(crate) enum SegmentReconstructionError {
/// Not enough pieces to reconstruct a segment
Expand All @@ -21,6 +19,10 @@ pub(crate) enum SegmentReconstructionError {
/// Internal piece retrieval process failed
#[error("Pieces retrieval failed")]
PieceRetrievalFailed(#[from] ReconstructorError),

/// Join error
#[error("Join error: {0}")]
JoinError(#[from] JoinError),
}

pub(crate) async fn recover_missing_piece<PG: PieceGetter>(
Expand All @@ -32,7 +34,7 @@ pub(crate) async fn recover_missing_piece<PG: PieceGetter>(
let segment_index = missing_piece_index.segment_index();
let position = missing_piece_index.position();

let semaphore = &Semaphore::new(PARALLELISM_LEVEL);
let semaphore = &Semaphore::new(RecordedHistorySegment::NUM_RAW_RECORDS);
let acquired_pieces_counter = &AtomicUsize::default();
let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;

Expand Down Expand Up @@ -100,9 +102,13 @@ pub(crate) async fn recover_missing_piece<PG: PieceGetter>(
return Err(SegmentReconstructionError::NotEnoughPiecesAcquired);
}

let archiver = PiecesReconstructor::new(kzg).expect("Internal constructor call must succeed.");
let result = tokio::task::spawn_blocking(move || {
let reconstructor =
PiecesReconstructor::new(kzg).expect("Internal constructor call must succeed.");

let result = archiver.reconstruct_piece(&segment_pieces, position as usize)?;
reconstructor.reconstruct_piece(&segment_pieces, position as usize)
})
.await??;

info!(%missing_piece_index, "Recovering missing piece succeeded.");

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ include = [
[dependencies]
anyhow = "1.0.82"
async-lock = "3.3.0"
async-nats = "0.34.0"
async-nats = "0.35.0"
async-trait = "0.1.80"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
base58 = "0.2.0"
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod benchmark;
pub(crate) mod cluster;
pub(crate) mod farm;
mod info;
mod scrub;
Expand Down
181 changes: 181 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
mod cache;
mod controller;
mod farmer;
mod plotter;

use crate::commands::cluster::cache::{cache, CacheArgs};
use crate::commands::cluster::controller::{controller, ControllerArgs};
use crate::commands::cluster::farmer::{farmer, FarmerArgs};
use crate::commands::cluster::plotter::{plotter, PlotterArgs};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_nats::ServerAddr;
use backoff::ExponentialBackoff;
use clap::{Parser, Subcommand};
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use prometheus_client::registry::Registry;
use std::env::current_exe;
use std::mem;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::utils::AsyncJoinOnDrop;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_proof_of_space::Table;

/// Arguments for cluster
#[derive(Debug, Parser)]
pub(crate) struct ClusterArgs {
/// Shared arguments for all subcommands
#[clap(flatten)]
shared_args: SharedArgs,
/// Cluster subcommands
#[clap(flatten)]
subcommands: ClusterSubcommands,
}

/// Recursive cluster subcommands
#[derive(Debug, Parser)]
struct ClusterSubcommands {
/// Cluster subcommands
#[clap(subcommand)]
subcommand: ClusterSubcommand,
}

/// Shared arguments
#[derive(Debug, Parser)]
struct SharedArgs {
/// NATS server address, typically in `nats://server1:port1` format, can be specified multiple
/// times.
///
/// NOTE: NATS must be configured for message sizes of 2MiB or larger (1MiB is the default),
/// which can be done by starting NATS server with config file containing `max_payload = 2MB`.
#[arg(long, alias = "nats-server", required = true)]
nats_servers: Vec<ServerAddr>,
/// Size of connection pool of NATS clients.
///
/// Pool size can be increased in case of large number of farms or high plotting capacity of
/// this instance.
#[arg(long, default_value = "8")]
nats_pool_size: NonZeroUsize,
/// Defines endpoints for the prometheus metrics server. It doesn't start without at least
/// one specified endpoint. Format: 127.0.0.1:8080
#[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])]
prometheus_listen_on: Vec<SocketAddr>,
}

/// Cluster subcommands
#[derive(Debug, Subcommand)]
enum ClusterSubcommand {
/// Farming cluster controller
Controller(ControllerArgs),
/// Farming cluster farmer
Farmer(FarmerArgs),
/// Farming cluster plotter
Plotter(PlotterArgs),
/// Farming cluster cache
Cache(CacheArgs),
}

impl ClusterSubcommand {
fn extract_additional_components(&mut self) -> Vec<String> {
match self {
ClusterSubcommand::Controller(args) => mem::take(&mut args.additional_components),
ClusterSubcommand::Farmer(args) => mem::take(&mut args.additional_components),
ClusterSubcommand::Plotter(args) => mem::take(&mut args.additional_components),
ClusterSubcommand::Cache(args) => mem::take(&mut args.additional_components),
}
}
}

pub(crate) async fn cluster<PosTable>(cluster_args: ClusterArgs) -> anyhow::Result<()>
where
PosTable: Table,
{
let signal = shutdown_signal();

let ClusterArgs {
shared_args,
subcommands,
} = cluster_args;
let SharedArgs {
nats_servers,
nats_pool_size,
prometheus_listen_on,
} = shared_args;
let ClusterSubcommands { mut subcommand } = subcommands;

let nats_client = NatsClient::new(
nats_servers,
ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
},
nats_pool_size,
)
.await
.map_err(|error| anyhow!("Failed to connect to NATS server: {error}"))?;
let mut registry = Registry::default();

let mut tasks = FuturesUnordered::new();

loop {
let nats_client = nats_client.clone();
let additional_components = subcommand.extract_additional_components();

tasks.push(match subcommand {
ClusterSubcommand::Controller(controller_args) => {
controller(nats_client, &mut registry, controller_args).await?
}
ClusterSubcommand::Farmer(farmer_args) => {
farmer::<PosTable>(nats_client, &mut registry, farmer_args).await?
}
ClusterSubcommand::Plotter(plotter_args) => {
plotter::<PosTable>(nats_client, &mut registry, plotter_args).await?
}
ClusterSubcommand::Cache(cache_args) => {
cache(nats_client, &mut registry, cache_args).await?
}
});

if additional_components.is_empty() {
break;
}

let binary_name = current_exe()
.ok()
.and_then(|path| {
path.file_name()
.and_then(|file_name| file_name.to_str())
.map(str::to_string)
})
.unwrap_or_else(|| "subspace-farmer".to_string());
ClusterSubcommands { subcommand } =
ClusterSubcommands::parse_from([binary_name].into_iter().chain(additional_components));
}

if !prometheus_listen_on.is_empty() {
let prometheus_task = start_prometheus_metrics_server(
prometheus_listen_on,
RegistryAdapter::PrometheusClient(registry),
)?;

let join_handle = tokio::spawn(prometheus_task);
tasks.push(Box::pin(async move {
Ok(AsyncJoinOnDrop::new(join_handle, true).await??)
}));
}

select! {
// Signal future
_ = signal.fuse() => {
Ok(())
},

// Run future
result = tasks.next() => {
result.expect("List of tasks is not empty; qed")
},
}
}
Loading

0 comments on commit 873e5e8

Please sign in to comment.