Skip to content

Commit

Permalink
Coordinator stopped on bad control command (#650)
Browse files Browse the repository at this point in the history
* fix issue and more
  • Loading branch information
Hennzau authored Sep 11, 2024
1 parent 62d3e38 commit 232d72a
Showing 1 changed file with 84 additions and 84 deletions.
168 changes: 84 additions & 84 deletions binaries/coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,65 +390,104 @@ async fn start_inner(
dataflow_uuid,
grace_duration,
} => {
stop_dataflow_by_uuid(
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid, &clock),
};
let _ = reply_sender.send(Ok(reply));

continue;
}

let dataflow = stop_dataflow(
&mut running_dataflows,
&dataflow_results,
dataflow_uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
&clock,
)
.await?;
.await;

match dataflow {
Ok(dataflow) => {
dataflow.reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::StopByName {
name,
grace_duration,
} => match resolve_name(name, &running_dataflows, &archived_dataflows) {
Ok(uuid) => {
stop_dataflow_by_uuid(
Ok(dataflow_uuid) => {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid, &clock),
};
let _ = reply_sender.send(Ok(reply));

continue;
}

let dataflow = stop_dataflow(
&mut running_dataflows,
&dataflow_results,
uuid,
dataflow_uuid,
&mut daemon_connections,
reply_sender,
clock.new_timestamp(),
grace_duration,
&clock,
)
.await?
.await;

match dataflow {
Ok(dataflow) => {
dataflow.reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
},
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
uuid
Ok(uuid)
} else if let Some(name) = name {
resolve_name(name, &running_dataflows, &archived_dataflows)?
resolve_name(name, &running_dataflows, &archived_dataflows)
} else {
bail!("No uuid")
Err(eyre!("No uuid"))
};

let reply = retrieve_logs(
&running_dataflows,
&archived_dataflows,
dataflow_uuid,
node.into(),
&mut daemon_connections,
clock.new_timestamp(),
)
.await
.map(ControlRequestReply::Logs);
let _ = reply_sender.send(reply);
match dataflow_uuid {
Ok(uuid) => {
let reply = retrieve_logs(
&running_dataflows,
&archived_dataflows,
uuid,
node.into(),
&mut daemon_connections,
clock.new_timestamp(),
)
.await
.map(ControlRequestReply::Logs);
let _ = reply_sender.send(reply);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::Destroy => {
tracing::info!("Received destroy command");

let reply = handle_destroy(
&running_dataflows,
&mut running_dataflows,
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
Expand Down Expand Up @@ -556,7 +595,7 @@ async fn start_inner(
Event::CtrlC => {
tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
handle_destroy(
&running_dataflows,
&mut running_dataflows,
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
Expand Down Expand Up @@ -592,50 +631,6 @@ async fn start_inner(
Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn stop_dataflow_by_uuid(
running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
dataflow_results: &HashMap<Uuid, BTreeMap<String, DataflowDaemonResult>>,
dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
clock: &uhlc::HLC,
) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
if let Some(result) = dataflow_results.get(&dataflow_uuid) {
let reply = ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_result(result, dataflow_uuid, clock),
};
let _ = reply_sender.send(Ok(reply));
return Ok(());
}
bail!("no known dataflow found with UUID `{dataflow_uuid}`")
};
let stop = async {
stop_dataflow(
dataflow,
dataflow_uuid,
daemon_connections,
timestamp,
grace_duration,
)
.await?;
Result::<_, eyre::Report>::Ok(())
};
match stop.await {
Ok(()) => {
dataflow.reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
};
Ok(())
}

fn dataflow_result(
results: &BTreeMap<String, DataflowDaemonResult>,
dataflow_uuid: Uuid,
Expand Down Expand Up @@ -663,17 +658,17 @@ struct DaemonConnection {
}

async fn handle_destroy(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
daemon_connections: &mut HashMap<String, DaemonConnection>,
abortable_events: &futures::stream::AbortHandle,
daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
clock: &HLC,
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for (&uuid, dataflow) in running_dataflows {
stop_dataflow(
dataflow,
uuid,
for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
let _ = stop_dataflow(
running_dataflows,
dataflow_uuid,
daemon_connections,
clock.new_timestamp(),
None,
Expand Down Expand Up @@ -737,16 +732,20 @@ impl PartialEq for RunningDataflow {

impl Eq for RunningDataflow {}

async fn stop_dataflow(
dataflow: &RunningDataflow,
uuid: Uuid,
async fn stop_dataflow<'a>(
running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
timestamp: uhlc::Timestamp,
grace_duration: Option<Duration>,
) -> eyre::Result<()> {
) -> eyre::Result<&'a mut RunningDataflow> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
};

let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::StopDataflow {
dataflow_id: uuid,
dataflow_id: dataflow_uuid,
grace_duration,
},
timestamp,
Expand All @@ -773,9 +772,10 @@ async fn stop_dataflow(
other => bail!("unexpected reply after sending stop: {other:?}"),
}
}
tracing::info!("successfully send stop dataflow `{uuid}` to all daemons");

Ok(())
tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");

Ok(dataflow)
}

async fn reload_dataflow(
Expand Down

0 comments on commit 232d72a

Please sign in to comment.