Skip to content

Commit

Permalink
Merge pull request #454 from talex5/pool-example
Browse files Browse the repository at this point in the history
Improve worker pool example
  • Loading branch information
talex5 authored Mar 7, 2023
2 parents 08edcc1 + 35f4be5 commit 0a4b3f9
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1140,13 +1140,11 @@ let handle_job request =
Fiber.yield (); (* (simulated work) *)
Printf.sprintf "Processed:%d" request
let run_worker id stream =
traceln "Worker %s ready" id;
while true do
let request, reply = Eio.Stream.take stream in
traceln "Worker %s processing request %d" id request;
Promise.resolve reply (handle_job request)
done
let rec run_worker id stream =
let request, reply = Eio.Stream.take stream in
traceln "Worker %s processing request %d" id request;
Promise.resolve reply (handle_job request);
run_worker id stream
let submit stream request =
let reply, resolve_reply = Promise.create () in
Expand All @@ -1160,10 +1158,13 @@ Each item in the stream is a request payload and a resolver for the reply promis
# Eio_main.run @@ fun env ->
let domain_mgr = Eio.Stdenv.domain_mgr env in
Switch.run @@ fun sw ->
let stream = Eio.Stream.create 100 in
let stream = Eio.Stream.create 0 in
let spawn_worker name =
Fiber.fork ~sw (fun () ->
Eio.Domain_manager.run domain_mgr (fun () -> run_worker name stream)
Fiber.fork_daemon ~sw (fun () ->
Eio.Domain_manager.run domain_mgr (fun () ->
traceln "Worker %s ready" name;
run_worker name stream
)
)
in
spawn_worker "A";
Expand All @@ -1175,9 +1176,8 @@ Each item in the stream is a request payload and a resolver for the reply promis
traceln "Client %d got %s" i (submit stream i)
);
Fiber.yield ()
done;
);
raise Exit;;
done
);;
+Worker A ready
+Worker B ready
+Client 1 submitting job...
Expand All @@ -1189,22 +1189,27 @@ Each item in the stream is a request payload and a resolver for the reply promis
+Worker A processing request 3
+Client 2 got Processed:2
+Client 3 got Processed:3
Exception: Stdlib.Exit.
- : unit = ()
```

We use a zero-capacity stream here, which means that the `Stream.add` doesn't succeed until a worker accepts the job.
This is a good choice for a worker pool because it means that if the client fiber gets cancelled while waiting for a worker
then the job will never be run. It's also more efficient, as 0-capacity streams use a lock-free algorithm that is faster
when there are multiple domains.
Note that, while the stream itself is 0-capacity, clients still queue up waiting to use it.

In the code above, any exception raised while processing a job will exit the whole program.
We might prefer to handle exceptions by sending them back to the client and continuing:

```ocaml
let run_worker id stream =
traceln "Worker %s ready" id;
while true do
let request, reply = Eio.Stream.take stream in
traceln "Worker %s processing request %d" id request;
match handle_job request with
let rec run_worker id stream =
let request, reply = Eio.Stream.take stream in
traceln "Worker %s processing request %d" id request;
begin match handle_job request with
| result -> Promise.resolve_ok reply result
| exception ex -> Promise.resolve_error reply ex; Fiber.check ()
done
end;
run_worker id stream
```

The `Fiber.check ()` checks whether the worker itself has been cancelled, and exits the loop if so.
Expand Down

0 comments on commit 0a4b3f9

Please sign in to comment.