-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Network Orchestrator service proposal #402
Conversation
pub async fn run(&mut self) { | ||
loop { | ||
tokio::select! { | ||
p2p_event = self.p2p_service.next_event() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of nesting all these match patterns, maybe create a single level identity who will check the cases?
4e6a3bb
to
ad0577a
Compare
Most of the reasoning for these changes can be found at: https://fuellabs.slack.com/archives/C01L1HUJSL8/p1656343694067609 It could be argued that it's a bit over-engineered but on the flip side we do not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh it is hard to do indepth review without knowing libp2p lib. Reviewing the code it looked good so I am okay to approve it.
I think a few things will pop out when starting to integrate p2p with the rest of the system. In wiki It was hard to specify everything in one go here so i see there is inconsistencies as block producer broadcasts block (it is used by multiple parties) but p2p has mpsc that wants to receive it.
|
||
db: Arc<dyn RelayerDb>, | ||
|
||
outbound_responses: FuturesUnordered<ResponseFuture>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't know about this, nice :)
tx_transaction: Sender<TransactionBroadcast>, | ||
tx_block: Sender<BlockBroadcast>, | ||
|
||
db: Arc<dyn RelayerDb>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] You can define yourown db trait in fuel-core-interface and implement it inside fuel-core
. Leave if for future work
fuel-core-interfaces/src/p2p.rs
Outdated
@@ -16,15 +17,18 @@ pub enum BlockBroadcast { | |||
NewBlock(FuelBlock), | |||
} | |||
|
|||
pub enum P2pMpsc { | |||
pub enum P2PRequestEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Camel-casing applies to acronyms as well (P2P -> P2p):
In UpperCamelCase, acronyms and contractions of compound words count as one word: use Uuid rather than UUID, Usize rather than USize or Stdin rather than StdIn
https://rust-lang.github.io/api-guidelines/naming.html
pub enum P2PRequestEvent { | |
pub enum P2pRequestEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, but "P2p" feels so wrong 😬
fuel-p2p/src/behavior.rs
Outdated
} else { | ||
debug!("ResponseChannel for {:?} does not exist!", request_id); | ||
return Err(ResponseError::ResponseChannelDoesNotExist); | ||
_ => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log anything in the event of an error?
_ => {} | |
(Err(e), _) => { debug!("unexpected error occurred while processing message response: {:?}", e); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logged it, and added another error enum variant
…fuel-core into leviathanbeak/p2p_orchestrator
fuel-p2p/src/orchestrator.rs
Outdated
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
normally cargo fmt
would trim this, but don't think it works inside of select! macros.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes select!
support is terrible
|
||
self.outbound_responses.push( | ||
Box::pin(async move { | ||
db.get_sealed_block(block_height).await.map(|block| (OutboundResponse::ResponseBlock(block), request_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may make sense in the future to have some other service/module handle fetching blocks from the DB for the orchestrator. For instance, if there are many outstanding requests (e.g. lots of full-syncs) having a dedicated block-fetching worker on the spawn_blocking
threadpool would help avoid stalling the main async runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the fetching itself is already async, shouldn't it be just fine to spawn another async task with tokio::spawn
instead ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, it should be the responsibility of this async-trait implementor to deal with any potential blocking issues. A regular spawn doesn't really net us anything here since that's more for launching background tasks that don't have an active poller.
|
||
/// Given a `GossipsubBroadcastRequest` retruns a `GossipTopic` | ||
/// which is broadcast over the network with the serialized inner value of `GossipsubBroadcastRequest` | ||
pub fn get_gossipsub_topic(&self, outgoing_request: &GossipsubBroadcastRequest) -> GossipTopic { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have tests to verify all of these lookups and topic encodings are accurate? Since topic setup relies on manual string formatting and setup, it will be harder for the compiler to detect errors during refactoring etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good observation, added a test that checks all the options
fuel-p2p/src/service.rs
Outdated
@@ -412,8 +416,8 @@ mod tests { | |||
// verifies that we've got at least a single peer address to send message to | |||
if !peer_addresses.is_empty() && !message_sent { | |||
message_sent = true; | |||
let default_tx = FuelGossipsubMessage::NewTx(Transaction::default()); | |||
node_a.publish_message(selected_topic.clone(), default_tx).unwrap(); | |||
let default_tx = GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also have tests that ensure message routing and serialization work as expected across all network event types we have. Currently, it seems like we only exercise the NewTx
and RequestBlock
code paths in our tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added tests for all Gossipsub Messages, as for Request-Response, currently we only support RequestBlock anyway
ready to approve once test coverage for new message types is improved. |
did some additional cleaning:
|
|
||
#[tokio::test] | ||
#[instrument] | ||
async fn gossipsub_broadcast_vote() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like these granular clean looking tests
LGTM! Good improvements with the test coverage |
Network Orchestrator is the service that:
tokio::sync::mpsc::Sender
values for propagating p2p messages to different componentstokio::sync::mpsc::Receiver
to get messages from different fuel-core componentsThis initial version is (loosely) based on miro design.
Apart from general design I would need feedback on specific components