Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to wait for a worker to drain #126

Merged
merged 2 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ ocluster-admin -c ./capnp-secrets/admin.cap pause linux-x86_64 my-host
A paused worker will not be assigned any more items until it is unpaused, but
it will continue with any jobs it is already running. Use `unpause` to resume it.

Use `pause --wait` if you want to wait until all running jobs have finished.

Instead of specifying a worker, you can also use `--all` to pause or unpause all workers in a pool.

If you want to set the state of a worker that hasn't ever connected to the scheduler, use `--auto-create`.
Expand Down
28 changes: 25 additions & 3 deletions api/pool_admin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ let pp_worker_info f { name; active; connected } =
else
Fmt.pf f "%s (@[<h>%a@])" name Fmt.(list ~sep:comma string) notes

let local ~show ~workers ~worker ~set_active ~update ~forget ~set_rate =
let local ~show ~workers ~worker ~set_active ~drain ~update ~forget ~set_rate =
let module X = Raw.Service.PoolAdmin in
X.local @@ object
inherit X.service
Expand Down Expand Up @@ -67,8 +67,22 @@ let local ~show ~workers ~worker ~set_active ~update ~forget ~set_rate =
method update_impl params release_param_caps =
let open X.Update in
let name = Params.worker_get params in
let progress = Params.progress_get params in
release_param_caps ();
update name
Service.return_lwt @@ fun () ->
Lwt.finalize
(fun () -> update ?progress name)
(fun () -> Option.iter Capability.dec_ref progress; Lwt.return_unit)

method drain_impl params release_param_caps =
let open X.Drain in
let name = Params.worker_get params in
let progress = Params.progress_get params in
release_param_caps ();
Service.return_lwt @@ fun () ->
Lwt.finalize
(fun () -> drain ?progress name)
(fun () -> Option.iter Capability.dec_ref progress; Lwt.return_unit)

method forget_impl params release_param_caps =
let open X.Forget in
Expand Down Expand Up @@ -120,10 +134,18 @@ let set_active ?(auto_create=false) t worker active =
Params.auto_create_set params auto_create;
Capability.call_for_unit_exn t method_id request

let update t worker =
let drain ?progress t worker =
let open X.Drain in
let request, params = Capability.Request.create Params.init_pointer in
Params.worker_set params worker;
Params.progress_set params progress;
Capability.call_for_unit_exn t method_id request

let update ?progress t worker =
let open X.Update in
let request, params = Capability.Request.create Params.init_pointer in
Params.worker_set params worker;
Params.progress_set params progress;
Capability.call_for_unit t method_id request

let forget t worker =
Expand Down
24 changes: 24 additions & 0 deletions api/progress.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
open Capnp_rpc_lwt

let local fn =
let module X = Raw.Service.Progress in
X.local @@ object
inherit X.service

method report_impl params release_param_caps =
let open X.Report in
let msg = Params.status_get params in
release_param_caps ();
fn msg;
Service.return_empty ()
end

module X = Raw.Client.Progress

type t = X.t Capability.t

let report t msg =
let open X.Report in
let request, params = Capability.Request.create Params.init_pointer in
Params.status_set params msg;
Capability.call_for_unit t method_id request
14 changes: 13 additions & 1 deletion api/schema.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ struct WorkerInfo {
connected @2 :Bool;
}

# A callback for receiving progress updates.
interface Progress {
report @0 (status :Text) -> ();
# Called when the status changes.
}

interface PoolAdmin {
show @0 () -> (state :Text);
workers @1 () -> (workers : List(WorkerInfo));
Expand All @@ -156,8 +162,14 @@ interface PoolAdmin {
# If autoCreate is true then this can be used even with an unknown worker,
# which may be useful if you want a new worker to start paused, for example.

update @4 (worker :Text) -> ();
drain @7 (worker :Text, progress :Progress) -> ();
# Mark the worker as paused and wait until no jobs are running.
# Returns immediately if the worker isn't connected.
# If given, [progress] receives one-line progress reports.

update @4 (worker :Text, progress :Progress) -> ();
# Drain worker, ask it to restart with the latest version, and return when it comes back.
# If given, [progress] receives one-line progress reports.

setRate @5 (id :Text, rate :Float64) -> ();
# Set the expected share of the pool for this client.
Expand Down
52 changes: 42 additions & 10 deletions bin/admin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,26 @@ let show cap_path pool =
Cluster_api.Pool_admin.show pool >|= fun status ->
print_endline (String.trim status)

let set_active active all auto_create cap_path pool worker =
let progress label = Cluster_api.Progress.local (Fmt.pr "%s: %s@." label)

let drain pool workers =
Fmt.pr "Waiting for jobs to finish...@.";
let jobs = workers |> List.map (fun w ->
Cluster_api.Pool_admin.drain ~progress:(progress w) pool w
) in
Lwt.join jobs

let set_active active all auto_create wait cap_path pool worker =
run cap_path @@ fun admin_service ->
Capability.with_ref (Cluster_api.Admin.pool admin_service pool) @@ fun pool ->
match worker with
| Some worker ->
Cluster_api.Pool_admin.set_active ~auto_create pool worker active
Cluster_api.Pool_admin.set_active ~auto_create pool worker active >>= fun () ->
begin
if wait then drain pool [worker]
else Lwt.return_unit
end >|= fun () ->
Fmt.pr "Success.@."
| None ->
Cluster_api.Pool_admin.workers pool >>= function
| [] ->
Expand All @@ -77,7 +91,11 @@ let set_active active all auto_create cap_path pool worker =
let set worker =
Cluster_api.Pool_admin.set_active pool worker active
in
Lwt.join (List.map set workers) >|= fun () ->
Lwt.join (List.map set workers) >>= fun () ->
begin
if wait then drain pool workers
else Lwt.return_unit
end >|= fun () ->
Fmt.pr "Success.@."
)
| workers ->
Expand All @@ -94,20 +112,23 @@ let update cap_path pool worker =
match worker with
| Some worker ->
begin
Cluster_api.Pool_admin.update pool worker >|= function
Cluster_api.Pool_admin.update ~progress:(progress worker) pool worker >|= function
| Ok () -> Fmt.pr "Restarted@."
| Error (`Capnp ex) ->
Fmt.pr "%a@." Capnp_rpc.Error.pp ex;
exit 1
end
| None ->
Cluster_api.Pool_admin.workers pool >>= function
let is_connected w = w.Cluster_api.Pool_admin.connected in
Cluster_api.Pool_admin.workers pool >>= fun workers ->
let connected, disconnected = List.partition is_connected workers in
match connected with
| [] ->
Fmt.epr "No workers connected to pool!@.";
exit 1
| w :: ws ->
Fmt.pr "Testing update on first worker in pool: %S@." w.name;
Cluster_api.Pool_admin.update pool w.name >>= function
Cluster_api.Pool_admin.update ~progress:(progress w.name) pool w.name >>= function
| Error (`Capnp ex) ->
Fmt.pr "%a@." Capnp_rpc.Error.pp ex;
exit 1
Expand All @@ -116,14 +137,18 @@ let update cap_path pool worker =
Fmt.(list ~sep:sp pp_worker_name) ws;
ws
|> List.map (fun (w:Cluster_api.Pool_admin.worker_info) ->
Cluster_api.Pool_admin.update pool w.name >|= function
Cluster_api.Pool_admin.update ~progress:(progress w.name) pool w.name >|= function
| Ok () -> Fmt.pr "%S restarted OK.@." w.name
| Error (`Capnp ex) ->
Fmt.pr "%S: %a@." w.name Capnp_rpc.Error.pp ex;
failwith "Failed update(s)"
)
|> Lwt.join >|= fun () ->
Fmt.pr "All pool workers restarted.@."
Fmt.pr "All pool workers restarted.@.";
if disconnected <> [] then (
Fmt.pr "@[<v2>WARNING: disconnected workers not updated:@,%a@]@."
Fmt.(list ~sep:cut pp_worker_name) disconnected
)

let forget cap_path pool worker =
run cap_path @@ fun admin_service ->
Expand Down Expand Up @@ -204,6 +229,13 @@ let auto_create =
~doc:"Create worker first if unknown"
["auto-create"]

let wait =
Arg.value @@
Arg.flag @@
Arg.info
~doc:"Wait until no jobs are running"
["wait"]

let add_client =
let doc = "Create a new client endpoint for submitting jobs" in
Term.(const add_client $ connect_addr $ Arg.required (client_id ~pos:0)),
Expand Down Expand Up @@ -231,12 +263,12 @@ let show =

let pause =
let doc = "Set a worker to be unavailable for further jobs" in
Term.(const (set_active false) $ all $ auto_create $ connect_addr $ Arg.required pool_pos $ worker),
Term.(const (set_active false) $ all $ auto_create $ wait $ connect_addr $ Arg.required pool_pos $ worker),
Term.info "pause" ~doc

let unpause =
let doc = "Resume a paused worker" in
Term.(const (set_active true) $ all $ auto_create $ connect_addr $ Arg.required pool_pos $ worker),
Term.(const (set_active true) $ all $ auto_create $ const false $ connect_addr $ Arg.required pool_pos $ worker),
Term.info "unpause" ~doc

let update =
Expand Down
52 changes: 48 additions & 4 deletions scheduler/cluster_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ module Item = struct
| x -> Fmt.string f x
end

let report_progress ?progress msg =
msg |> Fmt.kstr @@ fun msg ->
match progress with
| None -> Lwt.return_unit
| Some progress ->
Cluster_api.Progress.report progress msg >|= function
| Ok () -> ()
| Error (`Capnp ex) ->
Logs.info (fun f -> f "Failed to send progress report (%s): %a" msg Capnp_rpc.Error.pp ex)

module Pool_api = struct
module Inactive_reasons = Pool.Inactive_reasons
module Pool = Pool.Make(Item)(Unix)
Expand Down Expand Up @@ -94,6 +104,7 @@ module Pool_api = struct
| Ok { set_job; descr } ->
Capability.inc_ref job;
Capability.resolve_ok set_job job;
Lwt.on_termination (Cluster_api.Job.result job) (fun () -> Pool.job_finished q);
Ok descr

let register t ~name ~capacity worker =
Expand Down Expand Up @@ -147,14 +158,47 @@ module Pool_api = struct
| Error `Still_connected -> Service.fail "Worker %S is still connected!" name
| Error `Unknown_worker -> Service.fail "Worker %S not known in this pool" name
in
let update name =
let drain ?progress name =
match Pool.Worker_map.find_opt name (Pool.connected_workers t.pool) with
| None -> Service.fail "Unknown worker"
| None ->
if Pool.worker_known t.pool name then (
(* Disconnected, so already drained. Just make sure it will be paused on reconnect. *)
Pool.with_worker t.pool name (fun worker ->
Pool.set_active worker false ~reason:Inactive_reasons.admin_pause;
);
report_progress ?progress "Disconnected (nothing to drain)" >>= fun () ->
Lwt_result.return (Service.Response.create_empty ())
) else Lwt_result.fail (`Capnp (Capnp_rpc.Error.exn "Unknown worker"))
| Some w ->
(* Drain a connected worker. *)
Pool.set_active w false ~reason:Inactive_reasons.admin_pause;
let rec aux prev =
report_progress ?progress "Running jobs: %d" prev >>= fun () ->
if prev = 0 then Lwt_result.return (Service.Response.create_empty ())
else (
Pool.running_jobs w ~prev >>= aux
)
in
Pool.running_jobs w >>= aux
in
let update ?progress name =
match Pool.Worker_map.find_opt name (Pool.connected_workers t.pool) with
| None -> Lwt_result.fail (`Capnp (Capnp_rpc.Error.exn "Unknown worker"))
| Some w ->
let cap = Option.get (worker t name) in
Pool.shutdown w; (* Prevent any new items being assigned to it. *)
Service.return_lwt @@ fun () ->
Capability.with_ref cap @@ fun worker ->
let rec aux prev =
if prev = 0 then Lwt.return ()
else (
report_progress ?progress "Running jobs: %d" prev >>= fun () ->
Pool.running_jobs w ~prev >>= aux
)
in
(* Drain worker (with progress updates) *)
Pool.running_jobs w >>= aux >>= fun () ->
(* Restart *)
report_progress ?progress "Restarting" >>= fun () ->
Log.info (fun f -> f "Restarting %S" name);
Cluster_api.Worker.self_update worker >>= function
| Error _ as e -> Lwt.return e
Expand All @@ -178,7 +222,7 @@ module Pool_api = struct
Ok ()
) else Error `No_such_user
in
Cluster_api.Pool_admin.local ~show ~workers ~worker:(worker t) ~set_active ~update ~forget ~set_rate
Cluster_api.Pool_admin.local ~show ~workers ~worker:(worker t) ~set_active ~update ~drain ~forget ~set_rate

let remove_client t ~client_id =
Pool.remove_client t.pool ~client_id
Expand Down
20 changes: 19 additions & 1 deletion scheduler/pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ module Make (Item : S.ITEM)(Time : S.TIME) = struct
| `Inactive of Inactive_reasons.t * unit Lwt.t * unit Lwt.u (* ready/set_ready for resume *)
| `Finished ];
mutable workload : int; (* Total cost of items in worker's queue. *)
mutable running : int; (* Number of jobs currently running. *)
job_finished_cond : unit Lwt_condition.t;
} and client_info = {
id : string;
mutable next_fair_start_time : float;
Expand Down Expand Up @@ -400,6 +402,7 @@ module Make (Item : S.ITEM)(Time : S.TIME) = struct
let item = ticket.item in
Log.info (fun f -> f "%S takes %a from its local queue" worker.name Item.pp item);
mark_cached ticket.item worker;
worker.running <- worker.running + 1;
Prometheus.Counter.inc_one (Metrics.jobs_accepted t.pool);
dec_pending_count t ticket;
Lwt_result.return ticket.item
Expand Down Expand Up @@ -427,13 +430,26 @@ module Make (Item : S.ITEM)(Time : S.TIME) = struct
let item = ticket.item in
Log.info (fun f -> f "%S takes %a from the main queue" worker.name Item.pp item);
mark_cached item worker;
worker.running <- worker.running + 1;
Prometheus.Counter.inc_one (Metrics.jobs_accepted t.pool);
dec_pending_count t ticket;
Lwt_result.return item
)
in
aux ()

let job_finished worker =
Log.debug (fun f -> f "%S finishes a job" worker.name);
worker.running <- worker.running - 1;
Lwt_condition.broadcast worker.job_finished_cond ()

let rec running_jobs ?(prev=(-1)) worker =
if worker.running <> prev then Lwt.return worker.running
else (
Lwt_condition.wait worker.job_finished_cond >>= fun () ->
running_jobs ~prev worker
)

(* Worker is leaving and system is backlogged. Move the worker's items to the backlog. *)
let rec push_back worker worker_q q =
match Backlog.dequeue_opt ~pool:worker.parent.pool worker_q with
Expand Down Expand Up @@ -461,6 +477,8 @@ module Make (Item : S.ITEM)(Time : S.TIME) = struct
name;
state = `Inactive (inactive_reasons, ready, set_ready);
workload = 0;
running = 0;
job_finished_cond = Lwt_condition.create ();
capacity;
} in
t.workers <- Worker_map.add name q t.workers;
Expand Down Expand Up @@ -648,7 +666,7 @@ module Make (Item : S.ITEM)(Time : S.TIME) = struct

let dump_workers f x =
let pp_item f (id, w) =
Fmt.pf f "@,%s (%d): @[%a@]" id w.workload pp_state w in
Fmt.pf f "@,%s (%d): @[%a@] (%d running)" id w.workload pp_state w w.running in
Worker_map.bindings x
|> Fmt.(list ~sep:nop) pp_item f

Expand Down
7 changes: 7 additions & 0 deletions scheduler/pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ module Make (Item : S.ITEM) (Time : S.TIME) : sig
val pop : worker -> (Item.t, [> `Finished]) Lwt_result.t
(** [pop worker] gets the next item for [worker]. *)

val job_finished : worker -> unit
(** [job_finished worker] is called when [worker] completes a job. *)

val running_jobs : ?prev:int -> worker -> int Lwt.t
(** [running_jobs worker] returns the number of jobs running on [worker].
@param prev Wait until the number is different to [prev] before returning. *)

val set_active : reason:Inactive_reasons.t -> worker -> bool -> unit
(** [set_active ~reason worker active] sets the worker's active flag for [reason].
A worker is active if it has no reasons to be inactive.
Expand Down
Loading