From c3f59a1df4ff7c1fbf027bcbeb1ee0df995aea63 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 19 Aug 2024 12:12:28 +1000 Subject: [PATCH] Fix range sync never evaluating request as finished, causing it to get stuck. --- .../src/sync/block_sidecar_coupling.rs | 71 +++++++++++++++++-- beacon_node/network/src/sync/manager.rs | 1 + .../network/src/sync/network_context.rs | 64 +++++++++-------- 3 files changed, 102 insertions(+), 34 deletions(-) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 0666a267173..966ce55fabe 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -26,6 +26,9 @@ pub struct RangeBlockComponentsRequest { /// Used to determine if this accumulator should wait for a sidecars stream termination expects_blobs: bool, expects_custody_columns: Option>, + /// Used to determine if the number of data columns stream termination this accumulator should + /// wait for. This may be less than the number of `expects_custody_columns` due to request batching. + num_custody_column_requests: Option, /// The peers the request was made to. pub(crate) peer_ids: Vec, } @@ -34,6 +37,7 @@ impl RangeBlockComponentsRequest { pub fn new( expects_blobs: bool, expects_custody_columns: Option>, + num_custody_column_requests: Option, peer_ids: Vec, ) -> Self { Self { @@ -45,6 +49,7 @@ impl RangeBlockComponentsRequest { custody_columns_streams_terminated: 0, expects_blobs, expects_custody_columns, + num_custody_column_requests, peer_ids, } } @@ -219,8 +224,8 @@ impl RangeBlockComponentsRequest { if self.expects_blobs && !self.is_sidecars_stream_terminated { return false; } - if let Some(expects_custody_columns) = &self.expects_custody_columns { - if self.custody_columns_streams_terminated < expects_custody_columns.len() { + if let Some(expects_custody_column_responses) = self.num_custody_column_requests { + if self.custody_columns_streams_terminated < expects_custody_column_responses { return false; } } @@ -241,7 +246,7 @@ mod tests { #[test] fn no_blobs_into_responses() { let peer_id = PeerId::random(); - let mut info = RangeBlockComponentsRequest::::new(false, None, vec![peer_id]); + let mut info = RangeBlockComponentsRequest::::new(false, None, None, vec![peer_id]); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng).0) @@ -261,7 +266,7 @@ mod tests { #[test] fn empty_blobs_into_responses() { let peer_id = PeerId::random(); - let mut info = RangeBlockComponentsRequest::::new(true, None, vec![peer_id]); + let mut info = RangeBlockComponentsRequest::::new(true, None, None, vec![peer_id]); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -292,6 +297,7 @@ mod tests { let mut info = RangeBlockComponentsRequest::::new( false, Some(expects_custody_columns.clone()), + Some(expects_custody_columns.len()), vec![PeerId::random()], ); let mut rng = XorShiftRng::from_seed([42; 16]); @@ -343,4 +349,61 @@ mod tests { // All completed construct response info.into_responses(&spec).unwrap(); } + + #[test] + fn rpc_block_with_custody_columns_batched() { + let spec = test_spec::(); + let expects_custody_columns = vec![1, 2, 3, 4]; + let num_of_data_column_requests = 2; + let mut info = RangeBlockComponentsRequest::::new( + false, + Some(expects_custody_columns.clone()), + Some(num_of_data_column_requests), + vec![PeerId::random()], + ); + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..4) + .map(|_| { + generate_rand_block_and_data_columns::( + ForkName::Deneb, + NumBlobs::Number(1), + &mut rng, + &spec, + ) + }) + .collect::>(); + + // Send blocks and complete terminate response + for block in &blocks { + info.add_block_response(Some(block.0.clone().into())); + } + info.add_block_response(None); + // Assert response is not finished + assert!(!info.is_finished()); + + // Send data columns interleaved + for block in &blocks { + for column in &block.1 { + if expects_custody_columns.contains(&column.index) { + info.add_data_column(Some(column.clone())); + } + } + } + + // Terminate the requests + for i in 0..num_of_data_column_requests { + info.add_data_column(None); + if i < num_of_data_column_requests - 1 { + assert!( + !info.is_finished(), + "requested should not be finished at loop {i}" + ); + } else { + assert!(info.is_finished(), "request should be finished at loop {i}"); + } + } + + // All completed construct response + info.into_responses(&spec).unwrap(); + } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 60e5699e1a0..d078ec52a9e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1161,6 +1161,7 @@ impl SyncManager { RangeBlockComponentsRequest::new( resp.expects_blobs, resp.expects_custody_columns, + None, vec![], ), ); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index f35ee145b19..8a75f7be448 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -380,41 +380,45 @@ impl SyncNetworkContext { false }; - let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) - { - let custody_indexes = self.network_globals().custody_columns(epoch); - - for (peer_id, columns_by_range_request) in - self.make_columns_by_range_requests(epoch, request, &custody_indexes)? - { - requested_peers.push(peer_id); - - debug!( - self.log, - "Sending DataColumnsByRange requests"; - "method" => "DataColumnsByRange", - "count" => columns_by_range_request.count, - "epoch" => epoch, - "columns" => ?columns_by_range_request.columns, - "peer" => %peer_id, - ); - - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::DataColumnsByRange(columns_by_range_request), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; - } + let (expects_custody_columns, num_of_custody_column_req) = + if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { + let custody_indexes = self.network_globals().custody_columns(epoch); + let mut num_of_custody_column_req = 0; + + for (peer_id, columns_by_range_request) in + self.make_columns_by_range_requests(epoch, request, &custody_indexes)? + { + requested_peers.push(peer_id); + + debug!( + self.log, + "Sending DataColumnsByRange requests"; + "method" => "DataColumnsByRange", + "count" => columns_by_range_request.count, + "epoch" => epoch, + "columns" => ?columns_by_range_request.columns, + "peer" => %peer_id, + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: Request::DataColumnsByRange(columns_by_range_request), + request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; + + num_of_custody_column_req += 1; + } - Some(custody_indexes) - } else { - None - }; + (Some(custody_indexes), Some(num_of_custody_column_req)) + } else { + (None, None) + }; let info = RangeBlockComponentsRequest::new( expected_blobs, expects_custody_columns, + num_of_custody_column_req, requested_peers, ); self.range_block_components_requests