diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 255ce5ca7a5..07ecdd281ac 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -392,13 +392,14 @@ impl PeerActor { self.send_message_with_encoding(msg, Encoding::Borsh); } + #[tracing::instrument( + level = "trace", + target = "network", + "send_message_with_encoding", + skip_all, + fields(msg_type = msg.msg_variant()) + )] fn send_message_with_encoding(&self, msg: &PeerMessage, enc: Encoding) { - let msg_type: &str = msg.msg_variant(); - let _span = tracing::trace_span!( - target: "network", - "send_message_with_encoding", - msg_type) - .entered(); // Skip sending block and headers if we received it or header from this peer. // Record block requests in tracker. match msg { @@ -426,6 +427,7 @@ impl PeerActor { tracing::trace!(target: "network", msg_len = bytes_len); self.framed.send(stream::Frame(bytes)); metrics::PEER_DATA_SENT_BYTES.inc_by(bytes_len as u64); + let msg_type = msg.msg_variant(); metrics::PEER_MESSAGE_SENT_BY_TYPE_TOTAL.with_label_values(&[msg_type]).inc(); metrics::PEER_MESSAGE_SENT_BY_TYPE_BYTES .with_label_values(&[msg_type]) @@ -930,6 +932,13 @@ impl PeerActor { } } + #[tracing::instrument( + level = "trace", + target = "network", + "receive_routed_message", + skip_all, + fields(body_type = <&'static str>::from(&body)), + )] async fn receive_routed_message( clock: &time::Clock, network_state: &NetworkState, @@ -937,101 +946,85 @@ impl PeerActor { msg_hash: CryptoHash, body: RoutedMessageBody, ) -> Result, ReasonForBan> { - let body_type: &'static str = (&body).into(); - let result = async { - Ok(match body { - RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state + Ok(match body { + RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state + .client + .send_async(TxStatusRequest { tx_hash, signer_account_id: account_id }) + .await + .ok() + .flatten() + .map(|response| RoutedMessageBody::TxStatusResponse(*response)), + RoutedMessageBody::TxStatusResponse(tx_result) => { + network_state.client.send_async(TxStatusResponse(tx_result.into())).await.ok(); + None + } + RoutedMessageBody::StateResponse(info) => { + network_state .client - .send_async(TxStatusRequest { tx_hash, signer_account_id: account_id }) + .send_async(StateResponse(StateResponseInfo::V1(info).into())) .await - .ok() - .flatten() - .map(|response| RoutedMessageBody::TxStatusResponse(*response)), - RoutedMessageBody::TxStatusResponse(tx_result) => { - network_state.client.send_async(TxStatusResponse(tx_result.into())).await.ok(); - None - } - RoutedMessageBody::StateResponse(info) => { - network_state - .client - .send_async(StateResponse(StateResponseInfo::V1(info).into())) - .await - .ok(); - None - } - RoutedMessageBody::BlockApproval(approval) => { - network_state.client.send_async(BlockApproval(approval, peer_id)).await.ok(); - None - } - RoutedMessageBody::ForwardTx(transaction) => { - network_state - .client - .send_async(ProcessTxRequest { - transaction, - is_forwarded: true, - check_only: false, - }) - .await - .ok(); - None - } - RoutedMessageBody::PartialEncodedChunkRequest(request) => { - network_state.shards_manager_adapter.send( - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest { - partial_encoded_chunk_request: request, - route_back: msg_hash, - }, - ); - None - } - RoutedMessageBody::PartialEncodedChunkResponse(response) => { - network_state.shards_manager_adapter.send( - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse { - partial_encoded_chunk_response: response, - received_time: clock.now().into(), - }, - ); - None - } - RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => { - network_state - .shards_manager_adapter - .send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk)); - None - } - RoutedMessageBody::PartialEncodedChunkForward(msg) => { - network_state.shards_manager_adapter.send( - ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg), - ); - None - } - RoutedMessageBody::ChunkStateWitness(witness) => { - network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok(); - None - } - RoutedMessageBody::ChunkEndorsement(endorsement) => { - network_state - .client - .send_async(ChunkEndorsementMessage(endorsement)) - .await - .ok(); - None - } - body => { - tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body); - None - } - }) - }; - // DO NOT turn this into a scoped .entered() span, because that is incompatible - // with async fns. - result - .instrument(tracing::trace_span!( - target: "network", - "receive_routed_message", - "type" = body_type, - )) - .await + .ok(); + None + } + RoutedMessageBody::BlockApproval(approval) => { + network_state.client.send_async(BlockApproval(approval, peer_id)).await.ok(); + None + } + RoutedMessageBody::ForwardTx(transaction) => { + network_state + .client + .send_async(ProcessTxRequest { + transaction, + is_forwarded: true, + check_only: false, + }) + .await + .ok(); + None + } + RoutedMessageBody::PartialEncodedChunkRequest(request) => { + network_state.shards_manager_adapter.send( + ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest { + partial_encoded_chunk_request: request, + route_back: msg_hash, + }, + ); + None + } + RoutedMessageBody::PartialEncodedChunkResponse(response) => { + network_state.shards_manager_adapter.send( + ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse { + partial_encoded_chunk_response: response, + received_time: clock.now().into(), + }, + ); + None + } + RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => { + network_state + .shards_manager_adapter + .send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk)); + None + } + RoutedMessageBody::PartialEncodedChunkForward(msg) => { + network_state + .shards_manager_adapter + .send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg)); + None + } + RoutedMessageBody::ChunkStateWitness(witness) => { + network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok(); + None + } + RoutedMessageBody::ChunkEndorsement(endorsement) => { + network_state.client.send_async(ChunkEndorsementMessage(endorsement)).await.ok(); + None + } + body => { + tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body); + None + } + }) } fn receive_message( @@ -1177,19 +1170,19 @@ impl PeerActor { )); } + #[tracing::instrument( + level = "trace", + target = "network", + "handle_msg_ready", + skip_all, + fields(msg_type = <&'static str>::from(&peer_msg)), + )] fn handle_msg_ready( &mut self, ctx: &mut actix::Context, conn: Arc, peer_msg: PeerMessage, ) { - let _span = tracing::trace_span!( - target: "network", - "handle_msg_ready", - "type" = <&PeerMessage as Into<&'static str>>::into(&peer_msg) - ) - .entered(); - #[cfg(test)] let message_processed_event = { let sink = self.network_state.config.event_sink.clone(); @@ -1486,79 +1479,70 @@ impl PeerActor { } } + #[tracing::instrument( + level = "trace", + target = "network", + "handle_sync_routing_table", + skip_all + )] async fn handle_sync_routing_table( clock: &time::Clock, network_state: &Arc, conn: Arc, rtu: RoutingTableUpdate, ) { - let result = async { - if let Err(ban_reason) = network_state.add_edges(&clock, rtu.edges.clone()).await { - conn.stop(Some(ban_reason)); - } - - // Also pass the edges to the V2 routing table - if let Err(ban_reason) = network_state - .update_routes(&clock, NetworkTopologyChange::EdgeNonceRefresh(rtu.edges)) - .await - { - conn.stop(Some(ban_reason)); - } - - // For every announce we received, we fetch the last announce with the same account_id - // that we already broadcasted. Client actor will both verify signatures of the received announces - // as well as filter out those which are older than the fetched ones (to avoid overriding - // a newer announce with an older one). - let old = network_state - .account_announcements - .get_broadcasted_announcements(rtu.accounts.iter().map(|a| &a.account_id)); - let accounts: Vec<(AnnounceAccount, Option)> = rtu - .accounts - .into_iter() - .map(|aa| { - let id = aa.account_id.clone(); - (aa, old.get(&id).map(|old| old.epoch_id.clone())) - }) - .collect(); - match network_state.client.send_async(AnnounceAccountRequest(accounts)).await { - Ok(Err(ban_reason)) => conn.stop(Some(ban_reason)), - Ok(Ok(accounts)) => network_state.add_accounts(accounts).await, - Err(_) => {} - } - }; - // DO NOT turn this into a scoped .entered() span, because that is incompatible + if let Err(ban_reason) = network_state.add_edges(&clock, rtu.edges.clone()).await { + conn.stop(Some(ban_reason)); + } - result - .instrument(tracing::trace_span!(target: "network", "handle_sync_routing_table")) + // Also pass the edges to the V2 routing table + if let Err(ban_reason) = network_state + .update_routes(&clock, NetworkTopologyChange::EdgeNonceRefresh(rtu.edges)) .await + { + conn.stop(Some(ban_reason)); + } + + // For every announce we received, we fetch the last announce with the same account_id + // that we already broadcasted. Client actor will both verify signatures of the received announces + // as well as filter out those which are older than the fetched ones (to avoid overriding + // a newer announce with an older one). + let old = network_state + .account_announcements + .get_broadcasted_announcements(rtu.accounts.iter().map(|a| &a.account_id)); + let accounts: Vec<(AnnounceAccount, Option)> = rtu + .accounts + .into_iter() + .map(|aa| { + let id = aa.account_id.clone(); + (aa, old.get(&id).map(|old| old.epoch_id.clone())) + }) + .collect(); + match network_state.client.send_async(AnnounceAccountRequest(accounts)).await { + Ok(Err(ban_reason)) => conn.stop(Some(ban_reason)), + Ok(Ok(accounts)) => network_state.add_accounts(accounts).await, + Err(_) => {} + } } + #[tracing::instrument(level = "trace", target = "network", "handle_distance_vector", skip_all)] async fn handle_distance_vector( clock: &time::Clock, network_state: &Arc, conn: Arc, distance_vector: DistanceVector, ) { - let result = async { - if conn.peer_info.id != distance_vector.root { - conn.stop(Some(ReasonForBan::InvalidDistanceVector)); - return; - } - - if let Err(ban_reason) = network_state - .update_routes( - &clock, - NetworkTopologyChange::PeerAdvertisedDistances(distance_vector), - ) - .await - { - conn.stop(Some(ban_reason)); - } - }; + if conn.peer_info.id != distance_vector.root { + conn.stop(Some(ReasonForBan::InvalidDistanceVector)); + return; + } - // DO NOT turn this into a scoped .entered() span, because that is incompatible - // with async fns. - result.instrument(tracing::trace_span!(target: "network", "handle_distance_vector")).await + if let Err(ban_reason) = network_state + .update_routes(&clock, NetworkTopologyChange::PeerAdvertisedDistances(distance_vector)) + .await + { + conn.stop(Some(ban_reason)); + } } }