diff --git a/lib_eio/unix/eio_unix.ml b/lib_eio/unix/eio_unix.ml index 74a2a4dbb..9873d1395 100644 --- a/lib_eio/unix/eio_unix.ml +++ b/lib_eio/unix/eio_unix.ml @@ -14,6 +14,7 @@ module Private = struct type _ Effect.t += | Await_readable : Unix.file_descr -> unit Effect.t | Await_writable : Unix.file_descr -> unit Effect.t + | Sleep_until : float -> unit Effect.t | Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr -> socket Effect.t | Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int -> (socket * socket) Effect.t end @@ -25,22 +26,36 @@ let real_clock = object inherit [Ptime.t] Eio.Time.clock method now = Ptime_clock.now () - method sleep_until = failwith "sleep_until not implemented" + + method sleep_until time = + let time = Ptime.to_float_s time in + Effect.perform (Private.Sleep_until time) + method add_seconds t d = let span = Ptime.Span.of_float_s d in Option.bind span (Ptime.add_span t) |> Option.get + + method to_seconds time = Ptime.to_float_s time end let mono_clock = object inherit [Mtime.t] Eio.Time.clock method now = Mtime_clock.now () - method sleep_until = failwith "sleep_until not implemented" + + method sleep_until time = + let time = Mtime.to_uint64_ns time |> Int64.to_float in + Effect.perform (Private.Sleep_until (time /. 1e9)) + method add_seconds t d = let span = (d *. 1e9) |> Int64.of_float |> Mtime.Span.of_uint64_ns in Mtime.add_span t span |> Option.get + + method to_seconds time = + let time = Mtime.to_uint64_ns time |> Int64.to_float in + (time /. 1e9) end let sleep d = Eio.Time.sleep mono_clock d diff --git a/lib_eio/unix/eio_unix.mli b/lib_eio/unix/eio_unix.mli index f0793bdec..6570d7bfd 100644 --- a/lib_eio/unix/eio_unix.mli +++ b/lib_eio/unix/eio_unix.mli @@ -87,6 +87,7 @@ module Private : sig type _ Effect.t += | Await_readable : Unix.file_descr -> unit Effect.t (** See {!await_readable} *) | Await_writable : Unix.file_descr -> unit Effect.t (** See {!await_writable} *) + | Sleep_until : float -> unit Effect.t | Socket_of_fd : Switch.t * bool * Unix.file_descr -> socket Effect.t (** See {!FD.as_socket} *) | Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int -> diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index b39566e3b..7209ff5dc 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -514,7 +514,7 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = | Some Failed_thread (k, ex) -> Suspended.discontinue k ex | Some IO -> (* Note: be sure to re-inject the IO task before continuing! *) (* This is not a fair scheduler: timers always run before all other IO *) - let now = Unix.gettimeofday () in + let now = Eio_unix.(mono_clock#now |> mono_clock#to_seconds) in match Zzz.pop ~now sleep_q with | `Due k -> Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *) @@ -644,9 +644,8 @@ module Low_level = struct Log.debug (fun l -> l "noop returned"); if result <> 0 then raise (Unix.Unix_error (Uring.error_of_errno result, "noop", "")) - type _ Effect.t += Sleep_until : float -> unit Effect.t let sleep_until d = - Effect.perform (Sleep_until d) + Effect.perform (Eio_unix.Private.Sleep_until d) type _ Effect.t += ERead : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int Effect.t @@ -1379,7 +1378,7 @@ let rec run : type a. enqueue_write st k args; schedule st ) - | Low_level.Sleep_until time -> Some (fun k -> + | Eio_unix.Private.Sleep_until time -> Some (fun k -> let k = { Suspended.k; fiber } in match Fiber_context.get_error fiber with | Some ex -> Suspended.discontinue k ex diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index 29254890e..7f4085989 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -519,9 +519,9 @@ module Low_level = struct module Poll = Poll - let sleep_until due = - let delay = 1000. *. (due -. Unix.gettimeofday ()) |> ceil |> truncate |> max 0 in - enter @@ fun st k -> + let sleep_until due st (k: unit Suspended.t) = + let now = Eio_unix.(mono_clock#now |> mono_clock#to_seconds) in + let delay = 1000. *. (due -. now) |> ceil |> truncate |> max 0 in let timer = Luv.Timer.init ~loop:st.loop () |> or_raise in Fiber_context.set_cancel_fn k.fiber (fun ex -> Luv.Timer.stop timer |> or_raise; @@ -1070,6 +1070,11 @@ let rec run : type a. (_ -> a) -> a = fun main -> let k = { Suspended.k; fiber } in fn fiber (enqueue_result_thread st k) ) + | Eio_unix.Private.Sleep_until due -> Some (fun k -> + match Fiber_context.get_error fiber with + | Some e -> discontinue k e + | None -> sleep_until due st {Suspended.k; fiber} + ) | Eio_unix.Private.Await_readable fd -> Some (fun k -> match Fiber_context.get_error fiber with | Some e -> discontinue k e diff --git a/lib_eio_luv/eio_luv.mli b/lib_eio_luv/eio_luv.mli index a6df37d15..5b79969d9 100644 --- a/lib_eio_luv/eio_luv.mli +++ b/lib_eio_luv/eio_luv.mli @@ -21,11 +21,6 @@ module Low_level : sig (** [await_with_cancel ~request fn] converts a function using a luv-style callback to one using effects. It sets the fiber's cancel function to cancel [request], and clears it when the operation completes. *) - (** {1 Time functions} *) - - val sleep_until : float -> unit - (** [sleep_until time] blocks until the current time is [time]. *) - (** {1 DNS functions} *) val getaddrinfo : service:string -> string -> Eio.Net.Sockaddr.t list