Skip to content

Commit

Permalink
Merge branch 'release/v1.1.0' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
general-CbIC committed Dec 8, 2024
2 parents f99a939 + cb574ac commit 63a0596
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 176 deletions.
2 changes: 1 addition & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
{Credo.Check.Readability.AliasOrder, []},
{Credo.Check.Readability.FunctionNames, []},
{Credo.Check.Readability.LargeNumbers, []},
{Credo.Check.Readability.MaxLineLength, [priority: :low, max_length: 120]},
{Credo.Check.Readability.MaxLineLength, [priority: :low, max_length: 140]},
{Credo.Check.Readability.ModuleAttributeNames, []},
{Credo.Check.Readability.ModuleDoc, []},
{Credo.Check.Readability.ModuleNames, []},
Expand Down
3 changes: 2 additions & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
plugins: [Styler]
]
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.1.0] - 2024-12-08

### Added

- Added [adobe/elixir-styler](https://github.com/adobe/elixir-styler) to project.

### Changed

- Monitoring implemetation has been optimized by using plain map instead of `Agent` process.
- Refactored `State` struct by adding list of `@enforced_keys`. ([Details](https://hexdocs.pm/elixir/structs.html#default-values-and-required-keys))
- Poolex processes now have higher priority. ([Details](https://www.erlang.org/doc/apps/erts/erlang.html#process_flag_priority))

### Deprecated

- `Poolex.get_state/1` deprecated in favor `:sys.get_state/1`.

## [1.0.0] - 2024-09-23

### Changed
Expand Down Expand Up @@ -257,7 +273,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Supported main interface `Poolex.run/3` with `:timeout` option.
[unreleased]: https://github.com/general-CbIC/poolex/compare/v1.0.0...HEAD
[unreleased]: https://github.com/general-CbIC/poolex/compare/v1.1.0...HEAD
[1.1.0]: https://github.com/general-CbIC/poolex/compare/v1.0.0...v1.1.0
[1.0.0]: https://github.com/general-CbIC/poolex/compare/v0.10.0...v1.0.0
[0.10.0]: https://github.com/general-CbIC/poolex/compare/v0.9.0...v0.10.0
[0.9.0]: https://github.com/general-CbIC/poolex/compare/v0.8.0...v0.9.0
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[![Hex Docs](https://img.shields.io/badge/hex-docs-lightgreen.svg?style=flat)](https://hexdocs.pm/poolex/)
[![License](https://img.shields.io/hexpm/l/poolex.svg?style=flat)](https://github.com/general-CbIC/poolex/blob/main/LICENSE)
[![Total Download](https://img.shields.io/hexpm/dt/poolex.svg?style=flat)](https://hex.pm/packages/poolex)
[![Chat on Matrix](https://matrix.to/img/matrix-badge.svg?style=flat)](https://matrix.to/#/#poolex:gitter.im)

Poolex is a library for managing pools of workers. Inspired by [poolboy](https://github.com/devinus/poolboy).

Expand Down
116 changes: 46 additions & 70 deletions lib/poolex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ defmodule Poolex do
"""
@type run_option() :: {:checkout_timeout, timeout()}

@spawn_opts [priority: :high]

@doc """
Starts a Poolex process without links (outside of a supervision tree).
Expand All @@ -93,14 +95,14 @@ defmodule Poolex do
## Examples
iex> Poolex.start(pool_id: :my_pool, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
iex> %Poolex.Private.State{worker_module: worker_module} = Poolex.get_state(:my_pool)
iex> %Poolex.Private.State{worker_module: worker_module} = :sys.get_state(:my_pool)
iex> worker_module
Agent
"""
@spec start(list(poolex_option())) :: GenServer.on_start()
def start(opts) do
pool_id = Keyword.fetch!(opts, :pool_id)
GenServer.start(__MODULE__, opts, name: pool_id)
GenServer.start(__MODULE__, opts, name: pool_id, spawn_opt: @spawn_opts)
end

@doc """
Expand All @@ -117,14 +119,14 @@ defmodule Poolex do
## Examples
iex> Poolex.start_link(pool_id: :other_pool, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
iex> %Poolex.Private.State{worker_module: worker_module} = Poolex.get_state(:other_pool)
iex> %Poolex.Private.State{worker_module: worker_module} = :sys.get_state(:other_pool)
iex> worker_module
Agent
"""
@spec start_link(list(poolex_option())) :: GenServer.on_start()
def start_link(opts) do
pool_id = Keyword.fetch!(opts, :pool_id)
GenServer.start_link(__MODULE__, opts, name: pool_id)
GenServer.start_link(__MODULE__, opts, name: pool_id, spawn_opt: @spawn_opts)
end

@doc """
Expand Down Expand Up @@ -195,31 +197,17 @@ defmodule Poolex do
try do
GenServer.call(pool_id, {:get_idle_worker, caller_reference}, checkout_timeout)
catch
:exit,
{:timeout, {GenServer, :call, [_pool_id, {:get_idle_worker, ^caller_reference}, _timeout]}} ->
:exit, {:timeout, {GenServer, :call, [_pool_id, {:get_idle_worker, ^caller_reference}, _timeout]}} ->
{:error, :checkout_timeout}
after
GenServer.cast(pool_id, {:cancel_waiting, caller_reference})
end
end

@doc """
Returns current state of started pool.
Primarily needed to help with debugging. **Avoid using this function in production.**
## Examples
iex> Poolex.start(pool_id: :my_pool_2, worker_module: Agent, worker_args: [fn -> 0 end], workers_count: 5)
iex> state = %Poolex.Private.State{} = Poolex.get_state(:my_pool_2)
iex> state.worker_module
Agent
iex> is_pid(state.supervisor)
true
"""
@spec get_state(pool_id()) :: State.t()
def get_state(pool_id) do
GenServer.call(pool_id, :get_state)
@deprecated "Use :sys.get_state/1 instead"
@doc false
def get_state(name) do
:sys.get_state(name)
end

@doc """
Expand Down Expand Up @@ -264,8 +252,7 @@ defmodule Poolex do
raise ArgumentError, message
end

def add_idle_workers!(pool_id, workers_count)
when is_atom(pool_id) and is_integer(workers_count) do
def add_idle_workers!(pool_id, workers_count) when is_atom(pool_id) and is_integer(workers_count) do
GenServer.call(pool_id, {:add_idle_workers, workers_count})
end

Expand All @@ -280,8 +267,7 @@ defmodule Poolex do
raise ArgumentError, message
end

def remove_idle_workers!(pool_id, workers_count)
when is_atom(pool_id) and is_integer(workers_count) do
def remove_idle_workers!(pool_id, workers_count) when is_atom(pool_id) and is_integer(workers_count) do
GenServer.call(pool_id, {:remove_idle_workers, workers_count})
end

Expand All @@ -303,21 +289,19 @@ defmodule Poolex do
waiting_callers_impl =
Keyword.get(opts, :waiting_callers_impl, Poolex.Callers.Impl.ErlangQueue)

{:ok, monitor_pid} = Monitoring.init()
{:ok, supervisor} = Poolex.Private.Supervisor.start_link()

state =
%State{
max_overflow: max_overflow,
monitor_pid: monitor_pid,
pool_id: pool_id,
supervisor: supervisor,
worker_args: worker_args,
worker_module: worker_module,
worker_start_fun: worker_start_fun
}

initial_workers_pids = start_workers(workers_count, state)
{initial_workers_pids, state} = start_workers(workers_count, state)

state =
state
Expand All @@ -335,25 +319,24 @@ defmodule Poolex do
{:noreply, state}
end

@spec start_workers(non_neg_integer(), State.t()) :: [pid]
defp start_workers(0, _state) do
[]
@spec start_workers(non_neg_integer(), State.t()) :: {[pid], State.t()}
defp start_workers(0, state) do
{[], state}
end

defp start_workers(workers_count, _state) when workers_count < 0 do
msg = "workers_count must be non negative number, received: #{inspect(workers_count)}"
raise ArgumentError, msg
end

defp start_workers(workers_count, state) do
Enum.map(1..workers_count, fn _ ->
defp start_workers(workers_count, state) when is_integer(workers_count) and workers_count >= 1 do
Enum.map_reduce(1..workers_count, state, fn _iterator, state ->
{:ok, worker_pid} = start_worker(state)
Monitoring.add(state.monitor_pid, worker_pid, :worker)

worker_pid
state = Monitoring.add(state, worker_pid, :worker)
{worker_pid, state}
end)
end

defp start_workers(workers_count, _state) do
msg = "workers_count must be non negative integer, received: #{inspect(workers_count)}"
raise ArgumentError, msg
end

@spec start_worker(State.t()) :: {:ok, pid()}
defp start_worker(%State{} = state) do
DynamicSupervisor.start_child(state.supervisor, %{
Expand All @@ -374,16 +357,17 @@ defmodule Poolex do
if state.overflow < state.max_overflow do
{:ok, new_worker} = start_worker(state)

Monitoring.add(state.monitor_pid, new_worker, :worker)

state = BusyWorkers.add(state, new_worker)
state =
state
|> Monitoring.add(new_worker, :worker)
|> BusyWorkers.add(new_worker)

{:reply, {:ok, new_worker}, %State{state | overflow: state.overflow + 1}}
else
Monitoring.add(state.monitor_pid, from_pid, :waiting_caller)

state =
WaitingCallers.add(state, %Poolex.Caller{reference: caller_reference, from: caller})
state
|> Monitoring.add(from_pid, :waiting_caller)
|> WaitingCallers.add(%Poolex.Caller{reference: caller_reference, from: caller})

{:noreply, state}
end
Expand All @@ -395,10 +379,6 @@ defmodule Poolex do
end
end

def handle_call(:get_state, _from, state) do
{:reply, state, state}
end

def handle_call(:get_debug_info, _from, %State{} = state) do
debug_info = %DebugInfo{
busy_workers_count: BusyWorkers.count(state),
Expand All @@ -421,14 +401,14 @@ defmodule Poolex do

@impl GenServer
def handle_call({:add_idle_workers, workers_count}, _from, %State{} = state) do
new_state =
workers_count
|> start_workers(state)
|> Enum.reduce(state, fn worker, acc_state ->
{workers, state} = start_workers(workers_count, state)

state =
Enum.reduce(workers, state, fn worker, acc_state ->
IdleWorkers.add(acc_state, worker)
end)

{:reply, :ok, new_state}
{:reply, :ok, state}
end

@impl GenServer
Expand Down Expand Up @@ -461,15 +441,12 @@ defmodule Poolex do
end

@impl GenServer
def handle_info(
{:DOWN, monitoring_reference, _process, dead_process_pid, _reason},
%State{} = state
) do
case Monitoring.remove(state.monitor_pid, monitoring_reference) do
:worker ->
def handle_info({:DOWN, monitoring_reference, _process, dead_process_pid, _reason}, %State{} = state) do
case Monitoring.remove(state, monitoring_reference) do
{:worker, state} ->
{:noreply, handle_down_worker(state, dead_process_pid)}

:waiting_caller ->
{:waiting_caller, state} ->
{:noreply, handle_down_waiting_caller(state, dead_process_pid)}
end
end
Expand Down Expand Up @@ -513,15 +490,15 @@ defmodule Poolex do
else
{:ok, new_worker} = start_worker(state)

Monitoring.add(state.monitor_pid, new_worker, :worker)

IdleWorkers.add(state, new_worker)
state
|> Monitoring.add(new_worker, :worker)
|> IdleWorkers.add(new_worker)
end
else
{:ok, new_worker} = start_worker(state)
Monitoring.add(state.monitor_pid, new_worker, :worker)

state
|> Monitoring.add(new_worker, :worker)
|> BusyWorkers.add(new_worker)
|> provide_worker_to_waiting_caller(new_worker)
end
Expand All @@ -535,7 +512,6 @@ defmodule Poolex do
@impl GenServer
def terminate(reason, %State{} = state) do
DynamicSupervisor.stop(state.supervisor, reason)
Monitoring.stop(state.monitor_pid)

:ok
end
Expand Down
5 changes: 1 addition & 4 deletions lib/poolex/private/busy_workers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ defmodule Poolex.Private.BusyWorkers do

@doc false
@spec remove(State.t(), Poolex.worker()) :: State.t()
def remove(
%State{busy_workers_impl: impl, busy_workers_state: busy_workers_state} = state,
worker
) do
def remove(%State{busy_workers_impl: impl, busy_workers_state: busy_workers_state} = state, worker) do
%State{state | busy_workers_state: impl.remove(busy_workers_state, worker)}
end

Expand Down
5 changes: 1 addition & 4 deletions lib/poolex/private/idle_workers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ defmodule Poolex.Private.IdleWorkers do

@doc false
@spec remove(State.t(), Poolex.worker()) :: State.t()
def remove(
%State{idle_workers_impl: impl, idle_workers_state: idle_workers_state} = state,
worker
) do
def remove(%State{idle_workers_impl: impl, idle_workers_state: idle_workers_state} = state, worker) do
%State{state | idle_workers_state: impl.remove(idle_workers_state, worker)}
end

Expand Down
36 changes: 10 additions & 26 deletions lib/poolex/private/monitoring.ex
Original file line number Diff line number Diff line change
@@ -1,42 +1,26 @@
defmodule Poolex.Private.Monitoring do
@moduledoc false
@type kind_of_process() :: :worker | :waiting_caller

@spec init() :: {:ok, pid()}
@doc """
Create new monitoring references storage.
"""
def init do
Agent.start_link(fn -> %{} end)
end
alias Poolex.Private.State

@spec stop(pid()) :: :ok
@doc """
Delete storage.
"""
def stop(pid) do
Agent.stop(pid)
end
@type kind_of_process() :: :worker | :waiting_caller

@spec add(monitor_pid :: pid(), worker_pid :: pid(), kind_of_process()) :: :ok
@spec add(State.t(), worker_pid :: pid(), kind_of_process()) :: State.t()
@doc """
Start monitoring given worker or caller process.
"""
def add(monitor_pid, process_pid, kind_of_process) do
def add(%{monitors: monitors} = state, process_pid, kind_of_process) do
reference = Process.monitor(process_pid)

Agent.update(monitor_pid, fn state -> Map.put(state, reference, kind_of_process) end)
%{state | monitors: Map.put(monitors, reference, kind_of_process)}
end

@spec remove(monitor_pid :: pid(), reference()) :: kind_of_process()
@spec remove(State.t(), reference()) :: {kind_of_process(), State.t()}
@doc """
Stop monitoring given worker or caller process and return kind of it.
"""
def remove(monitor_pid, monitoring_reference) do
def remove(%{monitors: monitors} = state, monitoring_reference) do
true = Process.demonitor(monitoring_reference)

Agent.get_and_update(monitor_pid, fn state ->
{Map.get(state, monitoring_reference), Map.delete(state, monitoring_reference)}
end)
kind_of_process = Map.get(monitors, monitoring_reference)
state = %{state | monitors: Map.delete(monitors, monitoring_reference)}
{kind_of_process, state}
end
end
Loading

0 comments on commit 63a0596

Please sign in to comment.