Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to capnp-rpc 1.2 and fix connection handling #131

Merged
merged 1 commit into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ocaml/opam:debian-10-ocaml-4.12@sha256:ba38dbfbfa92b0dca7171ac33d81276e7209912baf26887ecc20c382d15f32e4 AS build
RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev -y --no-install-recommends
RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 1bd3ea712eef197c62c6ab92b2642cc2cda11be2 && opam update
RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 03e325a688740a7b1e6c61892483975480c8e550 && opam update
COPY --chown=opam ocluster-api.opam ocluster.opam /src/
COPY --chown=opam obuilder/obuilder.opam obuilder/obuilder-spec.opam /src/obuilder/
RUN opam pin -yn /src/obuilder/
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.worker
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ocaml/opam:debian-10-ocaml-4.12@sha256:ba38dbfbfa92b0dca7171ac33d81276e7209912baf26887ecc20c382d15f32e4 AS build
RUN sudo apt-get update && sudo apt-get install libev-dev capnproto m4 pkg-config libsqlite3-dev libgmp-dev -y --no-install-recommends
RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 1bd3ea712eef197c62c6ab92b2642cc2cda11be2 && opam update
RUN cd ~/opam-repository && git pull origin -q master && git reset --hard 03e325a688740a7b1e6c61892483975480c8e550 && opam update
COPY --chown=opam ocluster-api.opam ocluster.opam /src/
COPY --chown=opam obuilder/obuilder.opam obuilder/obuilder-spec.opam /src/obuilder/
RUN opam pin -yn /src/obuilder/
Expand Down
2 changes: 1 addition & 1 deletion current_ocluster.opam
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ depends: [
"current_git" {>= "0.3"}
"current_web" {>= "0.3" & with-test}
"current_github" {>= "0.3" & with-test}
"capnp-rpc-unix" {>= "0.9.0"}
"capnp-rpc-unix" {>= "1.2"}
"duration"
"logs"
"fmt"
Expand Down
6 changes: 3 additions & 3 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
(depends
ppx_deriving
lwt
(capnp-rpc-lwt (>= 0.9.0))
(capnp-rpc-lwt (>= 1.2))
fmt
ppx_deriving_yojson
(ocaml (>= 4.10.0))))
Expand All @@ -30,7 +30,7 @@
lwt
capnp-rpc-lwt
capnp-rpc-net
(capnp-rpc-unix (>= 0.9.0))
(capnp-rpc-unix (>= 1.2))
logs
fmt
(conf-libev (<> :os "win32"))
Expand Down Expand Up @@ -60,7 +60,7 @@
(current_git (>= 0.3))
(current_web (and (>= 0.3) :with-test))
(current_github (and (>= 0.3) :with-test))
(capnp-rpc-unix (>= 0.9.0))
(capnp-rpc-unix (>= 1.2))
duration
logs
fmt
Expand Down
2 changes: 1 addition & 1 deletion ocluster-api.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ depends: [
"dune" {>= "2.8"}
"ppx_deriving"
"lwt"
"capnp-rpc-lwt" {>= "0.9.0"}
"capnp-rpc-lwt" {>= "1.2"}
"fmt"
"ppx_deriving_yojson"
"ocaml" {>= "4.10.0"}
Expand Down
2 changes: 1 addition & 1 deletion ocluster.opam
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ depends: [
"lwt"
"capnp-rpc-lwt"
"capnp-rpc-net"
"capnp-rpc-unix" {>= "0.9.0"}
"capnp-rpc-unix" {>= "1.2"}
"logs"
"fmt"
"conf-libev" {os != "win32"}
Expand Down
39 changes: 20 additions & 19 deletions ocurrent-plugin/connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ type t = {
max_pipeline : int;
}

(* Replace this with the version in capnp-rpc 1.2, once released *)
let await_settled_exn t =
Capability.wait_until_settled t >|= fun () ->
match Capability.problem t with
| None -> ()
| Some e -> Fmt.failwith "%a" Capnp_rpc.Exception.pp e

(* Return a proxy to the scheduler, starting a new connection if we don't
currently have a working one. *)
let sched ~job t =
Expand All @@ -51,7 +44,7 @@ let sched ~job t =
Lwt.catch
(fun () ->
Sturdy_ref.connect_exn conn.sr >>= fun cap ->
await_settled_exn cap >|= fun () ->
Capability.await_settled_exn cap >|= fun () ->
cap
)
(fun ex ->
Expand All @@ -76,6 +69,14 @@ let urgent_if_high = function
| `High -> true
| `Low -> false

let with_state metric fn =
Prometheus.Gauge.inc_one metric;
Lwt.finalize fn
(fun () ->
Prometheus.Gauge.dec_one metric;
Lwt.return_unit
)

(* This is called by [Current.Job] once the confirmation threshold allows the job to be submitted. *)
let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~switch:_ =
let urgent = urgent priority in
Expand Down Expand Up @@ -113,16 +114,12 @@ let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~swi
let ticket = Cluster_api.Submission.submit ~urgent ?src ?secrets sched ~pool ~action ~cache_hint in
let build_job = Cluster_api.Ticket.job ticket in
stage := `Get_ticket ticket; (* Allow the user to cancel it now. *)
Prometheus.Gauge.inc_one Metrics.queue_get_ticket;
await_settled_exn ticket >>= fun () ->
Prometheus.Gauge.dec_one Metrics.queue_get_ticket;
with_state Metrics.queue_get_ticket (fun () -> Capability.await_settled ticket) >>!= fun () ->
Current.Job.log job "Waiting for worker...";
Prometheus.Gauge.inc_one Metrics.queue_get_worker;
await_settled_exn build_job >>= fun () ->
Prometheus.Gauge.dec_one Metrics.queue_get_worker;
with_state Metrics.queue_get_worker (fun () -> Capability.await_settled build_job) >>!= fun () ->
Capability.dec_ref ticket;
stage := `Got_worker;
Lwt.return build_job
Lwt_result.return build_job
)
)
(function
Expand All @@ -136,15 +133,19 @@ let submit ~job ~pool ~action ~cache_hint ?src ?secrets ~urgent t ~priority ~swi
in
limiter_thread := Some use_thread;
use_thread >>= fun build_job ->
Lwt.pause () >>= fun () ->
match Capability.problem build_job with
| None -> Lwt.return build_job
| Some err ->
match build_job with
| Ok build_job -> Lwt.return build_job
| Error err ->
Lwt.pause () >>= fun () ->
if Capability.problem sched = None then (
(* The job failed but we're still connected to the scheduler. Report the error. *)
Lwt.fail_with (Fmt.strf "%a" Capnp_rpc.Exception.pp err)
) else (
limiter_thread := None;
begin match !stage with
| `Init | `Got_worker | `Rate_limit -> ()
| `Get_ticket ticket -> Capability.dec_ref ticket
end;
stage := `Init;
aux ()
)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/cluster_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ module Pool_api = struct
let queued_jobs = Metrics.client_queued_jobs ~client_id:(Pool.Client.client_id client) ~pool:(Pool.Client.pool_id client) ~urgent in
Prometheus.Gauge.inc_one queued_jobs;
Lwt.async (fun () ->
Capability.wait_until_settled job >|= fun () ->
Capability.await_settled job >|= fun (_ : _ result) ->
Prometheus.Gauge.dec_one queued_jobs
);
let cancel () =
Expand Down
2 changes: 1 addition & 1 deletion stress/utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ let submit cap ~name ~i ~pool ~urgent ~cache_hint =
let descr = Api.Submission.obuilder_build job_id in
Capability.with_ref (Api.Submission.submit cap ~action:descr ~pool ~urgent ~cache_hint) @@ fun ticket ->
Capability.with_ref (Api.Ticket.job ticket) @@ fun job ->
Capability.wait_until_settled job >>= fun () ->
Capability.await_settled_exn job >>= fun () ->
Logs.info (fun f -> f "%s: job %S running" name job_id);
Api.Job.result job >|= function
| Ok _ -> Logs.info (fun f -> f "%s : job %S finished" name job_id);
Expand Down
10 changes: 5 additions & 5 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ let already_registered () =
with_sched @@ fun ~admin:_ ~registry ->
let api = Cluster_api.Worker.local ~metrics:(fun _ -> assert false) ~self_update:(fun () -> assert false) in
let q1 = Cluster_api.Registration.register registry ~name:"worker-1" ~capacity:1 api in
Capability.wait_until_settled q1 >>= fun () ->
Capability.await_settled q1 >>= fun (_ : _ result) ->
let q2 = Cluster_api.Registration.register registry ~name:"worker-1" ~capacity:1 api in
Capability.wait_until_settled q2 >>= fun () ->
Capability.await_settled q2 >>= fun (_ : _ result) ->
let pp_err = Fmt.(option ~none:(unit "ok")) Capnp_rpc.Exception.pp in
let p1 = Fmt.strf "%a" pp_err (Capability.problem q1) in
let p2 = Fmt.strf "%a" pp_err (Capability.problem q2) in
Expand Down Expand Up @@ -433,16 +433,16 @@ let clients () =
let action = Cluster_api.Submission.docker_build (`Contents "example") in
let ticket1 = Cluster_api.Submission.submit client1 ~pool:"pool" ~action ~cache_hint:"1" in
let ticket2 = Cluster_api.Submission.submit client2 ~pool:"pool" ~action ~cache_hint:"1" in
Capability.wait_until_settled ticket1 >>= fun () ->
Capability.wait_until_settled ticket2 >>= fun () ->
Capability.await_settled ticket1 >>= fun (_ : _ result) ->
Capability.await_settled ticket2 >>= fun (_ : _ result) ->
let pp_err = Fmt.(option ~none:(unit "ok")) Capnp_rpc.Exception.pp in
let problem1 = Fmt.strf "%a" pp_err (Capability.problem ticket1) in
Alcotest.(check string) "Access revoked" "Failed: Access has been revoked" problem1;
let problem2 = Fmt.strf "%a" pp_err (Capability.problem ticket2) in
Alcotest.(check string) "Access OK" "ok" problem2;
Capability.dec_ref ticket2;
Capability.with_ref (Cluster_api.Admin.add_client admin "client2") @@ fun client2b ->
Capability.wait_until_settled client2b >>= fun () ->
Capability.await_settled client2b >>= fun (_ : _ result) ->
let problem = Fmt.strf "%a" pp_err (Capability.problem client2b) in
Alcotest.(check string) "Duplicate user" {|Failed: Client "client2" already registered!|} problem;
Lwt.try_bind
Expand Down
2 changes: 2 additions & 0 deletions test/test_plugin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ let disconnect_while_queued () =
let pipeline t = Current_ocluster.build t spec ~pool:"pool" ~src:(Current.return []) ~options in
setup ~pipeline @@ fun ~registry ~await_result ~break ->
Lwt.pause () >>= fun () ->
Lwt.pause () >>= fun () ->
Lwt.pause () >>= fun () ->
break () >>= fun () ->
(* The plugin will immediately reconnect to the scheduler.
Now add a worker and the job should run. *)
Expand Down