Skip to content

Commit

Permalink
Merge branch 'master' into ethereum-client-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
claravanstaden authored May 16, 2024
2 parents 9fdcef3 + 453bb18 commit 4f5ef0d
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 39 deletions.
66 changes: 54 additions & 12 deletions bridges/relays/client-substrate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ pub fn is_ancient_block<N: From<u32> + PartialOrd + Saturating>(block: N, best:
}

/// Opaque justifications subscription type.
pub struct Subscription<T>(pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>);
pub struct Subscription<T>(
pub(crate) Mutex<futures::channel::mpsc::Receiver<Option<T>>>,
// The following field is not explicitly used by the code. But when it is dropped,
// the bakground task receives a shutdown signal.
#[allow(dead_code)] pub(crate) futures::channel::oneshot::Sender<()>,
);

/// Opaque GRANDPA authorities set.
pub type OpaqueGrandpaAuthoritiesSet = Vec<u8>;
Expand Down Expand Up @@ -621,6 +626,7 @@ impl<C: Chain> Client<C> {
e
})??;

let (cancel_sender, cancel_receiver) = futures::channel::oneshot::channel();
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
let (tracker, subscription) = self
.jsonrpsee_execute(move |client| async move {
Expand All @@ -639,7 +645,7 @@ impl<C: Chain> Client<C> {
self_clone,
stall_timeout,
tx_hash,
Subscription(Mutex::new(receiver)),
Subscription(Mutex::new(receiver), cancel_sender),
);
Ok((tracker, subscription))
})
Expand All @@ -649,6 +655,7 @@ impl<C: Chain> Client<C> {
"extrinsic".into(),
subscription,
sender,
cancel_receiver,
));
Ok(tracker)
}
Expand Down Expand Up @@ -790,14 +797,16 @@ impl<C: Chain> Client<C> {
Ok(FC::subscribe_justifications(&client).await?)
})
.await?;
let (cancel_sender, cancel_receiver) = futures::channel::oneshot::channel();
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"justification".into(),
subscription,
sender,
cancel_receiver,
));
Ok(Subscription(Mutex::new(receiver)))
Ok(Subscription(Mutex::new(receiver), cancel_sender))
}

/// Generates a proof of key ownership for the given authority in the given set.
Expand Down Expand Up @@ -843,9 +852,17 @@ impl<C: Chain> Client<C> {
impl<T: DeserializeOwned> Subscription<T> {
/// Consumes subscription and returns future statuses stream.
pub fn into_stream(self) -> impl futures::Stream<Item = T> {
futures::stream::unfold(self, |this| async {
futures::stream::unfold(Some(self), |mut this| async move {
let Some(this) = this.take() else { return None };
let item = this.0.lock().await.next().await.unwrap_or(None);
item.map(|i| (i, this))
match item {
Some(item) => Some((item, Some(this))),
None => {
// let's make it explicit here
let _ = this.1.send(());
None
},
}
})
}

Expand All @@ -860,36 +877,61 @@ impl<T: DeserializeOwned> Subscription<T> {
async fn background_worker(
chain_name: String,
item_type: String,
mut subscription: jsonrpsee::core::client::Subscription<T>,
subscription: jsonrpsee::core::client::Subscription<T>,
mut sender: futures::channel::mpsc::Sender<Option<T>>,
cancel_receiver: futures::channel::oneshot::Receiver<()>,
) {
log::trace!(
target: "bridge",
"Starting background worker for {} {} subscription stream.",
chain_name,
item_type,
);

futures::pin_mut!(subscription, cancel_receiver);
loop {
match subscription.next().await {
Some(Ok(item)) =>
match futures::future::select(subscription.next(), &mut cancel_receiver).await {
futures::future::Either::Left((Some(Ok(item)), _)) =>
if sender.send(Some(item)).await.is_err() {
log::trace!(
target: "bridge",
"{} {} subscription stream: no listener. Stopping background worker.",
chain_name,
item_type,
);

break
},
Some(Err(e)) => {
futures::future::Either::Left((Some(Err(e)), _)) => {
log::trace!(
target: "bridge",
"{} {} subscription stream has returned '{:?}'. Stream needs to be restarted.",
"{} {} subscription stream has returned '{:?}'. Stream needs to be restarted. Stopping background worker.",
chain_name,
item_type,
e,
);
let _ = sender.send(None).await;
break
},
None => {
futures::future::Either::Left((None, _)) => {
log::trace!(
target: "bridge",
"{} {} subscription stream has returned None. Stream needs to be restarted.",
"{} {} subscription stream has returned None. Stream needs to be restarted. Stopping background worker.",
chain_name,
item_type,
);
let _ = sender.send(None).await;
break
},
futures::future::Either::Right((_, _)) => {
log::trace!(
target: "bridge",
"{} {} subscription stream: listener has been dropped. Stopping background worker.",
chain_name,
item_type,
);
break;
},
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions bridges/relays/client-substrate/src/transaction_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,13 @@ mod tests {
TrackedTransactionStatus<HeaderIdOf<TestChain>>,
InvalidationStatus<HeaderIdOf<TestChain>>,
)> {
let (cancel_sender, _cancel_receiver) = futures::channel::oneshot::channel();
let (mut sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
Subscription(async_std::sync::Mutex::new(receiver), cancel_sender),
);

let wait_for_stall_timeout = futures::future::pending();
Expand Down Expand Up @@ -428,12 +429,13 @@ mod tests {

#[async_std::test]
async fn lost_on_timeout_when_waiting_for_invalidation_status() {
let (cancel_sender, _cancel_receiver) = futures::channel::oneshot::channel();
let (_sender, receiver) = futures::channel::mpsc::channel(1);
let tx_tracker = TransactionTracker::<TestChain, TestEnvironment>::new(
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription(async_std::sync::Mutex::new(receiver)),
Subscription(async_std::sync::Mutex::new(receiver), cancel_sender),
);

let wait_for_stall_timeout = futures::future::ready(()).shared();
Expand Down
117 changes: 96 additions & 21 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ pub struct DiscoveryConfig {
discovery_only_if_under_num: u64,
enable_mdns: bool,
kademlia_disjoint_query_paths: bool,
kademlia_protocols: Vec<Vec<u8>>,
kademlia_protocol: Vec<u8>,
kademlia_legacy_protocol: Vec<u8>,
kademlia_replication_factor: NonZeroUsize,
}

Expand All @@ -121,7 +122,8 @@ impl DiscoveryConfig {
discovery_only_if_under_num: std::u64::MAX,
enable_mdns: false,
kademlia_disjoint_query_paths: false,
kademlia_protocols: Vec::new(),
kademlia_protocol: Vec::new(),
kademlia_legacy_protocol: Vec::new(),
kademlia_replication_factor: NonZeroUsize::new(DEFAULT_KADEMLIA_REPLICATION_FACTOR)
.expect("value is a constant; constant is non-zero; qed."),
}
Expand Down Expand Up @@ -177,9 +179,8 @@ impl DiscoveryConfig {
fork_id: Option<&str>,
protocol_id: &ProtocolId,
) -> &mut Self {
self.kademlia_protocols = Vec::new();
self.kademlia_protocols.push(kademlia_protocol_name(genesis_hash, fork_id));
self.kademlia_protocols.push(legacy_kademlia_protocol_name(protocol_id));
self.kademlia_protocol = kademlia_protocol_name(genesis_hash, fork_id);
self.kademlia_legacy_protocol = legacy_kademlia_protocol_name(protocol_id);
self
}

Expand Down Expand Up @@ -207,14 +208,19 @@ impl DiscoveryConfig {
discovery_only_if_under_num,
enable_mdns,
kademlia_disjoint_query_paths,
kademlia_protocols,
kademlia_protocol,
kademlia_legacy_protocol,
kademlia_replication_factor,
} = self;

let kademlia = if !kademlia_protocols.is_empty() {
let kademlia = if !kademlia_protocol.is_empty() {
let mut config = KademliaConfig::default();

config.set_replication_factor(kademlia_replication_factor);
// Populate kad with both the legacy and the new protocol names.
// Remove the legacy protocol:
// https://github.com/paritytech/polkadot-sdk/issues/504
let kademlia_protocols = [kademlia_protocol.clone(), kademlia_legacy_protocol];
config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect());
// By default Kademlia attempts to insert all peers into its routing table once a
// dialing attempt succeeds. In order to control which peer is added, disable the
Expand Down Expand Up @@ -266,6 +272,7 @@ impl DiscoveryConfig {
.expect("value is a constant; constant is non-zero; qed."),
),
records_to_publish: Default::default(),
kademlia_protocol,
}
}
}
Expand Down Expand Up @@ -309,6 +316,11 @@ pub struct DiscoveryBehaviour {
/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
/// to these peers.
records_to_publish: HashMap<QueryId, Record>,
/// The chain based kademlia protocol name (including genesis hash and fork id).
///
/// Remove when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
/// <https://github.com/paritytech/polkadot-sdk/issues/504>.
kademlia_protocol: Vec<u8>,
}

impl DiscoveryBehaviour {
Expand Down Expand Up @@ -366,23 +378,29 @@ impl DiscoveryBehaviour {
return
}

if let Some(matching_protocol) = supported_protocols
// The supported protocols must include the chain-based Kademlia protocol.
//
// Extract the chain-based Kademlia protocol from `kademlia.protocol_name()`
// when all nodes are upgraded to genesis hash and fork ID-based Kademlia:
// https://github.com/paritytech/polkadot-sdk/issues/504.
if !supported_protocols
.iter()
.find(|p| kademlia.protocol_names().iter().any(|k| k.as_ref() == p.as_ref()))
.any(|p| p.as_ref() == self.kademlia_protocol.as_slice())
{
trace!(
target: "sub-libp2p",
"Adding self-reported address {} from {} to Kademlia DHT {}.",
addr, peer_id, String::from_utf8_lossy(matching_protocol.as_ref()),
);
kademlia.add_address(peer_id, addr.clone());
} else {
trace!(
target: "sub-libp2p",
"Ignoring self-reported address {} from {} as remote node is not part of the \
Kademlia DHT supported by the local node.", addr, peer_id,
);
return
}

trace!(
target: "sub-libp2p",
"Adding self-reported address {} from {} to Kademlia DHT.",
addr, peer_id
);
kademlia.add_address(peer_id, addr.clone());
}
}

Expand Down Expand Up @@ -1075,17 +1093,20 @@ mod tests {
.unwrap();
// Test both genesis hash-based and legacy
// protocol names.
let protocol_name = if swarm_n % 2 == 0 {
kademlia_protocol_name(genesis_hash, fork_id)
let protocol_names = if swarm_n % 2 == 0 {
vec![kademlia_protocol_name(genesis_hash, fork_id)]
} else {
legacy_kademlia_protocol_name(&protocol_id)
vec![
legacy_kademlia_protocol_name(&protocol_id),
kademlia_protocol_name(genesis_hash, fork_id),
]
};
swarms[swarm_n]
.0
.behaviour_mut()
.add_self_reported_address(
&other,
&[protocol_name],
protocol_names.as_slice(),
addr,
);

Expand Down Expand Up @@ -1181,9 +1202,56 @@ mod tests {
&[kademlia_protocol_name(supported_genesis_hash, None)],
remote_addr.clone(),
);
{
let kademlia = discovery.kademlia.as_mut().unwrap();
assert!(
!kademlia
.kbucket(remote_peer_id)
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expect peer with supported protocol to be added."
);
}

let unsupported_peer_id = predictable_peer_id(b"00000000000000000000000000000002");
let unsupported_peer_addr: Multiaddr = "/memory/2".parse().unwrap();

// Check the unsupported peer is not present before and after the call.
{
let kademlia = discovery.kademlia.as_mut().unwrap();
assert!(
kademlia
.kbucket(unsupported_peer_id)
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expect unsupported peer not to be added."
);
}
// Note: legacy protocol is not supported without genesis hash and fork ID,
// if the legacy is the only protocol supported, then the peer will not be added.
discovery.add_self_reported_address(
&unsupported_peer_id,
&[legacy_kademlia_protocol_name(&supported_protocol_id)],
unsupported_peer_addr.clone(),
);
{
let kademlia = discovery.kademlia.as_mut().unwrap();
assert!(
kademlia
.kbucket(unsupported_peer_id)
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expect unsupported peer not to be added."
);
}

// Supported legacy and genesis based protocols are allowed to be added.
discovery.add_self_reported_address(
&another_peer_id,
&[legacy_kademlia_protocol_name(&supported_protocol_id)],
&[
legacy_kademlia_protocol_name(&supported_protocol_id),
kademlia_protocol_name(supported_genesis_hash, None),
],
another_addr.clone(),
);

Expand All @@ -1194,6 +1262,13 @@ mod tests {
kademlia.kbuckets().fold(0, |acc, bucket| acc + bucket.num_entries()),
"Expect peers with supported protocol to be added."
);
assert!(
!kademlia
.kbucket(another_peer_id)
.expect("Remote peer id not to be equal to local peer id.")
.is_empty(),
"Expect peer with supported protocol to be added."
);
}
}
}
Loading

0 comments on commit 4f5ef0d

Please sign in to comment.