diff --git a/docker-compose.yml b/docker-compose.yml index c6251d6f..748c2322 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -94,10 +94,3 @@ services: ports: - "8474:8474" - "19052:19052" - environment: - LOG_LEVEL: trace - healthcheck: - test: [ "CMD-SHELL", "echo healthy" ] - interval: 2s - timeout: 4s - retries: 5 diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index 8c349ace..90c43562 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -12,6 +12,10 @@ defmodule Xandra.Cluster.Pool do alias Xandra.Cluster.{ConnectionPool, Host, LoadBalancingPolicy} alias Xandra.GenStatemHelpers + require Record + + Record.defrecordp(:checkout_queue, [:max_size, :queue]) + ## Public API @spec start_link(keyword(), keyword()) :: :gen_statem.start_ret() @@ -107,7 +111,7 @@ defmodule Xandra.Cluster.Pool do # A queue of requests that were received by this process *before* connecting # to *any* node. We "buffer" these for a while until we establish a connection. - reqs_before_connecting: %{ + checkout_queue: %{ queue: :queue.new(), max_size: nil }, @@ -142,8 +146,9 @@ defmodule Xandra.Cluster.Pool do {mod, opts} -> {mod, opts} end - queue_before_connecting_opts = Keyword.fetch!(cluster_opts, :queue_before_connecting) - queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout) + checkout_queue_opts = Keyword.fetch!(cluster_opts, :queue_checkouts_before_connecting) + checkout_queue_timeout = Keyword.fetch!(checkout_queue_opts, :timeout) + checkout_queue_max_size = Keyword.fetch!(checkout_queue_opts, :max_size) data = %__MODULE__{ connection_options: pool_opts, @@ -158,16 +163,13 @@ defmodule Xandra.Cluster.Pool do pool_size: Keyword.fetch!(cluster_opts, :pool_size), pool_supervisor: pool_sup, refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval), - reqs_before_connecting: %{ - queue: :queue.new(), - max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) - }, + checkout_queue: checkout_queue(queue: :queue.new(), max_size: checkout_queue_max_size), sync_connect_alias: sync_connect_alias_or_nil } actions = [ {:next_event, :internal, :start_control_connection}, - {{:timeout, :flush_queue_before_connecting}, queue_before_connecting_timeout, nil} + timeout_action(:flush_checkout_queue, checkout_queue_timeout) ] {:ok, :never_connected, data, actions} @@ -178,67 +180,53 @@ defmodule Xandra.Cluster.Pool do def handle_event(:internal, :start_control_connection, _state, data) do case start_control_connection(data) do - {:ok, data} -> - {:keep_state, data} - - :error -> - {:keep_state, data, {{:timeout, :reconnect_control_connection}, 1000, nil}} + {:ok, data} -> {:keep_state, data} + :error -> {:keep_state_and_data, timeout_action(:reconnect_control_connection, 1000)} end end - def handle_event( - :internal, - :flush_queue_before_connecting, - _state = :has_connected_once, - %__MODULE__{reqs_before_connecting: %{queue: queue}} = data - ) do + def handle_event(:internal, :flush_checkout_queue, :has_connected_once, %__MODULE__{} = data) do + checkout_queue(queue: queue) = data.checkout_queue + {reply_actions, data} = Enum.map_reduce(:queue.to_list(queue), data, fn from, data -> {data, reply_action} = checkout_connection(data, from) {reply_action, data} end) - {:keep_state, data, - reply_actions ++ [timeout_action(:flush_queue_before_connecting, :infinity)]} + cancel_timeout_action = timeout_action(:flush_checkout_queue, :infinity) + {:keep_state, data, [cancel_timeout_action] ++ reply_actions} end - def handle_event( - {:timeout, :flush_queue_before_connecting}, - nil, - _state = :never_connected, - %__MODULE__{} = data - ) do - actions = - for from <- :queue.to_list(data.reqs_before_connecting.queue), - do: {:reply, from, {:error, :empty}} - - data = put_in(data.reqs_before_connecting, nil) - - {:keep_state, data, actions} + def handle_event({:timeout, :flush_checkout_queue}, nil, :never_connected, %__MODULE__{} = data) do + {checkout_queue(queue: queue), data} = get_and_update_in(data.checkout_queue, &{&1, nil}) + reply_actions = for from <- :queue.to_list(queue), do: {:reply, from, {:error, :empty}} + {:keep_state, data, reply_actions} end - # We already flushed once, so we won't keep adding requests to the queue. - def handle_event( - {:call, from}, - :checkout, - _state = :never_connected, - %__MODULE__{reqs_before_connecting: nil} - ) do + # We have never connected, but we already flushed once, so we won't keep adding requests to + # the queue. + def handle_event({:call, from}, :checkout, :never_connected, %__MODULE__{checkout_queue: nil}) do {:keep_state_and_data, {:reply, from, {:error, :empty}}} end - def handle_event({:call, from}, :checkout, _state = :never_connected, %__MODULE__{} = data) do - %{queue: queue, max_size: max_size} = data.reqs_before_connecting + def handle_event({:call, from}, :checkout, :never_connected, %__MODULE__{} = data) do + checkout_queue(queue: queue, max_size: max_size) = data.checkout_queue if :queue.len(queue) == max_size do {:keep_state_and_data, {:reply, from, {:error, :empty}}} else - data = update_in(data.reqs_before_connecting.queue, &:queue.in(from, &1)) + data = + put_in( + data.checkout_queue, + checkout_queue(data.checkout_queue, queue: :queue.in(from, queue)) + ) + {:keep_state, data} end end - def handle_event({:call, from}, :checkout, _state = :has_connected_once, %__MODULE__{} = data) do + def handle_event({:call, from}, :checkout, :has_connected_once, %__MODULE__{} = data) do {data, reply_action} = checkout_connection(data, from) {:keep_state, data, reply_action} end @@ -327,7 +315,7 @@ defmodule Xandra.Cluster.Pool do send(alias, {alias, :connected}) end - actions = [{:next_event, :internal, :flush_queue_before_connecting}] + actions = [{:next_event, :internal, :flush_checkout_queue}] {:next_state, :has_connected_once, data, actions} end