diff --git a/lib_eio/dune b/lib_eio/dune index 139259d5b..cadb88f8b 100644 --- a/lib_eio/dune +++ b/lib_eio/dune @@ -2,4 +2,4 @@ (name eio) (public_name eio) (flags (:standard -open Eio__core -open Eio__core.Private)) - (libraries eio__core cstruct lwt-dllist fmt bigstringaf optint)) + (libraries eio__core cstruct lwt-dllist fmt bigstringaf optint mtime)) diff --git a/lib_eio/eio.ml b/lib_eio/eio.ml index 6e28c206d..be9d97915 100644 --- a/lib_eio/eio.ml +++ b/lib_eio/eio.ml @@ -37,6 +37,7 @@ module Stdenv = struct net : Net.t; domain_mgr : Domain_manager.t; clock : Time.clock; + mono_clock : Time.Mono.t; fs : Fs.dir Path.t; cwd : Fs.dir Path.t; secure_random : Flow.source; @@ -49,6 +50,7 @@ module Stdenv = struct let net (t : ) = t#net let domain_mgr (t : ) = t#domain_mgr let clock (t : ) = t#clock + let mono_clock (t : ) = t#mono_clock let secure_random (t: ) = t#secure_random let fs (t : ) = t#fs let cwd (t : ) = t#cwd diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index 951fc8b86..e46e2177e 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -179,6 +179,7 @@ module Stdenv : sig net : Net.t; domain_mgr : Domain_manager.t; clock : Time.clock; + mono_clock : Time.Mono.t; fs : Fs.dir Path.t; cwd : Fs.dir Path.t; secure_random : Flow.source; @@ -232,7 +233,10 @@ module Stdenv : sig *) val clock : -> 'a - (** [clock t] is the system clock. *) + (** [clock t] is the system clock (used to get the current time and date). *) + + val mono_clock : -> 'a + (** [mono_clock t] is a monotonic clock (used for measuring intervals). *) (** {1 Randomness} *) diff --git a/lib_eio/mock/clock.ml b/lib_eio/mock/clock.ml index 708180d5e..e65e8c9bd 100644 --- a/lib_eio/mock/clock.ml +++ b/lib_eio/mock/clock.ml @@ -1,67 +1,111 @@ open Eio.Std -type t = < - Eio.Time.clock; - advance : unit; - set_time : float -> unit; -> - -module Key = struct - type t = < > - let compare = compare -end +module type S = sig + type time + + type t = < + time Eio.Time.clock_base; + advance : unit; + set_time : time -> unit; + > -module Job = struct - type t = { - time : float; - resolver : unit Promise.u; - } + val make : unit -> t + val advance : t -> unit + val set_time : t -> time -> unit +end - let compare a b = Float.compare a.time b.time +module type TIME = sig + type t + val zero : t + val compare : t -> t -> int + val pp : t Fmt.t end -module Q = Psq.Make(Key)(Job) +module Make(T : TIME) : S with type time := T.t = struct + type t = < + T.t Eio.Time.clock_base; + advance : unit; + set_time : T.t -> unit; + > + + module Key = struct + type t = < > + let compare = compare + end + + module Job = struct + type t = { + time : T.t; + resolver : unit Promise.u; + } + + let compare a b = T.compare a.time b.time + end + + module Q = Psq.Make(Key)(Job) + + let make () = + object (self) + inherit [T.t] Eio.Time.clock_base -let make () = - object (self) - inherit Eio.Time.clock + val mutable now = T.zero + val mutable q = Q.empty - val mutable now = 0.0 - val mutable q = Q.empty + method now = now - method now = now + method sleep_until time = + if T.compare time now <= 0 then Fiber.yield () + else ( + let p, r = Promise.create () in + let k = object end in + q <- Q.add k { time; resolver = r } q; + try + Promise.await p + with Eio.Cancel.Cancelled _ as ex -> + q <- Q.remove k q; + raise ex + ) - method sleep_until time = - if time <= now then Fiber.yield () - else ( - let p, r = Promise.create () in - let k = object end in - q <- Q.add k { time; resolver = r } q; - try - Promise.await p - with Eio.Cancel.Cancelled _ as ex -> - q <- Q.remove k q; - raise ex - ) + method set_time time = + let rec drain () = + match Q.min q with + | Some (_, v) when T.compare v.time time <= 0 -> + Promise.resolve v.resolver (); + q <- Option.get (Q.rest q); + drain () + | _ -> () + in + drain (); + now <- time; + traceln "mock time is now %a" T.pp now - method set_time time = - let rec drain () = + method advance = match Q.min q with - | Some (_, v) when v.time <= time -> - Promise.resolve v.resolver (); - q <- Option.get (Q.rest q); - drain () - | _ -> () - in - drain (); - now <- time; - traceln "mock time is now %g" now - - method advance = - match Q.min q with - | None -> invalid_arg "No further events scheduled on mock clock" - | Some (_, v) -> self#set_time v.time - end + | None -> invalid_arg "No further events scheduled on mock clock" + | Some (_, v) -> self#set_time v.time + end + + let set_time (t:t) time = t#set_time time + let advance (t:t) = t#advance +end + +module Old_time = struct + type t = float + let compare = Float.compare + let pp f x = Fmt.pf f "%g" x + let zero = 0.0 +end + +module Mono_time = struct + type t = Mtime.t + let compare = Mtime.compare + let zero = Mtime.of_uint64_ns 0L + + let pp f t = + let s = Int64.to_float (Mtime.to_uint64_ns t) /. 1e9 in + Fmt.pf f "%g" s +end + +module Mono = Make(Mono_time) -let set_time (t:t) time = t#set_time time -let advance (t:t) = t#advance +include Make(Old_time) diff --git a/lib_eio/mock/clock.mli b/lib_eio/mock/clock.mli index 9d11b276e..3d4bd449f 100644 --- a/lib_eio/mock/clock.mli +++ b/lib_eio/mock/clock.mli @@ -1,17 +1,25 @@ -type t = < - Eio.Time.clock; - advance : unit; - set_time : float -> unit; -> +module type S = sig + type time -val make : unit -> t -(** [make ()] is a new clock. + type t = < + time Eio.Time.clock_base; + advance : unit; + set_time : time -> unit; + > - The time is initially set to 0.0 and doesn't change except when you call {!advance} or {!set_time}. *) + val make : unit -> t + (** [make ()] is a new clock. -val advance : t -> unit -(** [advance t] sets the time to the next scheduled event (adding any due fibers to the run queue). - @raise Invalid_argument if nothing is scheduled. *) + The time is initially set to 0.0 and doesn't change except when you call {!advance} or {!set_time}. *) -val set_time : t -> float -> unit -(** [set_time t time] sets the time to [time] (adding any due fibers to the run queue). *) + val advance : t -> unit + (** [advance t] sets the time to the next scheduled event (adding any due fibers to the run queue). + @raise Invalid_argument if nothing is scheduled. *) + + val set_time : t -> time -> unit + (** [set_time t time] sets the time to [time] (adding any due fibers to the run queue). *) +end + +include S with type time := float + +module Mono : S with type time := Mtime.t diff --git a/lib_eio/time.ml b/lib_eio/time.ml index 3ebe52db0..0eb7ed370 100644 --- a/lib_eio/time.ml +++ b/lib_eio/time.ml @@ -1,42 +1,80 @@ exception Timeout +class virtual ['a] clock_base = object + method virtual now : 'a + method virtual sleep_until : 'a -> unit +end + class virtual clock = object + inherit [float] clock_base method virtual now : float method virtual sleep_until : float -> unit end -let now (t : #clock) = t#now +let now (t : _ #clock_base) = t#now -let sleep_until (t : #clock) time = t#sleep_until time +let sleep_until (t : _ #clock_base) time = t#sleep_until time let sleep t d = sleep_until t (now t +. d) +module Mono = struct + class virtual t = object + inherit [Mtime.t] clock_base + end + + let now = now + let sleep_until = sleep_until + + let sleep_span t span = + match Mtime.add_span (now t) span with + | Some time -> sleep_until t time + | None -> Fiber.await_cancel () + + (* Converting floats via int64 is tricky when things overflow or go negative. + Since we don't need to wait for more than 100 years, limit it to this: *) + let too_many_ns = 0x8000000000000000. + + let span_of_s s = + if s >= 0.0 then ( + let ns = s *. 1e9 in + if ns >= too_many_ns then Mtime.Span.max_span + else Mtime.Span.of_uint64_ns (Int64.of_float ns) + ) else Mtime.Span.zero (* Also happens for NaN and negative infinity *) + + let sleep (t : #t) s = + sleep_span t (span_of_s s) +end + let with_timeout t d = Fiber.first (fun () -> sleep t d; Error `Timeout) let with_timeout_exn t d = Fiber.first (fun () -> sleep t d; raise Timeout) module Timeout = struct type t = - | Timeout of clock * float + | Timeout of Mono.t * Mtime.Span.t | Unlimited let none = Unlimited - let of_s clock time = Timeout ((clock :> clock), time) + let v clock time = Timeout ((clock :> Mono.t), time) + + let of_s clock time = + v clock (Mono.span_of_s time) let run t fn = match t with | Unlimited -> fn () | Timeout (clock, d) -> - Fiber.first (fun () -> sleep clock d; Error `Timeout) fn + Fiber.first (fun () -> Mono.sleep_span clock d; Error `Timeout) fn let run_exn t fn = match t with | Unlimited -> fn () | Timeout (clock, d) -> - Fiber.first (fun () -> sleep clock d; raise Timeout) fn + Fiber.first (fun () -> Mono.sleep_span clock d; raise Timeout) fn let pp f = function | Unlimited -> Fmt.string f "(no timeout)" | Timeout (_clock, d) -> + let d = Mtime.Span.to_s d in if d >= 0.001 && d < 0.1 then Fmt.pf f "%.2gms" (d *. 1000.) else if d < 120. then diff --git a/lib_eio/time.mli b/lib_eio/time.mli index 235e08ccf..36ceeef3e 100644 --- a/lib_eio/time.mli +++ b/lib_eio/time.mli @@ -1,6 +1,10 @@ +class virtual ['a] clock_base : object + method virtual now : 'a + method virtual sleep_until : 'a -> unit +end + class virtual clock : object - method virtual now : float - method virtual sleep_until : float -> unit + inherit [float] clock_base end val now : #clock -> float @@ -12,6 +16,31 @@ val sleep_until : #clock -> float -> unit val sleep : #clock -> float -> unit (** [sleep t d] waits for [d] seconds. *) +(** Monotonic clocks. *) +module Mono : sig + (** Monotonic clocks are unaffected by corrections to the real-time clock, + and so are a better choice for timeouts or measuring intervals, + where the absolute time doesn't matter. + + A monotonic clock may or may not include time while the computer is suspended. *) + + class virtual t : object + inherit [Mtime.t] clock_base + end + + val now : #t -> Mtime.t + (** [now t] is the current time according to [t]. *) + + val sleep_until : #t -> Mtime.t -> unit + (** [sleep_until t time] waits until [time] before returning. *) + + val sleep : #t -> float -> unit + (** [sleep t d] waits for [d] seconds. *) + + val sleep_span : #t -> Mtime.span -> unit + (** [sleep_span t d] waits for duration [d]. *) +end + (** {2 Timeouts} *) exception Timeout @@ -27,10 +56,13 @@ val with_timeout_exn : #clock -> float -> (unit -> 'a) -> 'a module Timeout : sig type t - val of_s : #clock -> float -> t - (** [of_s clock duration] is a timeout of [duration] seconds, as measured by [clock]. + val v : #Mono.t -> Mtime.Span.t -> t + (** [v clock duration] is a timeout of [duration], as measured by [clock]. Internally, this is just the tuple [(clock, duration)]. *) + val of_s : #Mono.t -> float -> t + (** [of_s clock duration] is a timeout of [duration] seconds, as measured by [clock]. *) + val none : t (** [none] is an infinite timeout. *) diff --git a/lib_eio/utils/zzz.ml b/lib_eio/utils/zzz.ml index f2db9f586..d8c67213a 100644 --- a/lib_eio/utils/zzz.ml +++ b/lib_eio/utils/zzz.ml @@ -5,11 +5,11 @@ end module Job = struct type t = { - time : float; + time : Mtime.t; thread : unit Suspended.t; } - let compare a b = Float.compare a.time b.time + let compare a b = Mtime.compare a.time b.time end module Q = Psq.Make(Key)(Job) diff --git a/lib_eio/utils/zzz.mli b/lib_eio/utils/zzz.mli index 9d260df6a..4005085e6 100644 --- a/lib_eio/utils/zzz.mli +++ b/lib_eio/utils/zzz.mli @@ -11,7 +11,7 @@ type t val create : unit -> t (** [create ()] is a fresh empty queue. *) -val add : t -> float -> unit Suspended.t -> Key.t +val add : t -> Mtime.t -> unit Suspended.t -> Key.t (** [add t time thread] adds a new event, due at [time], and returns its ID. You must use {!Eio.Private.Fiber_context.set_cancel_fn} on [thread] before calling {!pop}. @@ -20,7 +20,7 @@ val add : t -> float -> unit Suspended.t -> Key.t val remove : t -> Key.t -> unit (** [remove t key] removes an event previously added with [add]. *) -val pop : t -> now:float -> [`Due of unit Suspended.t | `Wait_until of float | `Nothing] +val pop : t -> now:Mtime.t -> [`Due of unit Suspended.t | `Wait_until of Mtime.t | `Nothing] (** [pop ~now t] removes and returns the earliest thread due by [now]. It also clears the thread's cancel function. If no thread is due yet, it returns the time the earliest thread becomes due. *) diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index c6821e905..a71acff38 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -514,7 +514,7 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = | Some Failed_thread (k, ex) -> Suspended.discontinue k ex | Some IO -> (* Note: be sure to re-inject the IO task before continuing! *) (* This is not a fair scheduler: timers always run before all other IO *) - let now = Unix.gettimeofday () in + let now = Mtime_clock.now () in match Zzz.pop ~now sleep_q with | `Due k -> Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) @@ -529,7 +529,11 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = ignore (Uring.submit uring : int); let timeout = match next_due with - | `Wait_until time -> Some (time -. now) + | `Wait_until time -> + let time = Mtime.to_uint64_ns time in + let now = Mtime.to_uint64_ns now in + let diff_ns = Int64.sub time now |> Int64.to_float in + Some (diff_ns /. 1e9) | `Nothing -> None in Log.debug (fun l -> l "@[scheduler out of jobs, next timeout %s:@,%a@]" @@ -644,7 +648,7 @@ module Low_level = struct Log.debug (fun l -> l "noop returned"); if result <> 0 then raise (Unix.Unix_error (Uring.error_of_errno result, "noop", "")) - type _ Effect.t += Sleep_until : float -> unit Effect.t + type _ Effect.t += Sleep_until : Mtime.t -> unit Effect.t let sleep_until d = Effect.perform (Sleep_until d) @@ -1167,6 +1171,7 @@ type stdenv = < net : Eio.Net.t; domain_mgr : Eio.Domain_manager.t; clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; fs : Eio.Fs.dir Eio.Path.t; cwd : Eio.Fs.dir Eio.Path.t; secure_random : Eio.Flow.source; @@ -1191,11 +1196,24 @@ let domain_mgr ~run_event_loop = object (self) ) end +let mono_clock = object + inherit Eio.Time.Mono.t + + method now = Mtime_clock.now () + + method sleep_until = Low_level.sleep_until +end + let clock = object inherit Eio.Time.clock method now = Unix.gettimeofday () - method sleep_until = Low_level.sleep_until + + method sleep_until time = + (* todo: use the realtime clock directly instead of converting to monotonic time. + That is needed to handle adjustments to the system clock correctly. *) + let d = time -. Unix.gettimeofday () in + Eio.Time.Mono.sleep mono_clock d end class dir ~label (fd : dir_fd) = object @@ -1280,6 +1298,7 @@ let stdenv ~run_event_loop = method net = net method domain_mgr = domain_mgr ~run_event_loop method clock = clock + method mono_clock = mono_clock method fs = (fs :> Eio.Fs.dir Eio.Path.t) method cwd = (cwd :> Eio.Fs.dir Eio.Path.t) method secure_random = secure_random diff --git a/lib_eio_linux/eio_linux.mli b/lib_eio_linux/eio_linux.mli index 70dca0de4..e83677e37 100644 --- a/lib_eio_linux/eio_linux.mli +++ b/lib_eio_linux/eio_linux.mli @@ -65,6 +65,7 @@ type stdenv = < net : Eio.Net.t; domain_mgr : Eio.Domain_manager.t; clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; fs : Eio.Fs.dir Eio.Path.t; cwd : Eio.Fs.dir Eio.Path.t; secure_random : Eio.Flow.source; @@ -111,7 +112,7 @@ module Low_level : sig (** {1 Time functions} *) - val sleep_until : float -> unit + val sleep_until : Mtime.t -> unit (** [sleep_until time] blocks until the current time is [time]. *) (** {1 Fixed-buffer memory allocation functions} diff --git a/lib_eio_linux/tests/basic_eio_linux.ml b/lib_eio_linux/tests/basic_eio_linux.ml index 3ead3b887..335d2fbd8 100644 --- a/lib_eio_linux/tests/basic_eio_linux.ml +++ b/lib_eio_linux/tests/basic_eio_linux.ml @@ -24,7 +24,7 @@ let () = let buf = alloc_fixed_or_wait () in let _ = read_exactly fd buf 5 in Logs.debug (fun l -> l "sleeping at %f" (Unix.gettimeofday ())); - sleep_until (Unix.gettimeofday () +. 1.0); + sleep_until (Mtime.add_span (Mtime_clock.now ()) Mtime.Span.s |> Option.get); print_endline (Uring.Region.to_string ~len:5 buf); let _ = read_exactly fd ~file_offset:(Int63.of_int 3) buf 3 in print_endline (Uring.Region.to_string ~len:3 buf); diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index a562ed3ce..44a573e92 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -520,8 +520,7 @@ module Low_level = struct module Poll = Poll - let sleep_until due = - let delay = 1000. *. (due -. Unix.gettimeofday ()) |> ceil |> truncate |> max 0 in + let sleep_ms delay = enter @@ fun st k -> let timer = Luv.Timer.init ~loop:st.loop () |> or_raise in Fiber_context.set_cancel_fn k.fiber (fun ex -> @@ -533,6 +532,10 @@ module Low_level = struct if Fiber_context.clear_cancel_fn k.fiber then enqueue_thread st k () ) |> or_raise + let sleep_until due = + let delay = 1000. *. (due -. Unix.gettimeofday ()) |> ceil |> truncate |> max 0 in + sleep_ms delay + (* https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml *) let getaddrinfo ~service node = let ( let* ) o f = Option.bind o f in @@ -825,6 +828,7 @@ type stdenv = < net : Eio.Net.t; domain_mgr : Eio.Domain_manager.t; clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; fs : Eio.Fs.dir Eio.Path.t; cwd : Eio.Fs.dir Eio.Path.t; secure_random : Eio.Flow.source; @@ -875,6 +879,22 @@ let clock = object method sleep_until = sleep_until end +let mono_clock = object + inherit Eio.Time.Mono.t + + method now = Mtime_clock.now () + + method sleep_until time = + let now = Mtime.to_uint64_ns (Mtime_clock.now ()) in + let time = Mtime.to_uint64_ns time in + if Int64.unsigned_compare now time >= 0 then Fiber.yield () + else ( + let delay = Int64.sub time now |> Int64.to_float in + let delay = 1000. *. delay |> ceil |> truncate |> max 0 in + Low_level.sleep_ms delay + ) +end + type _ Eio.Generic.ty += Dir_resolve_new : (string -> string) Eio.Generic.ty let dir_resolve_new x = Eio.Generic.probe x Dir_resolve_new @@ -1007,6 +1027,7 @@ let stdenv ~run_event_loop = method net = net method domain_mgr = domain_mgr ~run_event_loop method clock = clock + method mono_clock = mono_clock method fs = (fs :> Eio.Fs.dir), "." method cwd = (cwd :> Eio.Fs.dir), "." method secure_random = secure_random diff --git a/lib_eio_luv/eio_luv.mli b/lib_eio_luv/eio_luv.mli index 3f19416e9..50f00908c 100644 --- a/lib_eio_luv/eio_luv.mli +++ b/lib_eio_luv/eio_luv.mli @@ -129,6 +129,7 @@ type stdenv = < net : Eio.Net.t; domain_mgr : Eio.Domain_manager.t; clock : Eio.Time.clock; + mono_clock : Eio.Time.Mono.t; fs : Eio.Fs.dir Eio.Path.t; cwd : Eio.Fs.dir Eio.Path.t; secure_random : Eio.Flow.source; diff --git a/tests/network.md b/tests/network.md index aceae0495..3af7371e3 100644 --- a/tests/network.md +++ b/tests/network.md @@ -642,7 +642,7 @@ First attempt times out: ```ocaml # Eio_mock.Backend.run @@ fun () -> - let clock = Eio_mock.Clock.make () in + let clock = Eio_mock.Clock.Mono.make () in let timeout = Eio.Time.Timeout.of_s clock 10. in Eio_mock.Net.on_getaddrinfo net [`Return [addr1; addr2]]; let mock_flow = Eio_mock.Flow.make "flow" in @@ -655,7 +655,7 @@ First attempt times out: ) ) (fun () -> - Eio_mock.Clock.advance clock + Eio_mock.Clock.Mono.advance clock );; +mock-net: getaddrinfo ~service:http www.example.com +mock-net: connect to tcp:127.0.0.1:80 @@ -672,7 +672,7 @@ Both attempts time out: ```ocaml # Eio_mock.Backend.run @@ fun () -> - let clock = Eio_mock.Clock.make () in + let clock = Eio_mock.Clock.Mono.make () in let timeout = Eio.Time.Timeout.of_s clock 10. in Eio_mock.Net.on_getaddrinfo net [`Return [addr1; addr2]]; Eio_mock.Net.on_connect net [`Run Fiber.await_cancel; `Run Fiber.await_cancel]; @@ -683,10 +683,10 @@ Both attempts time out: ) ) (fun () -> - Eio_mock.Clock.advance clock; + Eio_mock.Clock.Mono.advance clock; Fiber.yield (); Fiber.yield (); - Eio_mock.Clock.advance clock + Eio_mock.Clock.Mono.advance clock );; +mock-net: getaddrinfo ~service:http www.example.com +mock-net: connect to tcp:127.0.0.1:80 diff --git a/tests/time.md b/tests/time.md index af160f046..a7554321a 100644 --- a/tests/time.md +++ b/tests/time.md @@ -113,7 +113,8 @@ let rec loop () = ### Timeouts ```ocaml -# run @@ fun ~clock -> +# Eio_main.run @@ fun env -> + let clock = Eio.Stdenv.mono_clock env in Eio.Time.Timeout.(run_exn none) (fun () -> ()); let t = Eio.Time.Timeout.of_s clock 0.0001 in Eio.Time.Timeout.run_exn t (fun () -> Fiber.await_cancel ());; @@ -121,7 +122,8 @@ Exception: Eio__Time.Timeout. ``` ```ocaml -# run @@ fun ~clock -> +# Eio_main.run @@ fun env -> + let clock = Eio.Stdenv.mono_clock env in let show d = let t = Eio.Time.Timeout.of_s clock d in traceln "%g -> %a" d Eio.Time.Timeout.pp t