Skip to content

Commit

Permalink
Merge pull request ocaml-multicore#661 from talex5/trace-switch
Browse files Browse the repository at this point in the history
Tracing: add labels to switches
  • Loading branch information
talex5 authored Dec 22, 2023
2 parents 4db993d + c88c58e commit 5e014fc
Show file tree
Hide file tree
Showing 21 changed files with 50 additions and 40 deletions.
4 changes: 2 additions & 2 deletions bench/bench_http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let run_client ~n_requests id conn =
let main net domain_mgr ~n_client_domains ~n_server_domains ~n_connections_per_domain ~n_requests_per_connection =
let total = Atomic.make 0 in
let t0 = Unix.gettimeofday () in
Switch.run (fun sw ->
Switch.run ~name:"main" (fun sw ->
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8085) in
let backlog = n_connections_per_domain * n_client_domains in
let server_socket = Eio.Net.listen ~reuse_addr:true ~backlog ~sw net addr in
Expand All @@ -74,7 +74,7 @@ let main net domain_mgr ~n_client_domains ~n_server_domains ~n_connections_per_d
for domain = 1 to n_client_domains do
Fiber.fork ~sw (fun () ->
Eio.Domain_manager.run domain_mgr (fun () ->
Switch.run @@ fun sw ->
Switch.run ~name:"client-domain" @@ fun sw ->
for i = 1 to n_connections_per_domain do
Fiber.fork ~sw (fun () ->
let id = Printf.sprintf "domain %d / conn %d" domain i in
Expand Down
2 changes: 1 addition & 1 deletion examples/net/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ module Write = Eio.Buf_write

(* Connect to [addr] on [net], send a message and then read the reply. *)
let run ~net ~addr =
Switch.run ~name:"client" @@ fun sw ->
traceln "Connecting to server at %a..." Eio.Net.Sockaddr.pp addr;
Switch.run @@ fun sw ->
let flow = Eio.Net.connect ~sw net addr in
(* We use a buffered writer here so we can create the message in multiple
steps but still send it efficiently as a single packet: *)
Expand Down
2 changes: 1 addition & 1 deletion examples/net/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8080)

(* Run a server and a test client, communicating using [net]. *)
let main ~net =
Switch.run @@ fun sw ->
Switch.run ~name:"main" @@ fun sw ->
(* We create the listening socket first so that we can be sure it is ready
as soon as the client wants to use it. *)
let listening_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
Expand Down
2 changes: 1 addition & 1 deletion examples/trace/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ let trace ~finished (clock, delay) cursor =

(* The program to be traced. *)
let main net =
Switch.run @@ fun sw ->
Switch.run ~name:"main" @@ fun sw ->
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8123) in
let s = Eio.Net.listen ~sw ~backlog:1 ~reuse_addr:true net addr in
Fiber.both
Expand Down
2 changes: 1 addition & 1 deletion lib_eio/buf_write.ml
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ let copy t flow =
with End_of_file -> ()

let with_flow ?(initial_size=0x1000) flow fn =
Switch.run @@ fun sw ->
Switch.run ~name:"Buf_write.with_flow" @@ fun sw ->
let t = create ~sw initial_size in
Fiber.fork ~sw (fun () -> copy t flow);
match fn t with
Expand Down
3 changes: 2 additions & 1 deletion lib_eio/core/cancel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,11 @@ let cancel t ex =
Printexc.raise_with_backtrace ex bt
)

let sub_checked purpose fn =
let sub_checked ?name purpose fn =
let ctx = Effect.perform Get_context in
let parent = ctx.cancel_context in
with_cc ~ctx ~parent ~protected:false purpose @@ fun t ->
Option.iter (Trace.name t.id) name;
fn t

let sub fn =
Expand Down
8 changes: 5 additions & 3 deletions lib_eio/core/eio__core.mli
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ module Switch : sig

(** {2 Switch creation} *)

val run : (t -> 'a) -> 'a
val run : ?name:string -> (t -> 'a) -> 'a
(** [run fn] runs [fn] with a fresh switch (initially on).
When [fn] finishes, [run] waits for all fibers registered with the switch to finish,
and then releases all attached resources.
If {!fail} is called, [run] will re-raise the exception (after everything is cleaned up).
If [fn] raises an exception, it is passed to {!fail}. *)
If [fn] raises an exception, it is passed to {!fail}.
@param name Used to name the switch when tracing. *)

val run_protected : (t -> 'a) -> 'a
val run_protected : ?name:string -> (t -> 'a) -> 'a
(** [run_protected fn] is like [run] but ignores cancellation requests from the parent context. *)

(** {2 Cancellation and failure} *)
Expand Down
8 changes: 4 additions & 4 deletions lib_eio/core/fiber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ let fork_promise_exn ~sw f =
p

let all xs =
Switch.run @@ fun sw ->
Switch.run ~name:"all" @@ fun sw ->
List.iter (fork ~sw) xs

let both f g = all [f; g]

let pair f g =
Switch.run @@ fun sw ->
Switch.run ~name:"pair" @@ fun sw ->
let x = fork_promise ~sw f in
let y = g () in
(Promise.await_exn x, y)
Expand Down Expand Up @@ -225,7 +225,7 @@ module List = struct
match items with
| [] -> [] (* Avoid creating a switch in the simple case *)
| items ->
Switch.run @@ fun sw ->
Switch.run ~name:"filter_map" @@ fun sw ->
let limiter = Limiter.create ~sw max_fibers in
let rec aux = function
| [] -> []
Expand All @@ -244,7 +244,7 @@ module List = struct
match items with
| [] -> () (* Avoid creating a switch in the simple case *)
| items ->
Switch.run @@ fun sw ->
Switch.run ~name:"iter" @@ fun sw ->
let limiter = Limiter.create ~sw max_fibers in
let rec aux = function
| [] -> ()
Expand Down
5 changes: 3 additions & 2 deletions lib_eio/core/switch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,12 @@ let run_internal t fn =
maybe_raise_exs t;
assert false

let run fn = Cancel.sub_checked Switch (fun cc -> run_internal (create cc) fn)
let run ?name fn = Cancel.sub_checked ?name Switch (fun cc -> run_internal (create cc) fn)

let run_protected fn =
let run_protected ?name fn =
let ctx = Effect.perform Cancel.Get_context in
Cancel.with_cc ~ctx ~parent:ctx.cancel_context ~protected:true Switch @@ fun cancel ->
Option.iter (Trace.name cancel.id) name;
run_internal (create cancel) fn

(* Run [fn ()] in [t]'s cancellation context.
Expand Down
1 change: 1 addition & 0 deletions lib_eio/core/trace.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ let create_fiber ~cc id =
add_event RE.create_fiber (id, cc)

let log = add_event RE.log
let name id x = add_event RE.name (id, x)
let enter_span = add_event RE.enter_span
let exit_span = add_event RE.exit_span
let fiber = add_event RE.fiber
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/core/trace.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ val mint_id : unit -> id
val log : string -> unit
(** [log msg] attaches text [msg] to the current fiber. *)

val name : id -> string -> unit
(** [name id label] sets [label] as the name for [id]. *)

val with_span : string -> (unit -> 'a) -> 'a
(** [with_span op fn] runs [fn ()], labelling the timespan during which it runs with [op]. *)

Expand Down
2 changes: 1 addition & 1 deletion lib_eio/executor_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ let max_capacity_f = float max_capacity
Each worker runs in its own domain,
taking jobs from [queue] whenever it has spare capacity. *)
let run_worker { queue } =
Switch.run @@ fun sw ->
Switch.run ~name:"run_worker" @@ fun sw ->
let capacity = ref 0 in
let condition = Condition.create () in
(* The main worker loop. *)
Expand Down
16 changes: 9 additions & 7 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ let getnameinfo (type tag) (t:[> tag ty] r) sockaddr =
let close = Resource.close

let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
Switch.run @@ fun sw ->
Switch.run ~name:"with_tcp_connect" @@ fun sw ->
match
let rec aux = function
| [] -> raise @@ err (Connection_failure No_matching_addresses)
Expand All @@ -355,8 +355,7 @@ let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
Exn.reraise_with_context ex bt "connecting to %S:%s" host service

(* Run a server loop in a single domain. *)
let run_server_loop ~connections ~on_error ~stop listening_socket connection_handler =
Switch.run @@ fun sw ->
let run_server_loop ~sw ~connections ~on_error ~stop listening_socket connection_handler =
let rec accept () =
Semaphore.acquire connections;
accept_fork ~sw ~on_error listening_socket (fun conn addr ->
Expand All @@ -371,13 +370,16 @@ let run_server_loop ~connections ~on_error ~stop listening_socket connection_han

let run_server ?(max_connections=Int.max_int) ?(additional_domains) ?stop ~on_error listening_socket connection_handler : 'a =
if max_connections <= 0 then invalid_arg "max_connections";
Switch.run @@ fun sw ->
Switch.run ~name:"run_server" @@ fun sw ->
let connections = Semaphore.make max_connections in
let run_server_loop () = run_server_loop ~connections ~on_error ~stop listening_socket connection_handler in
let run_server_loop sw = run_server_loop ~sw ~connections ~on_error ~stop listening_socket connection_handler in
additional_domains |> Option.iter (fun (domain_mgr, domains) ->
if domains < 0 then invalid_arg "additional_domains";
for _ = 1 to domains do
Fiber.fork ~sw (fun () -> Domain_manager.run domain_mgr (fun () -> ignore (run_server_loop () : 'a)))
Fiber.fork ~sw (fun () -> Domain_manager.run domain_mgr (fun () ->
Switch.run ~name:"run_server" @@ fun sw ->
ignore (run_server_loop sw : 'a)
))
done;
);
run_server_loop ()
run_server_loop sw
8 changes: 4 additions & 4 deletions lib_eio/path.ml
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ let is_directory t =
kind ~follow:true t = `Directory

let with_open_in path fn =
Switch.run @@ fun sw -> fn (open_in ~sw path)
Switch.run ~name:"with_open_in" @@ fun sw -> fn (open_in ~sw path)

let with_open_out ?append ~create path fn =
Switch.run @@ fun sw -> fn (open_out ~sw ?append ~create path)
Switch.run ~name:"with_open_out" @@ fun sw -> fn (open_out ~sw ?append ~create path)

let with_open_dir path fn =
Switch.run @@ fun sw -> fn (open_dir ~sw path)
Switch.run ~name:"with_open_dir" @@ fun sw -> fn (open_dir ~sw path)

let with_lines path fn =
with_open_in path @@ fun flow ->
Expand Down Expand Up @@ -174,7 +174,7 @@ let catch_missing ~missing_ok fn x =
let rec rmtree ~missing_ok t =
match kind ~follow:false t with
| `Directory ->
Switch.run (fun sw ->
Switch.run ~name:"rmtree" (fun sw ->
match
let t = open_dir ~sw t in
t, read_dir t
Expand Down
4 changes: 2 additions & 2 deletions lib_eio/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ let spawn (type tag) ~sw (t : [> tag mgr_ty] r) ?cwd ?stdin ?stdout ?stderr ?env
?stderr:(stderr :> Flow.sink_ty r option)

let run t ?cwd ?stdin ?stdout ?stderr ?(is_success = Int.equal 0) ?env ?executable args =
Switch.run @@ fun sw ->
Switch.run ~name:"Process.run" @@ fun sw ->
let child = spawn ~sw t ?cwd ?stdin ?stdout ?stderr ?env ?executable args in
match await child with
| `Exited code when is_success code -> ()
Expand All @@ -146,7 +146,7 @@ let pipe (type tag) ~sw ((Resource.T (v, ops)) : [> tag mgr_ty] r) =
X.pipe v ~sw

let parse_out (type tag) (t : [> tag mgr_ty] r) parse ?cwd ?stdin ?stderr ?is_success ?env ?executable args =
Switch.run @@ fun sw ->
Switch.run ~name:"Process.parse_out" @@ fun sw ->
let r, w = pipe t ~sw in
try
let child = spawn ~sw t ?cwd ?stdin ~stdout:w ?stderr ?env ?executable args in
Expand Down
6 changes: 3 additions & 3 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ let process =

(* fchdir wants just a directory FD, not an FD and a path like the *at functions. *)
let with_dir dir_fd path fn =
Switch.run @@ fun sw ->
Switch.run ~name:"with_dir" @@ fun sw ->
Low_level.openat ~sw
~seekable:false
~access:`R
Expand Down Expand Up @@ -527,7 +527,7 @@ end = struct
let mkdir t ~perm path = Low_level.mkdir_beneath ~perm t.fd path

let read_dir t path =
Switch.run @@ fun sw ->
Switch.run ~name:"read_dir" @@ fun sw ->
let fd = Low_level.open_dir ~sw t.fd (if path = "" then "." else path) in
Low_level.read_dir fd

Expand Down Expand Up @@ -569,7 +569,7 @@ end = struct
}
) else (
(* Linux < 5.18 *)
Switch.run @@ fun sw ->
Switch.run ~name:"stat" @@ fun sw ->
let fd = Low_level.openat ~sw ~seekable:false t.fd (if path = "" then "." else path)
~access:`R
~flags:Uring.Open_flags.(cloexec + path + (if follow then empty else nofollow))
Expand Down
6 changes: 3 additions & 3 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ let getrandom { Cstruct.buffer; off; len } =
let with_parent_dir_fd dir path fn =
let dir_path = Filename.dirname path in
let leaf = Filename.basename path in
Switch.run (fun sw ->
Switch.run ~name:"with_parent_dir" (fun sw ->
match dir with
| _ when leaf = ".." ->
let fd =
Expand Down Expand Up @@ -414,7 +414,7 @@ let statx_confined ~mask ~follow fd path buf =
with_parent_dir_fd fd path @@ fun parent leaf ->
statx ~mask ~fd:parent leaf buf flags
| Cwd | FD _ ->
Switch.run @@ fun sw ->
Switch.run ~name:"statx" @@ fun sw ->
let fd = openat ~sw ~seekable:false fd (if path = "" then "." else path)
~access:`R
~flags:Uring.Open_flags.(cloexec + path)
Expand Down Expand Up @@ -508,7 +508,7 @@ let pipe ~sw =
(r, w)

let with_pipe fn =
Switch.run @@ fun sw ->
Switch.run ~name:"with_pipe" @@ fun sw ->
let r, w = pipe ~sw in
fn r w

Expand Down
2 changes: 1 addition & 1 deletion lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ let run ~extra_effects st main arg =
~prepare_for_await:Eio.Private.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
Switch.run_protected (fun sw ->
Switch.run_protected ~name:"eio_linux" (fun sw ->
Fiber.fork_daemon ~sw (fun () -> monitor_event_fd st);
match main arg with
| x -> result := Some (Ok x)
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_posix/fs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ end = struct
else dir, leaf
in
let dir = resolve t dir in
Switch.run @@ fun sw ->
Switch.run ~name:"with_parent_dir" @@ fun sw ->
let dirfd = Low_level.openat ~sw ~mode:0 dir Low_level.Open_flags.(directory + rdonly + nofollow) in
fn (Some dirfd) leaf
) else fn None path
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_posix/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ module Impl = struct
| None -> Fmt.invalid_arg "cwd is not an OS directory!"
| Some posix ->
Fs.Dir.with_parent_dir posix path @@ fun dirfd s ->
Switch.run @@ fun launch_sw ->
Switch.run ~name:"spawn_unix" @@ fun launch_sw ->
let cwd = Low_level.openat ?dirfd ~sw:launch_sw ~mode:0 s Low_level.Open_flags.(rdonly + directory) in
fn (Low_level.Process.Fork_action.fchdir cwd :: actions)
in
Expand Down
2 changes: 1 addition & 1 deletion stress/stress_proc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ let run_in_domain mgr =
let main ~dm mgr =
let t0 = Unix.gettimeofday () in
for i = 1 to n_rounds do
Switch.run (fun sw ->
Switch.run ~name:"round" (fun sw ->
for _ = 1 to n_domains - 1 do
Fiber.fork ~sw (fun () -> Eio.Domain_manager.run dm (fun () -> run_in_domain mgr))
done;
Expand Down

0 comments on commit 5e014fc

Please sign in to comment.