Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[epoll]: replace duplicate Delay modules with ThreadExt.Delay #5861

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions message-switch-unix.opam
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ depends: [
"base-threads"
"message-switch-core"
"ppx_deriving_rpc"
"xapi-stdext-unix"
]
synopsis: "A simple store-and-forward message switch"
description: """
Expand Down
1 change: 1 addition & 0 deletions ocaml/message-switch/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
rpclib.core
rpclib.json
threads.posix
xapi-stdext-threads
edwintorok marked this conversation as resolved.
Show resolved Hide resolved
)
(preprocess (pps ppx_deriving_rpc))
)
Expand Down
66 changes: 1 addition & 65 deletions ocaml/message-switch/unix/protocol_unix_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,71 +34,7 @@ module Int64Map = Map.Make (struct
let compare = compare
end)

module Delay = struct
(* Concrete type is the ends of a pipe *)
type t = {
(* A pipe is used to wake up a thread blocked in wait: *)
mutable pipe_out: Unix.file_descr option
; mutable pipe_in: Unix.file_descr option
; (* Indicates that a signal arrived before a wait: *)
mutable signalled: bool
; m: Mutex.t
}

let make () =
{pipe_out= None; pipe_in= None; signalled= false; m= Mutex.create ()}

exception Pre_signalled

let wait (x : t) (seconds : float) =
let to_close = ref [] in
let close' fd =
if List.mem fd !to_close then Unix.close fd ;
to_close := List.filter (fun x -> fd <> x) !to_close
in
finally'
(fun () ->
try
let pipe_out =
Mutex.execute x.m (fun () ->
if x.signalled then (
x.signalled <- false ;
raise Pre_signalled
) ;
let pipe_out, pipe_in = Unix.pipe () in
(* these will be unconditionally closed on exit *)
to_close := [pipe_out; pipe_in] ;
x.pipe_out <- Some pipe_out ;
x.pipe_in <- Some pipe_in ;
x.signalled <- false ;
pipe_out
)
in
let r, _, _ = Unix.select [pipe_out] [] [] seconds in
(* flush the single byte from the pipe *)
if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ;
(* return true if we waited the full length of time, false if we were woken *)
r = []
with Pre_signalled -> false
)
(fun () ->
Mutex.execute x.m (fun () ->
x.pipe_out <- None ;
x.pipe_in <- None ;
List.iter close' !to_close
)
)

let signal (x : t) =
Mutex.execute x.m (fun () ->
match x.pipe_in with
| Some fd ->
ignore (Unix.write fd (Bytes.of_string "X") 0 1)
| None ->
x.signalled <- true
(* If the wait hasn't happened yet then store up the signal *)
)
end
module Delay = Xapi_stdext_threads.Threadext.Delay

type item = {id: int; name: string; fn: unit -> unit}

Expand Down
28 changes: 1 addition & 27 deletions ocaml/xapi-idl/lib/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,7 @@ open D

let with_lock = Xapi_stdext_threads.Threadext.Mutex.execute

module PipeDelay = struct
(* Concrete type is the ends of a pipe *)
type t = {
(* A pipe is used to wake up a thread blocked in wait: *)
pipe_out: Unix.file_descr
; pipe_in: Unix.file_descr
}

let make () =
let pipe_out, pipe_in = Unix.pipe () in
{pipe_out; pipe_in}

let wait (x : t) (seconds : float) =
let timeout = if seconds < 0.0 then 0.0 else seconds in
if Thread.wait_timed_read x.pipe_out timeout then
(* flush the single byte from the pipe *)
let (_ : int) = Unix.read x.pipe_out (Bytes.create 1) 0 1 in
(* return false if we were woken *)
false
else
(* return true if we waited the full length of time, false if we were woken *)
true

let signal (x : t) =
let (_ : int) = Unix.write x.pipe_in (Bytes.of_string "X") 0 1 in
()
end
module PipeDelay = Xapi_stdext_threads.Threadext.Delay

type handle = Mtime.span * int

Expand Down
Loading