Skip to content

Commit

Permalink
refactor: replace explicit spans with #[instrument] in peer_actor (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nagisa authored Mar 14, 2024
1 parent fc8743b commit 984f6ad
Showing 1 changed file with 146 additions and 162 deletions.
308 changes: 146 additions & 162 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,14 @@ impl PeerActor {
self.send_message_with_encoding(msg, Encoding::Borsh);
}

#[tracing::instrument(
level = "trace",
target = "network",
"send_message_with_encoding",
skip_all,
fields(msg_type = msg.msg_variant())
)]
fn send_message_with_encoding(&self, msg: &PeerMessage, enc: Encoding) {
let msg_type: &str = msg.msg_variant();
let _span = tracing::trace_span!(
target: "network",
"send_message_with_encoding",
msg_type)
.entered();
// Skip sending block and headers if we received it or header from this peer.
// Record block requests in tracker.
match msg {
Expand Down Expand Up @@ -426,6 +427,7 @@ impl PeerActor {
tracing::trace!(target: "network", msg_len = bytes_len);
self.framed.send(stream::Frame(bytes));
metrics::PEER_DATA_SENT_BYTES.inc_by(bytes_len as u64);
let msg_type = msg.msg_variant();
metrics::PEER_MESSAGE_SENT_BY_TYPE_TOTAL.with_label_values(&[msg_type]).inc();
metrics::PEER_MESSAGE_SENT_BY_TYPE_BYTES
.with_label_values(&[msg_type])
Expand Down Expand Up @@ -930,108 +932,99 @@ impl PeerActor {
}
}

#[tracing::instrument(
level = "trace",
target = "network",
"receive_routed_message",
skip_all,
fields(body_type = <&'static str>::from(&body)),
)]
async fn receive_routed_message(
clock: &time::Clock,
network_state: &NetworkState,
peer_id: PeerId,
msg_hash: CryptoHash,
body: RoutedMessageBody,
) -> Result<Option<RoutedMessageBody>, ReasonForBan> {
let body_type: &'static str = (&body).into();
let result = async {
Ok(match body {
RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state
Ok(match body {
RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state
.client
.send_async(TxStatusRequest { tx_hash, signer_account_id: account_id })
.await
.ok()
.flatten()
.map(|response| RoutedMessageBody::TxStatusResponse(*response)),
RoutedMessageBody::TxStatusResponse(tx_result) => {
network_state.client.send_async(TxStatusResponse(tx_result.into())).await.ok();
None
}
RoutedMessageBody::StateResponse(info) => {
network_state
.client
.send_async(TxStatusRequest { tx_hash, signer_account_id: account_id })
.send_async(StateResponse(StateResponseInfo::V1(info).into()))
.await
.ok()
.flatten()
.map(|response| RoutedMessageBody::TxStatusResponse(*response)),
RoutedMessageBody::TxStatusResponse(tx_result) => {
network_state.client.send_async(TxStatusResponse(tx_result.into())).await.ok();
None
}
RoutedMessageBody::StateResponse(info) => {
network_state
.client
.send_async(StateResponse(StateResponseInfo::V1(info).into()))
.await
.ok();
None
}
RoutedMessageBody::BlockApproval(approval) => {
network_state.client.send_async(BlockApproval(approval, peer_id)).await.ok();
None
}
RoutedMessageBody::ForwardTx(transaction) => {
network_state
.client
.send_async(ProcessTxRequest {
transaction,
is_forwarded: true,
check_only: false,
})
.await
.ok();
None
}
RoutedMessageBody::PartialEncodedChunkRequest(request) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
route_back: msg_hash,
},
);
None
}
RoutedMessageBody::PartialEncodedChunkResponse(response) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
received_time: clock.now().into(),
},
);
None
}
RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk));
None
}
RoutedMessageBody::PartialEncodedChunkForward(msg) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg),
);
None
}
RoutedMessageBody::ChunkStateWitness(witness) => {
network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok();
None
}
RoutedMessageBody::ChunkEndorsement(endorsement) => {
network_state
.client
.send_async(ChunkEndorsementMessage(endorsement))
.await
.ok();
None
}
body => {
tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body);
None
}
})
};
// DO NOT turn this into a scoped .entered() span, because that is incompatible
// with async fns.
result
.instrument(tracing::trace_span!(
target: "network",
"receive_routed_message",
"type" = body_type,
))
.await
.ok();
None
}
RoutedMessageBody::BlockApproval(approval) => {
network_state.client.send_async(BlockApproval(approval, peer_id)).await.ok();
None
}
RoutedMessageBody::ForwardTx(transaction) => {
network_state
.client
.send_async(ProcessTxRequest {
transaction,
is_forwarded: true,
check_only: false,
})
.await
.ok();
None
}
RoutedMessageBody::PartialEncodedChunkRequest(request) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
route_back: msg_hash,
},
);
None
}
RoutedMessageBody::PartialEncodedChunkResponse(response) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
received_time: clock.now().into(),
},
);
None
}
RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk));
None
}
RoutedMessageBody::PartialEncodedChunkForward(msg) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg));
None
}
RoutedMessageBody::ChunkStateWitness(witness) => {
network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok();
None
}
RoutedMessageBody::ChunkEndorsement(endorsement) => {
network_state.client.send_async(ChunkEndorsementMessage(endorsement)).await.ok();
None
}
body => {
tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body);
None
}
})
}

fn receive_message(
Expand Down Expand Up @@ -1177,19 +1170,19 @@ impl PeerActor {
));
}

#[tracing::instrument(
level = "trace",
target = "network",
"handle_msg_ready",
skip_all,
fields(msg_type = <&'static str>::from(&peer_msg)),
)]
fn handle_msg_ready(
&mut self,
ctx: &mut actix::Context<Self>,
conn: Arc<connection::Connection>,
peer_msg: PeerMessage,
) {
let _span = tracing::trace_span!(
target: "network",
"handle_msg_ready",
"type" = <&PeerMessage as Into<&'static str>>::into(&peer_msg)
)
.entered();

#[cfg(test)]
let message_processed_event = {
let sink = self.network_state.config.event_sink.clone();
Expand Down Expand Up @@ -1486,79 +1479,70 @@ impl PeerActor {
}
}

#[tracing::instrument(
level = "trace",
target = "network",
"handle_sync_routing_table",
skip_all
)]
async fn handle_sync_routing_table(
clock: &time::Clock,
network_state: &Arc<NetworkState>,
conn: Arc<connection::Connection>,
rtu: RoutingTableUpdate,
) {
let result = async {
if let Err(ban_reason) = network_state.add_edges(&clock, rtu.edges.clone()).await {
conn.stop(Some(ban_reason));
}

// Also pass the edges to the V2 routing table
if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::EdgeNonceRefresh(rtu.edges))
.await
{
conn.stop(Some(ban_reason));
}

// For every announce we received, we fetch the last announce with the same account_id
// that we already broadcasted. Client actor will both verify signatures of the received announces
// as well as filter out those which are older than the fetched ones (to avoid overriding
// a newer announce with an older one).
let old = network_state
.account_announcements
.get_broadcasted_announcements(rtu.accounts.iter().map(|a| &a.account_id));
let accounts: Vec<(AnnounceAccount, Option<EpochId>)> = rtu
.accounts
.into_iter()
.map(|aa| {
let id = aa.account_id.clone();
(aa, old.get(&id).map(|old| old.epoch_id.clone()))
})
.collect();
match network_state.client.send_async(AnnounceAccountRequest(accounts)).await {
Ok(Err(ban_reason)) => conn.stop(Some(ban_reason)),
Ok(Ok(accounts)) => network_state.add_accounts(accounts).await,
Err(_) => {}
}
};
// DO NOT turn this into a scoped .entered() span, because that is incompatible
if let Err(ban_reason) = network_state.add_edges(&clock, rtu.edges.clone()).await {
conn.stop(Some(ban_reason));
}

result
.instrument(tracing::trace_span!(target: "network", "handle_sync_routing_table"))
// Also pass the edges to the V2 routing table
if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::EdgeNonceRefresh(rtu.edges))
.await
{
conn.stop(Some(ban_reason));
}

// For every announce we received, we fetch the last announce with the same account_id
// that we already broadcasted. Client actor will both verify signatures of the received announces
// as well as filter out those which are older than the fetched ones (to avoid overriding
// a newer announce with an older one).
let old = network_state
.account_announcements
.get_broadcasted_announcements(rtu.accounts.iter().map(|a| &a.account_id));
let accounts: Vec<(AnnounceAccount, Option<EpochId>)> = rtu
.accounts
.into_iter()
.map(|aa| {
let id = aa.account_id.clone();
(aa, old.get(&id).map(|old| old.epoch_id.clone()))
})
.collect();
match network_state.client.send_async(AnnounceAccountRequest(accounts)).await {
Ok(Err(ban_reason)) => conn.stop(Some(ban_reason)),
Ok(Ok(accounts)) => network_state.add_accounts(accounts).await,
Err(_) => {}
}
}

#[tracing::instrument(level = "trace", target = "network", "handle_distance_vector", skip_all)]
async fn handle_distance_vector(
clock: &time::Clock,
network_state: &Arc<NetworkState>,
conn: Arc<connection::Connection>,
distance_vector: DistanceVector,
) {
let result = async {
if conn.peer_info.id != distance_vector.root {
conn.stop(Some(ReasonForBan::InvalidDistanceVector));
return;
}

if let Err(ban_reason) = network_state
.update_routes(
&clock,
NetworkTopologyChange::PeerAdvertisedDistances(distance_vector),
)
.await
{
conn.stop(Some(ban_reason));
}
};
if conn.peer_info.id != distance_vector.root {
conn.stop(Some(ReasonForBan::InvalidDistanceVector));
return;
}

// DO NOT turn this into a scoped .entered() span, because that is incompatible
// with async fns.
result.instrument(tracing::trace_span!(target: "network", "handle_distance_vector")).await
if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::PeerAdvertisedDistances(distance_vector))
.await
{
conn.stop(Some(ban_reason));
}
}
}

Expand Down

0 comments on commit 984f6ad

Please sign in to comment.