Skip to content

Commit

Permalink
Merge branch 'main' into rustielin/indexer-forge-processor-latency
Browse files Browse the repository at this point in the history
  • Loading branch information
rustielin authored Oct 17, 2024
2 parents b288cb7 + 8a540dc commit 06dd8be
Show file tree
Hide file tree
Showing 119 changed files with 3,734 additions and 1,078 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ members = [
"dkg",
"ecosystem/indexer-grpc/indexer-grpc-cache-worker",
"ecosystem/indexer-grpc/indexer-grpc-data-service",
"ecosystem/indexer-grpc/indexer-grpc-file-checker",
"ecosystem/indexer-grpc/indexer-grpc-file-store",
"ecosystem/indexer-grpc/indexer-grpc-file-store-backfiller",
"ecosystem/indexer-grpc/indexer-grpc-fullnode",
Expand Down Expand Up @@ -360,6 +361,7 @@ aptos-indexer = { path = "crates/indexer" }
aptos-indexer-grpc-cache-worker = { path = "ecosystem/indexer-grpc/indexer-grpc-cache-worker" }
aptos-indexer-grpc-data-service = { path = "ecosystem/indexer-grpc/indexer-grpc-data-service" }
aptos-indexer-grpc-file-store = { path = "ecosystem/indexer-grpc/indexer-grpc-file-store" }
aptos-indexer-grpc-file-checker = { path = "ecosystem/indexer-grpc/indexer-grpc-file-checker" }
aptos-indexer-grpc-file-store-backfiller = { path = "ecosystem/indexer-grpc/indexer-grpc-file-store-backfiller" }
aptos-indexer-grpc-fullnode = { path = "ecosystem/indexer-grpc/indexer-grpc-fullnode" }
aptos-indexer-grpc-in-memory-cache-benchmark = { path = "ecosystem/indexer-grpc/indexer-grpc-in-memory-cache-benchmark" }
Expand Down
2 changes: 1 addition & 1 deletion RUST_SECURE_CODING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ These Rust Secure Coding Guidelines are essential for anyone contributing to Apt

### Rustup

Utilize Rustup for managing Rust toolchains. However, keep in mind that, from a security perspective, Rustup performs all downloads over HTTPS, but it does not yet validate signatures of downloads. Security is shifted to [create.io](http://create.io) and GitHub repository hosting the code [[rustup]](https://www.rust-lang.org/tools/install).
Utilize Rustup for managing Rust toolchains. However, keep in mind that, from a security perspective, Rustup performs all downloads over HTTPS, but it does not yet validate signatures of downloads. Security is shifted to [crates.io](http://crates.io) and GitHub repository hosting the code [[rustup]](https://www.rust-lang.org/tools/install).

### Stable Toolchain

Expand Down
5 changes: 4 additions & 1 deletion aptos-move/aptos-gas-schedule/src/ver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
/// global operations.
/// - V1
/// - TBA
pub const LATEST_GAS_FEATURE_VERSION: u64 = gas_feature_versions::RELEASE_V1_21;
pub const LATEST_GAS_FEATURE_VERSION: u64 = gas_feature_versions::RELEASE_V1_22;

pub mod gas_feature_versions {
pub const RELEASE_V1_8: u64 = 11;
Expand All @@ -86,4 +86,7 @@ pub mod gas_feature_versions {
pub const RELEASE_V1_19: u64 = 23;
pub const RELEASE_V1_20: u64 = 24;
pub const RELEASE_V1_21: u64 = 25;
pub const RELEASE_V1_22: u64 = 26;
pub const RELEASE_V1_23: u64 = 27;
pub const RELEASE_V1_24: u64 = 28;
}
2 changes: 1 addition & 1 deletion config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl Default for ConsensusConfig {
enable_pre_commit: true,
max_pending_rounds_in_commit_vote_cache: 100,
optimistic_sig_verification: false,
enable_round_timeout_msg: false,
enable_round_timeout_msg: true,
}
}
}
Expand Down
34 changes: 21 additions & 13 deletions config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ use serde::{Deserialize, Serialize};
use serde_yaml::Value;

// Useful constants for enabling consensus observer on different node types
const ENABLE_ON_VALIDATORS: bool = false;
const ENABLE_ON_VALIDATOR_FULLNODES: bool = false;
const ENABLE_ON_VALIDATORS: bool = true;
const ENABLE_ON_VALIDATOR_FULLNODES: bool = true;
const ENABLE_ON_PUBLIC_FULLNODES: bool = false;

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct ConsensusObserverConfig {
/// Whether the consensus observer is enabled
pub observer_enabled: bool,
/// Whether the consensus observer publisher is enabled
/// Whether the consensus publisher is enabled
pub publisher_enabled: bool,

/// Maximum number of pending network messages
Expand All @@ -30,21 +30,27 @@ pub struct ConsensusObserverConfig {

/// Interval (in milliseconds) to garbage collect peer state
pub garbage_collection_interval_ms: u64,
/// The maximum number of concurrent subscriptions
pub max_concurrent_subscriptions: u64,
/// Maximum number of blocks to keep in memory (e.g., pending blocks, ordered blocks, etc.)
pub max_num_pending_blocks: u64,
/// Maximum timeout (in milliseconds) for active subscriptions
pub max_subscription_timeout_ms: u64,
/// Maximum timeout (in milliseconds) we'll wait for the synced version to
/// increase before terminating the active subscription.
pub max_synced_version_timeout_ms: u64,
/// Interval (in milliseconds) to check progress of the consensus observer
pub progress_check_interval_ms: u64,

/// The maximum number of concurrent subscriptions
pub max_concurrent_subscriptions: u64,
/// Maximum timeout (in milliseconds) we'll wait for the synced version to
/// increase before terminating the active subscription.
pub max_subscription_sync_timeout_ms: u64,
/// Maximum message timeout (in milliseconds) for active subscriptions
pub max_subscription_timeout_ms: u64,
/// Interval (in milliseconds) to check for subscription related peer changes
pub subscription_peer_change_interval_ms: u64,
/// Interval (in milliseconds) to refresh the subscription
pub subscription_refresh_interval_ms: u64,

/// Duration (in milliseconds) to require state sync to synchronize when in fallback mode
pub observer_fallback_duration_ms: u64,
/// Duration (in milliseconds) we'll wait for syncing progress before entering fallback mode
pub observer_fallback_sync_threshold_ms: u64,
}

impl Default for ConsensusObserverConfig {
Expand All @@ -56,13 +62,15 @@ impl Default for ConsensusObserverConfig {
max_parallel_serialization_tasks: num_cpus::get(), // Default to the number of CPUs
network_request_timeout_ms: 5_000, // 5 seconds
garbage_collection_interval_ms: 60_000, // 60 seconds
max_concurrent_subscriptions: 2, // 2 streams should be sufficient
max_num_pending_blocks: 100, // 100 blocks
max_subscription_timeout_ms: 30_000, // 30 seconds
max_synced_version_timeout_ms: 60_000, // 60 seconds
progress_check_interval_ms: 5_000, // 5 seconds
max_concurrent_subscriptions: 2, // 2 streams should be sufficient
max_subscription_sync_timeout_ms: 15_000, // 15 seconds
max_subscription_timeout_ms: 15_000, // 15 seconds
subscription_peer_change_interval_ms: 60_000, // 1 minute
subscription_refresh_interval_ms: 300_000, // 5 minutes
observer_fallback_duration_ms: 600_000, // 10 minutes
observer_fallback_sync_threshold_ms: 30_000, // 30 seconds
}
}
}
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ aptos-runtimes = { workspace = true }
aptos-safety-rules = { workspace = true }
aptos-schemadb = { workspace = true }
aptos-secure-storage = { workspace = true }
aptos-short-hex-str = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-temppath = { workspace = true }
aptos-time-service = { workspace = true }
Expand Down
7 changes: 2 additions & 5 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct PipelinedBlock {
/// The state_compute_result is calculated for all the pending blocks prior to insertion to
/// the tree. The execution results are not persisted: they're recalculated again for the
/// pending blocks upon restart.
#[derivative(PartialEq = "ignore")]
state_compute_result: StateComputeResult,
randomness: OnceCell<Randomness>,
pipeline_insertion_time: OnceCell<Instant>,
Expand All @@ -62,14 +63,12 @@ impl Serialize for PipelinedBlock {
struct SerializedBlock<'a> {
block: &'a Block,
input_transactions: &'a Vec<SignedTransaction>,
state_compute_result: &'a StateComputeResult,
randomness: Option<&'a Randomness>,
}

let serialized = SerializedBlock {
block: &self.block,
input_transactions: &self.input_transactions,
state_compute_result: &self.state_compute_result,
randomness: self.randomness.get(),
};
serialized.serialize(serializer)
Expand All @@ -86,21 +85,19 @@ impl<'de> Deserialize<'de> for PipelinedBlock {
struct SerializedBlock {
block: Block,
input_transactions: Vec<SignedTransaction>,
state_compute_result: StateComputeResult,
randomness: Option<Randomness>,
}

let SerializedBlock {
block,
input_transactions,
state_compute_result,
randomness,
} = SerializedBlock::deserialize(deserializer)?;

let block = PipelinedBlock {
block,
input_transactions,
state_compute_result,
state_compute_result: StateComputeResult::new_dummy(),
randomness: OnceCell::new(),
pipeline_insertion_time: OnceCell::new(),
execution_summary: Arc::new(OnceCell::new()),
Expand Down
24 changes: 11 additions & 13 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
use aptos_executor_types::StateComputeResult;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_types::ledger_info::LedgerInfoWithSignatures;
use aptos_types::{
ledger_info::LedgerInfoWithSignatures, proof::accumulator::InMemoryTransactionAccumulator,
};
use futures::executor::block_on;
#[cfg(test)]
use std::collections::VecDeque;
Expand Down Expand Up @@ -175,18 +177,14 @@ impl BlockStore {
root_metadata.accu_hash,
);

let result = StateComputeResult::new(
root_metadata.accu_hash,
root_metadata.frozen_root_hashes,
root_metadata.num_leaves, /* num_leaves */
vec![], /* parent_root_hashes */
0, /* parent_num_leaves */
None, /* epoch_state */
vec![], /* compute_status */
vec![], /* txn_infos */
vec![], /* reconfig_events */
None, // block end info
);
let result = StateComputeResult::new_empty(Arc::new(
InMemoryTransactionAccumulator::new(
root_metadata.frozen_root_hashes,
root_metadata.num_leaves,
)
.expect("Failed to recover accumulator."),
));
assert_eq!(result.root_hash(), root_metadata.accu_hash);

let pipelined_root_block = PipelinedBlock::new(
*root_block,
Expand Down
3 changes: 1 addition & 2 deletions consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use aptos_types::{
block_info::{BlockInfo, Round},
ledger_info::LedgerInfoWithSignatures,
};
use mirai_annotations::{checked_verify_eq, precondition};
use mirai_annotations::precondition;
use std::{
collections::{vec_deque::VecDeque, BTreeMap, HashMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -249,7 +249,6 @@ impl BlockTree {
existing_block,
block_id,
block);
checked_verify_eq!(existing_block.compute_result(), block.compute_result());
Ok(existing_block)
} else {
match self.get_linkable_block_mut(&block.parent_id()) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl BlockStore {
storage.save_tree(blocks.clone(), quorum_certs.clone())?;

execution_client
.sync_to(highest_commit_cert.ledger_info().clone())
.sync_to_target(highest_commit_cert.ledger_info().clone())
.await?;

// we do not need to update block_tree.highest_commit_decision_ledger_info here
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/consensus_observer/common/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub enum Error {
#[error("Network error: {0}")]
NetworkError(String),

#[error("Consensus observer progress stopped: {0}")]
ObserverProgressStopped(String),

#[error("Aptos network rpc error: {0}")]
RpcError(#[from] RpcError),

Expand Down Expand Up @@ -40,6 +43,7 @@ impl Error {
match self {
Self::InvalidMessageError(_) => "invalid_message_error",
Self::NetworkError(_) => "network_error",
Self::ObserverProgressStopped(_) => "observer_progress_stopped",
Self::RpcError(_) => "rpc_error",
Self::SubscriptionDisconnected(_) => "subscription_disconnected",
Self::SubscriptionProgressStopped(_) => "subscription_progress_stopped",
Expand Down
16 changes: 15 additions & 1 deletion consensus/src/consensus_observer/common/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_metrics_core::{
};
use once_cell::sync::Lazy;

// Useful metric labels
// Useful observer metric labels
pub const BLOCK_PAYLOAD_LABEL: &str = "block_payload";
pub const COMMIT_DECISION_LABEL: &str = "commit_decision";
pub const COMMITTED_BLOCKS_LABEL: &str = "committed_blocks";
Expand All @@ -21,6 +21,10 @@ pub const PENDING_BLOCK_ENTRIES_LABEL: &str = "pending_block_entries";
pub const PENDING_BLOCKS_LABEL: &str = "pending_blocks";
pub const STORED_PAYLOADS_LABEL: &str = "stored_payloads";

// Useful state sync metric labels
pub const STATE_SYNCING_FOR_FALLBACK: &str = "sync_for_fallback";
pub const STATE_SYNCING_TO_COMMIT: &str = "sync_to_commit";

/// Counter for tracking created subscriptions for the consensus observer
pub static OBSERVER_CREATED_SUBSCRIPTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down Expand Up @@ -149,6 +153,16 @@ pub static OBSERVER_SENT_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Gauge for tracking when consensus observer has invoked state sync
pub static OBSERVER_STATE_SYNC_EXECUTING: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"consensus_observer_state_sync_executing",
"Gauge for tracking when consensus observer has invoked state sync",
&["syncing_mode"]
)
.unwrap()
});

/// Counter for tracking terminated subscriptions for the consensus observer
pub static OBSERVER_TERMINATED_SUBSCRIPTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down
Loading

0 comments on commit 06dd8be

Please sign in to comment.