Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: replace explicit spans with #[instrument] in peer_actor #10778

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 146 additions & 162 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,14 @@ impl PeerActor {
self.send_message_with_encoding(msg, Encoding::Borsh);
}

#[tracing::instrument(
level = "trace",
target = "network",
"send_message_with_encoding",
skip_all,
fields(msg_type = msg.msg_variant())
)]
Comment on lines +395 to +401
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give me a quick TLDR of what it does?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty much a different way to write tracing::span! that covers the entire function and happens correctly handle async fn by default. I updated the style guide with some information about it today: https://near.github.io/nearcore/practices/style.html#spans

fn send_message_with_encoding(&self, msg: &PeerMessage, enc: Encoding) {
let msg_type: &str = msg.msg_variant();
let _span = tracing::trace_span!(
target: "network",
"send_message_with_encoding",
msg_type)
.entered();
// Skip sending block and headers if we received it or header from this peer.
// Record block requests in tracker.
match msg {
Expand Down Expand Up @@ -426,6 +427,7 @@ impl PeerActor {
tracing::trace!(target: "network", msg_len = bytes_len);
self.framed.send(stream::Frame(bytes));
metrics::PEER_DATA_SENT_BYTES.inc_by(bytes_len as u64);
let msg_type = msg.msg_variant();
metrics::PEER_MESSAGE_SENT_BY_TYPE_TOTAL.with_label_values(&[msg_type]).inc();
metrics::PEER_MESSAGE_SENT_BY_TYPE_BYTES
.with_label_values(&[msg_type])
Expand Down Expand Up @@ -930,108 +932,99 @@ impl PeerActor {
}
}

#[tracing::instrument(
level = "trace",
target = "network",
"receive_routed_message",
skip_all,
fields(body_type = <&'static str>::from(&body)),
)]
async fn receive_routed_message(
clock: &time::Clock,
network_state: &NetworkState,
peer_id: PeerId,
msg_hash: CryptoHash,
body: RoutedMessageBody,
) -> Result<Option<RoutedMessageBody>, ReasonForBan> {
let body_type: &'static str = (&body).into();
let result = async {
Ok(match body {
RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state
Ok(match body {
RoutedMessageBody::TxStatusRequest(account_id, tx_hash) => network_state
.client
.send_async(TxStatusRequest { tx_hash, signer_account_id: account_id })
.await
.ok()
.flatten()
.map(|response| RoutedMessageBody::TxStatusResponse(*response)),
RoutedMessageBody::TxStatusResponse(tx_result) => {
network_state.client.send_async(TxStatusResponse(tx_result.into())).await.ok();
None
}
RoutedMessageBody::StateResponse(info) => {
network_state
.client
.send_async(TxStatusRequest { tx_hash, signer_account_id: account_id })
.send_async(StateResponse(StateResponseInfo::V1(info).into()))
.await
.ok()
.flatten()
.map(|response| RoutedMessageBody::TxStatusResponse(*response)),
RoutedMessageBody::TxStatusResponse(tx_result) => {
network_state.client.send_async(TxStatusResponse(tx_result.into())).await.ok();
None
}
RoutedMessageBody::StateResponse(info) => {
network_state
.client
.send_async(StateResponse(StateResponseInfo::V1(info).into()))
.await
.ok();
None
}
RoutedMessageBody::BlockApproval(approval) => {
network_state.client.send_async(BlockApproval(approval, peer_id)).await.ok();
None
}
RoutedMessageBody::ForwardTx(transaction) => {
network_state
.client
.send_async(ProcessTxRequest {
transaction,
is_forwarded: true,
check_only: false,
})
.await
.ok();
None
}
RoutedMessageBody::PartialEncodedChunkRequest(request) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
route_back: msg_hash,
},
);
None
}
RoutedMessageBody::PartialEncodedChunkResponse(response) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
received_time: clock.now().into(),
},
);
None
}
RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk));
None
}
RoutedMessageBody::PartialEncodedChunkForward(msg) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg),
);
None
}
RoutedMessageBody::ChunkStateWitness(witness) => {
network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok();
None
}
RoutedMessageBody::ChunkEndorsement(endorsement) => {
network_state
.client
.send_async(ChunkEndorsementMessage(endorsement))
.await
.ok();
None
}
body => {
tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body);
None
}
})
};
// DO NOT turn this into a scoped .entered() span, because that is incompatible
// with async fns.
result
.instrument(tracing::trace_span!(
target: "network",
"receive_routed_message",
"type" = body_type,
))
.await
.ok();
None
}
RoutedMessageBody::BlockApproval(approval) => {
network_state.client.send_async(BlockApproval(approval, peer_id)).await.ok();
None
}
RoutedMessageBody::ForwardTx(transaction) => {
network_state
.client
.send_async(ProcessTxRequest {
transaction,
is_forwarded: true,
check_only: false,
})
.await
.ok();
None
}
RoutedMessageBody::PartialEncodedChunkRequest(request) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkRequest {
partial_encoded_chunk_request: request,
route_back: msg_hash,
},
);
None
}
RoutedMessageBody::PartialEncodedChunkResponse(response) => {
network_state.shards_manager_adapter.send(
ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkResponse {
partial_encoded_chunk_response: response,
received_time: clock.now().into(),
},
);
None
}
RoutedMessageBody::VersionedPartialEncodedChunk(chunk) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunk(chunk));
None
}
RoutedMessageBody::PartialEncodedChunkForward(msg) => {
network_state
.shards_manager_adapter
.send(ShardsManagerRequestFromNetwork::ProcessPartialEncodedChunkForward(msg));
None
}
RoutedMessageBody::ChunkStateWitness(witness) => {
network_state.client.send_async(ChunkStateWitnessMessage(witness)).await.ok();
None
}
RoutedMessageBody::ChunkEndorsement(endorsement) => {
network_state.client.send_async(ChunkEndorsementMessage(endorsement)).await.ok();
None
}
body => {
tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body);
None
}
})
}

fn receive_message(
Expand Down Expand Up @@ -1177,19 +1170,19 @@ impl PeerActor {
));
}

#[tracing::instrument(
level = "trace",
target = "network",
"handle_msg_ready",
skip_all,
fields(msg_type = <&'static str>::from(&peer_msg)),
)]
fn handle_msg_ready(
&mut self,
ctx: &mut actix::Context<Self>,
conn: Arc<connection::Connection>,
peer_msg: PeerMessage,
) {
let _span = tracing::trace_span!(
target: "network",
"handle_msg_ready",
"type" = <&PeerMessage as Into<&'static str>>::into(&peer_msg)
)
.entered();

#[cfg(test)]
let message_processed_event = {
let sink = self.network_state.config.event_sink.clone();
Expand Down Expand Up @@ -1486,79 +1479,70 @@ impl PeerActor {
}
}

#[tracing::instrument(
level = "trace",
target = "network",
"handle_sync_routing_table",
skip_all
)]
async fn handle_sync_routing_table(
clock: &time::Clock,
network_state: &Arc<NetworkState>,
conn: Arc<connection::Connection>,
rtu: RoutingTableUpdate,
) {
let result = async {
if let Err(ban_reason) = network_state.add_edges(&clock, rtu.edges.clone()).await {
conn.stop(Some(ban_reason));
}

// Also pass the edges to the V2 routing table
if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::EdgeNonceRefresh(rtu.edges))
.await
{
conn.stop(Some(ban_reason));
}

// For every announce we received, we fetch the last announce with the same account_id
// that we already broadcasted. Client actor will both verify signatures of the received announces
// as well as filter out those which are older than the fetched ones (to avoid overriding
// a newer announce with an older one).
let old = network_state
.account_announcements
.get_broadcasted_announcements(rtu.accounts.iter().map(|a| &a.account_id));
let accounts: Vec<(AnnounceAccount, Option<EpochId>)> = rtu
.accounts
.into_iter()
.map(|aa| {
let id = aa.account_id.clone();
(aa, old.get(&id).map(|old| old.epoch_id.clone()))
})
.collect();
match network_state.client.send_async(AnnounceAccountRequest(accounts)).await {
Ok(Err(ban_reason)) => conn.stop(Some(ban_reason)),
Ok(Ok(accounts)) => network_state.add_accounts(accounts).await,
Err(_) => {}
}
};
// DO NOT turn this into a scoped .entered() span, because that is incompatible
if let Err(ban_reason) = network_state.add_edges(&clock, rtu.edges.clone()).await {
conn.stop(Some(ban_reason));
}

result
.instrument(tracing::trace_span!(target: "network", "handle_sync_routing_table"))
// Also pass the edges to the V2 routing table
if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::EdgeNonceRefresh(rtu.edges))
.await
{
conn.stop(Some(ban_reason));
}

// For every announce we received, we fetch the last announce with the same account_id
// that we already broadcasted. Client actor will both verify signatures of the received announces
// as well as filter out those which are older than the fetched ones (to avoid overriding
// a newer announce with an older one).
let old = network_state
.account_announcements
.get_broadcasted_announcements(rtu.accounts.iter().map(|a| &a.account_id));
let accounts: Vec<(AnnounceAccount, Option<EpochId>)> = rtu
.accounts
.into_iter()
.map(|aa| {
let id = aa.account_id.clone();
(aa, old.get(&id).map(|old| old.epoch_id.clone()))
})
.collect();
match network_state.client.send_async(AnnounceAccountRequest(accounts)).await {
Ok(Err(ban_reason)) => conn.stop(Some(ban_reason)),
Ok(Ok(accounts)) => network_state.add_accounts(accounts).await,
Err(_) => {}
}
}

#[tracing::instrument(level = "trace", target = "network", "handle_distance_vector", skip_all)]
async fn handle_distance_vector(
clock: &time::Clock,
network_state: &Arc<NetworkState>,
conn: Arc<connection::Connection>,
distance_vector: DistanceVector,
) {
let result = async {
if conn.peer_info.id != distance_vector.root {
conn.stop(Some(ReasonForBan::InvalidDistanceVector));
return;
}

if let Err(ban_reason) = network_state
.update_routes(
&clock,
NetworkTopologyChange::PeerAdvertisedDistances(distance_vector),
)
.await
{
conn.stop(Some(ban_reason));
}
};
if conn.peer_info.id != distance_vector.root {
conn.stop(Some(ReasonForBan::InvalidDistanceVector));
return;
}

// DO NOT turn this into a scoped .entered() span, because that is incompatible
// with async fns.
result.instrument(tracing::trace_span!(target: "network", "handle_distance_vector")).await
if let Err(ban_reason) = network_state
.update_routes(&clock, NetworkTopologyChange::PeerAdvertisedDistances(distance_vector))
.await
{
conn.stop(Some(ban_reason));
}
}
}

Expand Down
Loading