Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pool telemetry #248

Merged
merged 15 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ defmodule Finch do
before being closed during a checkout attempt.
""",
default: :infinity
],
start_pool_metrics?: [
type: :boolean,
doc: "When true, pool metrics will be collected and avaiable through Finch.pool_status/2",
default: false
]
]

Expand All @@ -95,6 +100,10 @@ defmodule Finch do
"""
@type name() :: atom()

@type scheme() :: :http | :https

@type scheme_host_port() :: {scheme(), host :: String.t(), port :: :inet.port_number()}

@type request_opt() :: {:pool_timeout, pos_integer()} | {:receive_timeout, pos_integer()}

@typedoc """
Expand Down Expand Up @@ -271,7 +280,8 @@ defmodule Finch do
count: valid[:count],
conn_opts: conn_opts,
conn_max_idle_time: to_native(valid[:max_idle_time] || valid[:conn_max_idle_time]),
pool_max_idle_time: valid[:pool_max_idle_time]
pool_max_idle_time: valid[:pool_max_idle_time],
start_pool_metrics?: valid[:start_pool_metrics?]
}
end

Expand Down Expand Up @@ -570,4 +580,56 @@ defmodule Finch do
defp get_pool(%Request{scheme: scheme, host: host, port: port}, name) do
PoolManager.get_pool(name, {scheme, host, port})
end

@doc """
Get pool metrics list.

The number of items present on the metrics list depends on the `:count` option
each metric will have a `pool_index` going from 1 to `:count`.

The metrics struct depends on the pool scheme defined on the `:protocols` option
`Finch.HTTP1.PoolMetrics` for `:http1` and `Finch.HTTP2.PoolMetrics` for `:http2`.

See the `Finch.HTTP1.PoolMetrics` and `Finch.HTTP2.PoolMetrics` for more details.

`{:error, :not_found}` may return on 2 scenarios:
- There is no pool registered for the given pair finch instance and url
- The pool is configured with `start_pool_metrics?` option false (default)

## Example

iex> Finch.get_pool_status(MyFinch, "https://httpbin.org")
{:ok, [
%Finch.HTTP1.PoolMetrics{
pool_index: 1,
pool_size: 50,
available_connections: 43,
in_use_connections: 7
},
%Finch.HTTP1.PoolMetrics{
pool_index: 2,
pool_size: 50,
available_connections: 37,
in_use_connections: 13
}]
}
"""
@spec get_pool_status(name(), url :: String.t() | scheme_host_port()) ::
{:ok, list(Finch.HTTP1.PoolMetrics.t())}
| {:ok, list(Finch.HTTP2.PoolMetrics.t())}
| {:error, :not_found}
def get_pool_status(finch_name, url) when is_binary(url) do
{s, h, p, _, _} = Request.parse_url(url)
get_pool_status(finch_name, {s, h, p})
end

def get_pool_status(finch_name, shp) when is_tuple(shp) do
case PoolManager.get_pool(finch_name, shp, auto_start?: false) do
{_pool, pool_mod} ->
pool_mod.get_pool_status(finch_name, shp)

:not_found ->
{:error, :not_found}
end
end
end
69 changes: 58 additions & 11 deletions lib/finch/http1/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@ defmodule Finch.HTTP1.Pool do

alias Finch.HTTP1.Conn
alias Finch.Telemetry
alias Finch.HTTP1.PoolMetrics

def child_spec(opts) do
{_shp, _registry_name, _pool_size, _conn_opts, pool_max_idle_time} = opts
{
_shp,
_registry_name,
_pool_size,
_conn_opts,
pool_max_idle_time,
_start_pool_metrics?,
_pool_idx
} =
opts

%{
id: __MODULE__,
Expand All @@ -16,9 +26,13 @@ defmodule Finch.HTTP1.Pool do
}
end

def start_link({shp, registry_name, pool_size, conn_opts, pool_max_idle_time}) do
def start_link(
{shp, registry_name, pool_size, conn_opts, pool_max_idle_time, start_pool_metrics?,
pool_idx}
) do
NimblePool.start_link(
worker: {__MODULE__, {registry_name, shp, conn_opts}},
worker:
{__MODULE__, {registry_name, shp, pool_idx, pool_size, start_pool_metrics?, conn_opts}},
pool_size: pool_size,
lazy: true,
worker_idle_timeout: pool_idle_timeout(pool_max_idle_time)
Expand All @@ -32,7 +46,6 @@ defmodule Finch.HTTP1.Pool do
request_timeout = Keyword.get(opts, :request_timeout, :infinity)

metadata = %{request: req, pool: pool}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're already passing the Finch name in, could we add it to the metadata so it's available in telemetry? It would be really useful in monitoring wait time per Finch instance?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly @aselder I was thinking these changes enable #199 . I think that should be implemented in a new PR once this is complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea indeed. But just to be clear, I've only added the finch_name on the Finch.Pool request callback because of averages and maxes metrics.

If we gonna move on only with available and in use connections this is not necessary.

Should I keep it here unused anyway in order to enable this change? What you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the modifications on #252 I think would be better to remove these changes from this PR in order to avoid conflicts.


start_time = Telemetry.start(:queue, metadata)

try do
Expand Down Expand Up @@ -85,7 +98,13 @@ defmodule Finch.HTTP1.Pool do
monitor = Process.monitor(owner)
request_ref = {__MODULE__, self()}

case request(pool, req, {owner, monitor, request_ref}, &send_async_response/2, opts) do
case request(
pool,
req,
{owner, monitor, request_ref},
&send_async_response/2,
opts
) do
{:ok, _} -> send(owner, {request_ref, :done})
{:error, error} -> send(owner, {request_ref, {:error, error}})
end
Expand Down Expand Up @@ -118,35 +137,60 @@ defmodule Finch.HTTP1.Pool do
:ok
end

@impl Finch.Pool
def get_pool_status(finch_name, shp) do
case Finch.PoolManager.get_pool_count(finch_name, shp) do
nil ->
{:error, :not_found}

count ->
1..count
|> Enum.map(&PoolMetrics.get_pool_status(finch_name, shp, &1))
|> Enum.filter(&match?({:ok, _}, &1))
|> Enum.map(&elem(&1, 1))
|> case do
[] -> {:error, :not_found}
result -> {:ok, result}
end
end
end

@impl NimblePool
def init_pool({registry, shp, opts}) do
def init_pool({registry, shp, pool_idx, pool_size, start_pool_metrics?, opts}) do
{:ok, metric_ref} =
if start_pool_metrics?,
do: PoolMetrics.init(registry, shp, pool_idx, pool_size),
else: {:ok, nil}

# Register our pool with our module name as the key. This allows the caller
# to determine the correct pool module to use to make the request
{:ok, _} = Registry.register(registry, shp, __MODULE__)
{:ok, {shp, opts}}
{:ok, {registry, shp, pool_idx, metric_ref, opts}}
end

@impl NimblePool
def init_worker({{scheme, host, port}, opts} = pool_state) do
def init_worker({_name, {scheme, host, port}, _pool_idx, _metric_ref, opts} = pool_state) do
{:ok, Conn.new(scheme, host, port, opts, self()), pool_state}
end

@impl NimblePool
def handle_checkout(:checkout, _, %{mint: nil} = conn, pool_state) do
{_name, _shp, _pool_idx, metric_ref, _opts} = pool_state
idle_time = System.monotonic_time() - conn.last_checkin
PoolMetrics.maybe_add(metric_ref, in_use_connections: 1)
{:ok, {:fresh, conn, idle_time}, conn, pool_state}
end

def handle_checkout(:checkout, _from, conn, pool_state) do
idle_time = System.monotonic_time() - conn.last_checkin
{_name, {scheme, host, port}, _pool_idx, metric_ref, _opts} = pool_state

with true <- Conn.reusable?(conn, idle_time),
{:ok, conn} <- Conn.set_mode(conn, :passive) do
PoolMetrics.maybe_add(metric_ref, in_use_connections: 1)
{:ok, {:reuse, conn, idle_time}, conn, pool_state}
else
false ->
{{scheme, host, port}, _opts} = pool_state

meta = %{
scheme: scheme,
host: host,
Expand All @@ -167,6 +211,9 @@ defmodule Finch.HTTP1.Pool do

@impl NimblePool
def handle_checkin(checkin, _from, _old_conn, pool_state) do
{_name, _shp, _pool_idx, metric_ref, _opts} = pool_state
PoolMetrics.maybe_add(metric_ref, in_use_connections: -1)

with {:ok, conn} <- checkin,
{:ok, conn} <- Conn.set_mode(conn, :active) do
{:ok, %{conn | last_checkin: System.monotonic_time()}, pool_state}
Expand All @@ -192,7 +239,7 @@ defmodule Finch.HTTP1.Pool do

@impl NimblePool
def handle_ping(_conn, pool_state) do
{{scheme, host, port}, _opts} = pool_state
{_name, {scheme, host, port}, _pool_idx, _metric_ref, _opts} = pool_state

meta = %{
scheme: scheme,
Expand Down
81 changes: 81 additions & 0 deletions lib/finch/http1/pool_metrics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Finch.HTTP1.PoolMetrics do
@moduledoc """
HTTP1 Pool metrics.

Available metrics:

* `:pool_index` - Index of the pool
* `:pool_size` - Total number of connections of the pool
* `:available_connections` - Number of avaialable connections
* `:in_use_connections` - Number of connections currently in use

Caveats:

* A given number X of `available_connections` does not mean that currently
exists X connections to the server sitting on the pool. Because Finch uses
a lazy strategy for workers initialization, every pool starts with it's
size as available connections even if they are not started yet. In practice
this means that `available_connections` may be connections sitting on the pool
or available space on the pool for a new one if required.

"""
@type t :: %__MODULE__{}

defstruct [
:pool_index,
:pool_size,
:available_connections,
oliveigah marked this conversation as resolved.
Show resolved Hide resolved
:in_use_connections
]

@atomic_idx [
pool_idx: 1,
pool_size: 2,
in_use_connections: 3
]

def init(registry, shp, pool_idx, pool_size) do
ref = :atomics.new(length(@atomic_idx), [])
:atomics.add(ref, @atomic_idx[:pool_idx], pool_idx)
:atomics.add(ref, @atomic_idx[:pool_size], pool_size)

:persistent_term.put({__MODULE__, registry, shp, pool_idx}, ref)
{:ok, ref}
end

def maybe_add(nil, _metrics_list), do: :ok

def maybe_add(ref, metrics_list) when is_reference(ref) do
Enum.each(metrics_list, fn {metric_name, val} ->
:atomics.add(ref, @atomic_idx[metric_name], val)
end)
end

def get_pool_status(name, shp, pool_idx) do
{__MODULE__, name, shp, pool_idx}
|> :persistent_term.get(nil)
|> get_pool_status()
end

def get_pool_status(ref) when is_reference(ref) do
%{
pool_idx: pool_idx,
pool_size: pool_size,
in_use_connections: in_use_connections
} =
@atomic_idx
|> Enum.map(fn {k, idx} -> {k, :atomics.get(ref, idx)} end)
|> Map.new()

result = %__MODULE__{
pool_index: pool_idx,
pool_size: pool_size,
available_connections: pool_size - in_use_connections,
in_use_connections: in_use_connections
}

{:ok, result}
end

def get_pool_status(nil), do: {:error, :not_found}
end
Loading
Loading