diff --git a/Cargo.lock b/Cargo.lock index f440048fc..1b40a3f78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4395,9 +4395,9 @@ dependencies = [ [[package]] name = "tentacle" -version = "0.4.0-beta.4" +version = "0.4.0-beta.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "863528f8370d37ef73e403c59699e0383871f50b464a96262b7320303313255e" +checksum = "fffc3054674621fb5ff1eab60083fe8b8d1e98d241ea23db613b7cb1ab771600" dependencies = [ "async-trait", "bytes", diff --git a/crates/mem-pool/src/sync/p2p.rs b/crates/mem-pool/src/sync/p2p.rs index 7e8063e1b..cc17d5c7d 100644 --- a/crates/mem-pool/src/sync/p2p.rs +++ b/crates/mem-pool/src/sync/p2p.rs @@ -19,7 +19,7 @@ use gw_types::{ use tentacle::{ builder::MetaBuilder, error::SendErrorKind, - service::{ProtocolMeta, ServiceAsyncControl}, + service::{ProtocolMeta, ServiceAsyncControl, TargetSession}, SessionId, SubstreamReadPart, }; use tokio::sync::{broadcast, Mutex}; @@ -111,13 +111,7 @@ impl Publisher { tracing::info!(tip = %HashAndNumber::from(new_tip), "publishing new tip"); let mut shared = self.shared.lock().await; shared.buffer.handle_new_tip(new_tip, &msg); - for s in &shared.subscribers { - warn_result( - self.control - .send_message_to(*s, P2P_MEM_BLOCK_SYNC_PROTOCOL, msg.as_bytes()) - .await, - ); - } + self.broadcast(&shared.subscribers, msg).await; } pub(crate) async fn publish(&mut self, msg: RefreshMemBlockMessageUnion) { @@ -138,13 +132,18 @@ impl Publisher { let msg = P2PSyncMessage::new_builder().set(msg).build(); let mut shared = self.shared.lock().await; shared.buffer.push(msg.clone()); - for s in &shared.subscribers { - warn_result( - self.control - .send_message_to(*s, P2P_MEM_BLOCK_SYNC_PROTOCOL, msg.as_bytes()) - .await, - ); - } + self.broadcast(&shared.subscribers, msg).await; + } + + async fn broadcast(&self, subscribers: &HashSet, msg: P2PSyncMessage) { + let target = TargetSession::Multi(Box::new( + subscribers.iter().cloned().collect::>().into_iter(), + )); + warn_result( + self.control + .filter_broadcast(target, P2P_MEM_BLOCK_SYNC_PROTOCOL, msg.as_bytes()) + .await, + ); } }