From 232d72a623494d682282ad81c6c58201a704e0e4 Mon Sep 17 00:00:00 2001 From: Enzo Le Van Date: Wed, 11 Sep 2024 15:42:45 +0200 Subject: [PATCH] Coordinator stopped on bad control command (#650) * fix issue and more --- binaries/coordinator/src/lib.rs | 168 ++++++++++++++++---------------- 1 file changed, 84 insertions(+), 84 deletions(-) diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 5f89f8e1..b9aae414 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -390,34 +390,66 @@ async fn start_inner( dataflow_uuid, grace_duration, } => { - stop_dataflow_by_uuid( + if let Some(result) = dataflow_results.get(&dataflow_uuid) { + let reply = ControlRequestReply::DataflowStopped { + uuid: dataflow_uuid, + result: dataflow_result(result, dataflow_uuid, &clock), + }; + let _ = reply_sender.send(Ok(reply)); + + continue; + } + + let dataflow = stop_dataflow( &mut running_dataflows, - &dataflow_results, dataflow_uuid, &mut daemon_connections, - reply_sender, clock.new_timestamp(), grace_duration, - &clock, ) - .await?; + .await; + + match dataflow { + Ok(dataflow) => { + dataflow.reply_senders.push(reply_sender); + } + Err(err) => { + let _ = reply_sender.send(Err(err)); + } + } } ControlRequest::StopByName { name, grace_duration, } => match resolve_name(name, &running_dataflows, &archived_dataflows) { - Ok(uuid) => { - stop_dataflow_by_uuid( + Ok(dataflow_uuid) => { + if let Some(result) = dataflow_results.get(&dataflow_uuid) { + let reply = ControlRequestReply::DataflowStopped { + uuid: dataflow_uuid, + result: dataflow_result(result, dataflow_uuid, &clock), + }; + let _ = reply_sender.send(Ok(reply)); + + continue; + } + + let dataflow = stop_dataflow( &mut running_dataflows, - &dataflow_results, - uuid, + dataflow_uuid, &mut daemon_connections, - reply_sender, clock.new_timestamp(), grace_duration, - &clock, ) - .await? + .await; + + match dataflow { + Ok(dataflow) => { + dataflow.reply_senders.push(reply_sender); + } + Err(err) => { + let _ = reply_sender.send(Err(err)); + } + } } Err(err) => { let _ = reply_sender.send(Err(err)); @@ -425,30 +457,37 @@ async fn start_inner( }, ControlRequest::Logs { uuid, name, node } => { let dataflow_uuid = if let Some(uuid) = uuid { - uuid + Ok(uuid) } else if let Some(name) = name { - resolve_name(name, &running_dataflows, &archived_dataflows)? + resolve_name(name, &running_dataflows, &archived_dataflows) } else { - bail!("No uuid") + Err(eyre!("No uuid")) }; - let reply = retrieve_logs( - &running_dataflows, - &archived_dataflows, - dataflow_uuid, - node.into(), - &mut daemon_connections, - clock.new_timestamp(), - ) - .await - .map(ControlRequestReply::Logs); - let _ = reply_sender.send(reply); + match dataflow_uuid { + Ok(uuid) => { + let reply = retrieve_logs( + &running_dataflows, + &archived_dataflows, + uuid, + node.into(), + &mut daemon_connections, + clock.new_timestamp(), + ) + .await + .map(ControlRequestReply::Logs); + let _ = reply_sender.send(reply); + } + Err(err) => { + let _ = reply_sender.send(Err(err)); + } + } } ControlRequest::Destroy => { tracing::info!("Received destroy command"); let reply = handle_destroy( - &running_dataflows, + &mut running_dataflows, &mut daemon_connections, &abort_handle, &mut daemon_events_tx, @@ -556,7 +595,7 @@ async fn start_inner( Event::CtrlC => { tracing::info!("Destroying coordinator after receiving Ctrl-C signal"); handle_destroy( - &running_dataflows, + &mut running_dataflows, &mut daemon_connections, &abort_handle, &mut daemon_events_tx, @@ -592,50 +631,6 @@ async fn start_inner( Ok(()) } -#[allow(clippy::too_many_arguments)] -async fn stop_dataflow_by_uuid( - running_dataflows: &mut HashMap, - dataflow_results: &HashMap>, - dataflow_uuid: Uuid, - daemon_connections: &mut HashMap, - reply_sender: tokio::sync::oneshot::Sender>, - timestamp: uhlc::Timestamp, - grace_duration: Option, - clock: &uhlc::HLC, -) -> Result<(), eyre::ErrReport> { - let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { - if let Some(result) = dataflow_results.get(&dataflow_uuid) { - let reply = ControlRequestReply::DataflowStopped { - uuid: dataflow_uuid, - result: dataflow_result(result, dataflow_uuid, clock), - }; - let _ = reply_sender.send(Ok(reply)); - return Ok(()); - } - bail!("no known dataflow found with UUID `{dataflow_uuid}`") - }; - let stop = async { - stop_dataflow( - dataflow, - dataflow_uuid, - daemon_connections, - timestamp, - grace_duration, - ) - .await?; - Result::<_, eyre::Report>::Ok(()) - }; - match stop.await { - Ok(()) => { - dataflow.reply_senders.push(reply_sender); - } - Err(err) => { - let _ = reply_sender.send(Err(err)); - } - }; - Ok(()) -} - fn dataflow_result( results: &BTreeMap, dataflow_uuid: Uuid, @@ -663,17 +658,17 @@ struct DaemonConnection { } async fn handle_destroy( - running_dataflows: &HashMap, + running_dataflows: &mut HashMap, daemon_connections: &mut HashMap, abortable_events: &futures::stream::AbortHandle, daemon_events_tx: &mut Option>, clock: &HLC, ) -> Result<(), eyre::ErrReport> { abortable_events.abort(); - for (&uuid, dataflow) in running_dataflows { - stop_dataflow( - dataflow, - uuid, + for dataflow_uuid in running_dataflows.keys().cloned().collect::>() { + let _ = stop_dataflow( + running_dataflows, + dataflow_uuid, daemon_connections, clock.new_timestamp(), None, @@ -737,16 +732,20 @@ impl PartialEq for RunningDataflow { impl Eq for RunningDataflow {} -async fn stop_dataflow( - dataflow: &RunningDataflow, - uuid: Uuid, +async fn stop_dataflow<'a>( + running_dataflows: &'a mut HashMap, + dataflow_uuid: Uuid, daemon_connections: &mut HashMap, timestamp: uhlc::Timestamp, grace_duration: Option, -) -> eyre::Result<()> { +) -> eyre::Result<&'a mut RunningDataflow> { + let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else { + bail!("no known running dataflow found with UUID `{dataflow_uuid}`") + }; + let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::StopDataflow { - dataflow_id: uuid, + dataflow_id: dataflow_uuid, grace_duration, }, timestamp, @@ -773,9 +772,10 @@ async fn stop_dataflow( other => bail!("unexpected reply after sending stop: {other:?}"), } } - tracing::info!("successfully send stop dataflow `{uuid}` to all daemons"); - Ok(()) + tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons"); + + Ok(dataflow) } async fn reload_dataflow(