Skip to content

Commit

Permalink
Add Time.Mono for monotonic clocks
Browse files Browse the repository at this point in the history
Based on PR ocaml-multicore#308 by Bikal Lem.
  • Loading branch information
talex5 committed Oct 4, 2022
1 parent 4c743ba commit 59c5118
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 98 deletions.
2 changes: 1 addition & 1 deletion lib_eio/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
(name eio)
(public_name eio)
(flags (:standard -open Eio__core -open Eio__core.Private))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf optint))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf optint mtime))
2 changes: 2 additions & 0 deletions lib_eio/eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module Stdenv = struct
net : Net.t;
domain_mgr : Domain_manager.t;
clock : Time.clock;
mono_clock : Time.Mono.t;
fs : Fs.dir Path.t;
cwd : Fs.dir Path.t;
secure_random : Flow.source;
Expand All @@ -49,6 +50,7 @@ module Stdenv = struct
let net (t : <net : #Net.t; ..>) = t#net
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr
let clock (t : <clock : #Time.clock; ..>) = t#clock
let mono_clock (t : <mono_clock : #Time.Mono.t; ..>) = t#mono_clock
let secure_random (t: <secure_random : #Flow.source; ..>) = t#secure_random
let fs (t : <fs : #Fs.dir Path.t; ..>) = t#fs
let cwd (t : <cwd : #Fs.dir Path.t; ..>) = t#cwd
Expand Down
6 changes: 5 additions & 1 deletion lib_eio/eio.mli
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ module Stdenv : sig
net : Net.t;
domain_mgr : Domain_manager.t;
clock : Time.clock;
mono_clock : Time.Mono.t;
fs : Fs.dir Path.t;
cwd : Fs.dir Path.t;
secure_random : Flow.source;
Expand Down Expand Up @@ -232,7 +233,10 @@ module Stdenv : sig
*)

val clock : <clock : #Time.clock as 'a; ..> -> 'a
(** [clock t] is the system clock. *)
(** [clock t] is the system clock (used to get the current time and date). *)

val mono_clock : <mono_clock : #Time.Mono.t as 'a; ..> -> 'a
(** [mono_clock t] is a monotonic clock (used for measuring intervals). *)

(** {1 Randomness} *)

Expand Down
152 changes: 98 additions & 54 deletions lib_eio/mock/clock.ml
Original file line number Diff line number Diff line change
@@ -1,67 +1,111 @@
open Eio.Std

type t = <
Eio.Time.clock;
advance : unit;
set_time : float -> unit;
>

module Key = struct
type t = < >
let compare = compare
end
module type S = sig
type time

type t = <
time Eio.Time.clock_base;
advance : unit;
set_time : time -> unit;
>

module Job = struct
type t = {
time : float;
resolver : unit Promise.u;
}
val make : unit -> t
val advance : t -> unit
val set_time : t -> time -> unit
end

let compare a b = Float.compare a.time b.time
module type TIME = sig
type t
val zero : t
val compare : t -> t -> int
val pp : t Fmt.t
end

module Q = Psq.Make(Key)(Job)
module Make(T : TIME) : S with type time := T.t = struct
type t = <
T.t Eio.Time.clock_base;
advance : unit;
set_time : T.t -> unit;
>

module Key = struct
type t = < >
let compare = compare
end

module Job = struct
type t = {
time : T.t;
resolver : unit Promise.u;
}

let compare a b = T.compare a.time b.time
end

module Q = Psq.Make(Key)(Job)

let make () =
object (self)
inherit [T.t] Eio.Time.clock_base

let make () =
object (self)
inherit Eio.Time.clock
val mutable now = T.zero
val mutable q = Q.empty

val mutable now = 0.0
val mutable q = Q.empty
method now = now

method now = now
method sleep_until time =
if T.compare time now <= 0 then Fiber.yield ()
else (
let p, r = Promise.create () in
let k = object end in
q <- Q.add k { time; resolver = r } q;
try
Promise.await p
with Eio.Cancel.Cancelled _ as ex ->
q <- Q.remove k q;
raise ex
)

method sleep_until time =
if time <= now then Fiber.yield ()
else (
let p, r = Promise.create () in
let k = object end in
q <- Q.add k { time; resolver = r } q;
try
Promise.await p
with Eio.Cancel.Cancelled _ as ex ->
q <- Q.remove k q;
raise ex
)
method set_time time =
let rec drain () =
match Q.min q with
| Some (_, v) when T.compare v.time time <= 0 ->
Promise.resolve v.resolver ();
q <- Option.get (Q.rest q);
drain ()
| _ -> ()
in
drain ();
now <- time;
traceln "mock time is now %a" T.pp now

method set_time time =
let rec drain () =
method advance =
match Q.min q with
| Some (_, v) when v.time <= time ->
Promise.resolve v.resolver ();
q <- Option.get (Q.rest q);
drain ()
| _ -> ()
in
drain ();
now <- time;
traceln "mock time is now %g" now

method advance =
match Q.min q with
| None -> invalid_arg "No further events scheduled on mock clock"
| Some (_, v) -> self#set_time v.time
end
| None -> invalid_arg "No further events scheduled on mock clock"
| Some (_, v) -> self#set_time v.time
end

let set_time (t:t) time = t#set_time time
let advance (t:t) = t#advance
end

module Old_time = struct
type t = float
let compare = Float.compare
let pp f x = Fmt.pf f "%g" x
let zero = 0.0
end

module Mono_time = struct
type t = Mtime.t
let compare = Mtime.compare
let zero = Mtime.of_uint64_ns 0L

let pp f t =
let s = Int64.to_float (Mtime.to_uint64_ns t) /. 1e9 in
Fmt.pf f "%g" s
end

module Mono = Make(Mono_time)

let set_time (t:t) time = t#set_time time
let advance (t:t) = t#advance
include Make(Old_time)
34 changes: 21 additions & 13 deletions lib_eio/mock/clock.mli
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
type t = <
Eio.Time.clock;
advance : unit;
set_time : float -> unit;
>
module type S = sig
type time

val make : unit -> t
(** [make ()] is a new clock.
type t = <
time Eio.Time.clock_base;
advance : unit;
set_time : time -> unit;
>

The time is initially set to 0.0 and doesn't change except when you call {!advance} or {!set_time}. *)
val make : unit -> t
(** [make ()] is a new clock.
val advance : t -> unit
(** [advance t] sets the time to the next scheduled event (adding any due fibers to the run queue).
@raise Invalid_argument if nothing is scheduled. *)
The time is initially set to 0.0 and doesn't change except when you call {!advance} or {!set_time}. *)

val set_time : t -> float -> unit
(** [set_time t time] sets the time to [time] (adding any due fibers to the run queue). *)
val advance : t -> unit
(** [advance t] sets the time to the next scheduled event (adding any due fibers to the run queue).
@raise Invalid_argument if nothing is scheduled. *)

val set_time : t -> time -> unit
(** [set_time t time] sets the time to [time] (adding any due fibers to the run queue). *)
end

include S with type time := float

module Mono : S with type time := Mtime.t
50 changes: 44 additions & 6 deletions lib_eio/time.ml
Original file line number Diff line number Diff line change
@@ -1,42 +1,80 @@
exception Timeout

class virtual ['a] clock_base = object
method virtual now : 'a
method virtual sleep_until : 'a -> unit
end

class virtual clock = object
inherit [float] clock_base
method virtual now : float
method virtual sleep_until : float -> unit
end

let now (t : #clock) = t#now
let now (t : _ #clock_base) = t#now

let sleep_until (t : #clock) time = t#sleep_until time
let sleep_until (t : _ #clock_base) time = t#sleep_until time

let sleep t d = sleep_until t (now t +. d)

module Mono = struct
class virtual t = object
inherit [Mtime.t] clock_base
end

let now = now
let sleep_until = sleep_until

let sleep_span t span =
match Mtime.add_span (now t) span with
| Some time -> sleep_until t time
| None -> Fiber.await_cancel ()

(* Converting floats via int64 is tricky when things overflow or go negative.
Since we don't need to wait for more than 100 years, limit it to this: *)
let too_many_ns = 0x8000000000000000.

let span_of_s s =
if s >= 0.0 then (
let ns = s *. 1e9 in
if ns >= too_many_ns then Mtime.Span.max_span
else Mtime.Span.of_uint64_ns (Int64.of_float ns)
) else Mtime.Span.zero (* Also happens for NaN and negative infinity *)

let sleep (t : #t) s =
sleep_span t (span_of_s s)
end

let with_timeout t d = Fiber.first (fun () -> sleep t d; Error `Timeout)
let with_timeout_exn t d = Fiber.first (fun () -> sleep t d; raise Timeout)

module Timeout = struct
type t =
| Timeout of clock * float
| Timeout of Mono.t * Mtime.Span.t
| Unlimited

let none = Unlimited
let of_s clock time = Timeout ((clock :> clock), time)
let v clock time = Timeout ((clock :> Mono.t), time)

let of_s clock time =
v clock (Mono.span_of_s time)

let run t fn =
match t with
| Unlimited -> fn ()
| Timeout (clock, d) ->
Fiber.first (fun () -> sleep clock d; Error `Timeout) fn
Fiber.first (fun () -> Mono.sleep_span clock d; Error `Timeout) fn

let run_exn t fn =
match t with
| Unlimited -> fn ()
| Timeout (clock, d) ->
Fiber.first (fun () -> sleep clock d; raise Timeout) fn
Fiber.first (fun () -> Mono.sleep_span clock d; raise Timeout) fn

let pp f = function
| Unlimited -> Fmt.string f "(no timeout)"
| Timeout (_clock, d) ->
let d = Mtime.Span.to_s d in
if d >= 0.001 && d < 0.1 then
Fmt.pf f "%.2gms" (d *. 1000.)
else if d < 120. then
Expand Down
40 changes: 36 additions & 4 deletions lib_eio/time.mli
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
class virtual ['a] clock_base : object
method virtual now : 'a
method virtual sleep_until : 'a -> unit
end

class virtual clock : object
method virtual now : float
method virtual sleep_until : float -> unit
inherit [float] clock_base
end

val now : #clock -> float
Expand All @@ -12,6 +16,31 @@ val sleep_until : #clock -> float -> unit
val sleep : #clock -> float -> unit
(** [sleep t d] waits for [d] seconds. *)

(** Monotonic clocks. *)
module Mono : sig
(** Monotonic clocks are unaffected by corrections to the real-time clock,
and so are a better choice for timeouts or measuring intervals,
where the absolute time doesn't matter.
A monotonic clock may or may not include time while the computer is suspended. *)

class virtual t : object
inherit [Mtime.t] clock_base
end

val now : #t -> Mtime.t
(** [now t] is the current time according to [t]. *)

val sleep_until : #t -> Mtime.t -> unit
(** [sleep_until t time] waits until [time] before returning. *)

val sleep : #t -> float -> unit
(** [sleep t d] waits for [d] seconds. *)

val sleep_span : #t -> Mtime.span -> unit
(** [sleep_span t d] waits for duration [d]. *)
end

(** {2 Timeouts} *)

exception Timeout
Expand All @@ -27,10 +56,13 @@ val with_timeout_exn : #clock -> float -> (unit -> 'a) -> 'a
module Timeout : sig
type t

val of_s : #clock -> float -> t
(** [of_s clock duration] is a timeout of [duration] seconds, as measured by [clock].
val v : #Mono.t -> Mtime.Span.t -> t
(** [v clock duration] is a timeout of [duration], as measured by [clock].
Internally, this is just the tuple [(clock, duration)]. *)

val of_s : #Mono.t -> float -> t
(** [of_s clock duration] is a timeout of [duration] seconds, as measured by [clock]. *)

val none : t
(** [none] is an infinite timeout. *)

Expand Down
Loading

0 comments on commit 59c5118

Please sign in to comment.