Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Prepare for network protocol version upgrades #5084

Merged
merged 26 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2bd9cb1
explicitly tag network requests with version
rphmeier Mar 11, 2022
2f4464d
fmt
rphmeier Mar 11, 2022
b1ed697
make PeerSet more aware of versioning
rphmeier Mar 11, 2022
2f61e49
some generalization of the network bridge to support upgrades
rphmeier Mar 11, 2022
0593b0c
walk back some renaming
rphmeier Mar 11, 2022
5ffc622
walk back some version stuff
rphmeier Mar 11, 2022
2e4f7b0
extract version from fallback
rphmeier Mar 11, 2022
f63da6a
remove V1 from NetworkBridgeUpdate
rphmeier Mar 11, 2022
98ee4da
Merge branch 'master' into rh-network-protocol-upgrades
rphmeier Apr 14, 2022
ebb52bb
add accidentally-removed timer
rphmeier Apr 14, 2022
812da02
implement focusing for versioned messages
rphmeier Apr 14, 2022
5f3de34
fmt
rphmeier Apr 14, 2022
d84984e
fix up network bridge & tests
rphmeier Apr 15, 2022
5cbd94b
remove inaccurate version check in bridge
rphmeier Apr 15, 2022
da8f234
remove some TODO [now]s
rphmeier Apr 15, 2022
91067d6
fix fallout in statement distribution
rphmeier Apr 15, 2022
708a6d6
fmt
rphmeier Apr 15, 2022
d46a3a8
fallout in gossip-support
rphmeier Apr 15, 2022
95d4463
fix fallout in collator-protocol
rphmeier Apr 15, 2022
5c811ec
fix fallout in bitfield-distribution
rphmeier Apr 15, 2022
5fc3b9c
fix fallout in approval-distribution
rphmeier Apr 15, 2022
3e08c99
Merge branch 'master' into rh-network-protocol-upgrades
rphmeier Apr 20, 2022
f30ead9
fmt
rphmeier Apr 20, 2022
382e45e
Merge branch 'master' into rh-network-protocol-upgrades
rphmeier Apr 21, 2022
ac07199
use never!
rphmeier Apr 21, 2022
3bc077e
fmt
rphmeier Apr 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions bridges/bin/rialto/node/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ where
/// Underlying authority discovery service.
pub authority_discovery_service: AuthorityDiscoveryService,
/// POV request receiver
pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingRequest>,
pub chunk_req_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingRequest>,
pub collation_req_receiver: IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
pub pov_req_receiver: IncomingRequestReceiver<request_v1::PoVFetchingV1Request>,
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
pub chunk_req_receiver: IncomingRequestReceiver<request_v1::ChunkFetchingV1Request>,
pub collation_req_receiver: IncomingRequestReceiver<request_v1::CollationFetchingV1Request>,
pub available_data_req_receiver:
IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
pub statement_req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingRequest>,
IncomingRequestReceiver<request_v1::AvailableDataFetchingV1Request>,
pub statement_req_receiver: IncomingRequestReceiver<request_v1::StatementFetchingV1Request>,
pub dispute_req_receiver: IncomingRequestReceiver<request_v1::DisputeRequest>,
/// Prometheus registry, commonly used for production systems, less so for test.
pub registry: Option<&'a Registry>,
Expand Down
4 changes: 2 additions & 2 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ pub struct AvailabilityDistributionSubsystem {
/// Receivers to be passed into availability distribution.
pub struct IncomingRequestReceivers {
/// Receiver for incoming PoV requests.
pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingV1Request>,
/// Receiver for incoming availability chunk requests.
pub chunk_req_receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
pub chunk_req_receiver: IncomingRequestReceiver<v1::ChunkFetchingV1Request>,
}

impl<Context> overseer::Subsystem<Context, SubsystemError> for AvailabilityDistributionSubsystem
Expand Down
18 changes: 9 additions & 9 deletions node/network/availability-distribution/src/pov_requester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::{channel::oneshot, future::BoxFuture, FutureExt};

use polkadot_node_network_protocol::request_response::{
outgoing::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse},
v1::{PoVFetchingV1Request, PoVFetchingV1Response},
OutgoingRequest, Recipient,
};
use polkadot_node_primitives::PoV;
Expand Down Expand Up @@ -60,9 +60,9 @@ where
.clone();
let (req, pending_response) = OutgoingRequest::new(
Recipient::Authority(authority_id.clone()),
PoVFetchingRequest { candidate_hash },
PoVFetchingV1Request { candidate_hash },
);
let full_req = Requests::PoVFetching(req);
let full_req = Requests::PoVFetchingV1(req);

ctx.send_message(NetworkBridgeMessage::SendRequests(
vec![full_req],
Expand All @@ -85,7 +85,7 @@ where
async fn fetch_pov_job(
pov_hash: Hash,
authority_id: AuthorityDiscoveryId,
pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>,
pending_response: BoxFuture<'static, std::result::Result<PoVFetchingV1Response, RequestError>>,
span: jaeger::Span,
tx: oneshot::Sender<PoV>,
metrics: Metrics,
Expand All @@ -98,15 +98,15 @@ async fn fetch_pov_job(
/// Do the actual work of waiting for the response.
async fn do_fetch_pov(
pov_hash: Hash,
pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>,
pending_response: BoxFuture<'static, std::result::Result<PoVFetchingV1Response, RequestError>>,
_span: jaeger::Span,
tx: oneshot::Sender<PoV>,
metrics: Metrics,
) -> Result<()> {
let response = pending_response.await.map_err(Error::FetchPoV);
let pov = match response {
Ok(PoVFetchingResponse::PoV(pov)) => pov,
Ok(PoVFetchingResponse::NoSuchPoV) => {
Ok(PoVFetchingV1Response::PoV(pov)) => pov,
Ok(PoVFetchingV1Response::NoSuchPoV) => {
metrics.on_fetched_pov(NOT_FOUND);
return Err(Error::NoSuchPoV)
},
Expand Down Expand Up @@ -200,10 +200,10 @@ mod tests {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(mut reqs, _)) => {
let req = assert_matches!(
reqs.pop(),
Some(Requests::PoVFetching(outgoing)) => {outgoing}
Some(Requests::PoVFetchingV1(outgoing)) => {outgoing}
);
req.pending_response
.send(Ok(PoVFetchingResponse::PoV(pov.clone()).encode()))
.send(Ok(PoVFetchingV1Response::PoV(pov.clone()).encode()))
.unwrap();
break
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use futures::{
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
outgoing::{OutgoingRequest, Recipient, RequestError, Requests},
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
v1::{ChunkFetchingV1Request, ChunkFetchingV1Response},
};
use polkadot_node_primitives::ErasureChunk;
use polkadot_primitives::v2::{
Expand Down Expand Up @@ -112,7 +112,7 @@ struct RunningTask {
group: Vec<AuthorityDiscoveryId>,

/// The request to send.
request: ChunkFetchingRequest,
request: ChunkFetchingV1Request,

/// Root hash, for verifying the chunks validity.
erasure_root: Hash,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl FetchTaskConfig {
group: session_info.validator_groups.get(core.group_responsible.0 as usize)
.expect("The responsible group of a candidate should be available in the corresponding session. qed.")
.clone(),
request: ChunkFetchingRequest {
request: ChunkFetchingV1Request {
candidate_hash: core.candidate_hash,
index: session_info.our_index,
},
Expand Down Expand Up @@ -284,8 +284,8 @@ impl RunningTask {
},
};
let chunk = match resp {
ChunkFetchingResponse::Chunk(resp) => resp.recombine_into_chunk(&self.request),
ChunkFetchingResponse::NoSuchChunk => {
ChunkFetchingV1Response::Chunk(resp) => resp.recombine_into_chunk(&self.request),
ChunkFetchingV1Response::NoSuchChunk => {
tracing::debug!(
target: LOG_TARGET,
validator = ?validator,
Expand Down Expand Up @@ -322,10 +322,10 @@ impl RunningTask {
async fn do_request(
&mut self,
validator: &AuthorityDiscoveryId,
) -> std::result::Result<ChunkFetchingResponse, TaskError> {
) -> std::result::Result<ChunkFetchingV1Response, TaskError> {
let (full_request, response_recv) =
OutgoingRequest::new(Recipient::Authority(validator.clone()), self.request);
let requests = Requests::ChunkFetching(full_request);
let requests = Requests::ChunkFetchingV1(full_request);

self.sender
.send(FromFetchTask::Message(AllMessages::NetworkBridge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fn task_does_not_accept_invalid_chunk() {
let mut m = HashMap::new();
m.insert(
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
ChunkFetchingResponse::Chunk(v1::ChunkResponse {
ChunkFetchingV1Response::Chunk(v1::ChunkResponse {
chunk: vec![1, 2, 3],
proof: Proof::try_from(vec![vec![9, 8, 2], vec![2, 3, 4]]).unwrap(),
}),
Expand Down Expand Up @@ -86,7 +86,7 @@ fn task_stores_valid_chunk() {
let mut m = HashMap::new();
m.insert(
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
ChunkFetchingResponse::Chunk(v1::ChunkResponse {
ChunkFetchingV1Response::Chunk(v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
}),
Expand Down Expand Up @@ -118,7 +118,7 @@ fn task_does_not_accept_wrongly_indexed_chunk() {
let mut m = HashMap::new();
m.insert(
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
ChunkFetchingResponse::Chunk(v1::ChunkResponse {
ChunkFetchingV1Response::Chunk(v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
}),
Expand Down Expand Up @@ -157,18 +157,18 @@ fn task_stores_valid_chunk_if_there_is_one() {
let mut m = HashMap::new();
m.insert(
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
ChunkFetchingResponse::Chunk(v1::ChunkResponse {
ChunkFetchingV1Response::Chunk(v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
}),
);
m.insert(
Recipient::Authority(Sr25519Keyring::Bob.public().into()),
ChunkFetchingResponse::NoSuchChunk,
ChunkFetchingV1Response::NoSuchChunk,
);
m.insert(
Recipient::Authority(Sr25519Keyring::Charlie.public().into()),
ChunkFetchingResponse::Chunk(v1::ChunkResponse {
ChunkFetchingV1Response::Chunk(v1::ChunkResponse {
chunk: vec![1, 2, 3],
proof: Proof::try_from(vec![vec![9, 8, 2], vec![2, 3, 4]]).unwrap(),
}),
Expand All @@ -188,7 +188,7 @@ fn task_stores_valid_chunk_if_there_is_one() {
struct TestRun {
/// Response to deliver for a given validator index.
/// None means, answer with `NetworkError`.
chunk_responses: HashMap<Recipient, ChunkFetchingResponse>,
chunk_responses: HashMap<Recipient, ChunkFetchingV1Response>,
/// Set of chunks that should be considered valid:
valid_chunks: HashSet<Vec<u8>>,
}
Expand Down Expand Up @@ -235,13 +235,13 @@ impl TestRun {
let mut valid_responses = 0;
for req in reqs {
let req = match req {
Requests::ChunkFetching(req) => req,
Requests::ChunkFetchingV1(req) => req,
_ => panic!("Unexpected request"),
};
let response =
self.chunk_responses.get(&req.peer).ok_or(network::RequestFailure::Refused);

if let Ok(ChunkFetchingResponse::Chunk(resp)) = &response {
if let Ok(ChunkFetchingV1Response::Chunk(resp)) = &response {
if self.valid_chunks.contains(&resp.chunk) {
valid_responses += 1;
}
Expand Down Expand Up @@ -278,7 +278,7 @@ fn get_test_running_task() -> (RunningTask, mpsc::Receiver<FromFetchTask>) {
session_index: 0,
group_index: GroupIndex(0),
group: Vec::new(),
request: ChunkFetchingRequest {
request: ChunkFetchingV1Request {
candidate_hash: CandidateHash([43u8; 32].into()),
index: ValidatorIndex(0),
},
Expand Down
20 changes: 10 additions & 10 deletions node/network/availability-distribution/src/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be
/// Receiver task to be forked as a separate task to handle PoV requests.
pub async fn run_pov_receiver<Sender>(
mut sender: Sender,
mut receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
mut receiver: IncomingRequestReceiver<v1::PoVFetchingV1Request>,
metrics: Metrics,
) where
Sender: SubsystemSender,
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn run_pov_receiver<Sender>(
/// Receiver task to be forked as a separate task to handle chunk requests.
pub async fn run_chunk_receiver<Sender>(
mut sender: Sender,
mut receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
mut receiver: IncomingRequestReceiver<v1::ChunkFetchingV1Request>,
metrics: Metrics,
) where
Sender: SubsystemSender,
Expand Down Expand Up @@ -102,7 +102,7 @@ pub async fn run_chunk_receiver<Sender>(
/// Any errors of `answer_pov_request` will simply be logged.
pub async fn answer_pov_request_log<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::PoVFetchingRequest>,
req: IncomingRequest<v1::PoVFetchingV1Request>,
metrics: &Metrics,
) where
Sender: SubsystemSender,
Expand All @@ -126,7 +126,7 @@ pub async fn answer_pov_request_log<Sender>(
/// Any errors of `answer_request` will simply be logged.
pub async fn answer_chunk_request_log<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::ChunkFetchingRequest>,
req: IncomingRequest<v1::ChunkFetchingV1Request>,
metrics: &Metrics,
) -> ()
where
Expand All @@ -151,7 +151,7 @@ where
/// Returns: `Ok(true)` if chunk was found and served.
pub async fn answer_pov_request<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::PoVFetchingRequest>,
req: IncomingRequest<v1::PoVFetchingV1Request>,
) -> Result<bool>
where
Sender: SubsystemSender,
Expand All @@ -163,10 +163,10 @@ where
let result = av_data.is_some();

let response = match av_data {
None => v1::PoVFetchingResponse::NoSuchPoV,
None => v1::PoVFetchingV1Response::NoSuchPoV,
Some(av_data) => {
let pov = Arc::try_unwrap(av_data.pov).unwrap_or_else(|a| (&*a).clone());
v1::PoVFetchingResponse::PoV(pov)
v1::PoVFetchingV1Response::PoV(pov)
},
};

Expand All @@ -179,7 +179,7 @@ where
/// Returns: `Ok(true)` if chunk was found and served.
pub async fn answer_chunk_request<Sender>(
sender: &mut Sender,
req: IncomingRequest<v1::ChunkFetchingRequest>,
req: IncomingRequest<v1::ChunkFetchingV1Request>,
) -> Result<bool>
where
Sender: SubsystemSender,
Expand All @@ -202,8 +202,8 @@ where
);

let response = match chunk {
None => v1::ChunkFetchingResponse::NoSuchChunk,
Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()),
None => v1::ChunkFetchingV1Response::NoSuchChunk,
Some(chunk) => v1::ChunkFetchingV1Response::Chunk(chunk.into()),
};

req.send_response(response).map_err(|_| JfyiError::SendResponse)?;
Expand Down
4 changes: 2 additions & 2 deletions node/network/availability-distribution/src/tests/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ async fn overseer_recv(rx: &mut mpsc::UnboundedReceiver<AllMessages>) -> AllMess
fn to_incoming_req(
executor: &TaskExecutor,
outgoing: Requests,
) -> IncomingRequest<v1::ChunkFetchingRequest> {
) -> IncomingRequest<v1::ChunkFetchingV1Request> {
match outgoing {
Requests::ChunkFetching(OutgoingRequest { payload, pending_response, .. }) => {
Requests::ChunkFetchingV1(OutgoingRequest { payload, pending_response, .. }) => {
let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>) =
oneshot::channel();
executor.spawn(
Expand Down
Loading