Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial windows scheduler #490

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions lib_eio_windows/eio_windows.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
(* Can base this on the eio_posix directory structure.
See HACKING.md for instructions on creating a new backend. *)
let run _main = failwith "TODO: Windows support."
(* WIP backend for Windows using IOCP *)

let run main =
let stdenv = object
method stdin = failwith "Not implemented"
method stdout = failwith "Not implemented"
method stderr = failwith "Not implemented"
method debug = Eio.Private.Debug.v
method clock = failwith "Not implemented"
method mono_clock = failwith "Not implemented"
method net = failwith "Not implemented"
method domain_mgr = failwith "Not implemented"
method cwd = failwith "Not implemented"
method fs = failwith "Not implemented"
method secure_random = failwith "Not implemented"
end
in
let extra_effects : _ Effect.Deep.effect_handler = {
effc = (fun _ -> None)
}
in
Sched.with_sched @@ fun sched ->
Sched.run ~extra_effects sched main stdenv
6 changes: 6 additions & 0 deletions lib_eio_windows/eio_windows.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
(** Eio backend for Windows using IOCP *)

val run : (Eio.Stdenv.t -> 'a) -> 'a
(** [run main] runs an event loop and calls [main stdenv] inside it.

For portable code, you should use {!Eio_main.run} instead, which will call this for you if appropriate. *)
72 changes: 72 additions & 0 deletions lib_eio_windows/sched.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
module Fiber_context = Eio.Private.Fiber_context
module Lf_queue = Eio_utils.Lf_queue

exception Deadlock_detected

(* The scheduler could just return [unit], but this is clearer. *)
type exit = Exit_scheduler

type t = {
(* Suspended fibers waiting to run again.
[Lf_queue] is like [Stdlib.Queue], but is thread-safe (lock-free) and
allows pushing items to the head too, which we need. *)
run_q : (unit -> exit) Lf_queue.t;
}

(* Resume the next runnable fiber, if any. *)
let schedule t : exit =
match Lf_queue.pop t.run_q with
| Some f -> f ()
| None -> Exit_scheduler (* Finished (or deadlocked) *)

let with_sched fn =
let t = { run_q = Lf_queue.create () } in
fn t

(* Run [main] in an Eio main loop. *)
let run ~extra_effects t main arg =
let rec fork ~new_fiber:fiber fn =
(* Create a new fiber and run [fn] in it. *)
Effect.Deep.match_with fn ()
{ retc = (fun () -> Fiber_context.destroy fiber; schedule t);
exnc = (fun ex ->
let bt = Printexc.get_raw_backtrace () in
Fiber_context.destroy fiber;
Printexc.raise_with_backtrace ex bt
);
effc = fun (type a) (e : a Effect.t) : ((a, exit) Effect.Deep.continuation -> exit) option ->
match e with
| Eio.Private.Effects.Suspend f -> Some (fun k ->
(* Ask [f] to register whatever callbacks are needed to resume the fiber.
e.g. it might register a callback with a promise, for when that's resolved. *)
f fiber (fun result ->
(* The fiber is ready to run again. Add it to the queue. *)
Lf_queue.push t.run_q (fun () ->
(* Resume the fiber. *)
Fiber_context.clear_cancel_fn fiber;
match result with
| Ok v -> Effect.Deep.continue k v
| Error ex -> Effect.Deep.discontinue k ex
)
);
(* Switch to the next runnable fiber while this one's blocked. *)
schedule t
)
| Eio.Private.Effects.Fork (new_fiber, f) -> Some (fun k ->
(* Arrange for the forking fiber to run immediately after the new one. *)
Lf_queue.push_head t.run_q (Effect.Deep.continue k);
(* Create and run the new fiber (using fiber context [new_fiber]). *)
fork ~new_fiber f
)
| Eio.Private.Effects.Get_context -> Some (fun k ->
Effect.Deep.continue k fiber
)
| eff -> extra_effects.Effect.Deep.effc eff
}
in
let new_fiber = Fiber_context.make_root () in
let result = ref None in
let Exit_scheduler = fork ~new_fiber (fun () -> result := Some (main arg)) in
match !result with
| None -> raise Deadlock_detected
| Some x -> x
23 changes: 23 additions & 0 deletions lib_eio_windows/sched.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
(** The scheduler keeps track of all suspended fibers and resumes them as appropriate.

Each Eio domain has one scheduler, which keeps a queue of runnable
processes plus a record of all fibers waiting for IO operations to complete. *)

type t

type exit
(** This is equivalent to [unit], but indicates that a function returning this will call {!next}
and so does not return until the whole event loop is finished. Such functions should normally
be called in tail position. *)

val with_sched : (t -> 'a) -> 'a
(** [with_sched fn] sets up a scheduler and calls [fn t].
Typically [fn] will call {!run}.
When [fn] returns, the scheduler's resources are freed. *)

val run :
extra_effects:exit Effect.Deep.effect_handler ->
t -> ('a -> 'b) -> 'a -> 'b [@@alert "-unstable"]
(** [run ~extra_effects t f x] starts an event loop using [t] and runs [f x] as the root fiber within it.

Unknown effects are passed to [extra_effects]. *)
4 changes: 4 additions & 0 deletions lib_eio_windows/test/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(executable
(name test)
(enabled_if (= %{os_type} "Win32"))
(libraries eio_windows))
11 changes: 11 additions & 0 deletions lib_eio_windows/test/test.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
open Eio.Std

let () =
Eio_windows.run @@ fun _ ->
let check = ref [] in
Fiber.all [
(fun () -> Fiber.yield (); check := 2 :: !check);
(fun () -> Fiber.yield (); check := 3 :: !check);
(fun () -> check := 1 :: !check)
];
assert (!check = [ 3; 2; 1 ])