Skip to content

Commit

Permalink
Update to capnp-rpc 1.2 and fix connection handling
Browse files Browse the repository at this point in the history
PR #130 broke things a bit. Although it caused errors to be reported, it
also prevented retrying jobs if the connection was lost while the job
was queued.
  • Loading branch information
talex5 committed Jun 10, 2021
1 parent 96a05b3 commit 2c39c31
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ocaml/opam:debian-10-ocaml-4.12@sha256:ba38dbfbfa92b0dca7171ac33d81276e7209912baf26887ecc20c382d15f32e4 AS build
RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev -y --no-install-recommends
RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 1bd3ea712eef197c62c6ab92b2642cc2cda11be2 && opam update
RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 03e325a688740a7b1e6c61892483975480c8e550 && opam update
COPY --chown=opam ocluster-api.opam ocluster.opam /src/
COPY --chown=opam obuilder/obuilder.opam obuilder/obuilder-spec.opam /src/obuilder/
RUN opam pin -yn /src/obuilder/
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.worker
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ocaml/opam:debian-10-ocaml-4.12@sha256:ba38dbfbfa92b0dca7171ac33d81276e7209912baf26887ecc20c382d15f32e4 AS build
RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev -y --no-install-recommends
RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 1bd3ea712eef197c62c6ab92b2642cc2cda11be2 && opam update
RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 03e325a688740a7b1e6c61892483975480c8e550 && opam update
COPY --chown=opam ocluster-api.opam ocluster.opam /src/
COPY --chown=opam obuilder/obuilder.opam obuilder/obuilder-spec.opam /src/obuilder/
RUN opam pin -yn /src/obuilder/
Expand Down
2 changes: 1 addition & 1 deletion current_ocluster.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ depends: [
"current_git" {>= "0.3"}
"current_web" {>= "0.3" & with-test}
"current_github" {>= "0.3" & with-test}
"capnp-rpc-unix" {>= "0.9.0"}
"capnp-rpc-unix" {>= "1.2"}
"duration"
"logs"
"fmt"
Expand Down
6 changes: 3 additions & 3 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
(depends
ppx_deriving
lwt
(capnp-rpc-lwt (>= 0.9.0))
(capnp-rpc-lwt (>= 1.2))
fmt
ppx_deriving_yojson
(ocaml (>= 4.10.0))))
Expand All @@ -30,7 +30,7 @@
lwt
capnp-rpc-lwt
capnp-rpc-net
(capnp-rpc-unix (>= 0.9.0))
(capnp-rpc-unix (>= 1.2))
logs
fmt
(conf-libev (<> :os "win32"))
Expand Down Expand Up @@ -60,7 +60,7 @@
(current_git (>= 0.3))
(current_web (and (>= 0.3) :with-test))
(current_github (and (>= 0.3) :with-test))
(capnp-rpc-unix (>= 0.9.0))
(capnp-rpc-unix (>= 1.2))
duration
logs
fmt
Expand Down
2 changes: 1 addition & 1 deletion ocluster-api.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ depends: [
"dune" {>= "2.8"}
"ppx_deriving"
"lwt"
"capnp-rpc-lwt" {>= "0.9.0"}
"capnp-rpc-lwt" {>= "1.2"}
"fmt"
"ppx_deriving_yojson"
"ocaml" {>= "4.10.0"}
Expand Down
2 changes: 1 addition & 1 deletion ocluster.opam
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ depends: [
"lwt"
"capnp-rpc-lwt"
"capnp-rpc-net"
"capnp-rpc-unix" {>= "0.9.0"}
"capnp-rpc-unix" {>= "1.2"}
"logs"
"fmt"
"conf-libev" {os != "win32"}
Expand Down
39 changes: 20 additions & 19 deletions ocurrent-plugin/connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ type t = {
max_pipeline : int;
}

(* Replace this with the version in capnp-rpc 1.2, once released *)
let await_settled_exn t =
Capability.wait_until_settled t >|= fun () ->
match Capability.problem t with
| None -> ()
| Some e -> Fmt.failwith "%a" Capnp_rpc.Exception.pp e

(* Return a proxy to the scheduler, starting a new connection if we don't
currently have a working one. *)
let sched ~job t =
Expand All @@ -51,7 +44,7 @@ let sched ~job t =
Lwt.catch
(fun () ->
Sturdy_ref.connect_exn conn.sr >>= fun cap ->
await_settled_exn cap >|= fun () ->
Capability.await_settled_exn cap >|= fun () ->
cap
)
(fun ex ->
Expand All @@ -76,6 +69,14 @@ let urgent_if_high = function
| `High -> true
| `Low -> false

let with_state metric fn =
Prometheus.Gauge.inc_one metric;
Lwt.finalize fn
(fun () ->
Prometheus.Gauge.dec_one metric;
Lwt.return_unit
)

(* This is called by [Current.Job] once the confirmation threshold allows the job to be submitted. *)
let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~switch:_ =
let urgent = urgent priority in
Expand Down Expand Up @@ -113,16 +114,12 @@ let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~swi
let ticket = Cluster_api.Submission.submit ~urgent ?src ?secrets sched ~pool ~action ~cache_hint in
let build_job = Cluster_api.Ticket.job ticket in
stage := `Get_ticket ticket; (* Allow the user to cancel it now. *)
Prometheus.Gauge.inc_one Metrics.queue_get_ticket;
await_settled_exn ticket >>= fun () ->
Prometheus.Gauge.dec_one Metrics.queue_get_ticket;
with_state Metrics.queue_get_ticket (fun () -> Capability.await_settled ticket) >>!= fun () ->
Current.Job.log job "Waiting for worker...";
Prometheus.Gauge.inc_one Metrics.queue_get_worker;
await_settled_exn build_job >>= fun () ->
Prometheus.Gauge.dec_one Metrics.queue_get_worker;
with_state Metrics.queue_get_worker (fun () -> Capability.await_settled build_job) >>!= fun () ->
Capability.dec_ref ticket;
stage := `Got_worker;
Lwt.return build_job
Lwt_result.return build_job
)
)
(function
Expand All @@ -136,15 +133,19 @@ let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~swi
in
limiter_thread := Some use_thread;
use_thread >>= fun build_job ->
Lwt.pause () >>= fun () ->
match Capability.problem build_job with
| None -> Lwt.return build_job
| Some err ->
match build_job with
| Ok build_job -> Lwt.return build_job
| Error err ->
Lwt.pause () >>= fun () ->
if Capability.problem sched = None then (
(* The job failed but we're still connected to the scheduler. Report the error. *)
Lwt.fail_with (Fmt.strf "%a" Capnp_rpc.Exception.pp err)
) else (
limiter_thread := None;
begin match !stage with
| `Init | `Got_worker | `Rate_limit -> ()
| `Get_ticket ticket -> Capability.dec_ref ticket
end;
stage := `Init;
aux ()
)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/cluster_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ module Pool_api = struct
let queued_jobs = Metrics.client_queued_jobs ~client_id:(Pool.Client.client_id client) ~pool:(Pool.Client.pool_id client) ~urgent in
Prometheus.Gauge.inc_one queued_jobs;
Lwt.async (fun () ->
Capability.wait_until_settled job >|= fun () ->
Capability.await_settled job >|= fun (_ : _ result) ->
Prometheus.Gauge.dec_one queued_jobs
);
let cancel () =
Expand Down
2 changes: 1 addition & 1 deletion stress/utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ let submit cap ~name ~i ~pool ~urgent ~cache_hint =
let descr = Api.Submission.obuilder_build job_id in
Capability.with_ref (Api.Submission.submit cap ~action:descr ~pool ~urgent ~cache_hint) @@ fun ticket ->
Capability.with_ref (Api.Ticket.job ticket) @@ fun job ->
Capability.wait_until_settled job >>= fun () ->
Capability.await_settled_exn job >>= fun () ->
Logs.info (fun f -> f "%s: job %S running" name job_id);
Api.Job.result job >|= function
| Ok _ -> Logs.info (fun f -> f "%s : job %S finished" name job_id);
Expand Down
10 changes: 5 additions & 5 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ let already_registered () =
with_sched @@ fun ~admin:_ ~registry ->
let api = Cluster_api.Worker.local ~metrics:(fun _ -> assert false) ~self_update:(fun () -> assert false) in
let q1 = Cluster_api.Registration.register registry ~name:"worker-1" ~capacity:1 api in
Capability.wait_until_settled q1 >>= fun () ->
Capability.await_settled q1 >>= fun (_ : _ result) ->
let q2 = Cluster_api.Registration.register registry ~name:"worker-1" ~capacity:1 api in
Capability.wait_until_settled q2 >>= fun () ->
Capability.await_settled q2 >>= fun (_ : _ result) ->
let pp_err = Fmt.(option ~none:(unit "ok")) Capnp_rpc.Exception.pp in
let p1 = Fmt.strf "%a" pp_err (Capability.problem q1) in
let p2 = Fmt.strf "%a" pp_err (Capability.problem q2) in
Expand Down Expand Up @@ -433,16 +433,16 @@ let clients () =
let action = Cluster_api.Submission.docker_build (`Contents "example") in
let ticket1 = Cluster_api.Submission.submit client1 ~pool:"pool" ~action ~cache_hint:"1" in
let ticket2 = Cluster_api.Submission.submit client2 ~pool:"pool" ~action ~cache_hint:"1" in
Capability.wait_until_settled ticket1 >>= fun () ->
Capability.wait_until_settled ticket2 >>= fun () ->
Capability.await_settled ticket1 >>= fun (_ : _ result) ->
Capability.await_settled ticket2 >>= fun (_ : _ result) ->
let pp_err = Fmt.(option ~none:(unit "ok")) Capnp_rpc.Exception.pp in
let problem1 = Fmt.strf "%a" pp_err (Capability.problem ticket1) in
Alcotest.(check string) "Access revoked" "Failed: Access has been revoked" problem1;
let problem2 = Fmt.strf "%a" pp_err (Capability.problem ticket2) in
Alcotest.(check string) "Access OK" "ok" problem2;
Capability.dec_ref ticket2;
Capability.with_ref (Cluster_api.Admin.add_client admin "client2") @@ fun client2b ->
Capability.wait_until_settled client2b >>= fun () ->
Capability.await_settled client2b >>= fun (_ : _ result) ->
let problem = Fmt.strf "%a" pp_err (Capability.problem client2b) in
Alcotest.(check string) "Duplicate user" {|Failed: Client "client2" already registered!|} problem;
Lwt.try_bind
Expand Down
2 changes: 2 additions & 0 deletions test/test_plugin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ let disconnect_while_queued () =
let pipeline t = Current_ocluster.build t spec ~pool:"pool" ~src:(Current.return []) ~options in
setup ~pipeline @@ fun ~registry ~await_result ~break ->
Lwt.pause () >>= fun () ->
Lwt.pause () >>= fun () ->
Lwt.pause () >>= fun () ->
break () >>= fun () ->
(* The plugin will immediately reconnect to the scheduler.
Now add a worker and the job should run. *)
Expand Down

0 comments on commit 2c39c31

Please sign in to comment.