Skip to content

Commit

Permalink
FIXUP
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Oct 12, 2023
1 parent a1c4d8b commit 3abfba3
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 53 deletions.
7 changes: 0 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,3 @@ services:
ports:
- "8474:8474"
- "19052:19052"
environment:
LOG_LEVEL: trace
healthcheck:
test: [ "CMD-SHELL", "echo healthy" ]
interval: 2s
timeout: 4s
retries: 5
80 changes: 34 additions & 46 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ defmodule Xandra.Cluster.Pool do
alias Xandra.Cluster.{ConnectionPool, Host, LoadBalancingPolicy}
alias Xandra.GenStatemHelpers

require Record

Record.defrecordp(:checkout_queue, [:max_size, :queue])

## Public API

@spec start_link(keyword(), keyword()) :: :gen_statem.start_ret()
Expand Down Expand Up @@ -107,7 +111,7 @@ defmodule Xandra.Cluster.Pool do

# A queue of requests that were received by this process *before* connecting
# to *any* node. We "buffer" these for a while until we establish a connection.
reqs_before_connecting: %{
checkout_queue: %{
queue: :queue.new(),
max_size: nil
},
Expand Down Expand Up @@ -142,8 +146,9 @@ defmodule Xandra.Cluster.Pool do
{mod, opts} -> {mod, opts}
end

queue_before_connecting_opts = Keyword.fetch!(cluster_opts, :queue_before_connecting)
queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout)
checkout_queue_opts = Keyword.fetch!(cluster_opts, :queue_checkouts_before_connecting)
checkout_queue_timeout = Keyword.fetch!(checkout_queue_opts, :timeout)
checkout_queue_max_size = Keyword.fetch!(checkout_queue_opts, :max_size)

data = %__MODULE__{
connection_options: pool_opts,
Expand All @@ -158,16 +163,13 @@ defmodule Xandra.Cluster.Pool do
pool_size: Keyword.fetch!(cluster_opts, :pool_size),
pool_supervisor: pool_sup,
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval),
reqs_before_connecting: %{
queue: :queue.new(),
max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size)
},
checkout_queue: checkout_queue(queue: :queue.new(), max_size: checkout_queue_max_size),
sync_connect_alias: sync_connect_alias_or_nil
}

actions = [
{:next_event, :internal, :start_control_connection},
{{:timeout, :flush_queue_before_connecting}, queue_before_connecting_timeout, nil}
timeout_action(:flush_checkout_queue, checkout_queue_timeout)
]

{:ok, :never_connected, data, actions}
Expand All @@ -178,67 +180,53 @@ defmodule Xandra.Cluster.Pool do

def handle_event(:internal, :start_control_connection, _state, data) do
case start_control_connection(data) do
{:ok, data} ->
{:keep_state, data}

:error ->
{:keep_state, data, {{:timeout, :reconnect_control_connection}, 1000, nil}}
{:ok, data} -> {:keep_state, data}
:error -> {:keep_state_and_data, timeout_action(:reconnect_control_connection, 1000)}
end
end

def handle_event(
:internal,
:flush_queue_before_connecting,
_state = :has_connected_once,
%__MODULE__{reqs_before_connecting: %{queue: queue}} = data
) do
def handle_event(:internal, :flush_checkout_queue, :has_connected_once, %__MODULE__{} = data) do
checkout_queue(queue: queue) = data.checkout_queue

{reply_actions, data} =
Enum.map_reduce(:queue.to_list(queue), data, fn from, data ->
{data, reply_action} = checkout_connection(data, from)
{reply_action, data}
end)

{:keep_state, data,
reply_actions ++ [timeout_action(:flush_queue_before_connecting, :infinity)]}
cancel_timeout_action = timeout_action(:flush_checkout_queue, :infinity)
{:keep_state, data, [cancel_timeout_action] ++ reply_actions}
end

def handle_event(
{:timeout, :flush_queue_before_connecting},
nil,
_state = :never_connected,
%__MODULE__{} = data
) do
actions =
for from <- :queue.to_list(data.reqs_before_connecting.queue),
do: {:reply, from, {:error, :empty}}

data = put_in(data.reqs_before_connecting, nil)

{:keep_state, data, actions}
def handle_event({:timeout, :flush_checkout_queue}, nil, :never_connected, %__MODULE__{} = data) do
{checkout_queue(queue: queue), data} = get_and_update_in(data.checkout_queue, &{&1, nil})
reply_actions = for from <- :queue.to_list(queue), do: {:reply, from, {:error, :empty}}
{:keep_state, data, reply_actions}
end

# We already flushed once, so we won't keep adding requests to the queue.
def handle_event(
{:call, from},
:checkout,
_state = :never_connected,
%__MODULE__{reqs_before_connecting: nil}
) do
# We have never connected, but we already flushed once, so we won't keep adding requests to
# the queue.
def handle_event({:call, from}, :checkout, :never_connected, %__MODULE__{checkout_queue: nil}) do
{:keep_state_and_data, {:reply, from, {:error, :empty}}}
end

def handle_event({:call, from}, :checkout, _state = :never_connected, %__MODULE__{} = data) do
%{queue: queue, max_size: max_size} = data.reqs_before_connecting
def handle_event({:call, from}, :checkout, :never_connected, %__MODULE__{} = data) do
checkout_queue(queue: queue, max_size: max_size) = data.checkout_queue

if :queue.len(queue) == max_size do
{:keep_state_and_data, {:reply, from, {:error, :empty}}}
else
data = update_in(data.reqs_before_connecting.queue, &:queue.in(from, &1))
data =
put_in(
data.checkout_queue,
checkout_queue(data.checkout_queue, queue: :queue.in(from, queue))
)

{:keep_state, data}
end
end

def handle_event({:call, from}, :checkout, _state = :has_connected_once, %__MODULE__{} = data) do
def handle_event({:call, from}, :checkout, :has_connected_once, %__MODULE__{} = data) do
{data, reply_action} = checkout_connection(data, from)
{:keep_state, data, reply_action}
end
Expand Down Expand Up @@ -327,7 +315,7 @@ defmodule Xandra.Cluster.Pool do
send(alias, {alias, :connected})
end

actions = [{:next_event, :internal, :flush_queue_before_connecting}]
actions = [{:next_event, :internal, :flush_checkout_queue}]
{:next_state, :has_connected_once, data, actions}
end

Expand Down

0 comments on commit 3abfba3

Please sign in to comment.