Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge branch 'ao-past-session-slashing-runtime' into ao-past-session-…
Browse files Browse the repository at this point in the history
…slashing-client

* ao-past-session-slashing-runtime:
  XCM: Tools for uniquely referencing messages (#7234)
  Companion: Substrate#13869 (#7119)
  Companion for Substrate#14214 (#7283)
  Fix flaky test and error reporting (#7282)
  impl guide: Update Collator Generation (#7250)
  Add staking-miner bin (#7273)
  metrics: tests: Fix flaky runtime_can_publish_metrics (#7279)
  [companion] Fix request-response protocols backpressure mechanism (#7276)
  PVF: instantiate runtime from bytes (#7270)
  • Loading branch information
ordian committed May 25, 2023
2 parents cfe4be4 + f2bcd9b commit 8bb8e07
Show file tree
Hide file tree
Showing 73 changed files with 1,886 additions and 1,037 deletions.
666 changes: 338 additions & 328 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! The collation generation subsystem is the interface between polkadot and the collators.
//!
//! # Protocol
//!
//! On every `ActiveLeavesUpdate`:
//!
//! * If there is no collation generation config, ignore.
//! * Otherwise, for each `activated` head in the update:
//! * Determine if the para is scheduled on any core by fetching the `availability_cores` Runtime API.
//! * Use the Runtime API subsystem to fetch the full validation data.
//! * Invoke the `collator`, and use its outputs to produce a [`CandidateReceipt`], signed with the configuration's `key`.
//! * Dispatch a [`CollatorProtocolMessage::DistributeCollation`](receipt, pov)`.
#![deny(missing_docs)]

Expand Down
101 changes: 50 additions & 51 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,62 +217,61 @@ impl Initialized {
gum::trace!(target: LOG_TARGET, "Waiting for message");
let mut overlay_db = OverlayedBackend::new(backend);
let default_confirm = Box::new(|| Ok(()));
let confirm_write = match MuxedMessage::receive(ctx, &mut self.participation_receiver)
.await?
{
MuxedMessage::Participation(msg) => {
gum::trace!(target: LOG_TARGET, "MuxedMessage::Participation");
let ParticipationStatement {
session,
candidate_hash,
candidate_receipt,
outcome,
} = self.participation.get_participation_result(ctx, msg).await?;
if let Some(valid) = outcome.validity() {
gum::trace!(
target: LOG_TARGET,
?session,
?candidate_hash,
?valid,
"Issuing local statement based on participation outcome."
);
self.issue_local_statement(
ctx,
&mut overlay_db,
let confirm_write =
match MuxedMessage::receive(ctx, &mut self.participation_receiver).await? {
MuxedMessage::Participation(msg) => {
gum::trace!(target: LOG_TARGET, "MuxedMessage::Participation");
let ParticipationStatement {
session,
candidate_hash,
candidate_receipt,
session,
valid,
clock.now(),
)
.await?;
} else {
gum::warn!(target: LOG_TARGET, ?outcome, "Dispute participation failed");
}
default_confirm
},
MuxedMessage::Subsystem(msg) => match msg {
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
gum::trace!(target: LOG_TARGET, "OverseerSignal::ActiveLeaves");
self.process_active_leaves_update(
ctx,
&mut overlay_db,
update,
clock.now(),
)
.await?;
outcome,
} = self.participation.get_participation_result(ctx, msg).await?;
if let Some(valid) = outcome.validity() {
gum::trace!(
target: LOG_TARGET,
?session,
?candidate_hash,
?valid,
"Issuing local statement based on participation outcome."
);
self.issue_local_statement(
ctx,
&mut overlay_db,
candidate_hash,
candidate_receipt,
session,
valid,
clock.now(),
)
.await?;
} else {
gum::warn!(target: LOG_TARGET, ?outcome, "Dispute participation failed");
}
default_confirm
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, n)) => {
gum::trace!(target: LOG_TARGET, "OverseerSignal::BlockFinalized");
self.scraper.process_finalized_block(&n);
default_confirm
MuxedMessage::Subsystem(msg) => match msg {
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
gum::trace!(target: LOG_TARGET, "OverseerSignal::ActiveLeaves");
self.process_active_leaves_update(
ctx,
&mut overlay_db,
update,
clock.now(),
)
.await?;
default_confirm
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, n)) => {
gum::trace!(target: LOG_TARGET, "OverseerSignal::BlockFinalized");
self.scraper.process_finalized_block(&n);
default_confirm
},
FromOrchestra::Communication { msg } =>
self.handle_incoming(ctx, &mut overlay_db, msg, clock.now()).await?,
},
FromOrchestra::Communication { msg } =>
self.handle_incoming(ctx, &mut overlay_db, msg, clock.now()).await?,
},
};
};

if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
Expand Down
1 change: 0 additions & 1 deletion node/core/pvf/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ substrate-build-script-utils = { git = "https://github.com/paritytech/substrate"
[dev-dependencies]
adder = { package = "test-parachain-adder", path = "../../../../parachain/test-parachains/adder" }
halt = { package = "test-parachain-halt", path = "../../../../parachain/test-parachains/halt" }
tempfile = "3.3.0"

[features]
jemalloc-allocator = ["dep:tikv-jemalloc-ctl"]
34 changes: 23 additions & 11 deletions node/core/pvf/worker/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use polkadot_node_core_pvf::{
};
use polkadot_parachain::primitives::ValidationResult;
use std::{
path::{Path, PathBuf},
path::PathBuf,
sync::{mpsc::channel, Arc},
time::Duration,
};
Expand Down Expand Up @@ -97,6 +97,20 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
artifact_path.display(),
);

// Get the artifact bytes.
//
// We do this outside the thread so that we can lock down filesystem access there.
let compiled_artifact_blob = match std::fs::read(artifact_path) {
Ok(bytes) => bytes,
Err(err) => {
let response = Response::InternalError(
InternalValidationError::CouldNotOpenFile(err.to_string()),
);
send_response(&mut stream, response).await?;
continue
},
};

// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();

Expand All @@ -116,7 +130,12 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
let execute_thread = thread::spawn_worker_thread_with_stack_size(
"execute thread",
move || {
validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
validate_using_artifact(
&compiled_artifact_blob,
&params,
executor_2,
cpu_time_start,
)
},
Arc::clone(&condvar),
WaitOutcome::Finished,
Expand Down Expand Up @@ -167,23 +186,16 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
}

fn validate_using_artifact(
artifact_path: &Path,
compiled_artifact_blob: &[u8],
params: &[u8],
executor: Executor,
cpu_time_start: ProcessTime,
) -> Response {
// Check here if the file exists, because the error from Substrate is not match-able.
// TODO: Re-evaluate after <https://github.com/paritytech/substrate/issues/13860>.
let file_metadata = std::fs::metadata(artifact_path);
if let Err(err) = file_metadata {
return Response::InternalError(InternalValidationError::CouldNotOpenFile(err.to_string()))
}

let descriptor_bytes = match unsafe {
// SAFETY: this should be safe since the compiled artifact passed here comes from the
// file created by the prepare workers. These files are obtained by calling
// [`executor_intf::prepare`].
executor.execute(artifact_path.as_ref(), params)
executor.execute(compiled_artifact_blob, params)
} {
Err(err) => return Response::format_invalid("execute", &err),
Ok(d) => d,
Expand Down
34 changes: 24 additions & 10 deletions node/core/pvf/worker/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
use polkadot_primitives::{ExecutorParam, ExecutorParams};
use sc_executor_common::{
error::WasmError,
runtime_blob::RuntimeBlob,
wasm_runtime::{HeapAllocStrategy, InvokeMethod, WasmModule as _},
};
use sc_executor_wasmtime::{Config, DeterministicStackLimit, Semantics};
use sc_executor_wasmtime::{Config, DeterministicStackLimit, Semantics, WasmtimeRuntime};
use sp_core::storage::{ChildInfo, TrackedStorageKey};
use sp_externalities::MultiRemovalResults;
use std::{
any::{Any, TypeId},
path::Path,
};
use std::any::{Any, TypeId};

// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
Expand Down Expand Up @@ -206,7 +204,7 @@ impl Executor {
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
pub unsafe fn execute(
&self,
compiled_artifact_path: &Path,
compiled_artifact_blob: &[u8],
params: &[u8],
) -> Result<Vec<u8>, String> {
let mut extensions = sp_externalities::Extensions::new();
Expand All @@ -216,17 +214,33 @@ impl Executor {
let mut ext = ValidationExternalities(extensions);

match sc_executor::with_externalities_safe(&mut ext, || {
let runtime = sc_executor_wasmtime::create_runtime_from_artifact::<HostFunctions>(
compiled_artifact_path,
self.config.clone(),
)?;
let runtime = self.create_runtime_from_bytes(compiled_artifact_blob)?;
runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params)
}) {
Ok(Ok(ok)) => Ok(ok),
Ok(Err(err)) | Err(err) => Err(err),
}
.map_err(|err| format!("execute error: {:?}", err))
}

/// Constructs the runtime for the given PVF, given the artifact bytes.
///
/// # Safety
///
/// The caller must ensure that the compiled artifact passed here was:
/// 1) produced by [`prepare`],
/// 2) was not modified,
///
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
pub unsafe fn create_runtime_from_bytes(
&self,
compiled_artifact_blob: &[u8],
) -> Result<WasmtimeRuntime, WasmError> {
sc_executor_wasmtime::create_runtime_from_artifact_bytes::<HostFunctions>(
compiled_artifact_blob,
self.config.clone(),
)
}
}

type HostFunctions = (
Expand Down
7 changes: 2 additions & 5 deletions node/core/pvf/worker/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,13 @@ pub fn validate_candidate(
.expect("Decompressing code failed");

let blob = prevalidate(&code)?;
let artifact = prepare(blob, &ExecutorParams::default())?;
let tmpdir = tempfile::tempdir()?;
let artifact_path = tmpdir.path().join("blob");
std::fs::write(&artifact_path, &artifact)?;
let compiled_artifact_blob = prepare(blob, &ExecutorParams::default())?;

let executor = Executor::new(ExecutorParams::default())?;
let result = unsafe {
// SAFETY: This is trivially safe since the artifact is obtained by calling `prepare`
// and is written into a temporary directory in an unmodified state.
executor.execute(&artifact_path, params)?
executor.execute(&compiled_artifact_blob, params)?
};

Ok(result)
Expand Down
4 changes: 2 additions & 2 deletions node/metrics/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ async fn runtime_can_publish_metrics() {
// Start validator Bob.
let _bob = run_validator_node(bob_config, None);

// Wait for Alice to author two blocks.
alice.wait_for_blocks(2).await;
// Wait for Alice to see two finalized blocks.
alice.wait_for_finalized_blocks(2).await;

let metrics_uri = format!("http://localhost:{}/metrics", DEFAULT_PROMETHEUS_PORT);
let metrics = scrape_prometheus_metrics(&metrics_uri).await;
Expand Down
8 changes: 6 additions & 2 deletions node/network/availability-recovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ async fn overseer_signal(
.send(FromOrchestra::Signal(signal))
.timeout(TIMEOUT)
.await
.expect("10ms is more than enough for sending signals.");
.unwrap_or_else(|| {
panic!("{}ms is more than enough for sending signals.", TIMEOUT.as_millis())
});
}

async fn overseer_send(
Expand All @@ -184,7 +186,9 @@ async fn overseer_send(
.send(FromOrchestra::Communication { msg })
.timeout(TIMEOUT)
.await
.expect("10ms is more than enough for sending messages.");
.unwrap_or_else(|| {
panic!("{}ms is more than enough for sending messages.", TIMEOUT.as_millis())
});
}

async fn overseer_recv(
Expand Down
11 changes: 6 additions & 5 deletions node/network/bitfield-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ use sp_keystore::{testing::MemoryKeystore, Keystore, KeystorePtr};

use std::{iter::FromIterator as _, sync::Arc, time::Duration};

const TIMEOUT: Duration = Duration::from_millis(50);
macro_rules! launch {
($fut:expr) => {
$fut.timeout(Duration::from_millis(10))
.await
.expect("10ms is more than enough for sending messages.")
$fut.timeout(TIMEOUT).await.unwrap_or_else(|| {
panic!("{}ms is more than enough for sending messages.", TIMEOUT.as_millis())
});
};
}

Expand Down Expand Up @@ -220,7 +221,7 @@ fn receive_invalid_signature() {
));

// reputation doesn't change due to one_job_per_validator check
assert!(handle.recv().timeout(Duration::from_millis(10)).await.is_none());
assert!(handle.recv().timeout(TIMEOUT).await.is_none());

launch!(handle_network_msg(
&mut ctx,
Expand Down Expand Up @@ -523,7 +524,7 @@ fn do_not_relay_message_twice() {
);

// There shouldn't be any other message
assert!(handle.recv().timeout(Duration::from_millis(10)).await.is_none());
assert!(handle.recv().timeout(TIMEOUT).await.is_none());
});
}

Expand Down
2 changes: 1 addition & 1 deletion node/network/collator-protocol/src/collator_side/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::*;
use std::{collections::HashSet, sync::Arc, time::Duration};

use assert_matches::assert_matches;
use futures::{executor, future, Future, SinkExt};
use futures::{executor, future, Future};
use futures_timer::Delay;

use parity_scale_codec::{Decode, Encode};
Expand Down
1 change: 1 addition & 0 deletions node/network/dispute-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lru = "0.9.0"
indexmap = "1.9.1"

[dev-dependencies]
async-channel = "1.8.0"
async-trait = "0.1.57"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
Loading

0 comments on commit 8bb8e07

Please sign in to comment.