Skip to content

Commit

Permalink
Merge pull request #156 from patricoferris/custom-jobs
Browse files Browse the repository at this point in the history
Support custom jobs
  • Loading branch information
talex5 authored Apr 21, 2022
2 parents 62bc76a + f2c1820 commit 4cb48ea
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 31 deletions.
23 changes: 23 additions & 0 deletions api/custom.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
type payload = Raw.Reader.pointer_t

type 'a t = {
kind : string;
payload : 'a;
}

type send = (Raw.Builder.pointer_t -> unit) t
type recv = Raw.Reader.pointer_t t

let v ~kind payload = { kind; payload }

let kind t = t.kind
let payload t = t.payload

let read (action : Raw.Reader.Custom.t) =
let payload = Raw.Reader.Custom.payload_get action in
let kind = Raw.Reader.Custom.kind_get action in
{ kind; payload }

let init b { kind; payload } =
Raw.Builder.Custom.kind_set b kind;
payload (Raw.Builder.Custom.payload_get b)
23 changes: 23 additions & 0 deletions api/custom.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
type payload = Raw.Reader.pointer_t
(** The custom job specification payload. *)

type 'a t
(** Custom job specifications *)

type send = (Raw.Builder.pointer_t -> unit) t
type recv = Raw.Reader.pointer_t t

val v : kind:string -> (Raw.Builder.pointer_t -> unit) -> send
(** [v ~kind payload] is a custom job specification. *)

val kind : _ t -> string
(** A string describing the kind of custom job. *)

val payload : 'a t -> 'a
(** The dynamic payload of the custom job. *)

val init : Raw.Builder.Custom.t -> send -> unit
(** [init builder t] initialises a fresh builder with the values from [t]. *)

val read : Raw.Reader.Custom.t -> recv
(** [read c] reads the buffer and returns a custom job specification. *)
9 changes: 9 additions & 0 deletions api/schema.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ struct OBuilder {
# The contents of the OBuilder spec to build.
}

struct Custom {
kind @0 :Text;
# A name describing the kind of custom job

payload @1 :AnyPointer;
# A custom job with a dynamic payload
}

struct Secret {
id @0 :Text;
# The secret id.
Expand All @@ -53,6 +61,7 @@ struct JobDescr {
action :union {
dockerBuild @0 :DockerBuild;
obuilder @4 :OBuilder;
custom @6 :Custom;
}

cacheHint @1 :Text;
Expand Down
11 changes: 10 additions & 1 deletion api/submission.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@ module X = Raw.Client.Submission

type t = X.t Capability.t

type action =
type 'a action =
| Docker_build of Docker.Spec.t
| Obuilder_build of Obuilder_job.Spec.t
| Custom_build of 'a Custom.t

type send_action = (Raw.Builder.pointer_t -> unit) action

let docker_build ?push_to ?(options=Docker.Spec.defaults) dockerfile =
Docker_build { Docker.Spec.dockerfile; options; push_to }

let obuilder_build spec =
Obuilder_build { Obuilder_job.Spec.spec = `Contents spec }

let custom_build c = Custom_build c

let get_action descr =
let module JD = Raw.Reader.JobDescr in
match JD.action_get descr |> JD.Action.get with
| DockerBuild action -> Docker_build (Docker.Spec.read action)
| Obuilder action -> Obuilder_build (Obuilder_job.Spec.read action)
| Custom action -> Custom_build (Custom.read action)
| Undefined x -> Fmt.failwith "Unknown action type %d" x

let submit ?src ?(urgent=false) ?(secrets=[]) t ~pool ~action ~cache_hint =
Expand All @@ -55,6 +61,9 @@ let submit ?src ?(urgent=false) ?(secrets=[]) t ~pool ~action ~cache_hint =
| Obuilder_build action ->
let b = JD.Action.obuilder_init act in
Obuilder_job.Spec.init b action
| Custom_build action ->
let b = JD.Action.custom_init act in
Custom.init b action
end;
JD.cache_hint_set b cache_hint;
src |> Option.iter (fun (repo, commits) ->
Expand Down
2 changes: 1 addition & 1 deletion ocurrent-plugin/connection.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ val create :
val pool :
job:Current.Job.t ->
pool:string ->
action:Cluster_api.Submission.action ->
action:Cluster_api.Submission.send_action ->
cache_hint:string ->
?src:string * string list ->
?secrets:(string * string) list ->
Expand Down
3 changes: 2 additions & 1 deletion ocurrent-plugin/current_ocluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ module Op = struct
in
Some (repo, List.map hash commits)

let default_hint (action : Cluster_api.Submission.action) =
let default_hint (action : Cluster_api.Submission.send_action) =
match action with
| Custom_build _ -> ""
| Obuilder_build { spec = `Contents spec } ->
Astring.String.take ~sat:((<>) '\n') spec (* Use the first line *)
| Docker_build { dockerfile; _ } ->
Expand Down
53 changes: 53 additions & 0 deletions test/custom.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
open Lwt.Infix

module Spec = struct
let obuilder_spec_to_custom spec builder =
let open Cluster_api.Raw in
let obuilder = Builder.OBuilder.init_pointer builder in
Builder.OBuilder.spec_set obuilder spec

let dockerfile_to_custom spec =
let open Cluster_api.Raw in
let custom = Builder.Custom.init_root () in
let builder = Builder.Custom.payload_get custom in
let df = Builder.DockerBuild.Dockerfile.init_pointer builder in
Builder.DockerBuild.Dockerfile.contents_set df spec;
let r = Reader.Custom.of_builder custom in
Reader.Custom.payload_get r
end

(* Using the underlying Connection API to submit custom build jobs which are really
just dockerfile builds in disguise. *)
module Build = struct
module Op = struct
type t = Current_ocluster.Connection.t
let ( >>!= ) = Lwt_result.bind
let id = "mock-ocluster-build"

(* Build Pool *)
module Key = Current.String
(* Dockerfile Spec *)
module Value = Current.String

module Outcome = Current.String

let pp = Fmt.(pair string string)

let auto_cancel = true
let latched = true

let run t job pool value =
let action = Cluster_api.Submission.custom_build @@ Cluster_api.Custom.v ~kind:"dockerfile" @@ Spec.obuilder_spec_to_custom value in
let build_pool = Current_ocluster.Connection.pool ~job ~pool ~action ~cache_hint:"" t in
Current.Job.start_with ~pool:build_pool job ~level:Current.Level.Average >>= fun build_job ->
Capnp_rpc_lwt.Capability.with_ref build_job (Current_ocluster.Connection.run_job ~job)
end

module BC = Current_cache.Generic (Op)

let build_dockerfile t pool spec =
let open Current.Syntax in
Current.component "mock custom cluster build" |>
let> () = Current.return () in
BC.run t pool spec
end
2 changes: 1 addition & 1 deletion test/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(test
(name test)
(package ocluster)
(modules test mock_builder mock_network test_plugin)
(modules test custom mock_builder mock_network test_plugin)
(libraries
alcotest-lwt
capnp-rpc-net
Expand Down
43 changes: 38 additions & 5 deletions test/mock_builder.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ let rec await t id =
Lwt_condition.wait t.cond >>= fun () ->
await t id

let docker_build t ~switch ~log ~src:_ ~secrets:_ = function
| `Obuilder _ -> assert false
| `Docker (dockerfile, _options) ->
let build t ~switch ~log ~src:_ ~secrets:_ v =
let docker_build dockerfile =
match dockerfile with
| `Path _ -> assert false
| `Contents dockerfile ->
Expand All @@ -51,20 +50,54 @@ let docker_build t ~switch ~log ~src:_ ~secrets:_ = function
reply >|= fun reply ->
Hashtbl.remove t.replies dockerfile;
reply
in
match v with
| `Obuilder _ -> assert false
| `Docker (v, _) -> docker_build v
| `Custom c ->
let open Cluster_api in
match Custom.kind c with
| "dockerfile" ->
let module Db = Raw.Reader.DockerBuild in
let payload = Custom.payload c in
let r = Raw.Reader.of_pointer payload in
let contents =
match Db.Dockerfile.get (Db.dockerfile_get r) with
| Contents c -> `Contents c
| Path p -> `Path p
| Undefined _ -> Fmt.failwith "Unknown Dockerfile file"
in
docker_build contents
| "obuilder" as kind ->
let payload = Custom.payload c in
let obuilder_spec = Raw.Reader.OBuilder.spec_get @@ Raw.Reader.of_pointer payload in
Logs.info (fun f -> f "Got mock custom build %S" kind);
Cluster_worker.Log_data.write log (Fmt.str "Building job %s@." kind);
Cluster_worker.Log_data.write log (Fmt.str "Got spec %s@." obuilder_spec);
let reply = get t kind in
Lwt_switch.add_hook_or_exec (Some switch) (fun () ->
if Lwt.state reply = Lwt.Sleep then
set t kind @@ Error `Cancelled;
Lwt.return_unit
) >>= fun () ->
reply >|= fun reply ->
Hashtbl.remove t.replies kind;
reply
| v -> invalid_arg ("Unsupported kind of custom build: " ^ v)

let update () =
Logs.info (fun f -> f "Mock download updates...");
Lwt.return (fun () -> failwith "Mock restart")

let run ?(capacity=1) ?(name="worker-1") ~switch t registration_service =
let thread = Cluster_worker.run ~switch ~capacity ~name ~build:(docker_build t) ~update ~state_dir registration_service in
let thread = Cluster_worker.run ~switch ~capacity ~name ~build:(build t) ~update ~state_dir registration_service in
Lwt.on_failure thread
(fun ex -> if Lwt_switch.is_on switch then raise ex)

let run_remote ~builder_switch ~network_switch ?(capacity=1) ?(name="worker-1") t registration_service =
let thread =
let registration_service = Mock_network.remote ~switch:network_switch registration_service in
Cluster_worker.run ~switch:builder_switch ~capacity ~name ~build:(docker_build t) ~update ~state_dir registration_service
Cluster_worker.run ~switch:builder_switch ~capacity ~name ~build:(build t) ~update ~state_dir registration_service
in
Lwt.on_failure thread
(fun ex ->
Expand Down
29 changes: 27 additions & 2 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ let read_log job =
in
aux 0L

let submit service dockerfile =
let action = Cluster_api.Submission.docker_build (`Contents dockerfile) in
let common_submit service action =
Capability.with_ref (Cluster_api.Submission.submit service ~pool:"pool" ~action ~cache_hint:"1" ?src:None) @@ fun ticket ->
Capability.with_ref (Cluster_api.Ticket.job ticket) @@ fun job ->
let result = Cluster_api.Job.result job in
Expand All @@ -51,6 +50,14 @@ let submit service dockerfile =
| Ok x -> Fmt.failwith "Unexpected job output: %S" x
| Error (`Capnp _) -> Fmt.str "%sFAILED@." log

let submit service dockerfile =
let action = Cluster_api.Submission.docker_build (`Contents dockerfile) in
common_submit service action

let custom_submit service c =
let action = Cluster_api.Submission.custom_build c in
common_submit service action

let with_sched fn =
let db = Sqlite3.db_open ":memory:" in
Lwt.finalize
Expand Down Expand Up @@ -89,6 +96,23 @@ let simple () =
Alcotest.(check string) "Check job worked" "Building on worker-1\nBuilding example\nJob succeeded\n" result;
Lwt.return_unit

let simple_custom () =
with_sched @@ fun ~admin ~registry ->
Capability.with_ref (Cluster_api.Admin.add_client admin "client") @@ fun submission_service ->
let builder = Mock_builder.create () in
Lwt_switch.with_switch @@ fun switch ->
Mock_builder.run ~switch builder (Mock_network.sturdy registry);
let kind = "obuilder" in
let spec = "((from ocaml/opam:latest)\n(run (shell \"ls\")))" in
let job = Cluster_api.Custom.v ~kind @@ Custom.Spec.obuilder_spec_to_custom spec in
let result = custom_submit submission_service job in
Mock_builder.set builder "obuilder" @@ Ok "";
result >>= fun result ->
Logs.app (fun f -> f "Result: %S" result);
let expect = "Building on worker-1\nBuilding job obuilder\nGot spec " ^ spec ^ "\nJob succeeded\n" in
Alcotest.(check string) "Check job worked" expect result;
Lwt.return_unit

(* A failing build on a single worker. *)
let fails () =
let builder = Mock_builder.create () in
Expand Down Expand Up @@ -464,6 +488,7 @@ let () =
Lwt_main.run @@ Alcotest_lwt.run ~verbose "build-scheduler" [
"main", [
test_case "simple" simple;
test_case "simple_custom" simple_custom;
test_case "fails" fails;
test_case "await_builder" await_builder;
test_case "already_registered" already_registered ~expected_warnings:1;
Expand Down
32 changes: 27 additions & 5 deletions test/test_plugin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ let engine_cond = Lwt_condition.create () (* Fires after each update *)
let setup ~pipeline fn =
with_sched @@ fun ~submission_service ~registry ->
let submission_service, break = Mock_network.remote_breakable submission_service in
let t = Current_ocluster.v (Current_ocluster.Connection.create submission_service) in
let conn = Current_ocluster.Connection.create submission_service in
let t = Current_ocluster.v conn in
let state = ref (Error (`Msg "(init)")) in
SVar.set selected (Ok (fun () -> pipeline t));
SVar.set selected (Ok (fun () -> pipeline (t, conn)));
let trace ~next:_ (results : Current.Engine.results) =
state := results.value;
Lwt_condition.broadcast engine_cond ();
Expand Down Expand Up @@ -82,7 +83,27 @@ let options = Cluster_api.Docker.Spec.defaults

let simple () =
let spec = `Contents (Current.return "example1") in
let pipeline t = Current_ocluster.build t spec ~pool:"pool" ~src:(Current.return []) ~options in
let pipeline (t, _conn) = Current_ocluster.build t spec ~pool:"pool" ~src:(Current.return []) ~options in
setup ~pipeline @@ fun ~registry ~await_result ~break:_ ->
let builder = Mock_builder.create () in
Lwt_switch.with_switch @@ fun switch ->
Mock_builder.run_remote builder ~network_switch:switch ~builder_switch:switch registry;
Mock_builder.await builder "example1" >>= fun _ ->
Mock_builder.set builder "example1" @@ Ok "hash";
await_result () >>= fun x ->
Alcotest.(check (result pass reject)) "Pipeline successful" (Ok ()) x;
Lwt.return_unit

(* The simple custom test is mostly identical to the simple test except the Dockerfile is serialised
using the Custom API. The mock builder uses the kind argument to deserialise the Dockerfile. After
that, the logic is the same as doing a normal build. It does use the Connection API and the Custom.Build
module. *)
let simple_custom () =
let spec = "example1" in
let pipeline (_t, conn) =
Current.ignore_value @@
Custom.Build.build_dockerfile conn "pool" spec
in
setup ~pipeline @@ fun ~registry ~await_result ~break:_ ->
let builder = Mock_builder.create () in
Lwt_switch.with_switch @@ fun switch ->
Expand All @@ -95,7 +116,7 @@ let simple () =

let disconnect_while_queued () =
let spec = `Contents (Current.return "example2") in
let pipeline t = Current_ocluster.build t spec ~pool:"pool" ~src:(Current.return []) ~options in
let pipeline (t, _conn) = Current_ocluster.build t spec ~pool:"pool" ~src:(Current.return []) ~options in
setup ~pipeline @@ fun ~registry ~await_result ~break ->
Lwt.pause () >>= fun () ->
Lwt.pause () >>= fun () ->
Expand All @@ -114,7 +135,7 @@ let disconnect_while_queued () =
let two_jobs () =
let spec1 = `Contents (Current.return "spec1") in
let spec2 = `Contents (Current.return "spec2") in
let pipeline t =
let pipeline (t, _conn) =
let open Current.Syntax in
let* b1 = Current.state (Current_ocluster.build t spec1 ~pool:"pool" ~src:(Current.return []) ~options)
and* b2 = Current.state (Current_ocluster.build t spec2 ~pool:"pool" ~src:(Current.return []) ~options) in
Expand Down Expand Up @@ -225,6 +246,7 @@ let test_case name fn =

let suite = [
test_case "simple" simple;
test_case "simple_custom" simple_custom;
test_case "disconnect_while_queued" disconnect_while_queued;
test_case "two_jobs" two_jobs;
test_case "cancel_rate_limit" cancel_rate_limit;
Expand Down
Loading

0 comments on commit 4cb48ea

Please sign in to comment.