diff --git a/README.md b/README.md index d7e851e28..b75c57a62 100644 --- a/README.md +++ b/README.md @@ -1140,13 +1140,11 @@ let handle_job request = Fiber.yield (); (* (simulated work) *) Printf.sprintf "Processed:%d" request -let run_worker id stream = - traceln "Worker %s ready" id; - while true do - let request, reply = Eio.Stream.take stream in - traceln "Worker %s processing request %d" id request; - Promise.resolve reply (handle_job request) - done +let rec run_worker id stream = + let request, reply = Eio.Stream.take stream in + traceln "Worker %s processing request %d" id request; + Promise.resolve reply (handle_job request); + run_worker id stream let submit stream request = let reply, resolve_reply = Promise.create () in @@ -1160,10 +1158,13 @@ Each item in the stream is a request payload and a resolver for the reply promis # Eio_main.run @@ fun env -> let domain_mgr = Eio.Stdenv.domain_mgr env in Switch.run @@ fun sw -> - let stream = Eio.Stream.create 100 in + let stream = Eio.Stream.create 0 in let spawn_worker name = - Fiber.fork ~sw (fun () -> - Eio.Domain_manager.run domain_mgr (fun () -> run_worker name stream) + Fiber.fork_daemon ~sw (fun () -> + Eio.Domain_manager.run domain_mgr (fun () -> + traceln "Worker %s ready" name; + run_worker name stream + ) ) in spawn_worker "A"; @@ -1175,9 +1176,8 @@ Each item in the stream is a request payload and a resolver for the reply promis traceln "Client %d got %s" i (submit stream i) ); Fiber.yield () - done; - ); - raise Exit;; + done + );; +Worker A ready +Worker B ready +Client 1 submitting job... @@ -1189,22 +1189,27 @@ Each item in the stream is a request payload and a resolver for the reply promis +Worker A processing request 3 +Client 2 got Processed:2 +Client 3 got Processed:3 -Exception: Stdlib.Exit. +- : unit = () ``` +We use a zero-capacity stream here, which means that the `Stream.add` doesn't succeed until a worker accepts the job. +This is a good choice for a worker pool because it means that if the client fiber gets cancelled while waiting for a worker +then the job will never be run. It's also more efficient, as 0-capacity streams use a lock-free algorithm that is faster +when there are multiple domains. +Note that, while the stream itself is 0-capacity, clients still queue up waiting to use it. + In the code above, any exception raised while processing a job will exit the whole program. We might prefer to handle exceptions by sending them back to the client and continuing: ```ocaml -let run_worker id stream = - traceln "Worker %s ready" id; - while true do - let request, reply = Eio.Stream.take stream in - traceln "Worker %s processing request %d" id request; - match handle_job request with +let rec run_worker id stream = + let request, reply = Eio.Stream.take stream in + traceln "Worker %s processing request %d" id request; + begin match handle_job request with | result -> Promise.resolve_ok reply result | exception ex -> Promise.resolve_error reply ex; Fiber.check () - done + end; + run_worker id stream ``` The `Fiber.check ()` checks whether the worker itself has been cancelled, and exits the loop if so.