Skip to content

Commit 926389d

Browse files
authored
Merge pull request #3300 from nymtech/bugfix/empty-ack-stream-map
make sure to clear inner 'ack_map' in 'GatewaysReader'
2 parents b55db00 + 7a50f0c commit 926389d

File tree

2 files changed

+32
-13
lines changed

2 files changed

+32
-13
lines changed

nym-api/src/network_monitor/gateways_reader.rs

+24-6
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use futures::Stream;
45
use nym_crypto::asymmetric::identity;
56
use nym_gateway_client::{AcknowledgementReceiver, MixnetMessageReceiver};
7+
use std::pin::Pin;
8+
use std::task::{Context, Poll};
69
use tokio_stream::StreamMap;
710

811
pub(crate) type GatewayMessages = Vec<Vec<u8>>;
@@ -20,11 +23,7 @@ impl GatewaysReader {
2023
}
2124
}
2225

23-
pub fn stream_map(&mut self) -> &mut StreamMap<String, MixnetMessageReceiver> {
24-
&mut self.stream_map
25-
}
26-
27-
pub fn add_recievers(
26+
pub fn add_receivers(
2827
&mut self,
2928
id: identity::PublicKey,
3029
message_receiver: MixnetMessageReceiver,
@@ -35,8 +34,27 @@ impl GatewaysReader {
3534
self.ack_map.insert(channel_id, ack_receiver);
3635
}
3736

38-
pub fn remove_recievers(&mut self, id: &str) {
37+
pub fn remove_receivers(&mut self, id: &str) {
3938
self.stream_map.remove(id);
4039
self.ack_map.remove(id);
4140
}
4241
}
42+
43+
impl Stream for GatewaysReader {
44+
// just return whatever is returned by our main `stream_map`
45+
type Item = <StreamMap<String, MixnetMessageReceiver> as Stream>::Item;
46+
47+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48+
// exhaust the ack map if possible
49+
match Pin::new(&mut self.ack_map).poll_next(cx) {
50+
Poll::Ready(None) => {
51+
// this should have never happened!
52+
return Poll::Ready(None);
53+
}
54+
Poll::Ready(Some(_item)) => (),
55+
Poll::Pending => (),
56+
}
57+
58+
Pin::new(&mut self.stream_map).poll_next(cx)
59+
}
60+
}

nym-api/src/network_monitor/monitor/receiver.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ impl PacketReceiver {
4242
match update {
4343
GatewayClientUpdate::New(id, (message_receiver, ack_receiver)) => {
4444
self.gateways_reader
45-
.add_recievers(id, message_receiver, ack_receiver);
45+
.add_receivers(id, message_receiver, ack_receiver);
4646
}
4747
GatewayClientUpdate::Failure(id) => {
48-
self.gateways_reader.remove_recievers(&id.to_string());
48+
self.gateways_reader.remove_receivers(&id.to_string());
4949
}
5050
}
5151
}
@@ -66,11 +66,12 @@ impl PacketReceiver {
6666
// unwrap here is fine as it can only return a `None` if the PacketSender has died
6767
// and if that was the case, then the entire monitor is already in an undefined state
6868
update = self.clients_updater.next() => self.process_gateway_update(update.unwrap()),
69-
// similarly gateway reader will never return a `None` as it's implemented
70-
// as an infinite stream that returns Poll::Pending if it doesn't have anything
71-
// to return
72-
Some((_gateway_id, message)) = self.gateways_reader.stream_map().next() => {
73-
self.process_gateway_messages(message)
69+
gateway_message = self.gateways_reader.next() => {
70+
let Some((_gateway_id, message)) = gateway_message else {
71+
log::error!("the gateways reader stream has terminated!");
72+
continue
73+
};
74+
self.process_gateway_messages(message)
7475
}
7576
}
7677
}

0 commit comments

Comments
 (0)