Skip to content

Commit

Permalink
Merge pull request #860 from Sudha247/lwt_domain
Browse files Browse the repository at this point in the history
Lwt_domain: an interface to multicore parallelism
  • Loading branch information
raphael-proust authored Nov 25, 2021
2 parents d5f8452 + c7bc30a commit d6b223e
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 12 deletions.
52 changes: 40 additions & 12 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,56 +30,78 @@ jobs:
- false
ppx:
- true
domain:
- false
local-packages:
- "*.opam"
- |
*.opam
!lwt_domain.opam
include:
- os: ubuntu-latest
ocaml-compiler: ocaml-variants.4.13.1+options,ocaml-option-flambda,ocaml-option-musl,ocaml-option-static
libev: false
ppx: true
local-packages: "*.opam"
domain: false
local-packages: |
*.opam
!lwt_domain.opam
- os: ubuntu-latest
ocaml-compiler: ocaml-variants.4.12.0+domains
libev: false
ppx: true
domain: true
local-packages: "*.opam"
- os: macos-latest
ocaml-compiler: 4.13.x
libev: true
ppx: true
local-packages: "*.opam"
domain: false
local-packages: |
*.opam
!lwt_domain.opam
- os: windows-latest
ocaml-compiler: 4.13.x
libev: false
ppx: true
local-packages: "*.opam"
domain: false
local-packages: |
*.opam
!lwt_domain.opam
- os: ubuntu-latest
ocaml-compiler: 4.02.x
libev: true
ppx: false
domain: false
local-packages: |
*.opam
!lwt_domain.opam
!lwt_ppx.opam
- os: ubuntu-latest
ocaml-compiler: 4.03.x
libev: true
ppx: false
domain: false
local-packages: |
*.opam
!lwt_domain.opam
!lwt_ppx.opam
- os: macos-latest
ocaml-compiler: 4.02.x
libev: true
ppx: false
domain: false
local-packages: |
*.opam
!lwt_domain.opam
!lwt_ppx.opam
- os: windows-latest
ocaml-compiler: 4.06.x
libev: false
ppx: false
domain: false
local-packages: |
*.opam
!lwt_domain.opam
!lwt_ppx.opam
runs-on: ${{ matrix.os }}
Expand All @@ -98,23 +120,29 @@ jobs:
- run: opam depext conf-libev --install
if: ${{ matrix.libev == true }}

- run: opam install . --deps-only --with-test
if: ${{ matrix.ppx == true }}

- run: opam install lwt_luv lwt_react lwt --deps-only --with-test
if: ${{ matrix.ppx == false }}

- run: opam exec -- make build
- run: opam install lwt_ppx --deps-only --with-test
if: ${{ matrix.ppx == true }}

- run: opam install lwt_domain --deps-only --with-test
if: ${{ matrix.domain == true }}

- run: opam exec -- dune build --only-packages lwt_luv,lwt_react,lwt
if: ${{ matrix.ppx == false }}

- run: opam exec -- make test
- run: opam exec -- dune build --only-packages lwt_ppx
if: ${{ matrix.ppx == true }}

- run: opam exec -- dune build --only-packages lwt_domain
if: ${{ matrix.domain == true }}

- run: opam exec -- dune runtest --only-packages lwt_luv,lwt_react,lwt
if: ${{ matrix.ppx == false }}

- run: opam exec -- dune runtest --only-packages lwt_ppx
if: ${{ matrix.ppx == true }}

- run: opam exec -- dune runtest --only-packages lwt_domain
if: ${{ matrix.domain == true }}

- run: opam exec -- make ppx_let-test-deps ppx_let-test
if: ${{ matrix.ppx == true }}
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

* In the Lwt_unix module, add `?cloexec:bool` optional arguments to functions that create file descriptors (`dup`, `dup2`, `pipe`, `pipe_in`, `pipe_out`, `socket`, `socketpair`, `accept`, `accept_n`). The `?cloexec` argument is simply forwarded to the wrapped Unix function (with OCaml >= 4.05, see PR ocaml/ocaml#650), or emulated as best-effort with `Unix.set_close_on_exec` on older OCaml versions (#327, #847, #872, #901, Antonin Décimo).

* Lwt_domain: helpers for using domainslib from Lwt (#860, Sudha Parimala)

====== Misc ======

* Code quality improvement: remove an uneeded Obj.magic (#844, Benoit Montagu).
Expand Down
28 changes: 28 additions & 0 deletions lwt_domain.opam
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
opam-version: "2.0"

synopsis: "Helpers for using Domainslib with Lwt"

version: "1.1.4"
license: "MIT"
homepage: "https://github.com/ocsigen/lwt"
doc: "https://ocsigen.org/lwt/dev/api/Lwt_domain"
bug-reports: "https://github.com/ocsigen/lwt/issues"

authors: [
"Sudha Parimala"
]
maintainer: [
"Sudha Parimala"
]
dev-repo: "git+https://github.com/ocsigen/lwt.git"

depends: [
"dune" {>= "1.8.0"}
"lwt" {>= "3.0.0"}
"ocaml"
"domainslib" {>= "0.3.2"}
]

build: [
["dune" "build" "-p" name "-j" jobs]
]
19 changes: 19 additions & 0 deletions src/domain/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
(* -*- tuareg -*- *)

let preprocess =
match Sys.getenv "BISECT_ENABLE" with
| "yes" -> "(preprocess (pps bisect_ppx))"
| _ -> ""
| exception _ -> ""

let () = Jbuild_plugin.V1.send @@ {|

(library
(public_name lwt_domain)
(synopsis "Multicore programming helpers for Lwt")
(wrapped false)
(libraries lwt lwt.unix domainslib)
|} ^ preprocess ^ {|
(flags (:standard -w +A)))

|}
66 changes: 66 additions & 0 deletions src/domain/lwt_domain.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
open Lwt.Infix

module C = Domainslib.Chan
module T = Domainslib.Task

type pool = Domainslib.Task.pool

let setup_pool ?name num_additional_domains =
T.setup_pool ?name ~num_additional_domains ()

let teardown_pool = T.teardown_pool

let lookup_pool = T.lookup_pool

let get_num_domains = T.get_num_domains

let init_result = Error (Failure "Lwt_domain.detach")

let detach pool f args =
if (get_num_domains pool = 1) then
Lwt.wrap1 f args
else begin
let result = ref init_result in
let task () =
result := try Ok (f args) with exn -> Error exn
in
let waiter, wakener = Lwt.wait () in
let id =
Lwt_unix.make_notification ~once:true
(fun () -> Lwt.wakeup_result wakener !result)
in
let _ = T.async pool (fun _ -> task ();
Lwt_unix.send_notification id) in
waiter
end

(* +-----------------------------------------------------------------+
| Running Lwt threads in the main domain |
+-----------------------------------------------------------------+ *)

(* Jobs to be run in the main domain *)
let jobs = C.make_unbounded ()
let job_done = C.make_bounded 0
let job_notification =
Lwt_unix.make_notification
(fun () ->
let thunk = C.recv jobs in
ignore (thunk ()))

let run_in_main f =
let res = ref init_result in
let job () =
Lwt.try_bind f
(fun ret -> Lwt.return (Result.Ok ret))
(fun exn -> Lwt.return (Result.Error exn)) >>= fun result ->
res := result;
C.send job_done ();
Lwt.return_unit
in
C.send jobs job;
Lwt_unix.send_notification job_notification;
(* blocks calling domain until the job is executed *)
C.recv job_done;
match !res with
| Result.Ok ret -> ret
| Result.Error exn -> raise exn
98 changes: 98 additions & 0 deletions src/domain/lwt_domain.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)



(** This module provides the necessary function ({!detach}) to schedule some
computations to be ran in parallel in a separate domain. The result of such
a computation is exposed to the caller of {!detach} as a promise. Thus, this
module allows to mix multicore parallelism with the concurrent-only
scheduling of the rest of Lwt. *)

type pool
(** Domainslib Task pool *)

val detach : pool-> ('a -> 'b) -> 'a -> 'b Lwt.t
(** [detach pool f x] runs the computation [f x] in a separate domain in
parallel.
[detach pool f x] evaluates to an Lwt promise which is pending until the
domain completes the execution of [f x] at which point it becomes
resolved. If [f x] raises an exception, then the promise is rejected.
It is recommended you initialise the task pool using
{!setup_pool} with a number of domains equal to the number of
physical cores.
Note that the function [f] passed to [detach] cannot safely use {!Lwt}.
This is true even for implicit callback arguments (i.e.,
{!Lwt.with_value}). If you need to use {!Lwt} or interact with promises,
you must use {!run_in_main}.
In the special case where the task pool has size one (i.e., when there
is no additional domain to detach the computation to), the computation
runs immediately on the main domain. In other words, when the number of
domains is one (1), then [detach f x] is identical to
[Lwt.return (f x)].
@raise [Invalid_argument] if pool is already torn down. *)

val run_in_main : (unit -> 'a Lwt.t) -> 'a
(** [run_in_main f] can be called from a detached computation to execute [f
()] in the parent domain, i.e. the one executing {!Lwt_main.run}.
[run_in_main f] blocks until [f ()] completes, then it returns its
result. If [f ()] raises an exception, [run_in_main f] raises the same
exception. The whole of {!Lwt} can be safely used from within [f].
However, note that implicit callback arguments are local to [f]. I.e.,
{!Lwt.get} can only retrieve values set inside of [f], and not those set
inside the promise that called [detach] that called [run_in_main].
Note that the calling domain will be idle until [f ()] completes
execution and returns the result. Thus, heavy use of [run_in_main] may
lead to most or all domains being frozen. It's also possible to create a
dead-lock when [run_in_main] is called (thus freezing a domain) with a
function that calls [detach] (thus needing a domain). Consequently, it
is recommended to use this function sparingly. *)

val setup_pool : ?name:string -> int -> pool
(** [setup_pool name num_additional_domains] returns a task pool with
[num_additional_domains] domains including the current domain.
It is recommended to use this function to create a pool once before
calling [Lwt_main.run] and to not call it again afterwards. To resize the
pool, call [teardown_pool ()] first before creating a new pool again.
Multiple calls to resize the domain pool are safe but costly.
If [name] is provided, the pool is mapped to name. It can be obtained
later with [lookup_pool name].
For more details about task pool, please refer:
https://github.com/ocaml-multicore/domainslib/blob/master/lib/task.mli
@raise [Invalid_argument] if given number of domains [n] is smaller than
[1].
@raise [Failure] if the pool is already initialised when the function is
called.
*)

val teardown_pool : pool -> unit
(** [teardown_pool ()] shuts down the task pool. It is safe to call
[setup_pool] again after [teardown_pool] returns.
This function is useful if different portions of your program have benefit
from different degree of parallelism.
@raise [TasksActive] if any tasks in the pool are currently active.
@raise [Invalid_argument] if pool is already torn down. *)

val lookup_pool : string -> pool option
(** [lookup_pool name] returns [Some pool] if [pool] is associated to [name]
or returns [None] if no value is associated to it. *)

val get_num_domains : pool -> int
(** [get_num_domains pool] returns the number of domains in [pool]. *)

(**/**)
9 changes: 9 additions & 0 deletions test/domain/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
(executable
(name main)
(libraries lwt_domain lwttester tester))

(alias
(name runtest)
(package lwt_domain)
(action (run %{exe:main.exe}))
)
7 changes: 7 additions & 0 deletions test/domain/main.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)

let () =
Test.run "domain" [
Test_lwt_domain.suite;
]
Loading

0 comments on commit d6b223e

Please sign in to comment.