Skip to content

Commit

Permalink
get headers for all shards
Browse files Browse the repository at this point in the history
  • Loading branch information
saketh-are committed Nov 22, 2024
1 parent 72e34af commit b5fc25e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 17 deletions.
18 changes: 11 additions & 7 deletions chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use near_primitives::{
},
types::{AccountId, ShardId},
};
use std::collections::HashMap;
use tracing::{debug, debug_span, error};

pub fn need_receipt(
Expand Down Expand Up @@ -54,18 +55,21 @@ pub fn get_shards_cares_about_this_or_next_epoch(
block_header: &BlockHeader,
shard_tracker: &ShardTracker,
epoch_manager: &dyn EpochManagerAdapter,
) -> Vec<ShardId> {
) -> HashMap<ShardId, bool> {
epoch_manager
.shard_ids(&block_header.epoch_id())
.unwrap()
.into_iter()
.filter(|&shard_id| {
cares_about_shard_this_or_next_epoch(
account_id,
block_header.prev_hash(),
.map(|shard_id| {
(
shard_id,
is_me,
shard_tracker,
cares_about_shard_this_or_next_epoch(
account_id,
block_header.prev_hash(),
shard_id,
is_me,
shard_tracker,
),
)
})
.collect()
Expand Down
21 changes: 14 additions & 7 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub struct CatchupState {
pub sync_status: StateSyncStatus,
/// Manages going back to apply chunks after state has been downloaded.
pub catchup: BlocksCatchUpState,
/// Determines which shards should be synced
pub tracking_shards: HashMap<ShardId, bool>,
}

pub struct Client {
Expand Down Expand Up @@ -2552,11 +2554,20 @@ impl Client {
let sync_hash = self.get_catchup_sync_hash(&mut state_sync_info, &epoch_first_block)?;
let Some(sync_hash) = sync_hash else { continue };

let CatchupState { state_sync, sync_status: status, catchup } = self
let CatchupState { state_sync, sync_status: status, catchup, tracking_shards } = self
.catchup_state_syncs
.entry(sync_hash)
.or_insert_with(|| {
tracing::debug!(target: "client", ?epoch_first_block, ?sync_hash, "inserting new state sync");
let mut tracking_shards = self.epoch_manager
.shard_ids(&block_header.epoch_id())
.unwrap()
.into_iter()
.map(|shard_id| (shard_id, false))
.collect::<HashMap<ShardId, bool>>();
for shard_id in state_sync_info.shards() {
tracking_shards.insert(*shard_id, true);
}
CatchupState {
state_sync: StateSync::new(
self.clock.clone(),
Expand All @@ -2580,6 +2591,7 @@ impl Client {
computation_tasks: Vec::new(),
},
catchup: BlocksCatchUpState::new(sync_hash, *epoch_id),
tracking_shards,
}
});

Expand All @@ -2588,12 +2600,7 @@ impl Client {
// Initialize the new shard sync to contain the shards to split at
// first. It will get updated with the shard sync download status
// for other shards later.
match state_sync.run(
sync_hash,
status,
highest_height_peers,
state_sync_info.shards(),
)? {
match state_sync.run(sync_hash, status, highest_height_peers, tracking_shards)? {
StateSyncResult::InProgress => {}
StateSyncResult::Completed => {
debug!(target: "catchup", "state sync completed now catch up blocks");
Expand Down
8 changes: 5 additions & 3 deletions chain/client/src/sync/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod util;

use crate::metrics;
use crate::sync::external::{create_bucket_readonly, ExternalConnection};
use crate::sync::state::shard::StateToSync;
use chain_requests::ChainSenderForStateSync;
use downloader::StateSyncDownloader;
use external::StateSyncDownloadSourceExternal;
Expand Down Expand Up @@ -211,7 +212,7 @@ impl StateSync {
sync_hash: CryptoHash,
sync_status: &mut StateSyncStatus,
highest_height_peers: &[HighestHeightPeerInfo],
tracking_shards: &[ShardId],
tracking_shards: &HashMap<ShardId, bool>,
) -> Result<StateSyncResult, near_chain::Error> {
let _span =
tracing::debug_span!(target: "sync", "run_sync", sync_type = "StateSync").entered();
Expand All @@ -222,7 +223,7 @@ impl StateSync {
);

let mut all_done = true;
for shard_id in tracking_shards {
for (shard_id, is_tracking) in tracking_shards {
let key = (sync_hash, *shard_id);
let status = match self.shard_syncs.entry(key) {
Entry::Occupied(mut entry) => match entry.get_mut().result.try_recv() {
Expand Down Expand Up @@ -255,6 +256,7 @@ impl StateSync {
self.store.clone(),
*shard_id,
sync_hash,
if *is_tracking { StateToSync::Full } else { StateToSync::HeaderOnly },
self.downloader.clone(),
self.runtime.clone(),
self.epoch_manager.clone(),
Expand Down Expand Up @@ -287,7 +289,7 @@ impl StateSync {
// If a shard completed syncing, we just remove it. We will not be syncing it again the next time around,
// because we would've marked it as completed in the status for that shard.
self.shard_syncs.retain(|(existing_sync_hash, existing_shard_id), _v| {
tracking_shards.contains(existing_shard_id) && existing_sync_hash == &sync_hash
tracking_shards.contains_key(existing_shard_id) && existing_sync_hash == &sync_hash
});

sync_status.download_tasks = self.downloading_task_tracker.statuses();
Expand Down
13 changes: 13 additions & 0 deletions chain/client/src/sync/state/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;

#[derive(Debug, Clone)]
pub enum StateToSync {
HeaderOnly,
Full,
}

pub(super) struct StateSyncShardHandle {
pub status: Arc<Mutex<ShardSyncStatus>>,
pub result: oneshot::Receiver<Result<(), near_chain::Error>>,
Expand Down Expand Up @@ -58,6 +64,7 @@ pub(super) async fn run_state_sync_for_shard(
store: Store,
shard_id: ShardId,
sync_hash: CryptoHash,
state_to_sync: StateToSync,
downloader: Arc<StateSyncDownloader>,
runtime: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
Expand All @@ -70,6 +77,12 @@ pub(super) async fn run_state_sync_for_shard(
tracing::info!("Running state sync for shard {}", shard_id);
*status.lock().unwrap() = ShardSyncStatus::StateDownloadHeader;
let header = downloader.ensure_shard_header(shard_id, sync_hash, cancel.clone()).await?;
match state_to_sync {
StateToSync::HeaderOnly => {
return Ok(());
}
StateToSync::Full => {}
};
let state_root = header.chunk_prev_state_root();
let num_parts = header.num_state_parts();
let block_header =
Expand Down

0 comments on commit b5fc25e

Please sign in to comment.