Skip to content

Commit

Permalink
feat: ✨ Launch Storage Providers async tasks in Substrate node (#28)
Browse files Browse the repository at this point in the history
* WIP, provider requests protocol setup

* request protocol type changes with proto

* refactor: 🚚 Restructure spawned networking task as service

* fix: 🔥 Remove unimplemented inherent code that was preventing the parachain from running

* test: ✅ Add local testing instructions and configurations with pure zombienet

* refactor: 🚚 Make `service` in `node` use the new architecture with the `file_transfer` service

* revert: ⏪ Remove `pure_zombie` zombienet config file

* chore: 🔪 remove superseded bsp and msp crates

* feat: 🔧 add infrastructure crate and Storage trait

* feat: port Actor and EventBus infra. Update FileTransferService to use them

* fix: Cargo.lock

* style: 🎨 Format Cargo.toml files with zepter

* refactor: ♻️ Make dependencies in `node` all workspace dependencies

* fix FileTransferServive event loop stream merge

* fix: revert proto changes and add file watcher in build.rs

* feat: integrate events & add emit event mock example

---------

Co-authored-by: Michael Assaf <michael.assaf.edge@gmail.com>
Co-authored-by: Alexandru Murtaza <alexandru@moonsonglabs.com>
  • Loading branch information
3 people authored Apr 9, 2024
1 parent 09b4189 commit a5b227a
Show file tree
Hide file tree
Showing 33 changed files with 1,414 additions and 411 deletions.
345 changes: 197 additions & 148 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 17 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,34 @@ homepage = "https://moonsonglabs.com/"
panic = "unwind"

[workspace]
members = [
"runtime",
"pallets/*",
"node",
"bsp",
"msp",
"storage-kit",
"support/*",
]
members = ["runtime", "pallets/*", "node", "support/*"]
resolver = "2"

[workspace.dependencies]
log = "0.4.20"
serde = { version = "1.0.197", features = ["derive"] }
anyhow = "1.0.81"
array-bytes = "6.1"
async-channel = "1.8.0"
codec = { package = "parity-scale-codec", version = "3.0.0", features = [
"derive",
], default-features = false }
color-print = "0.3.4"
futures = "0.3.30"
libp2p-identity = "0.1.3"
log = { version = "0.4.21", default-features = false }
parking_lot = "0.12.1"
prost = "0.12"
prost-build = "0.12.3"
scale-info = { version = "2.11.0", default-features = false, features = [
"derive",
] }
serde = { version = "1.0.197", features = ["derive"] }
thiserror = "1.0.48"
tokio = "1.36.0"

hex-literal = { version = "0.4.1" }
smallvec = "1.11.0"
clap = { version = "4.4.14", features = ["derive"] }
jsonrpsee = { version = "0.20.3", features = ["server"] }
futures = "0.3.28"
serde_json = "1.0.108"

# Substrate
Expand Down Expand Up @@ -67,6 +70,7 @@ sc-telemetry = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "
sc-tracing = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0" }
sc-transaction-pool = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0" }
sc-transaction-pool-api = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0" }
sc-utils = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0" }
sp-blockchain = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0" }
sp-keystore = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0" }
sp-timestamp = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0" }
Expand Down Expand Up @@ -130,13 +134,12 @@ cumulus-primitives-utility = { git = "https://github.com/paritytech/polkadot-sdk
pallet-collator-selection = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0", default-features = false }
parachains-common = { git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0", default-features = false }
parachain-info = { package = "staging-parachain-info", git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-v1.7.0", default-features = false }

# Local Pallets
storage-hub-runtime = { path = "runtime" }
pallet-storage-providers = { path = "pallets/providers", default-features = false }
pallet-file-system = { path = "pallets/file-system", default-features = false }
pallet-proofs-dealer = { path = "pallets/proofs-dealer", default-features = false }
storage-hub-traits = { path = "support/traits", default-features = false }
storage-hub-infra = { path = "support/infra", default-features = false }

[workspace.lints.rust]
suspicious_double_ref_op = { level = "allow", priority = 2 }
Expand Down
8 changes: 0 additions & 8 deletions bsp/Cargo.toml

This file was deleted.

3 changes: 0 additions & 3 deletions bsp/src/main.rs

This file was deleted.

36 changes: 36 additions & 0 deletions client/mapping-sync/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "shc-mapping-sync"
description = "Mapping sync logic for StorageHub."
version = "0.1.0"
homepage = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
edition = { workspace = true }

[lints]
workspace = true

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
futures = { workspace = true }
futures-timer = "3.0.3"
log = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["macros", "sync"], optional = true }

# Substrate
sc-client-api = { workspace = true }
sc-utils = { workspace = true }
sp-api = { workspace = true }
sp-blockchain = { workspace = true }
sp-consensus = { workspace = true, features = ["default"] }
sp-core = { workspace = true, optional = true }
sp-runtime = { workspace = true }

[dev-dependencies]
# Substrate
sp-core = { workspace = true, features = ["default"] }
sp-io = { workspace = true }
10 changes: 10 additions & 0 deletions client/mapping-sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use sp_runtime::traits::Block as BlockT;

pub type StorageHubBlockNotificationSinks<T> =
parking_lot::Mutex<Vec<sc_utils::mpsc::TracingUnboundedSender<T>>>;

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct StorageHubBlockNotification<Block: BlockT> {
pub is_new_best: bool,
pub hash: Block::Hash,
}
8 changes: 0 additions & 8 deletions msp/Cargo.toml

This file was deleted.

3 changes: 0 additions & 3 deletions msp/src/main.rs

This file was deleted.

48 changes: 30 additions & 18 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,21 @@ publish = false
[lints]
workspace = true

[build-dependencies]
substrate-build-script-utils = { workspace = true }

[dependencies]
anyhow = { workspace = true }
array-bytes = { workspace = true }
async-channel = { workspace = true }
color-print = { workspace = true }
libp2p-identity = { workspace = true, features = ["peerid"] }
prost = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }

# Local
storage-hub-runtime = { workspace = true }
storage-hub-infra = { workspace = true }

# Substrate
clap = { workspace = true }
log = { workspace = true }
codec = { workspace = true }
Expand All @@ -25,9 +36,6 @@ jsonrpsee = { workspace = true }
futures = { workspace = true }
serde_json = { workspace = true }

# Local
storage-hub-runtime = { workspace = true }

# Substrate
frame-benchmarking = { workspace = true }
frame-benchmarking-cli = { workspace = true }
Expand All @@ -48,6 +56,7 @@ sc-telemetry = { workspace = true }
sc-tracing = { workspace = true }
sc-transaction-pool = { workspace = true }
sc-transaction-pool-api = { workspace = true }
sc-utils = { workspace = true }
sp-api = { workspace = true }
sp-block-builder = { workspace = true }
sp-blockchain = { workspace = true }
Expand Down Expand Up @@ -75,22 +84,25 @@ cumulus-client-service = { workspace = true }
cumulus-primitives-core = { workspace = true }
cumulus-primitives-parachain-inherent = { workspace = true }
cumulus-relay-chain-interface = { workspace = true }
color-print = "0.3.4"

[build-dependencies]
prost-build = { workspace = true }
substrate-build-script-utils = { workspace = true }

[features]
default = []
runtime-benchmarks = [
"cumulus-primitives-core/runtime-benchmarks",
"frame-benchmarking-cli/runtime-benchmarks",
"frame-benchmarking/runtime-benchmarks",
"storage-hub-runtime/runtime-benchmarks",
"polkadot-cli/runtime-benchmarks",
"polkadot-primitives/runtime-benchmarks",
"sc-service/runtime-benchmarks",
"sp-runtime/runtime-benchmarks",
"cumulus-primitives-core/runtime-benchmarks",
"frame-benchmarking-cli/runtime-benchmarks",
"frame-benchmarking/runtime-benchmarks",
"polkadot-cli/runtime-benchmarks",
"polkadot-primitives/runtime-benchmarks",
"sc-service/runtime-benchmarks",
"sp-runtime/runtime-benchmarks",
"storage-hub-runtime/runtime-benchmarks",
]
try-runtime = [
"storage-hub-runtime/try-runtime",
"polkadot-cli/try-runtime",
"sp-runtime/try-runtime",
"polkadot-cli/try-runtime",
"sp-runtime/try-runtime",
"storage-hub-runtime/try-runtime",
]
9 changes: 9 additions & 0 deletions node/build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
use substrate_build_script_utils::{generate_cargo_keys, rerun_if_git_head_changed};

const PROTOS: &[&str] = &["src/services/file_transfer/schema/provider.v1.proto"];

fn main() {
generate_cargo_keys();

// Tell Cargo to rerun this build script whenever the proto files change.
PROTOS.iter().for_each(|proto| {
println!("cargo:rerun-if-changed={}", proto);
});

rerun_if_git_head_changed();

prost_build::compile_protos(PROTOS, &["src/services/file_transfer/schema"]).unwrap();
}
2 changes: 2 additions & 0 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod cli;
mod command;
mod rpc;
mod service;
mod services;
mod tasks;

fn main() -> sc_cli::Result<()> {
command::run()
Expand Down
37 changes: 31 additions & 6 deletions node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Duration};

use cumulus_client_cli::CollatorOptions;
use storage_hub_infra::actor::TaskSpawner;
// Local Runtime Types
use storage_hub_runtime::{
opaque::{Block, Hash},
Expand All @@ -27,7 +28,7 @@ use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};

// Substrate Imports
use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE;
use sc_client_api::Backend;
use sc_client_api::{Backend, BlockBackend};
use sc_consensus::ImportQueue;
use sc_executor::{
HeapAllocStrategy, NativeElseWasmExecutor, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
Expand All @@ -40,6 +41,8 @@ use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_keystore::KeystorePtr;
use substrate_prometheus_endpoint::Registry;

use crate::services::{file_transfer::spawn_file_transfer_service, StorageHubHandler};

/// Native executor type.
pub struct ParachainNativeExecutor;

Expand All @@ -55,13 +58,14 @@ impl sc_executor::NativeExecutionDispatch for ParachainNativeExecutor {
}
}

type ParachainExecutor = NativeElseWasmExecutor<ParachainNativeExecutor>;
pub(crate) type ParachainExecutor = NativeElseWasmExecutor<ParachainNativeExecutor>;

type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
pub(crate) type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;

type ParachainBackend = TFullBackend<Block>;
pub(crate) type ParachainBackend = TFullBackend<Block>;

type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
pub(crate) type ParachainBlockImport =
TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;

/// Starts a `ServiceBuilder` for a full service.
///
Expand Down Expand Up @@ -173,12 +177,33 @@ async fn start_node_impl(

let params = new_partial(&parachain_config)?;
let (block_import, mut telemetry, telemetry_worker_handle) = params.other;
let net_config = sc_network::config::FullNetworkConfiguration::new(&parachain_config.network);
let mut net_config =
sc_network::config::FullNetworkConfiguration::new(&parachain_config.network);

let client = params.client.clone();
let backend = params.backend.clone();
let mut task_manager = params.task_manager;

let genesis_hash = client
.block_hash(0u32.into())
.ok()
.flatten()
.expect("Genesis block exists; qed");

let task_spawner = TaskSpawner::new(task_manager.spawn_handle(), "generic");

let file_transfer_service = spawn_file_transfer_service(
&task_spawner,
genesis_hash,
&parachain_config,
&mut net_config,
)
.await;

let sh_handler = StorageHubHandler::new(task_spawner, file_transfer_service);

sh_handler.start_bsp_tasks();

let (relay_chain_interface, collator_key) = build_relay_chain_interface(
polkadot_config,
&parachain_config,
Expand Down
27 changes: 27 additions & 0 deletions node/src/services/file_transfer/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use storage_hub_infra::event_bus::{EventBus, EventBusMessage, ProvidesEventBus};

#[derive(Clone, Debug, Default)]
pub struct FileTransferServiceEventBusProvider {
remote_upload_request_event_bus: EventBus<RemoteUploadRequest>,
}

impl FileTransferServiceEventBusProvider {
pub fn new() -> Self {
Self {
remote_upload_request_event_bus: EventBus::new(),
}
}
}

impl ProvidesEventBus<RemoteUploadRequest> for FileTransferServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<RemoteUploadRequest> {
&self.remote_upload_request_event_bus
}
}

#[derive(Debug, Clone)]
pub struct RemoteUploadRequest {
pub location: String,
}

impl EventBusMessage for RemoteUploadRequest {}
Loading

0 comments on commit a5b227a

Please sign in to comment.