Skip to content

Commit

Permalink
feat(o11y): Attach tracing context to messages received by PeerManage…
Browse files Browse the repository at this point in the history
…rActor (#7866)

This PR complete tracing of async work across all actix Actors.
https://pagodaplatform.atlassian.net/browse/ND-171
  • Loading branch information
nikurt authored Oct 19, 2022
1 parent f4c7bf1 commit 4b01657
Show file tree
Hide file tree
Showing 25 changed files with 469 additions and 284 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 44 additions & 28 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ use near_network::types::{
AccountIdOrPeerTrackingShard, PartialEncodedChunkForwardMsg, PartialEncodedChunkRequestMsg,
PartialEncodedChunkResponseMsg,
};
use near_o11y::WithSpanContextExt;
use rand::Rng;

mod chunk_cache;
Expand Down Expand Up @@ -666,13 +667,16 @@ impl ShardsManager {
};
debug!(target: "chunks", "Requesting {} parts for shard {} from {:?} prefer {}", parts_count, shard_id, target.account_id, target.prefer_peer);

self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkRequest {
target,
request,
create_time: Clock::instant().into(),
},
));
self.peer_manager_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkRequest {
target,
request,
create_time: Clock::instant().into(),
},
)
.with_span_context(),
);
} else {
warn!(target: "client", "{:?} requests parts {:?} for chunk {:?} from self",
me, part_ords, chunk_hash
Expand Down Expand Up @@ -1007,9 +1011,12 @@ impl ShardsManager {
.observe(elapsed);

if let Some(response) = response {
self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkResponse { route_back, response },
))
self.peer_manager_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkResponse { route_back, response },
)
.with_span_context(),
)
}
}

Expand Down Expand Up @@ -1968,24 +1975,30 @@ impl ShardsManager {
// We don't because with the current implementation, we force all validators to track all
// shards by making their config tracking all shards.
// See https://github.com/near/nearcore/issues/7388
self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkForward {
account_id: bp_account_id,
forward: forward.clone(),
},
));
self.peer_manager_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkForward {
account_id: bp_account_id,
forward: forward.clone(),
},
)
.with_span_context(),
);
}

// We also forward chunk parts to incoming chunk producers because we want them to be able
// to produce the next chunk without delays. For the same reason as above, we don't check if they
// actually track this shard.
for next_chunk_producer in next_chunk_producers {
self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkForward {
account_id: next_chunk_producer,
forward: forward.clone(),
},
));
self.peer_manager_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkForward {
account_id: next_chunk_producer,
forward: forward.clone(),
},
)
.with_span_context(),
);
}

Ok(())
Expand Down Expand Up @@ -2141,12 +2154,15 @@ impl ShardsManager {
);

if Some(&to_whom) != self.me.as_ref() {
self.peer_manager_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkMessage {
account_id: to_whom.clone(),
partial_encoded_chunk,
},
));
self.peer_manager_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedChunkMessage {
account_id: to_whom.clone(),
partial_encoded_chunk,
},
)
.with_span_context(),
);
}
}

Expand Down
84 changes: 51 additions & 33 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::sync::{BlockSync, EpochSync, HeaderSync, StateSync, StateSyncResult};
use crate::{metrics, SyncStatus};
use near_client_primitives::types::{Error, ShardSyncDownload, ShardSyncStatus};
use near_network::types::{AccountKeys, ChainInfo, PeerManagerMessageRequest, SetChainInfo};
use near_o11y::log_assert;
use near_o11y::{log_assert, WithSpanContextExt};
use near_primitives::block_header::ApprovalType;
use near_primitives::epoch_manager::RngSeed;
use near_primitives::version::PROTOCOL_VERSION;
Expand Down Expand Up @@ -269,9 +269,10 @@ impl Client {
&& !self.sync_status.is_syncing()
{
let block = self.chain.get_block(&self.chain.head()?.last_block_hash)?;
self.network_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::Block { block: block },
));
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Block { block: block })
.with_span_context(),
);
self.last_time_head_progress_made = Clock::instant();
}
Ok(())
Expand Down Expand Up @@ -796,9 +797,12 @@ impl Client {
for body in challenges {
let challenge = Challenge::produce(body, &**validator_signer);
self.challenges.insert(challenge.hash, challenge.clone());
self.network_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::Challenge(challenge),
));
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Challenge(
challenge,
))
.with_span_context(),
);
}
}
}
Expand Down Expand Up @@ -854,20 +858,26 @@ impl Client {
if let Err(e) = &result {
match e {
near_chain::Error::InvalidChunkProofs(chunk_proofs) => {
self.network_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::Challenge(Challenge::produce(
ChallengeBody::ChunkProofs(*chunk_proofs.clone()),
&**validator_signer,
)),
));
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Challenge(
Challenge::produce(
ChallengeBody::ChunkProofs(*chunk_proofs.clone()),
&**validator_signer,
),
))
.with_span_context(),
);
}
near_chain::Error::InvalidChunkState(chunk_state) => {
self.network_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::Challenge(Challenge::produce(
ChallengeBody::ChunkState(*chunk_state.clone()),
&**validator_signer,
)),
));
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Challenge(
Challenge::produce(
ChallengeBody::ChunkState(*chunk_state.clone()),
&**validator_signer,
),
))
.with_span_context(),
);
}
_ => {}
}
Expand Down Expand Up @@ -942,9 +952,12 @@ impl Client {

pub fn rebroadcast_block(&mut self, block: &Block) {
if self.rebroadcasted_blocks.get(block.hash()).is_none() {
self.network_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::Block { block: block.clone() },
));
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Block {
block: block.clone(),
})
.with_span_context(),
);
self.rebroadcasted_blocks.put(*block.hash(), ());
}
}
Expand Down Expand Up @@ -1055,9 +1068,12 @@ impl Client {
} else {
debug!(target: "client", "Sending an approval {:?} from {} to {} for {}", approval.inner, approval.account_id, next_block_producer, approval.target_height);
let approval_message = ApprovalMessage::new(approval, next_block_producer);
self.network_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::Approval { approval_message },
));
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::Approval {
approval_message,
})
.with_span_context(),
);
}

Ok(())
Expand Down Expand Up @@ -1541,9 +1557,13 @@ impl Client {
);

// Send message to network to actually forward transaction.
self.network_adapter.do_send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ForwardTx(validator, tx.clone()),
));
self.network_adapter.do_send(
PeerManagerMessageRequest::NetworkRequests(NetworkRequests::ForwardTx(
validator,
tx.clone(),
))
.with_span_context(),
);
}

Ok(())
Expand Down Expand Up @@ -1962,11 +1982,9 @@ impl Client {
let height = tip.height;
#[cfg(feature = "test_features")]
let height = self.adv_sync_height.unwrap_or(height);
self.network_adapter.do_send(SetChainInfo(ChainInfo {
height,
tracked_shards,
tier1_accounts,
}));
self.network_adapter.do_send(
SetChainInfo(ChainInfo { height, tracked_shards, tier1_accounts }).with_span_context(),
);
Ok(())
}
}
Expand Down
Loading

0 comments on commit 4b01657

Please sign in to comment.