Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ShardsManager] Move ShardsManager to its own actor #8329

Merged
merged 2 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/chunks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ near-network = { path = "../network" }
near-o11y = { path = "../../core/o11y" }
near-chain = { path = "../chain" }
near-pool = { path = "../pool" }
near-performance-metrics = { path = "../../utils/near-performance-metrics" }

[dev-dependencies]
assert_matches.workspace = true
Expand Down
133 changes: 133 additions & 0 deletions chain/chunks/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use actix::Message;
use near_chain::types::Tip;
use near_network::types::MsgRecipient;
use near_primitives::{
hash::CryptoHash,
merkle::MerklePath,
receipt::Receipt,
sharding::{EncodedShardChunk, PartialEncodedChunk, ShardChunkHeader},
types::EpochId,
};

mzhangmzz marked this conversation as resolved.
Show resolved Hide resolved
/// The interface of ShardsManager that faces the Client.
/// It is thread safe (messages are posted to a dedicated thread that runs the
/// ShardsManager).
/// See also ShardsManagerAdapterForNetwork, which is the interface given to
/// the networking component.
/// See also ClientAdapterForShardsManager, which is the other direction - the
/// interface of the Client given to the ShardsManager.
pub trait ShardsManagerAdapterForClient: Send + Sync + 'static {
/// Processes the header seen from a block we received, if we have not already received the
/// header earlier from the chunk producer (via PartialEncodedChunk).
/// This can happen if we are not a validator, or if we are a validator but somehow missed
/// the chunk producer's message.
fn process_chunk_header_from_block(&self, chunk_header: &ShardChunkHeader);
/// Lets the ShardsManager know that the chain heads have been updated.
/// For a discussion of head vs header_head, see #8154.
fn update_chain_heads(&self, head: Tip, header_head: Tip);
/// As a chunk producer, distributes the given chunk to the other validators (by sending
/// PartialEncodedChunk messages to them).
/// The partial_chunk and encoded_chunk represent the same data, just in different formats.
///
/// TODO(#8422): the arguments contain redundant information.
fn distribute_encoded_chunk(
&self,
partial_chunk: PartialEncodedChunk,
encoded_chunk: EncodedShardChunk,
merkle_paths: Vec<MerklePath>,
outgoing_receipts: Vec<Receipt>,
);
/// Requests the given chunks to be fetched from other nodes.
/// Only the parts and receipt proofs that this node cares about will be fetched; when
/// the fetching is complete, a response of ClientAdapterForShardsManager::did_complete_chunk
/// will be sent back to the client.
fn request_chunks(&self, chunks_to_request: Vec<ShardChunkHeader>, prev_hash: CryptoHash);
/// Similar to request_chunks, but for orphan chunks. Since the chunk belongs to an orphan
/// block, the previous block is not known and thus we cannot derive epoch information from
/// that block. Therefore, an ancestor_hash must be provided which must correspond to a
/// block that is an ancestor of these chunks' blocks, and which must be in the same epoch.
fn request_chunks_for_orphan(
&self,
chunks_to_request: Vec<ShardChunkHeader>,
epoch_id: EpochId,
ancestor_hash: CryptoHash,
);
/// In response to processing a block, checks if there are any chunks that should have been
/// complete but are just waiting on the previous block to become available (e.g. a chunk
/// requested by request_chunks_for_orphan, which then received all needed parts and receipt
/// proofs, but cannot be marked as complete because the previous block isn't available),
/// and completes them if so.
fn check_incomplete_chunks(&self, prev_block_hash: CryptoHash);
}

#[derive(Message)]
#[rtype(result = "()")]
pub enum ShardsManagerRequestFromClient {
ProcessChunkHeaderFromBlock(ShardChunkHeader),
UpdateChainHeads {
head: Tip,
header_head: Tip,
},
DistributeEncodedChunk {
partial_chunk: PartialEncodedChunk,
encoded_chunk: EncodedShardChunk,
merkle_paths: Vec<MerklePath>,
outgoing_receipts: Vec<Receipt>,
},
RequestChunks {
chunks_to_request: Vec<ShardChunkHeader>,
prev_hash: CryptoHash,
},
RequestChunksForOrphan {
chunks_to_request: Vec<ShardChunkHeader>,
epoch_id: EpochId,
ancestor_hash: CryptoHash,
},
CheckIncompleteChunks(CryptoHash),
}

impl<A: MsgRecipient<ShardsManagerRequestFromClient>> ShardsManagerAdapterForClient for A {
fn process_chunk_header_from_block(&self, chunk_header: &ShardChunkHeader) {
self.do_send(ShardsManagerRequestFromClient::ProcessChunkHeaderFromBlock(
chunk_header.clone(),
));
}
fn update_chain_heads(&self, head: Tip, header_head: Tip) {
self.do_send(ShardsManagerRequestFromClient::UpdateChainHeads { head, header_head });
}
fn distribute_encoded_chunk(
&self,
partial_chunk: PartialEncodedChunk,
encoded_chunk: EncodedShardChunk,
merkle_paths: Vec<MerklePath>,
outgoing_receipts: Vec<Receipt>,
) {
self.do_send(ShardsManagerRequestFromClient::DistributeEncodedChunk {
partial_chunk,
encoded_chunk,
merkle_paths,
outgoing_receipts,
});
}
fn request_chunks(&self, chunks_to_request: Vec<ShardChunkHeader>, prev_hash: CryptoHash) {
self.do_send(ShardsManagerRequestFromClient::RequestChunks {
chunks_to_request,
prev_hash,
});
}
fn request_chunks_for_orphan(
&self,
chunks_to_request: Vec<ShardChunkHeader>,
epoch_id: EpochId,
ancestor_hash: CryptoHash,
) {
self.do_send(ShardsManagerRequestFromClient::RequestChunksForOrphan {
chunks_to_request,
epoch_id,
ancestor_hash,
});
}
fn check_incomplete_chunks(&self, prev_block_hash: CryptoHash) {
self.do_send(ShardsManagerRequestFromClient::CheckIncompleteChunks(prev_block_hash));
}
}
14 changes: 13 additions & 1 deletion chain/chunks/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,25 @@ use near_primitives::{
types::{AccountId, ShardId},
};

pub trait ClientAdapterForShardsManager {
pub trait ClientAdapterForShardsManager: Send + Sync + 'static {
/// Notifies the client that the ShardsManager has collected a complete chunk.
/// Note that this does NOT mean that the chunk is fully constructed. If we are
/// not tracking the shard this chunk is in, then being complete only means that
/// we have received the parts we own, and the receipt proofs corresponding to
/// shards that we do track. On the other hand if we are tracking the shard this
/// chunk is in, then being complete does mean having the full chunk, in which
/// case the shard_chunk is also provided.
fn did_complete_chunk(
&self,
partial_chunk: PartialEncodedChunk,
shard_chunk: Option<ShardChunk>,
);
/// Notifies the client that we have collected a full chunk but the chunk cannot
/// be properly decoded.
fn saw_invalid_chunk(&self, chunk: EncodedShardChunk);
/// Notifies the client that the chunk header is ready for inclusion into a new
/// block, so that if we are a block producer, we may create a block that contains
/// this chunk now. The producer of this chunk is also provided.
fn chunk_header_ready_for_inclusion(
&self,
chunk_header: ShardChunkHeader,
Expand Down
Loading