From 2c39c31b574fc87cc96e4f655b8d42368d73abee Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Thu, 10 Jun 2021 10:44:23 +0100 Subject: [PATCH] Update to capnp-rpc 1.2 and fix connection handling PR #130 broke things a bit. Although it caused errors to be reported, it also prevented retrying jobs if the connection was lost while the job was queued. --- Dockerfile | 2 +- Dockerfile.worker | 2 +- current_ocluster.opam | 2 +- dune-project | 6 +++--- ocluster-api.opam | 2 +- ocluster.opam | 2 +- ocurrent-plugin/connection.ml | 39 +++++++++++++++++----------------- scheduler/cluster_scheduler.ml | 2 +- stress/utils.ml | 2 +- test/test.ml | 10 ++++----- test/test_plugin.ml | 2 ++ 11 files changed, 37 insertions(+), 34 deletions(-) diff --git a/Dockerfile b/Dockerfile index 7aa1e1e3..8acec9ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM ocaml/opam:debian-10-ocaml-4.12@sha256:ba38dbfbfa92b0dca7171ac33d81276e7209912baf26887ecc20c382d15f32e4 AS build RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev -y --no-install-recommends -RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 1bd3ea712eef197c62c6ab92b2642cc2cda11be2 && opam update +RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 03e325a688740a7b1e6c61892483975480c8e550 && opam update COPY --chown=opam ocluster-api.opam ocluster.opam /src/ COPY --chown=opam obuilder/obuilder.opam obuilder/obuilder-spec.opam /src/obuilder/ RUN opam pin -yn /src/obuilder/ diff --git a/Dockerfile.worker b/Dockerfile.worker index 602ec676..94541e70 100644 --- a/Dockerfile.worker +++ b/Dockerfile.worker @@ -1,6 +1,6 @@ FROM ocaml/opam:debian-10-ocaml-4.12@sha256:ba38dbfbfa92b0dca7171ac33d81276e7209912baf26887ecc20c382d15f32e4 AS build RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev -y --no-install-recommends -RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 1bd3ea712eef197c62c6ab92b2642cc2cda11be2 && opam update +RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 03e325a688740a7b1e6c61892483975480c8e550 && opam update COPY --chown=opam ocluster-api.opam ocluster.opam /src/ COPY --chown=opam obuilder/obuilder.opam obuilder/obuilder-spec.opam /src/obuilder/ RUN opam pin -yn /src/obuilder/ diff --git a/current_ocluster.opam b/current_ocluster.opam index 6c8f9e0a..7acf1adf 100644 --- a/current_ocluster.opam +++ b/current_ocluster.opam @@ -16,7 +16,7 @@ depends: [ "current_git" {>= "0.3"} "current_web" {>= "0.3" & with-test} "current_github" {>= "0.3" & with-test} - "capnp-rpc-unix" {>= "0.9.0"} + "capnp-rpc-unix" {>= "1.2"} "duration" "logs" "fmt" diff --git a/dune-project b/dune-project index 3769b258..64eb87fd 100644 --- a/dune-project +++ b/dune-project @@ -12,7 +12,7 @@ (depends ppx_deriving lwt - (capnp-rpc-lwt (>= 0.9.0)) + (capnp-rpc-lwt (>= 1.2)) fmt ppx_deriving_yojson (ocaml (>= 4.10.0)))) @@ -30,7 +30,7 @@ lwt capnp-rpc-lwt capnp-rpc-net - (capnp-rpc-unix (>= 0.9.0)) + (capnp-rpc-unix (>= 1.2)) logs fmt (conf-libev (<> :os "win32")) @@ -60,7 +60,7 @@ (current_git (>= 0.3)) (current_web (and (>= 0.3) :with-test)) (current_github (and (>= 0.3) :with-test)) - (capnp-rpc-unix (>= 0.9.0)) + (capnp-rpc-unix (>= 1.2)) duration logs fmt diff --git a/ocluster-api.opam b/ocluster-api.opam index e675e8de..6973a3d8 100644 --- a/ocluster-api.opam +++ b/ocluster-api.opam @@ -10,7 +10,7 @@ depends: [ "dune" {>= "2.8"} "ppx_deriving" "lwt" - "capnp-rpc-lwt" {>= "0.9.0"} + "capnp-rpc-lwt" {>= "1.2"} "fmt" "ppx_deriving_yojson" "ocaml" {>= "4.10.0"} diff --git a/ocluster.opam b/ocluster.opam index 1e6aec5b..8126e0dc 100644 --- a/ocluster.opam +++ b/ocluster.opam @@ -25,7 +25,7 @@ depends: [ "lwt" "capnp-rpc-lwt" "capnp-rpc-net" - "capnp-rpc-unix" {>= "0.9.0"} + "capnp-rpc-unix" {>= "1.2"} "logs" "fmt" "conf-libev" {os != "win32"} diff --git a/ocurrent-plugin/connection.ml b/ocurrent-plugin/connection.ml index 49a5ebcb..a5fbd3ca 100644 --- a/ocurrent-plugin/connection.ml +++ b/ocurrent-plugin/connection.ml @@ -29,13 +29,6 @@ type t = { max_pipeline : int; } -(* Replace this with the version in capnp-rpc 1.2, once released *) -let await_settled_exn t = - Capability.wait_until_settled t >|= fun () -> - match Capability.problem t with - | None -> () - | Some e -> Fmt.failwith "%a" Capnp_rpc.Exception.pp e - (* Return a proxy to the scheduler, starting a new connection if we don't currently have a working one. *) let sched ~job t = @@ -51,7 +44,7 @@ let sched ~job t = Lwt.catch (fun () -> Sturdy_ref.connect_exn conn.sr >>= fun cap -> - await_settled_exn cap >|= fun () -> + Capability.await_settled_exn cap >|= fun () -> cap ) (fun ex -> @@ -76,6 +69,14 @@ let urgent_if_high = function | `High -> true | `Low -> false +let with_state metric fn = + Prometheus.Gauge.inc_one metric; + Lwt.finalize fn + (fun () -> + Prometheus.Gauge.dec_one metric; + Lwt.return_unit + ) + (* This is called by [Current.Job] once the confirmation threshold allows the job to be submitted. *) let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~switch:_ = let urgent = urgent priority in @@ -113,16 +114,12 @@ let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~swi let ticket = Cluster_api.Submission.submit ~urgent ?src ?secrets sched ~pool ~action ~cache_hint in let build_job = Cluster_api.Ticket.job ticket in stage := `Get_ticket ticket; (* Allow the user to cancel it now. *) - Prometheus.Gauge.inc_one Metrics.queue_get_ticket; - await_settled_exn ticket >>= fun () -> - Prometheus.Gauge.dec_one Metrics.queue_get_ticket; + with_state Metrics.queue_get_ticket (fun () -> Capability.await_settled ticket) >>!= fun () -> Current.Job.log job "Waiting for worker..."; - Prometheus.Gauge.inc_one Metrics.queue_get_worker; - await_settled_exn build_job >>= fun () -> - Prometheus.Gauge.dec_one Metrics.queue_get_worker; + with_state Metrics.queue_get_worker (fun () -> Capability.await_settled build_job) >>!= fun () -> Capability.dec_ref ticket; stage := `Got_worker; - Lwt.return build_job + Lwt_result.return build_job ) ) (function @@ -136,15 +133,19 @@ let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~swi in limiter_thread := Some use_thread; use_thread >>= fun build_job -> - Lwt.pause () >>= fun () -> - match Capability.problem build_job with - | None -> Lwt.return build_job - | Some err -> + match build_job with + | Ok build_job -> Lwt.return build_job + | Error err -> + Lwt.pause () >>= fun () -> if Capability.problem sched = None then ( (* The job failed but we're still connected to the scheduler. Report the error. *) Lwt.fail_with (Fmt.strf "%a" Capnp_rpc.Exception.pp err) ) else ( limiter_thread := None; + begin match !stage with + | `Init | `Got_worker | `Rate_limit -> () + | `Get_ticket ticket -> Capability.dec_ref ticket + end; stage := `Init; aux () ) diff --git a/scheduler/cluster_scheduler.ml b/scheduler/cluster_scheduler.ml index 61ecbbb7..7490dbfd 100644 --- a/scheduler/cluster_scheduler.ml +++ b/scheduler/cluster_scheduler.ml @@ -80,7 +80,7 @@ module Pool_api = struct let queued_jobs = Metrics.client_queued_jobs ~client_id:(Pool.Client.client_id client) ~pool:(Pool.Client.pool_id client) ~urgent in Prometheus.Gauge.inc_one queued_jobs; Lwt.async (fun () -> - Capability.wait_until_settled job >|= fun () -> + Capability.await_settled job >|= fun (_ : _ result) -> Prometheus.Gauge.dec_one queued_jobs ); let cancel () = diff --git a/stress/utils.ml b/stress/utils.ml index cdc2d7c7..f99f5f4d 100644 --- a/stress/utils.ml +++ b/stress/utils.ml @@ -14,7 +14,7 @@ let submit cap ~name ~i ~pool ~urgent ~cache_hint = let descr = Api.Submission.obuilder_build job_id in Capability.with_ref (Api.Submission.submit cap ~action:descr ~pool ~urgent ~cache_hint) @@ fun ticket -> Capability.with_ref (Api.Ticket.job ticket) @@ fun job -> - Capability.wait_until_settled job >>= fun () -> + Capability.await_settled_exn job >>= fun () -> Logs.info (fun f -> f "%s: job %S running" name job_id); Api.Job.result job >|= function | Ok _ -> Logs.info (fun f -> f "%s : job %S finished" name job_id); diff --git a/test/test.ml b/test/test.ml index fdd20063..03db4504 100644 --- a/test/test.ml +++ b/test/test.ml @@ -122,9 +122,9 @@ let already_registered () = with_sched @@ fun ~admin:_ ~registry -> let api = Cluster_api.Worker.local ~metrics:(fun _ -> assert false) ~self_update:(fun () -> assert false) in let q1 = Cluster_api.Registration.register registry ~name:"worker-1" ~capacity:1 api in - Capability.wait_until_settled q1 >>= fun () -> + Capability.await_settled q1 >>= fun (_ : _ result) -> let q2 = Cluster_api.Registration.register registry ~name:"worker-1" ~capacity:1 api in - Capability.wait_until_settled q2 >>= fun () -> + Capability.await_settled q2 >>= fun (_ : _ result) -> let pp_err = Fmt.(option ~none:(unit "ok")) Capnp_rpc.Exception.pp in let p1 = Fmt.strf "%a" pp_err (Capability.problem q1) in let p2 = Fmt.strf "%a" pp_err (Capability.problem q2) in @@ -433,8 +433,8 @@ let clients () = let action = Cluster_api.Submission.docker_build (`Contents "example") in let ticket1 = Cluster_api.Submission.submit client1 ~pool:"pool" ~action ~cache_hint:"1" in let ticket2 = Cluster_api.Submission.submit client2 ~pool:"pool" ~action ~cache_hint:"1" in - Capability.wait_until_settled ticket1 >>= fun () -> - Capability.wait_until_settled ticket2 >>= fun () -> + Capability.await_settled ticket1 >>= fun (_ : _ result) -> + Capability.await_settled ticket2 >>= fun (_ : _ result) -> let pp_err = Fmt.(option ~none:(unit "ok")) Capnp_rpc.Exception.pp in let problem1 = Fmt.strf "%a" pp_err (Capability.problem ticket1) in Alcotest.(check string) "Access revoked" "Failed: Access has been revoked" problem1; @@ -442,7 +442,7 @@ let clients () = Alcotest.(check string) "Access OK" "ok" problem2; Capability.dec_ref ticket2; Capability.with_ref (Cluster_api.Admin.add_client admin "client2") @@ fun client2b -> - Capability.wait_until_settled client2b >>= fun () -> + Capability.await_settled client2b >>= fun (_ : _ result) -> let problem = Fmt.strf "%a" pp_err (Capability.problem client2b) in Alcotest.(check string) "Duplicate user" {|Failed: Client "client2" already registered!|} problem; Lwt.try_bind diff --git a/test/test_plugin.ml b/test/test_plugin.ml index 0a7a47bf..249090ce 100644 --- a/test/test_plugin.ml +++ b/test/test_plugin.ml @@ -98,6 +98,8 @@ let disconnect_while_queued () = let pipeline t = Current_ocluster.build t spec ~pool:"pool" ~src:(Current.return []) ~options in setup ~pipeline @@ fun ~registry ~await_result ~break -> Lwt.pause () >>= fun () -> + Lwt.pause () >>= fun () -> + Lwt.pause () >>= fun () -> break () >>= fun () -> (* The plugin will immediately reconnect to the scheduler. Now add a worker and the job should run. *)