Skip to content

Commit

Permalink
Introduce Toxiproxy (#340)
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide authored Oct 12, 2023
1 parent fc57b95 commit d571329
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 67 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ This change is breaking, and affect Xandra pretty significantly. The user-facing

* Make retry strategies **cluster aware**, by adding the `{:retry, new_options, new_state, conn_pid}` return value to the `retry/3` callback. See the updated documentation for `Xandra.RetryStrategy`.
* Support `GenServer.start_link/3` options in `Xandra.Cluster.start_link/1` (like `:spawn_opt` and friends).
* Add the `:queue_before_connecting` option to `Xandra.Cluster.start_link/1` to queue requests in the cluster until at least one connection to one node is established.
* Add the `:queue_checkouts_before_connecting` option to `Xandra.Cluster.start_link/1` to queue checkout requests in the cluster until at least one connection to one node is established.
* Fix a few bugs with rogue data in the native protocol parser.
* Fix a small bug when negotiating the native protocol version.
* Fix IPv6 support in `Xandra.Cluster`.
Expand Down
6 changes: 6 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@ services:
driver: "json-file"
options:
max-size: 50m

toxiproxy:
image: ghcr.io/shopify/toxiproxy
ports:
- "8474:8474"
- "19052:19052"
33 changes: 18 additions & 15 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -201,37 +201,40 @@ defmodule Xandra.Cluster do
the cluster could connect once and then drop connections right away, so this doesn't
mean that the cluster is connected, but rather that it *connected at least once*.
This is useful, for example, in test suites where you're not worried about
resiliency but rather race conditions. In most cases, the `:queue_before_connecting`
option is what you want.
resiliency but rather race conditions. In most cases, the
`:queue_checkouts_before_connecting` option is what you want.
"""
],
queue_before_connecting: [
queue_checkouts_before_connecting: [
type: :keyword_list,
doc: """
Controls how to handle requests that go through the cluster *before* the cluster
is able to establish a connection to **any node**. This is useful because if you
try to use the Xandra cluster right away after starting it, you'll likely get errors
since the cluster needs to establish at least a connection to at least one node first.
The strategy that `Xandra.Cluster` uses is to queue requests until a connection
is established, and at that point flush those requests. If you don't want this behaviour,
you can set `:buffer_size` to `0`. *Available since v0.18.0*. This option supports these
keys:
Controls how to handle checkouts that go through the cluster *before* the cluster
is able to establish a connection to **any node**. Whenever you run a cluster function,
the cluster checks out a connection from one of the connected nodes and executes the
request on that connection. However, if you try to run any cluster function before the
cluster connects to any of the nodes, you'll likely get `Xandra.ConnectionError`s
with reason `{:cluster, :not_connected}`. This is because the cluster needs to establish
at least one connection to one node before it can execute requests. This option addresses
this issue by queueing "checkout requests" until the cluster establishes a connection
to a node. Once the connection is established, the cluster starts to hand over
connections. If you want to **disable this behavior**, set `:max_size` to `0`. *Available
since v0.18.0*. This option supports the following sub-options:
""",
keys: [
buffer_size: [
max_size: [
type: :non_neg_integer,
default: 100,
doc: """
The number of requests to queue in the cluster and flush as soon as a connection
The number of checkouts to queue in the cluster and flush as soon as a connection
is established.
"""
],
timeout: [
type: :timeout,
default: 5_000,
doc: """
How long to hold on to requests for. When this timeout expires, all requests
are dropped and an error is returned to the caller.
How long to hold on to checkout requests for. When this timeout expires, all requests
are dropped and a connection error is returned to each caller.
"""
]
],
Expand Down
80 changes: 34 additions & 46 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ defmodule Xandra.Cluster.Pool do
alias Xandra.Cluster.{ConnectionPool, Host, LoadBalancingPolicy}
alias Xandra.GenStatemHelpers

require Record

Record.defrecordp(:checkout_queue, [:max_size, :queue])

## Public API

@spec start_link(keyword(), keyword()) :: :gen_statem.start_ret()
Expand Down Expand Up @@ -107,7 +111,7 @@ defmodule Xandra.Cluster.Pool do

# A queue of requests that were received by this process *before* connecting
# to *any* node. We "buffer" these for a while until we establish a connection.
reqs_before_connecting: %{
checkout_queue: %{
queue: :queue.new(),
max_size: nil
},
Expand Down Expand Up @@ -142,8 +146,9 @@ defmodule Xandra.Cluster.Pool do
{mod, opts} -> {mod, opts}
end

queue_before_connecting_opts = Keyword.fetch!(cluster_opts, :queue_before_connecting)
queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout)
checkout_queue_opts = Keyword.fetch!(cluster_opts, :queue_checkouts_before_connecting)
checkout_queue_timeout = Keyword.fetch!(checkout_queue_opts, :timeout)
checkout_queue_max_size = Keyword.fetch!(checkout_queue_opts, :max_size)

data = %__MODULE__{
connection_options: pool_opts,
Expand All @@ -158,16 +163,13 @@ defmodule Xandra.Cluster.Pool do
pool_size: Keyword.fetch!(cluster_opts, :pool_size),
pool_supervisor: pool_sup,
refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval),
reqs_before_connecting: %{
queue: :queue.new(),
max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size)
},
checkout_queue: checkout_queue(queue: :queue.new(), max_size: checkout_queue_max_size),
sync_connect_alias: sync_connect_alias_or_nil
}

actions = [
{:next_event, :internal, :start_control_connection},
{{:timeout, :flush_queue_before_connecting}, queue_before_connecting_timeout, nil}
timeout_action(:flush_checkout_queue, checkout_queue_timeout)
]

{:ok, :never_connected, data, actions}
Expand All @@ -178,67 +180,53 @@ defmodule Xandra.Cluster.Pool do

def handle_event(:internal, :start_control_connection, _state, data) do
case start_control_connection(data) do
{:ok, data} ->
{:keep_state, data}

:error ->
{:keep_state, data, {{:timeout, :reconnect_control_connection}, 1000, nil}}
{:ok, data} -> {:keep_state, data}
:error -> {:keep_state_and_data, timeout_action(:reconnect_control_connection, 1000)}
end
end

def handle_event(
:internal,
:flush_queue_before_connecting,
_state = :has_connected_once,
%__MODULE__{reqs_before_connecting: %{queue: queue}} = data
) do
def handle_event(:internal, :flush_checkout_queue, :has_connected_once, %__MODULE__{} = data) do
checkout_queue(queue: queue) = data.checkout_queue

{reply_actions, data} =
Enum.map_reduce(:queue.to_list(queue), data, fn from, data ->
{data, reply_action} = checkout_connection(data, from)
{reply_action, data}
end)

{:keep_state, data,
reply_actions ++ [timeout_action(:flush_queue_before_connecting, :infinity)]}
cancel_timeout_action = timeout_action(:flush_checkout_queue, :infinity)
{:keep_state, data, [cancel_timeout_action] ++ reply_actions}
end

def handle_event(
{:timeout, :flush_queue_before_connecting},
nil,
_state = :never_connected,
%__MODULE__{} = data
) do
actions =
for from <- :queue.to_list(data.reqs_before_connecting.queue),
do: {:reply, from, {:error, :empty}}

data = put_in(data.reqs_before_connecting, nil)

{:keep_state, data, actions}
def handle_event({:timeout, :flush_checkout_queue}, nil, :never_connected, %__MODULE__{} = data) do
{checkout_queue(queue: queue), data} = get_and_update_in(data.checkout_queue, &{&1, nil})
reply_actions = for from <- :queue.to_list(queue), do: {:reply, from, {:error, :empty}}
{:keep_state, data, reply_actions}
end

# We already flushed once, so we won't keep adding requests to the queue.
def handle_event(
{:call, from},
:checkout,
_state = :never_connected,
%__MODULE__{reqs_before_connecting: nil}
) do
# We have never connected, but we already flushed once, so we won't keep adding requests to
# the queue.
def handle_event({:call, from}, :checkout, :never_connected, %__MODULE__{checkout_queue: nil}) do
{:keep_state_and_data, {:reply, from, {:error, :empty}}}
end

def handle_event({:call, from}, :checkout, _state = :never_connected, %__MODULE__{} = data) do
%{queue: queue, max_size: max_size} = data.reqs_before_connecting
def handle_event({:call, from}, :checkout, :never_connected, %__MODULE__{} = data) do
checkout_queue(queue: queue, max_size: max_size) = data.checkout_queue

if :queue.len(queue) == max_size do
{:keep_state_and_data, {:reply, from, {:error, :empty}}}
else
data = update_in(data.reqs_before_connecting.queue, &:queue.in(from, &1))
data =
put_in(
data.checkout_queue,
checkout_queue(data.checkout_queue, queue: :queue.in(from, queue))
)

{:keep_state, data}
end
end

def handle_event({:call, from}, :checkout, _state = :has_connected_once, %__MODULE__{} = data) do
def handle_event({:call, from}, :checkout, :has_connected_once, %__MODULE__{} = data) do
{data, reply_action} = checkout_connection(data, from)
{:keep_state, data, reply_action}
end
Expand Down Expand Up @@ -327,7 +315,7 @@ defmodule Xandra.Cluster.Pool do
send(alias, {alias, :connected})
end

actions = [{:next_event, :internal, :flush_queue_before_connecting}]
actions = [{:next_event, :internal, :flush_checkout_queue}]
{:next_state, :has_connected_once, data, actions}
end

Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ defmodule Xandra.Mixfile do
{:excoveralls, "~> 0.17", only: :test},
{:mox, "~> 1.0", only: :test},
{:stream_data, "~> 0.6.0", only: [:dev, :test]},
{:nimble_lz4, "~> 0.1.3", only: [:dev, :test]}
{:nimble_lz4, "~> 0.1.3", only: [:dev, :test]},
{:toxiproxy_ex, "~> 1.1", only: :test}
]
end
end
5 changes: 5 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"},
"excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"},
"mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"},
"nimble_lz4": {:hex, :nimble_lz4, "0.1.4", "22b9fa4163e8057a10e6a2238285c1ed8137ea2e2659b8166d7354c0f2957312", [:mix], [{:rustler, "~> 0.29.0", [hex: :rustler, repo: "hexpm", optional: false]}, {:rustler_precompiled, "~> 0.6.2", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}], "hexpm", "1ae42465181aca22924972682fa52e10e46ecc6541d9df59af6ff3ada00fc592"},
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
Expand All @@ -18,5 +21,7 @@
"rustler_precompiled": {:hex, :rustler_precompiled, "0.6.3", "f838d94bc35e1844973ee7266127b156fdc962e9e8b7ff666c8fb4fed7964d23", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "e18ecca3669a7454b3a2be75ae6c3ef01d550bc9a8cf5fbddcfff843b881d7c6"},
"stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"tesla": {:hex, :tesla, "1.7.0", "a62dda2f80d4f8a925eb7b8c5b78c461e0eb996672719fe1a63b26321a5f8b4e", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, "~> 1.3", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "2e64f01ebfdb026209b47bc651a0e65203fcff4ae79c11efb73c4852b00dc313"},
"toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
"toxiproxy_ex": {:hex, :toxiproxy_ex, "1.1.1", "af605b9f54a4508e2c8987764301609f457076cafc472ba83beaec93e2796e99", [:mix], [{:castore, "~> 1.0.3", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7.0", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "826c415c6e8ec1708894a86091afd0674f0a4115ed09ac6604088ef916226fe7"},
}
4 changes: 4 additions & 0 deletions test/docker/health-check-services.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ MAX_SECONDS=120
END_SECONDS=$((SECONDS+MAX_SECONDS))

for name in $(docker ps --format '{{.Names}}'); do
if [[ "$name" == *"toxiproxy"* ]]; then
continue
fi

HEALTHY=false

while [[ "$SECONDS" -lt "$END_SECONDS" ]]; do
Expand Down
8 changes: 8 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ if System.get_env("XANDRA_DEBUG") do
Xandra.Telemetry.attach_debug_handler()
end

ToxiproxyEx.populate!([
%{
name: "xandra_test_cassandra",
listen: "0.0.0.0:19052",
upstream: "cassandra:9042"
}
])

Logger.configure(level: String.to_existing_atom(System.get_env("LOG_LEVEL", "info")))

excluded =
Expand Down
2 changes: 1 addition & 1 deletion test/xandra/cluster/control_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
socket = :sys.get_state(ctrl_conn).transport.socket

:ok = :gen_tcp.shutdown(socket, :read_write)
GenServer.cast(ctrl_conn, {:refresh_topology, []})
send(ctrl_conn, :refresh_topology)

assert_receive {:DOWN, ^ref, _, _, _}
end
Expand Down
Loading

0 comments on commit d571329

Please sign in to comment.