Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Farming cluster #2758

Merged
merged 5 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions crates/subspace-farmer-components/src/segment_reconstruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{ArchivedHistorySegment, Piece, PieceIndex, RecordedHistorySegment};
use thiserror::Error;
use tokio::sync::Semaphore;
use tokio::task::JoinError;
use tracing::{debug, error, info, trace, warn};

// TODO: Probably should be made configurable
const PARALLELISM_LEVEL: usize = 20;

#[derive(Debug, Error)]
pub(crate) enum SegmentReconstructionError {
/// Not enough pieces to reconstruct a segment
Expand All @@ -21,6 +19,10 @@ pub(crate) enum SegmentReconstructionError {
/// Internal piece retrieval process failed
#[error("Pieces retrieval failed")]
PieceRetrievalFailed(#[from] ReconstructorError),

/// Join error
#[error("Join error: {0}")]
JoinError(#[from] JoinError),
}

pub(crate) async fn recover_missing_piece<PG: PieceGetter>(
Expand All @@ -32,7 +34,7 @@ pub(crate) async fn recover_missing_piece<PG: PieceGetter>(
let segment_index = missing_piece_index.segment_index();
let position = missing_piece_index.position();

let semaphore = &Semaphore::new(PARALLELISM_LEVEL);
let semaphore = &Semaphore::new(RecordedHistorySegment::NUM_RAW_RECORDS);
let acquired_pieces_counter = &AtomicUsize::default();
let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;

Expand Down Expand Up @@ -100,9 +102,13 @@ pub(crate) async fn recover_missing_piece<PG: PieceGetter>(
return Err(SegmentReconstructionError::NotEnoughPiecesAcquired);
}

let archiver = PiecesReconstructor::new(kzg).expect("Internal constructor call must succeed.");
let result = tokio::task::spawn_blocking(move || {
let reconstructor =
PiecesReconstructor::new(kzg).expect("Internal constructor call must succeed.");

let result = archiver.reconstruct_piece(&segment_pieces, position as usize)?;
reconstructor.reconstruct_piece(&segment_pieces, position as usize)
})
.await??;

info!(%missing_piece_index, "Recovering missing piece succeeded.");

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ include = [
[dependencies]
anyhow = "1.0.82"
async-lock = "3.3.0"
async-nats = "0.34.0"
async-nats = "0.35.0"
async-trait = "0.1.80"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
base58 = "0.2.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod benchmark;
pub(crate) mod cluster;
pub(crate) mod farm;
mod info;
mod scrub;
Expand Down
181 changes: 181 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
mod cache;
mod controller;
mod farmer;
mod plotter;

use crate::commands::cluster::cache::{cache, CacheArgs};
use crate::commands::cluster::controller::{controller, ControllerArgs};
use crate::commands::cluster::farmer::{farmer, FarmerArgs};
use crate::commands::cluster::plotter::{plotter, PlotterArgs};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_nats::ServerAddr;
use backoff::ExponentialBackoff;
use clap::{Parser, Subcommand};
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use prometheus_client::registry::Registry;
use std::env::current_exe;
use std::mem;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::utils::AsyncJoinOnDrop;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_proof_of_space::Table;

/// Arguments for cluster
#[derive(Debug, Parser)]
pub(crate) struct ClusterArgs {
/// Shared arguments for all subcommands
#[clap(flatten)]
shared_args: SharedArgs,
/// Cluster subcommands
#[clap(flatten)]
subcommands: ClusterSubcommands,
}

/// Recursive cluster subcommands
#[derive(Debug, Parser)]
struct ClusterSubcommands {
/// Cluster subcommands
#[clap(subcommand)]
subcommand: ClusterSubcommand,
}

/// Shared arguments
#[derive(Debug, Parser)]
struct SharedArgs {
/// NATS server address, typically in `nats://server1:port1` format, can be specified multiple
/// times.
///
/// NOTE: NATS must be configured for message sizes of 2MiB or larger (1MiB is the default),
/// which can be done by starting NATS server with config file containing `max_payload = 2MB`.
#[arg(long, alias = "nats-server", required = true)]
nats_servers: Vec<ServerAddr>,
/// Size of connection pool of NATS clients.
///
/// Pool size can be increased in case of large number of farms or high plotting capacity of
/// this instance.
#[arg(long, default_value = "8")]
nats_pool_size: NonZeroUsize,
/// Defines endpoints for the prometheus metrics server. It doesn't start without at least
/// one specified endpoint. Format: 127.0.0.1:8080
#[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])]
prometheus_listen_on: Vec<SocketAddr>,
}

/// Cluster subcommands
#[derive(Debug, Subcommand)]
enum ClusterSubcommand {
/// Farming cluster controller
Controller(ControllerArgs),
/// Farming cluster farmer
Farmer(FarmerArgs),
/// Farming cluster plotter
Plotter(PlotterArgs),
/// Farming cluster cache
Cache(CacheArgs),
}

impl ClusterSubcommand {
fn extract_additional_components(&mut self) -> Vec<String> {
match self {
ClusterSubcommand::Controller(args) => mem::take(&mut args.additional_components),
ClusterSubcommand::Farmer(args) => mem::take(&mut args.additional_components),
ClusterSubcommand::Plotter(args) => mem::take(&mut args.additional_components),
ClusterSubcommand::Cache(args) => mem::take(&mut args.additional_components),
}
}
}

pub(crate) async fn cluster<PosTable>(cluster_args: ClusterArgs) -> anyhow::Result<()>
where
PosTable: Table,
{
let signal = shutdown_signal();

let ClusterArgs {
shared_args,
subcommands,
} = cluster_args;
let SharedArgs {
nats_servers,
nats_pool_size,
prometheus_listen_on,
} = shared_args;
let ClusterSubcommands { mut subcommand } = subcommands;

let nats_client = NatsClient::new(
nats_servers,
ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
},
nats_pool_size,
)
.await
.map_err(|error| anyhow!("Failed to connect to NATS server: {error}"))?;
let mut registry = Registry::default();

let mut tasks = FuturesUnordered::new();

loop {
let nats_client = nats_client.clone();
let additional_components = subcommand.extract_additional_components();

tasks.push(match subcommand {
ClusterSubcommand::Controller(controller_args) => {
controller(nats_client, &mut registry, controller_args).await?
}
ClusterSubcommand::Farmer(farmer_args) => {
farmer::<PosTable>(nats_client, &mut registry, farmer_args).await?
}
ClusterSubcommand::Plotter(plotter_args) => {
plotter::<PosTable>(nats_client, &mut registry, plotter_args).await?
}
ClusterSubcommand::Cache(cache_args) => {
cache(nats_client, &mut registry, cache_args).await?
}
});

if additional_components.is_empty() {
break;
}

let binary_name = current_exe()
.ok()
.and_then(|path| {
path.file_name()
.and_then(|file_name| file_name.to_str())
.map(str::to_string)
})
.unwrap_or_else(|| "subspace-farmer".to_string());
ClusterSubcommands { subcommand } =
ClusterSubcommands::parse_from([binary_name].into_iter().chain(additional_components));
}

if !prometheus_listen_on.is_empty() {
let prometheus_task = start_prometheus_metrics_server(
prometheus_listen_on,
RegistryAdapter::PrometheusClient(registry),
)?;

let join_handle = tokio::spawn(prometheus_task);
tasks.push(Box::pin(async move {
Ok(AsyncJoinOnDrop::new(join_handle, true).await??)
}));
}

select! {
// Signal future
_ = signal.fuse() => {
Ok(())
},

// Run future
result = tasks.next() => {
result.expect("List of tasks is not empty; qed")
},
}
}
Loading
Loading