Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to node scheduler #213

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions arroyo-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ thiserror = "1.0.40"
regex = "1.7.3"
reqwest = { version = "0.11.16", features = ["json"] }
uuid = "1.3.3"
async-stream = "0.3.5"

[build-dependencies]
cornucopia = { version = "0.9" }
Expand Down
117 changes: 84 additions & 33 deletions arroyo-controller/src/schedulers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::bail;
use arroyo_rpc::grpc::node_grpc_client::NodeGrpcClient;
use arroyo_rpc::grpc::{
HeartbeatNodeReq, RegisterNodeReq, StartWorkerReq, StopWorkerReq, StopWorkerStatus,
WorkerFinishedReq,
HeartbeatNodeReq, RegisterNodeReq, StartWorkerData, StartWorkerHeader, StartWorkerReq,
StopWorkerReq, StopWorkerStatus, WorkerFinishedReq,
};
use arroyo_types::{
NodeId, WorkerId, JOB_ID_ENV, NODE_ID_ENV, RUN_ID_ENV, TASK_SLOTS_ENV, WORKER_ID_ENV,
Expand Down Expand Up @@ -43,6 +43,8 @@ lazy_static! {
.unwrap();
}

const NODE_PART_SIZE: usize = 2 * 1024 * 1024;

#[async_trait::async_trait]
pub trait Scheduler: Send + Sync {
async fn start_workers(
Expand Down Expand Up @@ -303,10 +305,12 @@ impl NodeStatus {
}
}

#[derive(Clone)]
struct NodeWorker {
job_id: String,
node_id: NodeId,
run_id: i64,
running: bool,
}

#[derive(Default)]
Expand Down Expand Up @@ -357,19 +361,23 @@ impl NodeScheduler {
job_id: &str,
worker_id: WorkerId,
force: bool,
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<WorkerId>> {
let state = self.state.lock().await;

let Some(worker) = state.workers.get(&worker_id) else {
// assume it's already finished
return Ok(());
return Ok(Some(worker_id));
};

let Some(node) = state.nodes.get(&worker.node_id) else {
warn!(message = "node not found for stop worker", node_id = worker.node_id.0);
return Ok(());
return Ok(Some(worker_id));
};

let worker = worker.clone();
let node = node.clone();
drop(state);

info!(
message = "stopping worker",
job_id = worker.job_id,
Expand All @@ -378,25 +386,28 @@ impl NodeScheduler {
worker_id = worker_id.0
);

let mut client = NodeGrpcClient::connect(format!("http://{}", node.addr)).await?;

match (
client
.stop_worker(Request::new(StopWorkerReq {
job_id: job_id.to_string(),
worker_id: worker_id.0,
force,
}))
.await?
.get_ref()
.status(),
force,
) {
let Ok(mut client) = NodeGrpcClient::connect(format!("http://{}", node.addr)).await else {
warn!("Failed to connect to worker to stop; this likely means it is dead");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to be careful about making sure a pipeline was actually dead we'd coordinate this behavior with timeouts/heartbeats. Might not be worth it for what is likely not the main mode of deployment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think there are ways of making this more robust but waiting for a heartbeat does mean taking longer to recover failed pipelines.

There are basically two cases:

  • The node process has died, in which case the workers will also have died (generally) because they are child processes of the node
  • We have a network partition between the controller and the node

In case 1, we're doing the right thing. In case 2, the worker and the node will each die on their own as they fail to talk to the controller.

return Ok(Some(worker_id));
};

let Ok(resp) = client
.stop_worker(Request::new(StopWorkerReq {
job_id: job_id.to_string(),
worker_id: worker_id.0,
force,
}))
.await else {
warn!("Failed to connect to worker to stop; this likely means it is dead");
return Ok(Some(worker_id));
};

match (resp.get_ref().status(), force) {
(StopWorkerStatus::NotFound, false) => {
bail!("couldn't find worker, will only continue if force")
}
(StopWorkerStatus::StopFailed, _) => bail!("tried to kill and couldn't"),
_ => Ok(()),
_ => Ok(None),
}
}
}
Expand Down Expand Up @@ -463,7 +474,7 @@ impl Scheduler for NodeScheduler {
.workers
.iter()
.filter(|(_, v)| {
v.job_id == job_id && (run_id.is_none() || v.run_id == run_id.unwrap())
v.job_id == job_id && v.running && (run_id.is_none() || v.run_id == run_id.unwrap())
})
.map(|(w, _)| *w)
.collect())
Expand All @@ -477,6 +488,8 @@ impl Scheduler for NodeScheduler {
.await
.map_err(|_| SchedulerError::CompilationNeeded)?;

let binary = Arc::new(binary);

// TODO: make this locking more fine-grained
let mut state = self.state.lock().await;

Expand Down Expand Up @@ -536,18 +549,45 @@ impl Scheduler for NodeScheduler {
))
})?;

let header = StartWorkerReq {
msg: Some(arroyo_rpc::grpc::start_worker_req::Msg::Header(
StartWorkerHeader {
name: start_pipeline_req.name.clone(),
job_id: start_pipeline_req.job_id.clone(),
wasm: wasm.clone(),
slots: slots_for_this_one as u64,
node_id: node.id.0,
run_id: start_pipeline_req.run_id as u64,
env_vars: start_pipeline_req.env_vars.clone(),
binary_size: binary.len() as u64,
},
)),
};

let binary = binary.clone();
let outbound = async_stream::stream! {
yield header;

let mut part = 0;
let mut sent = 0;

for chunk in binary.chunks(NODE_PART_SIZE) {
sent += chunk.len();

yield StartWorkerReq {
msg: Some(arroyo_rpc::grpc::start_worker_req::Msg::Data(StartWorkerData {
part,
data: chunk.to_vec(),
has_more: sent < binary.len(),
}))
};

part += 1;
}
};

let res = client
.start_worker(Request::new(StartWorkerReq {
name: start_pipeline_req.name.clone(),
job_id: start_pipeline_req.job_id.clone(),
binary: binary.clone(),
wasm: wasm.clone(),
job_hash: start_pipeline_req.hash.clone(),
slots: slots_for_this_one as u64,
node_id: node.id.0,
run_id: start_pipeline_req.run_id as u64,
env_vars: start_pipeline_req.env_vars.clone(),
}))
.start_worker(Request::new(outbound))
.await
.map_err(|e| {
// release back slots already scheduled.
Expand Down Expand Up @@ -579,6 +619,7 @@ impl Scheduler for NodeScheduler {
job_id: start_pipeline_req.job_id.clone(),
run_id: start_pipeline_req.run_id,
node_id: node.id,
running: true,
},
);

Expand All @@ -603,7 +644,17 @@ impl Scheduler for NodeScheduler {
}

for f in futures {
f.await?;
match f.await? {
Some(worker_id) => {
let mut state = self.state.lock().await;
if let Some(worker) = state.workers.get_mut(&worker_id) {
worker.running = false;
}
}
None => {
bail!("Failed to stop worker");
}
}
}

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions arroyo-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ rand = "0.8"
local-ip-address = "0.5"
lazy_static = "1.4.0"
prometheus = "0.13.3"
tokio-stream = "0.1.14"
anyhow = "1.0.72"
Loading