Skip to content

Commit

Permalink
remove global OnceCell ChainIdentifier, and move into AuthorityState.…
Browse files Browse the repository at this point in the history
… When SuiNode is instantiated, have it pass to the SnapshotUploader, so the latter doesnt depend on the global

eagerly set chain_identifier on AuthorityState
  • Loading branch information
wlmyng committed Dec 27, 2024
1 parent aa99382 commit 6828218
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 53 deletions.
25 changes: 8 additions & 17 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use sui_types::layout_resolver::LayoutResolver;
use sui_types::messages_consensus::{AuthorityCapabilitiesV1, AuthorityCapabilitiesV2};
use sui_types::object::bounded_visitor::BoundedVisitor;
use sui_types::transaction_executor::SimulateTransactionResult;
use tap::{TapFallible, TapOptional};
use tap::TapFallible;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::JoinHandle;
Expand All @@ -70,7 +70,6 @@ use mysten_metrics::{monitored_scope, spawn_monitored_task};
use crate::jsonrpc_index::IndexStore;
use crate::jsonrpc_index::{CoinInfo, ObjectIndexChanges};
use mysten_common::debug_fatal;
use once_cell::sync::OnceCell;
use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
use sui_archival::reader::ArchiveReaderBalancer;
use sui_config::genesis::Genesis;
Expand Down Expand Up @@ -216,8 +215,6 @@ pub mod transaction_deferral;
pub(crate) mod authority_store;
pub mod backpressure;

pub static CHAIN_IDENTIFIER: OnceCell<ChainIdentifier> = OnceCell::new();

/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
pub struct AuthorityMetrics {
tx_orders: IntCounter,
Expand Down Expand Up @@ -820,6 +817,9 @@ pub struct AuthorityState {
pub overload_info: AuthorityOverloadInfo,

pub validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,

/// The chain identifier is derived from the digest of the genesis checkpoint.
chain_identifier: ChainIdentifier,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -2854,6 +2854,7 @@ impl AuthorityState {
indirect_objects_threshold: usize,
archive_readers: ArchiveReaderBalancer,
validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
chain_identifier: ChainIdentifier,
) -> Arc<Self> {
Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());

Expand Down Expand Up @@ -2910,6 +2911,7 @@ impl AuthorityState {
config,
overload_info: AuthorityOverloadInfo::default(),
validator_tx_finalizer,
chain_identifier,
});

// Start a task to execute ready certificates.
Expand Down Expand Up @@ -3470,19 +3472,8 @@ impl AuthorityState {
}

/// Chain Identifier is the digest of the genesis checkpoint.
pub fn get_chain_identifier(&self) -> Option<ChainIdentifier> {
if let Some(digest) = CHAIN_IDENTIFIER.get() {
return Some(*digest);
}

let checkpoint = self
.get_checkpoint_by_sequence_number(0)
.tap_err(|e| error!("Failed to get genesis checkpoint: {:?}", e))
.ok()?
.tap_none(|| error!("Genesis checkpoint is missing from DB"))?;
// It's ok if the value is already set due to data races.
let _ = CHAIN_IDENTIFIER.set(ChainIdentifier::from(*checkpoint.digest()));
Some(ChainIdentifier::from(*checkpoint.digest()))
pub fn get_chain_identifier(&self) -> ChainIdentifier {
self.chain_identifier
}

#[instrument(level = "trace", skip_all)]
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ impl<'a> TestAuthorityBuilder<'a> {
config.authority_overload_config = authority_overload_config;
config.authority_store_pruning_config = pruning_config;

let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());

let state = AuthorityState::new(
name,
secret,
Expand All @@ -329,6 +331,7 @@ impl<'a> TestAuthorityBuilder<'a> {
usize::MAX,
ArchiveReaderBalancer::default(),
None,
chain_identifier,
)
.await;

Expand Down
5 changes: 1 addition & 4 deletions crates/sui-core/src/execution_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ pub async fn execution_process(
return;
};

state
.get_chain_identifier()
.map(|chain_id| chain_id.chain())
== Some(Chain::Mainnet)
state.get_chain_identifier().chain() == Chain::Mainnet
};

// Loop whenever there is a signal that a new transactions is ready to process.
Expand Down
8 changes: 2 additions & 6 deletions crates/sui-core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,8 @@ impl RpcStateReader for RestReadStore {
}
}

fn get_chain_identifier(
&self,
) -> sui_types::storage::error::Result<sui_types::digests::ChainIdentifier> {
self.state
.get_chain_identifier()
.ok_or_else(|| StorageError::missing("unable to query chain identifier"))
fn get_chain_identifier(&self) -> Result<sui_types::digests::ChainIdentifier> {
Ok(self.state.get_chain_identifier())
}

fn indexes(&self) -> Option<&dyn RpcIndexes> {
Expand Down
5 changes: 1 addition & 4 deletions crates/sui-json-rpc/src/authority_state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use arc_swap::Guard;
use async_trait::async_trait;
use move_core_types::language_storage::TypeTag;
Expand Down Expand Up @@ -531,9 +530,7 @@ impl StateRead for AuthorityState {
}

fn get_chain_identifier(&self) -> StateReadResult<ChainIdentifier> {
Ok(self
.get_chain_identifier()
.ok_or(anyhow!("Chain identifier not found"))?)
Ok(self.get_chain_identifier())
}
}

Expand Down
24 changes: 12 additions & 12 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use sui_core::authority::authority_store_tables::AuthorityPerpetualTablesOptions
use sui_core::authority::backpressure::BackpressureManager;
use sui_core::authority::epoch_start_configuration::EpochFlag;
use sui_core::authority::RandomnessRoundReceiver;
use sui_core::authority::CHAIN_IDENTIFIER;
use sui_core::consensus_adapter::ConsensusClient;
use sui_core::consensus_manager::UpdatableConsensusClient;
use sui_core::epoch::randomness::RandomnessManager;
Expand Down Expand Up @@ -608,8 +607,6 @@ impl SuiNode {
};

let chain_identifier = ChainIdentifier::from(*genesis.checkpoint().digest());
// It's ok if the value is already set due to data races.
let _ = CHAIN_IDENTIFIER.set(chain_identifier);

info!("creating archive reader");
// Create network
Expand Down Expand Up @@ -659,8 +656,12 @@ impl SuiNode {

info!("start snapshot upload");
// Start uploading state snapshot to remote store
let state_snapshot_handle =
Self::start_state_snapshot(&config, &prometheus_registry, checkpoint_store.clone())?;
let state_snapshot_handle = Self::start_state_snapshot(
&config,
&prometheus_registry,
checkpoint_store.clone(),
chain_identifier,
)?;

// Start uploading db checkpoints to remote store
info!("start db checkpoint");
Expand Down Expand Up @@ -709,6 +710,7 @@ impl SuiNode {
config.indirect_objects_threshold,
archive_readers,
validator_tx_finalizer,
chain_identifier,
)
.await;
// ensure genesis txn was executed
Expand Down Expand Up @@ -961,6 +963,7 @@ impl SuiNode {
config: &NodeConfig,
prometheus_registry: &Registry,
checkpoint_store: Arc<CheckpointStore>,
chain_identifier: ChainIdentifier,
) -> Result<Option<tokio::sync::broadcast::Sender<()>>> {
if let Some(remote_store_config) = &config.state_snapshot_write_config.object_store_config {
let snapshot_uploader = StateSnapshotUploader::new(
Expand All @@ -970,6 +973,7 @@ impl SuiNode {
60,
prometheus_registry,
checkpoint_store,
chain_identifier,
)?;
Ok(Some(snapshot_uploader.start()))
} else {
Expand Down Expand Up @@ -1989,8 +1993,8 @@ fn build_kv_store(
)
})?;

let network_str = match state.get_chain_identifier().map(|c| c.chain()) {
Some(Chain::Mainnet) => "/mainnet",
let network_str = match state.get_chain_identifier().chain() {
Chain::Mainnet => "/mainnet",
_ => {
info!("using local db only for kv store");
return Ok(Arc::new(db_store));
Expand Down Expand Up @@ -2078,11 +2082,7 @@ pub async fn build_http_server(
reverse_registry_id,
)
} else {
match CHAIN_IDENTIFIER
.get()
.expect("chain_id should be initialized")
.chain()
{
match state.get_chain_identifier().chain() {
Chain::Mainnet => sui_json_rpc::name_service::NameServiceConfig::mainnet(),
Chain::Testnet => sui_json_rpc::name_service::NameServiceConfig::testnet(),
Chain::Unknown => sui_json_rpc::name_service::NameServiceConfig::default(),
Expand Down
8 changes: 7 additions & 1 deletion crates/sui-snapshot/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use sui_storage::object_store::util::{
run_manifest_update_loop,
};
use sui_storage::FileCompression;
use sui_types::digests::ChainIdentifier;
use sui_types::messages_checkpoint::CheckpointCommitment::ECMHLiveObjectSetDigest;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -66,6 +67,9 @@ pub struct StateSnapshotUploader {
/// Time interval to check for presence of new db checkpoint
interval: Duration,
metrics: Arc<StateSnapshotUploaderMetrics>,
/// The chain identifier is derived from the genesis checkpoint and used to identify the
/// network.
chain_identifier: ChainIdentifier,
}

impl StateSnapshotUploader {
Expand All @@ -76,6 +80,7 @@ impl StateSnapshotUploader {
interval_s: u64,
registry: &Registry,
checkpoint_store: Arc<CheckpointStore>,
chain_identifier: ChainIdentifier,
) -> Result<Arc<Self>> {
let db_checkpoint_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::File),
Expand All @@ -96,6 +101,7 @@ impl StateSnapshotUploader {
snapshot_store: snapshot_store_config.make()?,
interval: Duration::from_secs(interval_s),
metrics: StateSnapshotUploaderMetrics::new(registry),
chain_identifier,
}))
}

Expand Down Expand Up @@ -140,7 +146,7 @@ impl StateSnapshotUploader {
.expect("Expected at least one commitment")
.clone();
state_snapshot_writer
.write(*epoch, db, state_hash_commitment)
.write(*epoch, db, state_hash_commitment, self.chain_identifier)
.await?;
info!("State snapshot creation successful for epoch: {}", *epoch);
// Drop marker in the output directory that upload completed successfully
Expand Down
8 changes: 3 additions & 5 deletions crates/sui-snapshot/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
ManifestV1, FILE_MAX_BYTES, MAGIC_BYTES, MANIFEST_FILE_MAGIC, OBJECT_FILE_MAGIC,
OBJECT_REF_BYTES, REFERENCE_FILE_MAGIC, SEQUENCE_NUM_BYTES,
};
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use byteorder::{BigEndian, ByteOrder};
use fastcrypto::hash::MultisetHash;
use futures::StreamExt;
Expand All @@ -24,13 +24,13 @@ use std::path::PathBuf;
use std::sync::Arc;
use sui_config::object_storage_config::ObjectStoreConfig;
use sui_core::authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject};
use sui_core::authority::CHAIN_IDENTIFIER;
use sui_core::state_accumulator::StateAccumulator;
use sui_protocol_config::{ProtocolConfig, ProtocolVersion};
use sui_storage::blob::{Blob, BlobEncoding, BLOB_ENCODING_BYTES};
use sui_storage::object_store::util::{copy_file, delete_recursively, path_to_filesystem};
use sui_types::accumulator::Accumulator;
use sui_types::base_types::{ObjectID, ObjectRef};
use sui_types::digests::ChainIdentifier;
use sui_types::messages_checkpoint::ECMHLiveObjectSetDigest;
use sui_types::sui_system_state::get_sui_system_state;
use sui_types::sui_system_state::SuiSystemStateTrait;
Expand Down Expand Up @@ -260,13 +260,11 @@ impl StateSnapshotWriterV1 {
epoch: u64,
perpetual_db: Arc<AuthorityPerpetualTables>,
root_state_hash: ECMHLiveObjectSetDigest,
chain_identifier: ChainIdentifier,
) -> Result<()> {
let system_state_object = get_sui_system_state(&perpetual_db)?;

let protocol_version = system_state_object.protocol_version();
let chain_identifier = CHAIN_IDENTIFIER
.get()
.ok_or(anyhow!("No chain identifier found"))?;
let protocol_config = ProtocolConfig::get_for_version(
ProtocolVersion::new(protocol_version),
chain_identifier.chain(),
Expand Down
5 changes: 1 addition & 4 deletions crates/sui-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ struct IpResponse {
pub async fn send_telemetry_event(state: Arc<AuthorityState>, is_validator: bool) {
let git_rev = env!("CARGO_PKG_VERSION").to_string();
let ip_address = get_ip().await;
let chain_identifier = match state.get_chain_identifier() {
Some(chain_identifier) => chain_identifier.to_string(),
None => "Unknown".to_string(),
};
let chain_identifier = state.get_chain_identifier().to_string();
let since_the_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Now should be later than epoch!");
Expand Down

0 comments on commit 6828218

Please sign in to comment.