Skip to content

Commit

Permalink
Merge pull request #248 from oliveigah/main
Browse files Browse the repository at this point in the history
Implement pool telemetry
  • Loading branch information
sneako committed Jan 7, 2024
2 parents 109c3d2 + b443c64 commit 3946acb
Show file tree
Hide file tree
Showing 15 changed files with 743 additions and 47 deletions.
64 changes: 63 additions & 1 deletion lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ defmodule Finch do
before being closed during a checkout attempt.
""",
default: :infinity
],
start_pool_metrics?: [
type: :boolean,
doc: "When true, pool metrics will be collected and avaiable through Finch.pool_status/2",
default: false
]
]

Expand All @@ -95,6 +100,10 @@ defmodule Finch do
"""
@type name() :: atom()

@type scheme() :: :http | :https

@type scheme_host_port() :: {scheme(), host :: String.t(), port :: :inet.port_number()}

@type request_opt() :: {:pool_timeout, pos_integer()} | {:receive_timeout, pos_integer()}

@typedoc """
Expand Down Expand Up @@ -271,7 +280,8 @@ defmodule Finch do
count: valid[:count],
conn_opts: conn_opts,
conn_max_idle_time: to_native(valid[:max_idle_time] || valid[:conn_max_idle_time]),
pool_max_idle_time: valid[:pool_max_idle_time]
pool_max_idle_time: valid[:pool_max_idle_time],
start_pool_metrics?: valid[:start_pool_metrics?]
}
end

Expand Down Expand Up @@ -570,4 +580,56 @@ defmodule Finch do
defp get_pool(%Request{scheme: scheme, host: host, port: port}, name) do
PoolManager.get_pool(name, {scheme, host, port})
end

@doc """
Get pool metrics list.
The number of items present on the metrics list depends on the `:count` option
each metric will have a `pool_index` going from 1 to `:count`.
The metrics struct depends on the pool scheme defined on the `:protocols` option
`Finch.HTTP1.PoolMetrics` for `:http1` and `Finch.HTTP2.PoolMetrics` for `:http2`.
See the `Finch.HTTP1.PoolMetrics` and `Finch.HTTP2.PoolMetrics` for more details.
`{:error, :not_found}` may return on 2 scenarios:
- There is no pool registered for the given pair finch instance and url
- The pool is configured with `start_pool_metrics?` option false (default)
## Example
iex> Finch.get_pool_status(MyFinch, "https://httpbin.org")
{:ok, [
%Finch.HTTP1.PoolMetrics{
pool_index: 1,
pool_size: 50,
available_connections: 43,
in_use_connections: 7
},
%Finch.HTTP1.PoolMetrics{
pool_index: 2,
pool_size: 50,
available_connections: 37,
in_use_connections: 13
}]
}
"""
@spec get_pool_status(name(), url :: String.t() | scheme_host_port()) ::
{:ok, list(Finch.HTTP1.PoolMetrics.t())}
| {:ok, list(Finch.HTTP2.PoolMetrics.t())}
| {:error, :not_found}
def get_pool_status(finch_name, url) when is_binary(url) do
{s, h, p, _, _} = Request.parse_url(url)
get_pool_status(finch_name, {s, h, p})
end

def get_pool_status(finch_name, shp) when is_tuple(shp) do
case PoolManager.get_pool(finch_name, shp, auto_start?: false) do
{_pool, pool_mod} ->
pool_mod.get_pool_status(finch_name, shp)

:not_found ->
{:error, :not_found}
end
end
end
69 changes: 58 additions & 11 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@ defmodule Finch.HTTP1.Pool do

alias Finch.HTTP1.Conn
alias Finch.Telemetry
alias Finch.HTTP1.PoolMetrics

def child_spec(opts) do
{_shp, _registry_name, _pool_size, _conn_opts, pool_max_idle_time} = opts
{
_shp,
_registry_name,
_pool_size,
_conn_opts,
pool_max_idle_time,
_start_pool_metrics?,
_pool_idx
} =
opts

%{
id: __MODULE__,
Expand All @@ -16,9 +26,13 @@ defmodule Finch.HTTP1.Pool do
}
end

def start_link({shp, registry_name, pool_size, conn_opts, pool_max_idle_time}) do
def start_link(
{shp, registry_name, pool_size, conn_opts, pool_max_idle_time, start_pool_metrics?,
pool_idx}
) do
NimblePool.start_link(
worker: {__MODULE__, {registry_name, shp, conn_opts}},
worker:
{__MODULE__, {registry_name, shp, pool_idx, pool_size, start_pool_metrics?, conn_opts}},
pool_size: pool_size,
lazy: true,
worker_idle_timeout: pool_idle_timeout(pool_max_idle_time)
Expand All @@ -32,7 +46,6 @@ defmodule Finch.HTTP1.Pool do
request_timeout = Keyword.get(opts, :request_timeout, :infinity)

metadata = %{request: req, pool: pool}

start_time = Telemetry.start(:queue, metadata)

try do
Expand Down Expand Up @@ -85,7 +98,13 @@ defmodule Finch.HTTP1.Pool do
monitor = Process.monitor(owner)
request_ref = {__MODULE__, self()}

case request(pool, req, {owner, monitor, request_ref}, &send_async_response/2, opts) do
case request(
pool,
req,
{owner, monitor, request_ref},
&send_async_response/2,
opts
) do
{:ok, _} -> send(owner, {request_ref, :done})
{:error, error} -> send(owner, {request_ref, {:error, error}})
end
Expand Down Expand Up @@ -118,35 +137,60 @@ defmodule Finch.HTTP1.Pool do
:ok
end

@impl Finch.Pool
def get_pool_status(finch_name, shp) do
case Finch.PoolManager.get_pool_count(finch_name, shp) do
nil ->
{:error, :not_found}

count ->
1..count
|> Enum.map(&PoolMetrics.get_pool_status(finch_name, shp, &1))
|> Enum.filter(&match?({:ok, _}, &1))
|> Enum.map(&elem(&1, 1))
|> case do
[] -> {:error, :not_found}
result -> {:ok, result}
end
end
end

@impl NimblePool
def init_pool({registry, shp, opts}) do
def init_pool({registry, shp, pool_idx, pool_size, start_pool_metrics?, opts}) do
{:ok, metric_ref} =
if start_pool_metrics?,
do: PoolMetrics.init(registry, shp, pool_idx, pool_size),
else: {:ok, nil}

# Register our pool with our module name as the key. This allows the caller
# to determine the correct pool module to use to make the request
{:ok, _} = Registry.register(registry, shp, __MODULE__)
{:ok, {shp, opts}}
{:ok, {registry, shp, pool_idx, metric_ref, opts}}
end

@impl NimblePool
def init_worker({{scheme, host, port}, opts} = pool_state) do
def init_worker({_name, {scheme, host, port}, _pool_idx, _metric_ref, opts} = pool_state) do
{:ok, Conn.new(scheme, host, port, opts, self()), pool_state}
end

@impl NimblePool
def handle_checkout(:checkout, _, %{mint: nil} = conn, pool_state) do
{_name, _shp, _pool_idx, metric_ref, _opts} = pool_state
idle_time = System.monotonic_time() - conn.last_checkin
PoolMetrics.maybe_add(metric_ref, in_use_connections: 1)
{:ok, {:fresh, conn, idle_time}, conn, pool_state}
end

def handle_checkout(:checkout, _from, conn, pool_state) do
idle_time = System.monotonic_time() - conn.last_checkin
{_name, {scheme, host, port}, _pool_idx, metric_ref, _opts} = pool_state

with true <- Conn.reusable?(conn, idle_time),
{:ok, conn} <- Conn.set_mode(conn, :passive) do
PoolMetrics.maybe_add(metric_ref, in_use_connections: 1)
{:ok, {:reuse, conn, idle_time}, conn, pool_state}
else
false ->
{{scheme, host, port}, _opts} = pool_state

meta = %{
scheme: scheme,
host: host,
Expand All @@ -167,6 +211,9 @@ defmodule Finch.HTTP1.Pool do

@impl NimblePool
def handle_checkin(checkin, _from, _old_conn, pool_state) do
{_name, _shp, _pool_idx, metric_ref, _opts} = pool_state
PoolMetrics.maybe_add(metric_ref, in_use_connections: -1)

with {:ok, conn} <- checkin,
{:ok, conn} <- Conn.set_mode(conn, :active) do
{:ok, %{conn | last_checkin: System.monotonic_time()}, pool_state}
Expand All @@ -192,7 +239,7 @@ defmodule Finch.HTTP1.Pool do

@impl NimblePool
def handle_ping(_conn, pool_state) do
{{scheme, host, port}, _opts} = pool_state
{_name, {scheme, host, port}, _pool_idx, _metric_ref, _opts} = pool_state

meta = %{
scheme: scheme,
Expand Down
81 changes: 81 additions & 0 deletions lib/finch/http1/pool_metrics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Finch.HTTP1.PoolMetrics do
@moduledoc """
HTTP1 Pool metrics.
Available metrics:
* `:pool_index` - Index of the pool
* `:pool_size` - Total number of connections of the pool
* `:available_connections` - Number of avaialable connections
* `:in_use_connections` - Number of connections currently in use
Caveats:
* A given number X of `available_connections` does not mean that currently
exists X connections to the server sitting on the pool. Because Finch uses
a lazy strategy for workers initialization, every pool starts with it's
size as available connections even if they are not started yet. In practice
this means that `available_connections` may be connections sitting on the pool
or available space on the pool for a new one if required.
"""
@type t :: %__MODULE__{}

defstruct [
:pool_index,
:pool_size,
:available_connections,
:in_use_connections
]

@atomic_idx [
pool_idx: 1,
pool_size: 2,
in_use_connections: 3
]

def init(registry, shp, pool_idx, pool_size) do
ref = :atomics.new(length(@atomic_idx), [])
:atomics.add(ref, @atomic_idx[:pool_idx], pool_idx)
:atomics.add(ref, @atomic_idx[:pool_size], pool_size)

:persistent_term.put({__MODULE__, registry, shp, pool_idx}, ref)
{:ok, ref}
end

def maybe_add(nil, _metrics_list), do: :ok

def maybe_add(ref, metrics_list) when is_reference(ref) do
Enum.each(metrics_list, fn {metric_name, val} ->
:atomics.add(ref, @atomic_idx[metric_name], val)
end)
end

def get_pool_status(name, shp, pool_idx) do
{__MODULE__, name, shp, pool_idx}
|> :persistent_term.get(nil)
|> get_pool_status()
end

def get_pool_status(ref) when is_reference(ref) do
%{
pool_idx: pool_idx,
pool_size: pool_size,
in_use_connections: in_use_connections
} =
@atomic_idx
|> Enum.map(fn {k, idx} -> {k, :atomics.get(ref, idx)} end)
|> Map.new()

result = %__MODULE__{
pool_index: pool_idx,
pool_size: pool_size,
available_connections: pool_size - in_use_connections,
in_use_connections: in_use_connections
}

{:ok, result}
end

def get_pool_status(nil), do: {:error, :not_found}
end
Loading

0 comments on commit 3946acb

Please sign in to comment.