Skip to content

Commit 1efff12

Browse files
committed
use try_send on connToMain channel
1 parent b374e07 commit 1efff12

File tree

3 files changed

+60
-69
lines changed

3 files changed

+60
-69
lines changed

trust-quorum/src/connection_manager.rs

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -446,19 +446,16 @@ impl ConnMgr {
446446
);
447447

448448
// Inform the main task that accepted connection is established
449-
if let Err(e) = main_tx
450-
.send(ConnToMainMsg {
451-
task_id: task::id(),
452-
msg: ConnToMainMsgInner::Accepted {
453-
addr,
454-
peer_id: baseboard_id,
455-
},
456-
})
457-
.await
458-
{
459-
// The system is shutting down
449+
if let Err(_) = main_tx.try_send(ConnToMainMsg {
450+
task_id: task::id(),
451+
msg: ConnToMainMsgInner::Accepted {
452+
addr,
453+
peer_id: baseboard_id,
454+
},
455+
}) {
456+
// The system is shutting down or we've overloaded the main channel
460457
// Just bail from this task
461-
warn!(log, "Failed to send 'accepted' msg to main task: {e:?}");
458+
warn!(log, "Failed to send 'accepted' msg to main task");
462459
} else {
463460
conn.run().await;
464461
}
@@ -663,22 +660,16 @@ impl ConnMgr {
663660
);
664661
// Inform the main task that the client connection is
665662
// established.
666-
if let Err(e) = main_tx
667-
.send(ConnToMainMsg {
668-
task_id: task::id(),
669-
msg: ConnToMainMsgInner::Connected {
670-
addr,
671-
peer_id: baseboard_id,
672-
},
673-
})
674-
.await
675-
{
676-
// The system is shutting down
663+
if let Err(_) = main_tx.try_send(ConnToMainMsg {
664+
task_id: task::id(),
665+
msg: ConnToMainMsgInner::Connected {
666+
addr,
667+
peer_id: baseboard_id,
668+
},
669+
}) {
670+
// The system is shutting down or we've overloaded the main channel
677671
// Just bail from this task
678-
error!(
679-
log,
680-
"Failed to send 'connected' msg to main task: {e:?}"
681-
);
672+
error!(log, "Failed to send 'connected' msg to main task");
682673
} else {
683674
conn.run().await;
684675
}

trust-quorum/src/established_conn.rs

Lines changed: 26 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,13 @@ impl EstablishedConn {
155155
}
156156

157157
async fn close(&mut self) {
158-
if let Err(e) = self
159-
.main_tx
160-
.send(ConnToMainMsg {
161-
task_id: self.task_id,
162-
msg: ConnToMainMsgInner::Disconnected {
163-
peer_id: self.peer_id.clone(),
164-
},
165-
})
166-
.await
167-
{
168-
warn!(self.log, "Failed to send to main task: {e:?}");
158+
if let Err(_) = self.main_tx.try_send(ConnToMainMsg {
159+
task_id: self.task_id,
160+
msg: ConnToMainMsgInner::Disconnected {
161+
peer_id: self.peer_id.clone(),
162+
},
163+
}) {
164+
warn!(self.log, "Failed to send to main task");
169165
}
170166
let _ = self.writer.shutdown().await;
171167
}
@@ -202,21 +198,18 @@ impl EstablishedConn {
202198
debug!(self.log, "Received {msg:?}");
203199
match msg {
204200
WireMsg::Tq(msg) => {
205-
if let Err(e) = self
206-
.main_tx
207-
.send(ConnToMainMsg {
208-
task_id: self.task_id,
209-
msg: ConnToMainMsgInner::Received {
210-
from: self.peer_id.clone(),
211-
msg,
212-
},
213-
})
214-
.await
215-
{
216-
warn!(
201+
if let Err(_) = self.main_tx.try_send(ConnToMainMsg {
202+
task_id: self.task_id,
203+
msg: ConnToMainMsgInner::Received {
204+
from: self.peer_id.clone(),
205+
msg,
206+
},
207+
}) {
208+
error!(
217209
self.log,
218-
"Failed to send received fsm msg to main task: {e:?}"
210+
"Failed to send received fsm msg to main task"
219211
);
212+
panic!("Connection to main task channel full");
220213
}
221214
}
222215
WireMsg::Ping => {
@@ -225,22 +218,19 @@ impl EstablishedConn {
225218
}
226219
WireMsg::NetworkConfig(config) => {
227220
let generation = config.generation;
228-
if let Err(e) = self
229-
.main_tx
230-
.send(ConnToMainMsg {
231-
task_id: self.task_id,
232-
msg: ConnToMainMsgInner::ReceivedNetworkConfig {
233-
from: self.peer_id.clone(),
234-
config,
235-
},
236-
})
237-
.await
238-
{
221+
if let Err(_) = self.main_tx.try_send(ConnToMainMsg {
222+
task_id: self.task_id,
223+
msg: ConnToMainMsgInner::ReceivedNetworkConfig {
224+
from: self.peer_id.clone(),
225+
config,
226+
},
227+
}) {
239228
warn!(
240229
self.log,
241230
"Failed to send received NetworkConfig with
242-
generation {generation} to main task: {e:?}"
231+
generation {generation} to main task"
243232
);
233+
panic!("Connection to main task channnel full");
244234
}
245235
}
246236
}

trust-quorum/src/task.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@ use tokio::sync::oneshot::error::RecvError;
1818
use tokio::sync::{mpsc, oneshot};
1919
use trust_quorum_protocol::{BaseboardId, Node, NodeCtx};
2020

21+
/// We only expect a handful of messages at a time.
22+
const API_CHANNEL_BOUND: usize = 32;
23+
24+
/// We size this bound large enough that it should never be hit. Up to 31
25+
/// `EstablishedConn` tasks can send messages to the main task simultaneously when
26+
/// messages are received.
27+
///
28+
/// We use `try_send.unwrap()` when sending to the main task to prevent deadlock
29+
/// and inform us via panic that something has gone seriously wrong. This is
30+
/// similar to using an unbounded channel but will not use all possible memory.
31+
const CONN_TO_MAIN_CHANNEL_BOUND: usize = 1024;
32+
2133
#[derive(Debug, Clone)]
2234
pub struct Config {
2335
pub baseboard_id: BaseboardId,
@@ -130,13 +142,11 @@ impl NodeTask {
130142
"component" => "trust-quorum",
131143
"baseboard_id" => config.baseboard_id.to_string()
132144
));
133-
// We only expect one outstanding request at a time for `Init_` or
134-
// `LoadRackSecret` requests, We can have one of those requests in
135-
// flight while allowing `PeerAddresses` updates. We also allow status
136-
// requests in parallel. Just leave some room.
137-
let (tx, rx) = mpsc::channel(10);
138145

139-
let (conn_mgr_tx, conn_mgr_rx) = mpsc::channel(100);
146+
let (tx, rx) = mpsc::channel(API_CHANNEL_BOUND);
147+
148+
let (conn_mgr_tx, conn_mgr_rx) =
149+
mpsc::channel(CONN_TO_MAIN_CHANNEL_BOUND);
140150

141151
let baseboard_id = config.baseboard_id.clone();
142152

0 commit comments

Comments
 (0)