Skip to content

Commit

Permalink
feat: implement terminate_pool/2 callback (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah authored Nov 11, 2023
1 parent a33ec58 commit a0aa89d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
28 changes: 26 additions & 2 deletions lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,33 @@ defmodule NimblePool do
) ::
{:ok, worker_state} | {:remove, user_reason()} | {:stop, user_reason()}

@doc """
Handle pool termination.
The `reason` argmument is the same given to GenServer's terminate/2 callback.
It is not necessary to terminate workers here because the
`terminate_worker/3` callback has already been invoked.
This should be used only for clean up extra resources that can not be
handled by `terminate_worker/3` callback.
This callback is optional.
"""
@doc callback: :pool
@callback terminate_pool(
reason :: :DOWN | :timeout | :throw | :error | :exit | user_reason,
pool_state
) :: :ok

@optional_callbacks init_pool: 1,
handle_checkin: 4,
handle_info: 2,
handle_enqueue: 2,
handle_update: 3,
handle_ping: 2,
terminate_worker: 3
terminate_worker: 3,
terminate_pool: 2

@doc """
Defines a pool to be started under the supervision tree.
Expand Down Expand Up @@ -679,11 +699,15 @@ defmodule NimblePool do
end

@impl true
def terminate(reason, %{resources: resources} = state) do
def terminate(reason, %{worker: worker, resources: resources} = state) do
for {worker_server_state, _} <- :queue.to_list(resources) do
maybe_terminate_worker(reason, worker_server_state, state)
end

if function_exported?(worker, :terminate_pool, 2) do
worker.terminate_pool(reason, state)
end

:ok
end

Expand Down
39 changes: 39 additions & 0 deletions test/nimble_pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ defmodule NimblePoolTest do
[{^instruction, return} | instructions] when is_function(return) ->
{return, instructions}

# Always accept terminate_pool as a valid instruction when there is no more instructions
[] = state ->
if instruction == :terminate_pool,
do: {fn _, _ -> :ok end, []},
else: raise("expected #{inspect(instruction)}, state was #{inspect(state)}")

state ->
raise "expected #{inspect(instruction)}, state was #{inspect(state)}"
end)
Expand Down Expand Up @@ -97,6 +103,10 @@ defmodule NimblePoolTest do
def terminate_worker(reason, worker_state, pid) do
TestAgent.next(pid, :terminate_worker, [reason, worker_state, pid])
end

def terminate_pool(reason, pool_state) do
TestAgent.next(pool_state.state, :terminate_pool, [reason, pool_state])
end
end

defp stateless_pool!(instructions, opts \\ []) do
Expand Down Expand Up @@ -1641,4 +1651,33 @@ defmodule NimblePoolTest do
assert_receive {:DOWN, _, :process, ^pool, {:shutdown, :some_reason}}
end
end

describe "terminate_pool" do
test "should terminate workers and call parent when terminating" do
parent = self()

{_, pool} =
stateful_pool!(
[
init_worker: fn next -> {:ok, :worker1, next} end,
terminate_worker: fn reason, worker, state ->
send(parent, {:terminated_worker, worker, reason, System.monotonic_time()})
{:ok, state}
end,
terminate_pool: fn reason, _state ->
send(parent, {:terminated_pool, reason, System.monotonic_time()})
:ok
end
],
pool_size: 1
)

NimblePool.stop(pool, :shutdown)

assert_receive {:terminated_worker, :worker1, :shutdown, termination_time_worker}
assert_receive {:terminated_pool, :shutdown, termination_time_pool}

assert termination_time_pool > termination_time_worker
end
end
end

0 comments on commit a0aa89d

Please sign in to comment.