Skip to content

Commit

Permalink
Add Time.Mono for monotonic clocks
Browse files Browse the repository at this point in the history
Based on PR ocaml-multicore#308 by Bikal Lem.
  • Loading branch information
talex5 committed Nov 15, 2022
1 parent 11011dd commit 42f2a31
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 102 deletions.
2 changes: 1 addition & 1 deletion lib_eio/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
(name eio)
(public_name eio)
(flags (:standard -open Eio__core -open Eio__core.Private))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf optint))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf optint mtime))
2 changes: 2 additions & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module Stdenv = struct
net : Net.t;
domain_mgr : Domain_manager.t;
clock : Time.clock;
mono_clock : Time.Mono.t;
fs : Fs.dir Path.t;
cwd : Fs.dir Path.t;
secure_random : Flow.source;
Expand All @@ -49,6 +50,7 @@ module Stdenv = struct
let net (t : <net : #Net.t; ..>) = t#net
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr
let clock (t : <clock : #Time.clock; ..>) = t#clock
let mono_clock (t : <mono_clock : #Time.Mono.t; ..>) = t#mono_clock
let secure_random (t: <secure_random : #Flow.source; ..>) = t#secure_random
let fs (t : <fs : #Fs.dir Path.t; ..>) = t#fs
let cwd (t : <cwd : #Fs.dir Path.t; ..>) = t#cwd
Expand Down
6 changes: 5 additions & 1 deletion lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ module Stdenv : sig
net : Net.t;
domain_mgr : Domain_manager.t;
clock : Time.clock;
mono_clock : Time.Mono.t;
fs : Fs.dir Path.t;
cwd : Fs.dir Path.t;
secure_random : Flow.source;
Expand Down Expand Up @@ -232,7 +233,10 @@ module Stdenv : sig
*)

val clock : <clock : #Time.clock as 'a; ..> -> 'a
(** [clock t] is the system clock. *)
(** [clock t] is the system clock (used to get the current time and date). *)

val mono_clock : <mono_clock : #Time.Mono.t as 'a; ..> -> 'a
(** [mono_clock t] is a monotonic clock (used for measuring intervals). *)

(** {1 Randomness} *)

Expand Down
152 changes: 98 additions & 54 deletions lib_eio/mock/clock.ml
Original file line number Diff line number Diff line change
@@ -1,67 +1,111 @@
open Eio.Std

type t = <
Eio.Time.clock;
advance : unit;
set_time : float -> unit;
>

module Key = struct
type t = < >
let compare = compare
end
module type S = sig
type time

type t = <
time Eio.Time.clock_base;
advance : unit;
set_time : time -> unit;
>

module Job = struct
type t = {
time : float;
resolver : unit Promise.u;
}
val make : unit -> t
val advance : t -> unit
val set_time : t -> time -> unit
end

let compare a b = Float.compare a.time b.time
module type TIME = sig
type t
val zero : t
val compare : t -> t -> int
val pp : t Fmt.t
end

module Q = Psq.Make(Key)(Job)
module Make(T : TIME) : S with type time := T.t = struct
type t = <
T.t Eio.Time.clock_base;
advance : unit;
set_time : T.t -> unit;
>

module Key = struct
type t = < >
let compare = compare
end

module Job = struct
type t = {
time : T.t;
resolver : unit Promise.u;
}

let compare a b = T.compare a.time b.time
end

module Q = Psq.Make(Key)(Job)

let make () =
object (self)
inherit [T.t] Eio.Time.clock_base

let make () =
object (self)
inherit Eio.Time.clock
val mutable now = T.zero
val mutable q = Q.empty

val mutable now = 0.0
val mutable q = Q.empty
method now = now

method now = now
method sleep_until time =
if T.compare time now <= 0 then Fiber.yield ()
else (
let p, r = Promise.create () in
let k = object end in
q <- Q.add k { time; resolver = r } q;
try
Promise.await p
with Eio.Cancel.Cancelled _ as ex ->
q <- Q.remove k q;
raise ex
)

method sleep_until time =
if time <= now then Fiber.yield ()
else (
let p, r = Promise.create () in
let k = object end in
q <- Q.add k { time; resolver = r } q;
try
Promise.await p
with Eio.Cancel.Cancelled _ as ex ->
q <- Q.remove k q;
raise ex
)
method set_time time =
let rec drain () =
match Q.min q with
| Some (_, v) when T.compare v.time time <= 0 ->
Promise.resolve v.resolver ();
q <- Option.get (Q.rest q);
drain ()
| _ -> ()
in
drain ();
now <- time;
traceln "mock time is now %a" T.pp now

method set_time time =
let rec drain () =
method advance =
match Q.min q with
| Some (_, v) when v.time <= time ->
Promise.resolve v.resolver ();
q <- Option.get (Q.rest q);
drain ()
| _ -> ()
in
drain ();
now <- time;
traceln "mock time is now %g" now

method advance =
match Q.min q with
| None -> invalid_arg "No further events scheduled on mock clock"
| Some (_, v) -> self#set_time v.time
end
| None -> invalid_arg "No further events scheduled on mock clock"
| Some (_, v) -> self#set_time v.time
end

let set_time (t:t) time = t#set_time time
let advance (t:t) = t#advance
end

module Old_time = struct
type t = float
let compare = Float.compare
let pp f x = Fmt.pf f "%g" x
let zero = 0.0
end

module Mono_time = struct
type t = Mtime.t
let compare = Mtime.compare
let zero = Mtime.of_uint64_ns 0L

let pp f t =
let s = Int64.to_float (Mtime.to_uint64_ns t) /. 1e9 in
Fmt.pf f "%g" s
end

module Mono = Make(Mono_time)

let set_time (t:t) time = t#set_time time
let advance (t:t) = t#advance
include Make(Old_time)
34 changes: 21 additions & 13 deletions lib_eio/mock/clock.mli
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
type t = <
Eio.Time.clock;
advance : unit;
set_time : float -> unit;
>
module type S = sig
type time

val make : unit -> t
(** [make ()] is a new clock.
type t = <
time Eio.Time.clock_base;
advance : unit;
set_time : time -> unit;
>

The time is initially set to 0.0 and doesn't change except when you call {!advance} or {!set_time}. *)
val make : unit -> t
(** [make ()] is a new clock.
val advance : t -> unit
(** [advance t] sets the time to the next scheduled event (adding any due fibers to the run queue).
@raise Invalid_argument if nothing is scheduled. *)
The time is initially set to 0.0 and doesn't change except when you call {!advance} or {!set_time}. *)

val set_time : t -> float -> unit
(** [set_time t time] sets the time to [time] (adding any due fibers to the run queue). *)
val advance : t -> unit
(** [advance t] sets the time to the next scheduled event (adding any due fibers to the run queue).
@raise Invalid_argument if nothing is scheduled. *)

val set_time : t -> time -> unit
(** [set_time t time] sets the time to [time] (adding any due fibers to the run queue). *)
end

include S with type time := float

module Mono : S with type time := Mtime.t
73 changes: 61 additions & 12 deletions lib_eio/time.ml
Original file line number Diff line number Diff line change
@@ -1,46 +1,95 @@
exception Timeout

class virtual ['a] clock_base = object
method virtual now : 'a
method virtual sleep_until : 'a -> unit
end

class virtual clock = object
method virtual now : float
method virtual sleep_until : float -> unit
inherit [float] clock_base
end

let now (t : #clock) = t#now
let now (t : _ #clock_base) = t#now

let sleep_until (t : #clock) time = t#sleep_until time
let sleep_until (t : _ #clock_base) time = t#sleep_until time

let sleep t d = sleep_until t (now t +. d)

module Mono = struct
class virtual t = object
inherit [Mtime.t] clock_base
end

let now = now
let sleep_until = sleep_until

let sleep_span t span =
match Mtime.add_span (now t) span with
| Some time -> sleep_until t time
| None -> Fiber.await_cancel ()

(* Converting floats via int64 is tricky when things overflow or go negative.
Since we don't need to wait for more than 100 years, limit it to this: *)
let too_many_ns = 0x8000000000000000.

let span_of_s s =
if s >= 0.0 then (
let ns = s *. 1e9 in
if ns >= too_many_ns then Mtime.Span.max_span
else Mtime.Span.of_uint64_ns (Int64.of_float ns)
) else Mtime.Span.zero (* Also happens for NaN and negative infinity *)

let sleep (t : #t) s =
sleep_span t (span_of_s s)
end

let with_timeout t d = Fiber.first (fun () -> sleep t d; Error `Timeout)
let with_timeout_exn t d = Fiber.first (fun () -> sleep t d; raise Timeout)

module Timeout = struct
type t =
| Timeout of clock * float
| Timeout of Mono.t * Mtime.Span.t
| Deprecated of clock * float
| Unlimited

let none = Unlimited
let of_s clock time = Timeout ((clock :> clock), time)
let v clock time = Timeout ((clock :> Mono.t), time)

let seconds clock time =
v clock (Mono.span_of_s time)

let of_s clock time =
Deprecated ((clock :> clock), time)

let run t fn =
match t with
| Unlimited -> fn ()
| Timeout (clock, d) ->
Fiber.first (fun () -> Mono.sleep_span clock d; Error `Timeout) fn
| Deprecated (clock, d) ->
Fiber.first (fun () -> sleep clock d; Error `Timeout) fn

let run_exn t fn =
match t with
| Unlimited -> fn ()
| Timeout (clock, d) ->
Fiber.first (fun () -> Mono.sleep_span clock d; raise Timeout) fn
| Deprecated (clock, d) ->
Fiber.first (fun () -> sleep clock d; raise Timeout) fn

let pp_duration f d =
if d >= 0.001 && d < 0.1 then
Fmt.pf f "%.2gms" (d *. 1000.)
else if d < 120. then
Fmt.pf f "%.2gs" d
else
Fmt.pf f "%.2gm" (d /. 60.)

let pp f = function
| Unlimited -> Fmt.string f "(no timeout)"
| Timeout (_clock, d) ->
if d >= 0.001 && d < 0.1 then
Fmt.pf f "%.2gms" (d *. 1000.)
else if d < 120. then
Fmt.pf f "%.2gs" d
else
Fmt.pf f "%.2gm" (d /. 60.)
let d = Mtime.Span.to_s d in
pp_duration f d
| Deprecated (_clock, d) ->
pp_duration f d
end
Loading

0 comments on commit 42f2a31

Please sign in to comment.