Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

fix: remove an unused channel that was locking the broadcast #433

Merged
merged 2 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ impl DoubleEcho {
pub fn spawn_task_manager(
&mut self,
task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>,
) -> mpsc::Receiver<(CertificateId, TaskStatus)> {
let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048);

) {
let task_manager = crate::task_manager::TaskManager::new(
task_manager_message_receiver,
task_completion_sender,
self.subscriptions.clone(),
self.event_sender.clone(),
self.validator_id,
Expand All @@ -112,8 +109,6 @@ impl DoubleEcho {
);

tokio::spawn(task_manager.run(self.task_manager_cancellation.child_token()));

task_completion_receiver
}

/// DoubleEcho main loop
Expand All @@ -127,7 +122,7 @@ impl DoubleEcho {
mut self,
task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>,
) {
let mut task_completion = self.spawn_task_manager(task_manager_message_receiver);
self.spawn_task_manager(task_manager_message_receiver);

info!("DoubleEcho started");

Expand Down
4 changes: 0 additions & 4 deletions crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type RunningTasks =
/// or existing tasks will receive the messages.
pub struct TaskManager {
pub message_receiver: mpsc::Receiver<DoubleEchoCommand>,
pub task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>,
pub subscriptions: SubscriptionsView,
pub event_sender: mpsc::Sender<ProtocolEvents>,
pub tasks: HashMap<CertificateId, TaskContext>,
Expand All @@ -63,7 +62,6 @@ impl TaskManager {
#[allow(clippy::too_many_arguments)]
pub fn new(
message_receiver: mpsc::Receiver<DoubleEchoCommand>,
task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>,
subscriptions: SubscriptionsView,
event_sender: mpsc::Sender<ProtocolEvents>,
validator_id: ValidatorId,
Expand All @@ -74,7 +72,6 @@ impl TaskManager {
) -> Self {
Self {
message_receiver,
task_completion_sender,
subscriptions,
event_sender,
tasks: HashMap::new(),
Expand Down Expand Up @@ -138,7 +135,6 @@ impl TaskManager {
debug!("Task for certificate {} finished successfully", certificate_id);
self.tasks.remove(&certificate_id);
DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec();
let _ = self.task_completion_sender.send((certificate_id, status)).await;
} else {
debug!("Task for certificate {} finished unsuccessfully", certificate_id);
}
Expand Down
2 changes: 0 additions & 2 deletions crates/topos-tce-broadcast/src/tests/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::{sampler::SubscriptionsView, task_manager::TaskManager};
async fn can_start(#[future] create_validator_store: Arc<ValidatorStore>) {
let validator_store = create_validator_store.await;
let (message_sender, message_receiver) = mpsc::channel(1);
let (task_completion_sender, _) = mpsc::channel(1);
let (event_sender, _) = mpsc::channel(1);
let (broadcast_sender, _) = broadcast::channel(1);
let shutdown = CancellationToken::new();
Expand All @@ -39,7 +38,6 @@ async fn can_start(#[future] create_validator_store: Arc<ValidatorStore>) {

let mut manager = TaskManager::new(
message_receiver,
task_completion_sender,
SubscriptionsView::default(),
event_sender,
validator_id,
Expand Down
Loading