From 347e04219a85b331cda3535229cde13c2b36abc1 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 5 Jan 2018 13:41:25 +0000 Subject: [PATCH 1/6] Use PostgreSQL advisory locks to enforce only one subscription instance PostgreSQL's `pg_try_advisory_lock` function is used to acquire a session lock on the subscription id to guarantee that only once instance can run. Each subscription now has its own connection. When the subscription process goes down, the connection will terminate and its lock will be released. --- config/dev.exs | 2 + config/test.exs | 3 +- lib/event_store/sql/statements.ex | 6 + lib/event_store/storage.ex | 17 ++- lib/event_store/storage/subscription.ex | 98 +++++++++------ .../subscriptions/stream_subscription.ex | 115 ++++++++++-------- lib/event_store/subscriptions/subscription.ex | 62 +++++++--- .../subscriptions/subscription_state.ex | 2 + lib/event_store/subscriptions/supervisor.ex | 8 +- lib/event_store/supervisor.ex | 36 ++++-- .../storage/subscription_persistence_test.exs | 35 +++++- .../all_streams_subscription_test.exs | 94 +++++++------- .../single_stream_subscription_test.exs | 95 ++++++++------- .../subscribe_to_stream_test.exs | 3 +- test/support/storage_case.ex | 13 +- 15 files changed, 372 insertions(+), 217 deletions(-) diff --git a/config/dev.exs b/config/dev.exs index dfbf74ce..089a8377 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -3,6 +3,8 @@ use Mix.Config # Do not include metadata nor timestamps in development logs config :logger, :console, format: "[$level] $message\n" +config :mix_test_watch, clear: true + config :eventstore, EventStore.Storage, serializer: EventStore.TermSerializer, username: "postgres", diff --git a/config/test.exs b/config/test.exs index ead4f337..c8adf5f4 100644 --- a/config/test.exs +++ b/config/test.exs @@ -17,4 +17,5 @@ config :eventstore, EventStore.Storage, pool_overflow: 0 config :eventstore, - registry: :local + registry: :local, + subscription_retry_interval: 1_000 diff --git a/lib/event_store/sql/statements.ex b/lib/event_store/sql/statements.ex index 9766cb9d..4f4c26ac 100644 --- a/lib/event_store/sql/statements.ex +++ b/lib/event_store/sql/statements.ex @@ -290,6 +290,12 @@ WHERE stream_uuid = $1 AND subscription_name = $2; """ end + def try_advisory_lock do +""" +SELECT pg_try_advisory_lock($1); +""" + end + def ack_last_seen_event do """ UPDATE subscriptions diff --git a/lib/event_store/storage.ex b/lib/event_store/storage.ex index d5d34647..f98a7ebe 100644 --- a/lib/event_store/storage.ex +++ b/lib/event_store/storage.ex @@ -79,28 +79,37 @@ defmodule EventStore.Storage do unique name and starting position (event number or stream version). """ def subscribe_to_stream(stream_uuid, subscription_name, start_from_event_number \\ nil, start_from_stream_version \\ nil) do - Subscription.subscribe_to_stream(@event_store, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) + Subscription.subscribe_to_stream(@event_store, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version, pool: DBConnection.Poolboy) + end + + @doc """ + Attempt to acquire an exclusive lock for the given subscription id. Uses + PostgreSQL's advisory locks[1] to provide session level locking. + [1] https://www.postgresql.org/docs/current/static/explicit-locking.html#ADVISORY-LOCKS + """ + def try_acquire_exclusive_lock(subscription_id) do + Subscription.try_acquire_exclusive_lock(@event_store, subscription_id, pool: DBConnection.Poolboy) end @doc """ Acknowledge receipt of an event by its number, for a single subscription. """ def ack_last_seen_event(stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version) do - Subscription.ack_last_seen_event(@event_store, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version) + Subscription.ack_last_seen_event(@event_store, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version, pool: DBConnection.Poolboy) end @doc """ Unsubscribe from an existing named subscription to a stream. """ def unsubscribe_from_stream(stream_uuid, subscription_name) do - Subscription.unsubscribe_from_stream(@event_store, stream_uuid, subscription_name) + Subscription.unsubscribe_from_stream(@event_store, stream_uuid, subscription_name, pool: DBConnection.Poolboy) end @doc """ Get all known subscriptions, to any stream. """ def subscriptions do - Subscription.subscriptions(@event_store) + Subscription.subscriptions(@event_store, pool: DBConnection.Poolboy) end @doc """ diff --git a/lib/event_store/storage/subscription.ex b/lib/event_store/storage/subscription.ex index 9ab7453d..795214eb 100644 --- a/lib/event_store/storage/subscription.ex +++ b/lib/event_store/storage/subscription.ex @@ -29,28 +29,33 @@ defmodule EventStore.Storage.Subscription do @doc """ List all known subscriptions """ - def subscriptions(conn), - do: Subscription.All.execute(conn) + def subscriptions(conn, opts \\ []), + do: Subscription.All.execute(conn, opts) - def subscribe_to_stream(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) do - case Subscription.Query.execute(conn, stream_uuid, subscription_name) do - {:ok, subscription} -> {:ok, subscription} - {:error, :subscription_not_found} -> Subscription.Subscribe.execute(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) + def subscribe_to_stream(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version, opts \\ []) do + with {:ok, subscription} <- Subscription.Query.execute(conn, stream_uuid, subscription_name, opts) do + {:ok, subscription} + else + {:error, :subscription_not_found} -> + Subscription.Subscribe.execute(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version, opts) end end - def ack_last_seen_event(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version) do - Subscription.Ack.execute(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version) + def try_acquire_exclusive_lock(conn, subscription_id, opts \\ []), + do: Subscription.TryAdvisoryLock.execute(conn, subscription_id, opts) + + def ack_last_seen_event(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version, opts \\ []) do + Subscription.Ack.execute(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version, opts) end - def unsubscribe_from_stream(conn, stream_uuid, subscription_name), - do: Subscription.Unsubscribe.execute(conn, stream_uuid, subscription_name) + def unsubscribe_from_stream(conn, stream_uuid, subscription_name, opts \\ []), + do: Subscription.Unsubscribe.execute(conn, stream_uuid, subscription_name, opts) defmodule All do - def execute(conn) do + def execute(conn, opts) do conn - |> Postgrex.query(Statements.query_all_subscriptions, [], pool: DBConnection.Poolboy) - |> handle_response + |> Postgrex.query(Statements.query_all_subscriptions, [], opts) + |> handle_response() end defp handle_response({:ok, %Postgrex.Result{num_rows: 0}}), @@ -61,10 +66,10 @@ defmodule EventStore.Storage.Subscription do end defmodule Query do - def execute(conn, stream_uuid, subscription_name) do + def execute(conn, stream_uuid, subscription_name, opts) do conn - |> Postgrex.query(Statements.query_get_subscription, [stream_uuid, subscription_name], pool: DBConnection.Poolboy) - |> handle_response + |> Postgrex.query(Statements.query_get_subscription, [stream_uuid, subscription_name], opts) + |> handle_response() end defp handle_response({:ok, %Postgrex.Result{num_rows: 0}}), @@ -75,11 +80,11 @@ defmodule EventStore.Storage.Subscription do end defmodule Subscribe do - def execute(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) do + def execute(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version, opts) do _ = Logger.debug(fn -> "Attempting to create subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\"" end) conn - |> Postgrex.query(Statements.create_subscription, [stream_uuid, subscription_name, start_from_event_number, start_from_stream_version], pool: DBConnection.Poolboy) + |> Postgrex.query(Statements.create_subscription, [stream_uuid, subscription_name, start_from_event_number, start_from_stream_version], opts) |> handle_response(stream_uuid, subscription_name) end @@ -99,10 +104,30 @@ defmodule EventStore.Storage.Subscription do end end + defmodule TryAdvisoryLock do + def execute(conn, subscription_id, opts) do + conn + |> Postgrex.query(Statements.try_advisory_lock(), [subscription_id], opts) + |> handle_response() + end + + defp handle_response({:ok, %Postgrex.Result{rows: [[true]]}}) do + :ok + end + + defp handle_response({:ok, %Postgrex.Result{rows: [[false]]}}) do + {:error, :lock_already_taken} + end + + defp handle_response({:error, error}) do + {:error, error} + end + end + defmodule Ack do - def execute(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version) do + def execute(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version, opts) do conn - |> Postgrex.query(Statements.ack_last_seen_event, [stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version], pool: DBConnection.Poolboy) + |> Postgrex.query(Statements.ack_last_seen_event, [stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version], opts) |> handle_response(stream_uuid, subscription_name) end @@ -117,11 +142,11 @@ defmodule EventStore.Storage.Subscription do end defmodule Unsubscribe do - def execute(conn, stream_uuid, subscription_name) do + def execute(conn, stream_uuid, subscription_name, opts) do _ = Logger.debug(fn -> "Attempting to unsubscribe from stream \"#{stream_uuid}\" named \"#{subscription_name}\"" end) conn - |> Postgrex.query(Statements.delete_subscription, [stream_uuid, subscription_name], pool: DBConnection.Poolboy) + |> Postgrex.query(Statements.delete_subscription, [stream_uuid, subscription_name], opts) |> handle_response(stream_uuid, subscription_name) end @@ -137,25 +162,20 @@ defmodule EventStore.Storage.Subscription do end defmodule Adapter do - def to_subscriptions(rows) do - rows - |> Enum.map(&to_subscription_from_row/1) - end + def to_subscriptions(rows), do: Enum.map(rows, &to_subscription_from_row/1) - def to_subscription(rows) do - rows - |> List.first() - |> to_subscription_from_row() - end + def to_subscription([row | _]), do: to_subscription_from_row(row) + + defp to_subscription_from_row(row) do + [ + subscription_id, + stream_uuid, + subscription_name, + last_seen_event_number, + last_seen_stream_version, + created_at, + ] = row - defp to_subscription_from_row([ - subscription_id, - stream_uuid, - subscription_name, - last_seen_event_number, - last_seen_stream_version, - created_at, - ]) do %Subscription{ subscription_id: subscription_id, stream_uuid: stream_uuid, diff --git a/lib/event_store/subscriptions/stream_subscription.ex b/lib/event_store/subscriptions/stream_subscription.ex index 6ef5aef5..09d1031b 100644 --- a/lib/event_store/subscriptions/stream_subscription.ex +++ b/lib/event_store/subscriptions/stream_subscription.ex @@ -17,27 +17,49 @@ defmodule EventStore.Subscriptions.StreamSubscription do @max_buffer_size 1_000 defstate initial do - defevent subscribe(stream_uuid, subscription_name, subscriber, opts), data: %SubscriptionState{} = data do - case subscribe_to_stream(stream_uuid, subscription_name, opts[:start_from_event_number], opts[:start_from_stream_version]) do - {:ok, subscription} -> - last_ack = subscription_provider(stream_uuid).last_ack(subscription) || 0 + defevent subscribe(conn, stream_uuid, subscription_name, subscriber, opts), data: %SubscriptionState{} = data do + data = %SubscriptionState{data | + conn: conn, + stream_uuid: stream_uuid, + subscription_name: subscription_name, + subscriber: subscriber, + mapper: opts[:mapper], + max_size: opts[:max_size] || @max_buffer_size, + } - data = %SubscriptionState{data | - stream_uuid: stream_uuid, - subscription_name: subscription_name, - subscriber: subscriber, - mapper: opts[:mapper], - last_seen: last_ack, - last_ack: last_ack, - max_size: opts[:max_size] || @max_buffer_size, - } + with {:ok, subscription} <- subscribe_to_stream(data, opts), + :ok <- try_acquire_exclusive_lock(conn, subscription) do - next_state(:request_catch_up, data) + last_ack = subscription_provider(stream_uuid).last_ack(subscription) || 0 - {:error, _reason} -> - next_state(:failed, data) + data = %SubscriptionState{data | + subscription_id: subscription.subscription_id, + last_seen: last_ack, + last_ack: last_ack, + } + + next_state(:request_catch_up, data) + else + _ -> + # Failed to subscribe to stream, retry after delay + next_state(:initial, data) end end + + # ignore ack's before subscribed + defevent ack(_ack), data: %SubscriptionState{} = data do + next_state(:initial, data) + end + + # ignore event notifications before subscribed + defevent notify_events(events), data: %SubscriptionState{} = data do + next_state(:initial, track_last_received(events, data)) + end + + defevent unsubscribe, data: %SubscriptionState{} = data do + unsubscribe_from_stream(data) + next_state(:unsubscribed, data) + end end defstate request_catch_up do @@ -59,8 +81,8 @@ defmodule EventStore.Subscriptions.StreamSubscription do next_state(:request_catch_up, track_last_received(events, data)) end - defevent unsubscribe, data: %SubscriptionState{stream_uuid: stream_uuid, subscription_name: subscription_name} = data do - unsubscribe_from_stream(stream_uuid, subscription_name) + defevent unsubscribe, data: %SubscriptionState{} = data do + unsubscribe_from_stream(data) next_state(:unsubscribed, data) end end @@ -100,8 +122,8 @@ defmodule EventStore.Subscriptions.StreamSubscription do next_state(:catching_up, track_last_received(events, data)) end - defevent unsubscribe, data: %SubscriptionState{stream_uuid: stream_uuid, subscription_name: subscription_name} = data do - unsubscribe_from_stream(stream_uuid, subscription_name) + defevent unsubscribe, data: %SubscriptionState{} = data do + unsubscribe_from_stream(data) next_state(:unsubscribed, data) end end @@ -166,8 +188,8 @@ defmodule EventStore.Subscriptions.StreamSubscription do next_state(:request_catch_up, data) end - defevent unsubscribe, data: %SubscriptionState{stream_uuid: stream_uuid, subscription_name: subscription_name} = data do - unsubscribe_from_stream(stream_uuid, subscription_name) + defevent unsubscribe, data: %SubscriptionState{} = data do + unsubscribe_from_stream(data) next_state(:unsubscribed, data) end end @@ -199,8 +221,8 @@ defmodule EventStore.Subscriptions.StreamSubscription do next_state(:max_capacity, data) end - defevent unsubscribe, data: %SubscriptionState{stream_uuid: stream_uuid, subscription_name: subscription_name} = data do - unsubscribe_from_stream(stream_uuid, subscription_name) + defevent unsubscribe, data: %SubscriptionState{} = data do + unsubscribe_from_stream(data) next_state(:unsubscribed, data) end end @@ -227,37 +249,34 @@ defmodule EventStore.Subscriptions.StreamSubscription do end end - defstate failed do - defevent notify_events(events), data: %SubscriptionState{} = data do - next_state(:failed, track_last_received(events, data)) - end - - defevent ack(_ack), data: %SubscriptionState{} = data do - next_state(:failed, data) - end + defp subscription_provider(@all_stream), do: AllStreamsSubscription + defp subscription_provider(_stream_uuid), do: SingleStreamSubscription - defevent catch_up, data: %SubscriptionState{} = data do - next_state(:failed, data) - end + defp subscribe_to_stream(%SubscriptionState{} = data, opts) do + %SubscriptionState{ + conn: conn, + stream_uuid: stream_uuid, + subscription_name: subscription_name + } = data - defevent caught_up(_last_seen), data: %SubscriptionState{} = data do - next_state(:failed, data) - end + start_from_event_number = Keyword.get(opts, :start_from_event_number) + start_from_stream_version = Keyword.get(opts, :start_from_stream_version) - defevent unsubscribe, data: %SubscriptionState{} = data do - next_state(:failed, data) - end + Storage.Subscription.subscribe_to_stream(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) end - defp subscription_provider(@all_stream), do: AllStreamsSubscription - defp subscription_provider(_stream_uuid), do: SingleStreamSubscription - - defp subscribe_to_stream(stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) do - Storage.subscribe_to_stream(stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) + defp try_acquire_exclusive_lock(conn, %Storage.Subscription{subscription_id: subscription_id}) do + Storage.Subscription.try_acquire_exclusive_lock(conn, subscription_id) end - defp unsubscribe_from_stream(stream_uuid, subscription_name) do - Storage.unsubscribe_from_stream(stream_uuid, subscription_name) + defp unsubscribe_from_stream(%SubscriptionState{} = data) do + %SubscriptionState{ + conn: conn, + stream_uuid: stream_uuid, + subscription_name: subscription_name + } = data + + Storage.Subscription.unsubscribe_from_stream(conn, stream_uuid, subscription_name) end defp track_last_received(events, %SubscriptionState{} = data) do diff --git a/lib/event_store/subscriptions/subscription.ex b/lib/event_store/subscriptions/subscription.ex index 59ec5678..71a407eb 100644 --- a/lib/event_store/subscriptions/subscription.ex +++ b/lib/event_store/subscriptions/subscription.ex @@ -2,8 +2,9 @@ defmodule EventStore.Subscriptions.Subscription do @moduledoc """ Subscription to a single, or all, event streams. - A subscription is persistent so that resuming the subscription will continue from the last acknowledged event. - This guarantees at least once delivery of every event appended to storage. + A subscription is persistent so that resuming the subscription will continue + from the last acknowledged event. This guarantees at least once delivery of + every event appended to storage. """ use GenServer @@ -13,20 +14,23 @@ defmodule EventStore.Subscriptions.Subscription do alias EventStore.Subscriptions.{StreamSubscription,Subscription} defstruct [ + conn: nil, stream_uuid: nil, subscription_name: nil, subscriber: nil, subscription: nil, subscription_opts: [], + postgrex_config: nil ] - def start_link(stream_uuid, subscription_name, subscriber, subscription_opts, opts \\ []) do + def start_link(postgrex_config, stream_uuid, subscription_name, subscriber, subscription_opts, opts \\ []) do GenServer.start_link(__MODULE__, %Subscription{ stream_uuid: stream_uuid, subscription_name: subscription_name, subscriber: subscriber, subscription: StreamSubscription.new(), subscription_opts: subscription_opts, + postgrex_config: postgrex_config }, opts) end @@ -71,33 +75,45 @@ defmodule EventStore.Subscriptions.Subscription do end @doc false - def init(%Subscription{subscriber: subscriber} = state) do + def init(%Subscription{subscriber: subscriber, postgrex_config: postgrex_config} = state) do Process.link(subscriber) + + # A subscription has its own connection to the database to enforce locking + {:ok, conn} = Postgrex.start_link(postgrex_config) - GenServer.cast(self(), :subscribe_to_stream) + send(self(), :subscribe_to_stream) - {:ok, state} + {:ok, %Subscription{state | conn: conn}} end - def handle_cast({:notify_events, events}, %Subscription{subscription: subscription} = state) do - subscription = StreamSubscription.notify_events(subscription, events) + def handle_info(:subscribe_to_stream, %Subscription{} = state) do + %Subscription{ + conn: conn, + stream_uuid: stream_uuid, + subscription_name: subscription_name, + subscriber: subscriber, + subscription: subscription, + subscription_opts: opts + } = state + + subscription = StreamSubscription.subscribe(subscription, conn, stream_uuid, subscription_name, subscriber, opts) state = %Subscription{state | subscription: subscription} :ok = handle_subscription_state(state) + subscribe_to_events(stream_uuid) + {:noreply, state} end - def handle_cast(:subscribe_to_stream, %Subscription{stream_uuid: stream_uuid, subscription_name: subscription_name, subscriber: subscriber, subscription: subscription, subscription_opts: opts} = state) do - subscription = StreamSubscription.subscribe(subscription, stream_uuid, subscription_name, subscriber, opts) + def handle_cast({:notify_events, events}, %Subscription{subscription: subscription} = state) do + subscription = StreamSubscription.notify_events(subscription, events) state = %Subscription{state | subscription: subscription} :ok = handle_subscription_state(state) - subscribe_to_stream(stream_uuid) - {:noreply, state} end @@ -152,10 +168,16 @@ defmodule EventStore.Subscriptions.Subscription do {:reply, reply, state} end - defp subscribe_to_stream(stream_uuid) do + defp subscribe_to_events(stream_uuid) do {:ok, _} = Registry.register(EventStore.Subscriptions.PubSub, stream_uuid, {Subscription, :notify_events}) end + defp handle_subscription_state(%Subscription{subscription: %{state: :initial}, subscription_name: subscription_name}) do + retry_interval = subscription_retry_interval() + Logger.debug(fn -> "Failed to subscribe to #{subscription_name}, will retry in #{retry_interval}ms" end) + Process.send_after(self(), :subscribe_to_stream, retry_interval) + end + defp handle_subscription_state(%Subscription{subscription: %{state: :request_catch_up}}) do GenServer.cast(self(), :catch_up) end @@ -165,7 +187,15 @@ defmodule EventStore.Subscriptions.Subscription do :ok end - # no-op - defp handle_subscription_state(_state), - do: :ok + # no-op for all other subscription states + defp handle_subscription_state(_state), do: :ok + + # get the delay between subscription attempts, in milliseconds, from app + # config. Default value is one minute. + defp subscription_retry_interval do + case Application.get_env(:eventstore, :subscription_retry_interval) do + interval when is_integer(interval) -> interval + _ -> 60_000 + end + end end diff --git a/lib/event_store/subscriptions/subscription_state.ex b/lib/event_store/subscriptions/subscription_state.ex index 634590b1..3d0b0c36 100644 --- a/lib/event_store/subscriptions/subscription_state.ex +++ b/lib/event_store/subscriptions/subscription_state.ex @@ -2,9 +2,11 @@ defmodule EventStore.Subscriptions.SubscriptionState do @moduledoc false defstruct [ catch_up_pid: nil, + conn: nil, stream_uuid: nil, subscription_name: nil, subscriber: nil, + subscription_id: nil, mapper: nil, last_seen: 0, last_ack: 0, diff --git a/lib/event_store/subscriptions/supervisor.ex b/lib/event_store/subscriptions/supervisor.ex index 720fca94..2e4591ac 100644 --- a/lib/event_store/subscriptions/supervisor.ex +++ b/lib/event_store/subscriptions/supervisor.ex @@ -7,8 +7,8 @@ defmodule EventStore.Subscriptions.Supervisor do alias EventStore.Subscriptions.Subscription - def start_link(_) do - Supervisor.start_link(__MODULE__, nil, name: __MODULE__) + def start_link(postgrex_config) do + Supervisor.start_link(__MODULE__, postgrex_config, name: __MODULE__) end def subscribe_to_stream(stream_uuid, subscription_name, subscriber, subscription_opts) do @@ -28,9 +28,9 @@ defmodule EventStore.Subscriptions.Supervisor do end end - def init(_) do + def init(postgrex_config) do children = [ - worker(Subscription, [], restart: :temporary), + worker(Subscription, [postgrex_config], restart: :temporary), ] supervise(children, strategy: :simple_one_for_one) diff --git a/lib/event_store/supervisor.ex b/lib/event_store/supervisor.ex index 49bd96ed..2253d190 100644 --- a/lib/event_store/supervisor.ex +++ b/lib/event_store/supervisor.ex @@ -11,11 +11,14 @@ defmodule EventStore.Supervisor do end def init([config, serializer]) do + postgrex_config = postgrex_opts(config) + subscription_postgrex_config = subscription_postgrex_opts(config) + children = [ - {Postgrex, postgrex_opts(config)}, + {Postgrex, postgrex_config}, Supervisor.child_spec({Registry, keys: :unique, name: EventStore.Subscriptions.Subscription}, id: EventStore.Subscriptions.Subscription), Supervisor.child_spec({Registry, keys: :duplicate, name: EventStore.Subscriptions.PubSub, partitions: System.schedulers_online}, id: EventStore.Subscriptions.PubSub), - {EventStore.Subscriptions.Supervisor, []}, + {EventStore.Subscriptions.Supervisor, subscription_postgrex_config}, {EventStore.Streams.Supervisor, serializer}, {EventStore.Publisher, serializer}, ] ++ Registration.child_spec() @@ -23,25 +26,32 @@ defmodule EventStore.Supervisor do Supervisor.init(children, strategy: :one_for_one) end + @default_postgrex_opts [ + :username, + :password, + :database, + :hostname, + :port, + :types, + :ssl, + :ssl_opts + ] + defp postgrex_opts(config) do [ pool_size: 10, - pool_overflow: 0, + pool_overflow: 0 ] |> Keyword.merge(config) - |> Keyword.take([ - :username, - :password, - :database, - :hostname, - :port, + |> Keyword.take(@default_postgrex_opts ++ [ :pool, :pool_size, - :pool_overflow, - :types, - :ssl, - :ssl_opts, + :pool_overflow ]) |> Keyword.merge(name: :event_store) end + + defp subscription_postgrex_opts(config) do + Keyword.take(config, @default_postgrex_opts) + end end diff --git a/test/storage/subscription_persistence_test.exs b/test/storage/subscription_persistence_test.exs index 4a9b3dcd..869332bf 100644 --- a/test/storage/subscription_persistence_test.exs +++ b/test/storage/subscription_persistence_test.exs @@ -1,7 +1,7 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do use EventStore.StorageCase - alias EventStore.Storage + alias EventStore.{ProcessHelper,Storage} @all_stream "$all" @subscription_name "test_subscription" @@ -24,7 +24,7 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do test "list subscriptions" do {:ok, subscription} = Storage.subscribe_to_stream(@all_stream, @subscription_name) - {:ok, subscriptions} = Storage.subscriptions + {:ok, subscriptions} = Storage.subscriptions() assert length(subscriptions) > 0 assert Enum.member?(subscriptions, subscription) @@ -41,6 +41,37 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do assert length(subscriptions) == initial_length end + test "acquire exclusive subscription lock" do + assert :ok = Storage.try_acquire_exclusive_lock(1) + end + + test "acquire and release lock by connection" do + config = + EventStore.configuration() + |> EventStore.Config.parse() + |> Keyword.drop([:pool, :pool_size, :pool_overflow]) + + {:ok, conn1} = Postgrex.start_link(config) + {:ok, conn2} = Postgrex.start_link(config) + + # conn1 acquire lock + assert :ok = Storage.Subscription.try_acquire_exclusive_lock(conn1, 1) + + # conn2 cannot acquire lock + assert {:error, :lock_already_taken} = Storage.Subscription.try_acquire_exclusive_lock(conn2, 1) + + # conn1 can acquire same lock multiple times + assert :ok = Storage.Subscription.try_acquire_exclusive_lock(conn1, 1) + + # shutdown conn1 process should release its locks + ProcessHelper.shutdown(conn1) + + # conn2 can now acquire lock + assert :ok = Storage.Subscription.try_acquire_exclusive_lock(conn2, 1) + + ProcessHelper.shutdown(conn2) + end + test "remove subscription when not found should not fail" do :ok = Storage.unsubscribe_from_stream(@all_stream, @subscription_name) end diff --git a/test/subscriptions/all_streams_subscription_test.exs b/test/subscriptions/all_streams_subscription_test.exs index 4a0c3780..56cd11c6 100644 --- a/test/subscriptions/all_streams_subscription_test.exs +++ b/test/subscriptions/all_streams_subscription_test.exs @@ -1,16 +1,31 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do use EventStore.StorageCase - alias EventStore.{EventFactory,RecordedEvent} + alias EventStore.{EventFactory,ProcessHelper,RecordedEvent} alias EventStore.Storage.{Appender,Stream} alias EventStore.Subscriptions.StreamSubscription @all_stream "$all" @subscription_name "test_subscription" + setup do + config = + EventStore.configuration() + |> EventStore.Config.parse() + |> Keyword.drop([:pool, :pool_size, :pool_overflow, :serializer]) + + {:ok, conn} = Postgrex.start_link(config) + + on_exit fn -> + ProcessHelper.shutdown(conn) + end + + [subscription_conn: conn] + end + describe "subscribe to all streams" do - test "create subscription to all streams" do - subscription = create_subscription() + test "create subscription to all streams", context do + subscription = create_subscription(context) assert subscription.state == :request_catch_up assert subscription.data.subscription_name == @subscription_name @@ -20,8 +35,8 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert subscription.data.last_received == nil end - test "create subscription to all streams from starting event id" do - subscription = create_subscription(start_from_event_number: 2) + test "create subscription to all streams from starting event id", context do + subscription = create_subscription(context, start_from_event_number: 2) assert subscription.state == :request_catch_up assert subscription.data.subscription_name == @subscription_name @@ -32,9 +47,9 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do end end - test "catch-up subscription, no persisted events" do + test "catch-up subscription, no persisted events", context do subscription = - create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -47,14 +62,11 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert subscription.data.last_received == nil end - test "catch-up subscription, unseen persisted events", %{conn: conn} do - stream_uuid = UUID.uuid4 - {:ok, stream_id} = Stream.create_stream(conn, stream_uuid) - recorded_events = EventFactory.create_recorded_events(3, stream_uuid) - {:ok, [1, 2, 3]} = Appender.append(conn, stream_id, recorded_events) + test "catch-up subscription, unseen persisted events", context do + [recorded_events: recorded_events] = append_events_to_stream(context) subscription = - create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -76,12 +88,12 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert pluck(received_events, :data) == pluck(expected_events, :data) end - test "notify events" do + test "notify events", context do stream_uuid = UUID.uuid4() events = EventFactory.create_recorded_events(1, stream_uuid) subscription = - create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(events) @@ -96,9 +108,9 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert pluck(received_events, :data) == pluck(events, :data) end - test "should catch up when events received while catching up" do + test "should catch up when events received while catching up", context do subscription = - create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -121,7 +133,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do describe "ack notified events" do setup [:append_events_to_stream, :create_caught_up_subscription] - test "should skip events during catch up when acknowledged", %{subscription: subscription, recorded_events: events} do + test "should skip events during catch up when acknowledged", %{subscription: subscription, recorded_events: events} = context do subscription = ack(subscription, events) assert subscription.state == :subscribed @@ -130,7 +142,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert subscription.data.last_received == nil subscription = - create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert_receive_caught_up(3) @@ -146,9 +158,9 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert subscription.data.last_received == nil end - test "should replay events when catching up and events had not been acknowledged" do + test "should replay events when catching up and events had not been acknowledged", context do subscription = - create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() # should receive already seen events @@ -166,19 +178,9 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert subscription.data.last_received == nil end - def append_events_to_stream(%{conn: conn}) do - stream_uuid = UUID.uuid4 - {:ok, stream_id} = Stream.create_stream(conn, stream_uuid) - - recorded_events = EventFactory.create_recorded_events(3, stream_uuid) - {:ok, [1, 2, 3]} = Appender.append(conn, stream_id, recorded_events) - - [recorded_events: recorded_events] - end - - def create_caught_up_subscription(_context) do + def create_caught_up_subscription(context) do subscription = - create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -197,14 +199,14 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do end end - test "should not notify events until ack received" do + test "should not notify events until ack received", context do stream_uuid = UUID.uuid4 events = EventFactory.create_recorded_events(6, stream_uuid) initial_events = Enum.take(events, 3) remaining_events = Enum.drop(events, 3) subscription = - create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(initial_events) @@ -242,14 +244,14 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do end describe "pending event buffer limit" do - test "should restrict pending events until ack" do + test "should restrict pending events until ack", context do stream_uuid = UUID.uuid4 events = EventFactory.create_recorded_events(6, stream_uuid) initial_events = Enum.take(events, 3) remaining_events = Enum.drop(events, 3) subscription = - create_subscription(max_size: 3) + create_subscription(context, max_size: 3) |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(initial_events) @@ -279,14 +281,14 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do assert pluck(received_events, :data) == pluck(remaining_events, :data) end - test "should receive pending events on ack after reaching max capacity" do + test "should receive pending events on ack after reaching max capacity", context do stream_uuid = UUID.uuid4 events = EventFactory.create_recorded_events(6, stream_uuid) initial_events = Enum.take(events, 3) remaining_events = Enum.drop(events, 3) subscription = - create_subscription(max_size: 3) + create_subscription(context, max_size: 3) |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(initial_events) @@ -322,9 +324,19 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do end end - defp create_subscription(opts \\ []) do + def append_events_to_stream(%{conn: conn}) do + stream_uuid = UUID.uuid4 + {:ok, stream_id} = Stream.create_stream(conn, stream_uuid) + + recorded_events = EventFactory.create_recorded_events(3, stream_uuid) + {:ok, [1, 2, 3]} = Appender.append(conn, stream_id, recorded_events) + + [recorded_events: recorded_events] + end + + defp create_subscription(%{subscription_conn: conn}, opts \\ []) do StreamSubscription.new() - |> StreamSubscription.subscribe(@all_stream, @subscription_name, self(), opts) + |> StreamSubscription.subscribe(conn, @all_stream, @subscription_name, self(), opts) end defp ack_refute_receive(subscription, ack, expected_last_ack) do diff --git a/test/subscriptions/single_stream_subscription_test.exs b/test/subscriptions/single_stream_subscription_test.exs index 4cb5ba6a..21360b78 100644 --- a/test/subscriptions/single_stream_subscription_test.exs +++ b/test/subscriptions/single_stream_subscription_test.exs @@ -1,20 +1,37 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do use EventStore.StorageCase - alias EventStore.{EventFactory,RecordedEvent,Streams} + alias EventStore.{EventFactory,ProcessHelper,RecordedEvent,Streams} alias EventStore.Storage.{Appender,Stream} alias EventStore.Subscriptions.StreamSubscription @subscription_name "test_subscription" + setup do + config = + EventStore.configuration() + |> EventStore.Config.parse() + |> Keyword.drop([:pool, :pool_size, :pool_overflow, :serializer]) + + {:ok, conn} = Postgrex.start_link(config) + + on_exit fn -> + ProcessHelper.shutdown(conn) + end + + [ + subscription_conn: conn, + stream_uuid: UUID.uuid4() + ] + end + describe "subscribe to stream" do setup [:append_events_to_another_stream] - test "create subscription to a single stream" do - stream_uuid = UUID.uuid4() + test "create subscription to a single stream", %{stream_uuid: stream_uuid} = context do {:ok, _stream} = Streams.Supervisor.open_stream(stream_uuid) - subscription = create_subscription(stream_uuid) + subscription = create_subscription(context) assert subscription.state == :request_catch_up assert subscription.data.subscription_name == @subscription_name @@ -23,11 +40,10 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert subscription.data.last_ack == 0 end - test "create subscription to a single stream from starting stream version" do - stream_uuid = UUID.uuid4() + test "create subscription to a single stream from starting stream version", %{stream_uuid: stream_uuid} = context do {:ok, _stream} = Streams.Supervisor.open_stream(stream_uuid) - subscription = create_subscription(stream_uuid, start_from_stream_version: 2) + subscription = create_subscription(context, start_from_stream_version: 2) assert subscription.state == :request_catch_up assert subscription.data.subscription_name == @subscription_name @@ -36,27 +52,24 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert subscription.data.last_ack == 2 end - test "create subscription to a single stream with event mapping function" do - stream_uuid = UUID.uuid4() + test "create subscription to a single stream with event mapping function", %{stream_uuid: stream_uuid} = context do {:ok, _stream} = Streams.Supervisor.open_stream(stream_uuid) mapper = fn event -> event.event_number end - subscription = create_subscription(stream_uuid, mapper: mapper) + subscription = create_subscription(context, mapper: mapper) assert subscription.data.mapper == mapper end end - describe "catch-up subscription" do - setup [:append_events_to_another_stream, :create_stream] + describe "catch-up subscription on empty stream" do + setup [:append_events_to_another_stream] - test "catch-up subscription, no persisted events" do - stream_uuid = UUID.uuid4() + test "should be caught up", %{stream_uuid: stream_uuid} = context do {:ok, _stream} = Streams.Supervisor.open_stream(stream_uuid) subscription = - stream_uuid - |> create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -64,11 +77,14 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert_receive_caught_up(0) end + end + + describe "catch-up subscription" do + setup [:append_events_to_another_stream, :create_stream] - test "catch-up subscription, unseen persisted events", %{stream_uuid: stream_uuid, recorded_events: recorded_events} do + test "unseen persisted events", %{recorded_events: recorded_events} = context do subscription = - stream_uuid - |> create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -88,10 +104,9 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert subscription.data.last_ack == 3 end - test "confirm subscription caught up to persisted events", %{stream_uuid: stream_uuid} do + test "confirm subscription caught up to persisted events", context do subscription = - stream_uuid - |> create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -112,14 +127,13 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do end end - test "notify events" do - stream_uuid = UUID.uuid4 + test "notify events", %{stream_uuid: stream_uuid} = context do {:ok, _stream} = Streams.Supervisor.open_stream(stream_uuid) events = EventFactory.create_recorded_events(1, stream_uuid) subscription = - create_subscription(stream_uuid) + create_subscription(context) |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(events) @@ -134,9 +148,9 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do end describe "ack events" do - setup [:append_events_to_another_stream, :create_stream, :create_subscription] + setup [:append_events_to_another_stream, :create_stream, :subscribe_to_stream] - test "should skip events during catch up when acknowledged", %{stream_uuid: stream_uuid, subscription: subscription, recorded_events: events} do + test "should skip events during catch up when acknowledged", %{subscription: subscription, recorded_events: events} = context do subscription = ack(subscription, events) assert subscription.state == :subscribed @@ -144,8 +158,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert subscription.data.last_ack == 3 subscription = - stream_uuid - |> create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -161,10 +174,9 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert subscription.data.last_ack == 3 end - test "should replay events when not acknowledged", %{stream_uuid: stream_uuid} do + test "should replay events when not acknowledged", context do subscription = - stream_uuid - |> create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -193,8 +205,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do :ok = EventStore.append_to_stream(stream_uuid, 0, events) end - defp create_stream(%{conn: conn}) do - stream_uuid = UUID.uuid4 + defp create_stream(%{conn: conn, stream_uuid: stream_uuid}) do {:ok, stream_id} = Stream.create_stream(conn, stream_uuid) recorded_events = EventFactory.create_recorded_events(3, stream_uuid, 4) @@ -203,16 +214,14 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do {:ok, stream} = Streams.Supervisor.open_stream(stream_uuid) [ - stream_uuid: stream_uuid, stream: stream, recorded_events: recorded_events, ] end - defp create_subscription(%{stream_uuid: stream_uuid}) do + defp subscribe_to_stream(context) do subscription = - stream_uuid - |> create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -229,8 +238,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do [subscription: subscription] end - test "should not notify events until ack received" do - stream_uuid = UUID.uuid4 + test "should not notify events until ack received", %{stream_uuid: stream_uuid} = context do events = EventFactory.create_recorded_events(6, stream_uuid) initial_events = Enum.take(events, 3) remaining_events = Enum.drop(events, 3) @@ -238,8 +246,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do {:ok, _stream} = Streams.Supervisor.open_stream(stream_uuid) subscription = - stream_uuid - |> create_subscription() + create_subscription(context) |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(initial_events) @@ -275,9 +282,9 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do refute_receive {:events, _received_events} end - defp create_subscription(stream_uuid, opts \\ []) do + defp create_subscription(%{subscription_conn: conn, stream_uuid: stream_uuid}, opts \\ []) do StreamSubscription.new() - |> StreamSubscription.subscribe(stream_uuid, @subscription_name, self(), opts) + |> StreamSubscription.subscribe(conn, stream_uuid, @subscription_name, self(), opts) end def ack(subscription, events) when is_list(events) do diff --git a/test/subscriptions/subscribe_to_stream_test.exs b/test/subscriptions/subscribe_to_stream_test.exs index 76b2d5a5..b0917453 100644 --- a/test/subscriptions/subscribe_to_stream_test.exs +++ b/test/subscriptions/subscribe_to_stream_test.exs @@ -20,12 +20,11 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do events = EventFactory.create_events(3) {:ok, _stream} = Streams.Supervisor.open_stream(stream_uuid) - {:ok, _subscription} = subscribe_to_stream(stream_uuid, subscription_name, self()) :ok = Stream.append_to_stream(stream_uuid, 0, events) - assert_receive {:events, received_events} + assert_receive {:events, received_events} assert pluck(received_events, :event_number) == [4, 5, 6] assert pluck(received_events, :stream_uuid) == [stream_uuid, stream_uuid, stream_uuid] assert pluck(received_events, :stream_version) == [1, 2, 3] diff --git a/test/support/storage_case.ex b/test/support/storage_case.ex index edf77428..70e8a615 100644 --- a/test/support/storage_case.ex +++ b/test/support/storage_case.ex @@ -1,18 +1,25 @@ defmodule EventStore.StorageCase do use ExUnit.CaseTemplate - alias EventStore.Registration + alias EventStore.{ProcessHelper,Registration} setup do registry = Registration.registry_provider() before_reset(registry) - EventStore.StorageInitializer.reset_storage!() + {:ok, conn} = + EventStore.configuration() + |> EventStore.Config.parse() + |> Postgrex.start_link() + + EventStore.Storage.Initializer.reset!(conn) after_reset(registry) - {:ok, conn} = EventStore.configuration() |> EventStore.Config.parse() |> Postgrex.start_link() + on_exit fn -> + ProcessHelper.shutdown(conn) + end {:ok, %{conn: conn}} end From a66b59584b35be1b8d65aa582a51c4ebd533c4fd Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 5 Jan 2018 13:51:31 +0000 Subject: [PATCH 2/6] [Test] Ensure duplicate subscriptions cannot both connect --- .../all_streams_subscription_test.exs | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/test/subscriptions/all_streams_subscription_test.exs b/test/subscriptions/all_streams_subscription_test.exs index 56cd11c6..6bafd211 100644 --- a/test/subscriptions/all_streams_subscription_test.exs +++ b/test/subscriptions/all_streams_subscription_test.exs @@ -20,7 +20,10 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do ProcessHelper.shutdown(conn) end - [subscription_conn: conn] + [ + postgrex_config: config, + subscription_conn: conn + ] end describe "subscribe to all streams" do @@ -324,6 +327,50 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do end end + describe "duplicate subscriptions" do + setup [:create_second_connection, :create_two_duplicate_subscriptions] + + test "should only allow one subscriber", %{subscription1: subscription1, subscription2: subscription2} do + assert subscription1.state == :request_catch_up + assert subscription2.state == :initial + end + + test "should allow second subscriber to takeover when first connection terminates", %{subscription_conn: conn, subscription_conn2: conn2, subscription1: subscription1, subscription2: subscription2} do + # attempt to resubscribe should fail + subscription2 = StreamSubscription.subscribe(subscription2, conn2, @all_stream, @subscription_name, self(), []) + assert subscription2.state == :initial + + # stop subscription1 connection + ProcessHelper.shutdown(conn) + + # attempt to resubscribe should now succeed + subscription2 = StreamSubscription.subscribe(subscription2, conn2, @all_stream, @subscription_name, self(), []) + assert subscription2.state == :request_catch_up + end + + defp create_second_connection(%{postgrex_config: config}) do + {:ok, conn} = Postgrex.start_link(config) + + on_exit fn -> + ProcessHelper.shutdown(conn) + end + + [subscription_conn2: conn] + end + + # Create two identically named subscriptions to the same stream, but using + # different database connections + defp create_two_duplicate_subscriptions(%{subscription_conn: conn1, subscription_conn2: conn2}) do + subscription1 = create_subscription(%{subscription_conn: conn1}) + subscription2 = create_subscription(%{subscription_conn: conn2}) + + [ + subscription1: subscription1, + subscription2: subscription2, + ] + end + end + def append_events_to_stream(%{conn: conn}) do stream_uuid = UUID.uuid4 {:ok, stream_id} = Stream.create_stream(conn, stream_uuid) From 3589258144eeb46c505c6502a043382ea72f564b Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 5 Jan 2018 16:24:52 +0000 Subject: [PATCH 3/6] Ensure a duplicate subscription will takeover on terminate --- lib/event_store/storage/subscription.ex | 2 +- lib/event_store/subscriptions.ex | 2 +- .../subscriptions/stream_subscription.ex | 32 +++++-- lib/event_store/subscriptions/subscription.ex | 79 ++++++++--------- .../all_streams_subscription_test.exs | 20 +++-- .../single_stream_subscription_test.exs | 12 ++- .../subscribe_to_stream_test.exs | 87 ++++++++++++------- test/support/collecting_subscriber.ex | 48 ++++++++++ test/support/wait.ex | 2 +- 9 files changed, 196 insertions(+), 88 deletions(-) create mode 100644 test/support/collecting_subscriber.ex diff --git a/lib/event_store/storage/subscription.ex b/lib/event_store/storage/subscription.ex index 795214eb..51062883 100644 --- a/lib/event_store/storage/subscription.ex +++ b/lib/event_store/storage/subscription.ex @@ -94,7 +94,7 @@ defmodule EventStore.Storage.Subscription do end defp handle_response({:error, %Postgrex.Error{postgres: %{code: :unique_violation}}}, stream_uuid, subscription_name) do - _ = Logger.warn(fn -> "Failed to create subscription on stream #{stream_uuid} named #{subscription_name}, already exists" end) + _ = Logger.debug(fn -> "Failed to create subscription on stream #{stream_uuid} named #{subscription_name}, already exists" end) {:error, :subscription_already_exists} end diff --git a/lib/event_store/subscriptions.ex b/lib/event_store/subscriptions.ex index 7be1a299..13d7ca64 100644 --- a/lib/event_store/subscriptions.ex +++ b/lib/event_store/subscriptions.ex @@ -52,7 +52,7 @@ defmodule EventStore.Subscriptions do defp notify_subscribers(stream_uuid, events) do Registry.dispatch(EventStore.Subscriptions.PubSub, stream_uuid, fn subscribers -> - for {subscription, {module, function}} <- subscribers, do: apply(module, function, [subscription, events]) + for {pid, _} <- subscribers, do: send(pid, {:notify_events, events}) end) end end diff --git a/lib/event_store/subscriptions/stream_subscription.ex b/lib/event_store/subscriptions/stream_subscription.ex index 09d1031b..c67fa79f 100644 --- a/lib/event_store/subscriptions/stream_subscription.ex +++ b/lib/event_store/subscriptions/stream_subscription.ex @@ -16,6 +16,11 @@ defmodule EventStore.Subscriptions.StreamSubscription do @all_stream "$all" @max_buffer_size 1_000 + # The main flow between states in this finite state machine is: + # + # initial -> subscribe_to_events -> request_catch_up -> catching_up -> subscribed + # + defstate initial do defevent subscribe(conn, stream_uuid, subscription_name, subscriber, opts), data: %SubscriptionState{} = data do data = %SubscriptionState{data | @@ -27,7 +32,7 @@ defmodule EventStore.Subscriptions.StreamSubscription do max_size: opts[:max_size] || @max_buffer_size, } - with {:ok, subscription} <- subscribe_to_stream(data, opts), + with {:ok, subscription} <- create_subscription(data, opts), :ok <- try_acquire_exclusive_lock(conn, subscription) do last_ack = subscription_provider(stream_uuid).last_ack(subscription) || 0 @@ -38,7 +43,7 @@ defmodule EventStore.Subscriptions.StreamSubscription do last_ack: last_ack, } - next_state(:request_catch_up, data) + next_state(:subscribe_to_events, data) else _ -> # Failed to subscribe to stream, retry after delay @@ -51,9 +56,24 @@ defmodule EventStore.Subscriptions.StreamSubscription do next_state(:initial, data) end - # ignore event notifications before subscribed - defevent notify_events(events), data: %SubscriptionState{} = data do - next_state(:initial, track_last_received(events, data)) + defevent unsubscribe, data: %SubscriptionState{} = data do + unsubscribe_from_stream(data) + next_state(:unsubscribed, data) + end + end + + defstate subscribe_to_events do + defevent subscribed, data: %SubscriptionState{} = data do + next_state(:request_catch_up, data) + end + + defevent ack(ack), data: %SubscriptionState{} = data do + data = + data + |> ack_events(ack) + |> notify_pending_events() + + next_state(:subscribe_to_events, data) end defevent unsubscribe, data: %SubscriptionState{} = data do @@ -252,7 +272,7 @@ defmodule EventStore.Subscriptions.StreamSubscription do defp subscription_provider(@all_stream), do: AllStreamsSubscription defp subscription_provider(_stream_uuid), do: SingleStreamSubscription - defp subscribe_to_stream(%SubscriptionState{} = data, opts) do + defp create_subscription(%SubscriptionState{} = data, opts) do %SubscriptionState{ conn: conn, stream_uuid: stream_uuid, diff --git a/lib/event_store/subscriptions/subscription.ex b/lib/event_store/subscriptions/subscription.ex index 71a407eb..30808698 100644 --- a/lib/event_store/subscriptions/subscription.ex +++ b/lib/event_store/subscriptions/subscription.ex @@ -35,7 +35,7 @@ defmodule EventStore.Subscriptions.Subscription do end def notify_events(subscription, events) when is_list(events) do - GenServer.cast(subscription, {:notify_events, events}) + send(subscription, {:notify_events, events}) end @doc """ @@ -77,8 +77,8 @@ defmodule EventStore.Subscriptions.Subscription do @doc false def init(%Subscription{subscriber: subscriber, postgrex_config: postgrex_config} = state) do Process.link(subscriber) - - # A subscription has its own connection to the database to enforce locking + + # Each subscription has its own connection to the database to enforce locking {:ok, conn} = Postgrex.start_link(postgrex_config) send(self(), :subscribe_to_stream) @@ -98,53 +98,39 @@ defmodule EventStore.Subscriptions.Subscription do subscription = StreamSubscription.subscribe(subscription, conn, stream_uuid, subscription_name, subscriber, opts) - state = %Subscription{state | subscription: subscription} - - :ok = handle_subscription_state(state) - - subscribe_to_events(stream_uuid) - - {:noreply, state} + {:noreply, apply_subscription_to_state(subscription, state)} end - def handle_cast({:notify_events, events}, %Subscription{subscription: subscription} = state) do + def handle_info({:notify_events, events}, %Subscription{subscription: subscription} = state) do subscription = StreamSubscription.notify_events(subscription, events) - state = %Subscription{state | subscription: subscription} + {:noreply, apply_subscription_to_state(subscription, state)} + end - :ok = handle_subscription_state(state) + def handle_cast(:subscribe_to_events, %Subscription{subscription: subscription} = state) do + subscribe_to_events(state) + + subscription = StreamSubscription.subscribed(subscription) - {:noreply, state} + {:noreply, apply_subscription_to_state(subscription, state)} end def handle_cast(:catch_up, %Subscription{subscription: subscription} = state) do subscription = StreamSubscription.catch_up(subscription) - state = %Subscription{state | subscription: subscription} - - :ok = handle_subscription_state(state) - - {:noreply, state} + {:noreply, apply_subscription_to_state(subscription, state)} end def handle_cast({:caught_up, last_seen}, %Subscription{subscription: subscription} = state) do subscription = StreamSubscription.caught_up(subscription, last_seen) - state = %Subscription{state | subscription: subscription} - - :ok = handle_subscription_state(state) - - {:noreply, state} + {:noreply, apply_subscription_to_state(subscription, state)} end def handle_cast({:ack, ack}, %Subscription{subscription: subscription} = state) do subscription = StreamSubscription.ack(subscription, ack) - state = %Subscription{state | subscription: subscription} - - :ok = handle_subscription_state(state) - - {:noreply, state} + {:noreply, apply_subscription_to_state(subscription, state)} end def handle_call(:unsubscribe, _from, %Subscription{subscriber: subscriber, subscription: subscription} = state) do @@ -152,34 +138,41 @@ defmodule EventStore.Subscriptions.Subscription do subscription = StreamSubscription.unsubscribe(subscription) - state = %Subscription{state | subscription: subscription} - - :ok = handle_subscription_state(state) - - {:reply, :ok, state} + {:reply, :ok, apply_subscription_to_state(subscription, state)} end def handle_call(:subscribed?, _from, %Subscription{subscription: %{state: subscription_state}} = state) do - reply = case subscription_state do - :subscribed -> true - _ -> false - end + reply = + case subscription_state do + :subscribed -> true + _ -> false + end {:reply, reply, state} end - defp subscribe_to_events(stream_uuid) do - {:ok, _} = Registry.register(EventStore.Subscriptions.PubSub, stream_uuid, {Subscription, :notify_events}) + defp apply_subscription_to_state(%StreamSubscription{} = subscription, %Subscription{} = state) do + state = %Subscription{state | subscription: subscription} + + :ok = handle_subscription_state(state) + + state end defp handle_subscription_state(%Subscription{subscription: %{state: :initial}, subscription_name: subscription_name}) do retry_interval = subscription_retry_interval() + Logger.debug(fn -> "Failed to subscribe to #{subscription_name}, will retry in #{retry_interval}ms" end) Process.send_after(self(), :subscribe_to_stream, retry_interval) + :ok + end + + defp handle_subscription_state(%Subscription{subscription: %{state: :subscribe_to_events}}) do + :ok = GenServer.cast(self(), :subscribe_to_events) end defp handle_subscription_state(%Subscription{subscription: %{state: :request_catch_up}}) do - GenServer.cast(self(), :catch_up) + :ok = GenServer.cast(self(), :catch_up) end defp handle_subscription_state(%Subscription{subscription: %{state: :max_capacity}, subscription_name: subscription_name}) do @@ -190,6 +183,10 @@ defmodule EventStore.Subscriptions.Subscription do # no-op for all other subscription states defp handle_subscription_state(_state), do: :ok + defp subscribe_to_events(%Subscription{stream_uuid: stream_uuid}) do + {:ok, _} = Registry.register(EventStore.Subscriptions.PubSub, stream_uuid, []) + end + # get the delay between subscription attempts, in milliseconds, from app # config. Default value is one minute. defp subscription_retry_interval do diff --git a/test/subscriptions/all_streams_subscription_test.exs b/test/subscriptions/all_streams_subscription_test.exs index 6bafd211..09736e83 100644 --- a/test/subscriptions/all_streams_subscription_test.exs +++ b/test/subscriptions/all_streams_subscription_test.exs @@ -30,7 +30,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do test "create subscription to all streams", context do subscription = create_subscription(context) - assert subscription.state == :request_catch_up + assert subscription.state == :subscribe_to_events assert subscription.data.subscription_name == @subscription_name assert subscription.data.subscriber == self() assert subscription.data.last_seen == 0 @@ -41,7 +41,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do test "create subscription to all streams from starting event id", context do subscription = create_subscription(context, start_from_event_number: 2) - assert subscription.state == :request_catch_up + assert subscription.state == :subscribe_to_events assert subscription.data.subscription_name == @subscription_name assert subscription.data.subscriber == self() assert subscription.data.last_seen == 2 @@ -53,6 +53,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do test "catch-up subscription, no persisted events", context do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -70,6 +71,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -97,6 +99,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(events) @@ -114,6 +117,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do test "should catch up when events received while catching up", context do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -146,6 +150,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert_receive_caught_up(3) @@ -164,6 +169,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do test "should replay events when catching up and events had not been acknowledged", context do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() # should receive already seen events @@ -184,6 +190,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do def create_caught_up_subscription(context) do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -210,6 +217,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(initial_events) @@ -255,6 +263,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do subscription = create_subscription(context, max_size: 3) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(initial_events) @@ -292,6 +301,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do subscription = create_subscription(context, max_size: 3) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(initial_events) @@ -331,11 +341,11 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do setup [:create_second_connection, :create_two_duplicate_subscriptions] test "should only allow one subscriber", %{subscription1: subscription1, subscription2: subscription2} do - assert subscription1.state == :request_catch_up + assert subscription1.state == :subscribe_to_events assert subscription2.state == :initial end - test "should allow second subscriber to takeover when first connection terminates", %{subscription_conn: conn, subscription_conn2: conn2, subscription1: subscription1, subscription2: subscription2} do + test "should allow second subscriber to takeover when first connection terminates", %{subscription_conn: conn, subscription_conn2: conn2, subscription2: subscription2} do # attempt to resubscribe should fail subscription2 = StreamSubscription.subscribe(subscription2, conn2, @all_stream, @subscription_name, self(), []) assert subscription2.state == :initial @@ -345,7 +355,7 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do # attempt to resubscribe should now succeed subscription2 = StreamSubscription.subscribe(subscription2, conn2, @all_stream, @subscription_name, self(), []) - assert subscription2.state == :request_catch_up + assert subscription2.state == :subscribe_to_events end defp create_second_connection(%{postgrex_config: config}) do diff --git a/test/subscriptions/single_stream_subscription_test.exs b/test/subscriptions/single_stream_subscription_test.exs index 21360b78..2be9eb8f 100644 --- a/test/subscriptions/single_stream_subscription_test.exs +++ b/test/subscriptions/single_stream_subscription_test.exs @@ -33,7 +33,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do subscription = create_subscription(context) - assert subscription.state == :request_catch_up + assert subscription.state == :subscribe_to_events assert subscription.data.subscription_name == @subscription_name assert subscription.data.subscriber == self() assert subscription.data.last_seen == 0 @@ -45,7 +45,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do subscription = create_subscription(context, start_from_stream_version: 2) - assert subscription.state == :request_catch_up + assert subscription.state == :subscribe_to_events assert subscription.data.subscription_name == @subscription_name assert subscription.data.subscriber == self() assert subscription.data.last_seen == 2 @@ -70,6 +70,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -85,6 +86,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do test "unseen persisted events", %{recorded_events: recorded_events} = context do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -107,6 +109,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do test "confirm subscription caught up to persisted events", context do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -134,6 +137,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(events) @@ -159,6 +163,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -177,6 +182,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do test "should replay events when not acknowledged", context do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -222,6 +228,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do defp subscribe_to_stream(context) do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() assert subscription.state == :catching_up @@ -247,6 +254,7 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do subscription = create_subscription(context) + |> StreamSubscription.subscribed() |> StreamSubscription.catch_up() |> StreamSubscription.caught_up(0) |> StreamSubscription.notify_events(initial_events) diff --git a/test/subscriptions/subscribe_to_stream_test.exs b/test/subscriptions/subscribe_to_stream_test.exs index b0917453..b3df0685 100644 --- a/test/subscriptions/subscribe_to_stream_test.exs +++ b/test/subscriptions/subscribe_to_stream_test.exs @@ -3,8 +3,9 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do alias EventStore.{EventFactory,ProcessHelper,Wait} alias EventStore.{Streams,Subscriptions,Subscriber} - alias EventStore.Subscriptions.Subscription alias EventStore.Streams.Stream + alias EventStore.Subscriptions.Subscription + alias EventStore.Support.CollectingSubscriber setup do subscription_name = UUID.uuid4() @@ -483,49 +484,73 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do end end - defmodule CollectingSubscriber do - use GenServer + describe "duplicate subscriptions" do + setup [:create_two_duplicate_subscriptions] - def start_link(subscription_name) do - GenServer.start_link(__MODULE__, subscription_name) - end + test "should only allow single active subscription", %{subscription1: subscription1, subscription2: subscription2} do + stream1_uuid = append_events_to_stream(3) - def received_events(subscriber) do - GenServer.call(subscriber, {:received_events}) - end + # subscriber1 should receive events + assert_receive {:events, received_events} + assert Subscription.subscribed?(subscription1) - def subscribed?(subscriber) do - GenServer.call(subscriber, {:subscribed?}) - end + Subscription.ack(subscription1, received_events) - def unsubscribe(subscriber) do - GenServer.call(subscriber, {:unsubscribe}) - end + assert length(received_events) == 3 + Enum.each(received_events, fn event -> + assert event.stream_uuid == stream1_uuid + end) - def init(subscription_name) do - {:ok, subscription} = Subscriptions.subscribe_to_all_streams(subscription_name, self()) + # subscriber2 should not receive any events + refute Subscription.subscribed?(subscription2) + refute_receive {:events, _received_events} - {:ok, %{events: [], subscription: subscription, subscription_name: subscription_name}} - end + # shutdown subscriber1 process + ProcessHelper.shutdown(subscription1) - def handle_call({:received_events}, _from, %{events: events} = state) do - {:reply, events, state} - end + wait_until_subscribed(subscription2) - def handle_call({:subscribed?}, _from, %{subscription: subscription} = state) do - reply = Subscription.subscribed?(subscription) - {:reply, reply, state} + stream2_uuid = append_events_to_stream(3) + + # subscriber2 should now start receiving events + assert_receive {:events, received_events}, 5_000 + Subscription.ack(subscription2, received_events) + + assert length(received_events) == 3 + Enum.each(received_events, fn event -> + assert event.stream_uuid == stream2_uuid + end) + + refute_receive {:events, _received_events} end - def handle_call({:unsubscribe}, _from, %{subscription_name: subscription_name} = state) do - Subscriptions.unsubscribe_from_all_streams(subscription_name) - {:reply, :ok, state} + defp create_two_duplicate_subscriptions(%{subscription_name: subscription_name}) do + postgrex_config = + EventStore.configuration() + |> EventStore.Config.parse() + |> Keyword.drop([:pool, :pool_size, :pool_overflow, :serializer]) + + {:ok, subscription1} = EventStore.Subscriptions.Subscription.start_link(postgrex_config, "$all", subscription_name, self(), start_from_event_number: 0) + + wait_until_subscribed(subscription1) + + {:ok, subscription2} = EventStore.Subscriptions.Subscription.start_link(postgrex_config, "$all", subscription_name, self(), start_from_event_number: 0) + + [ + subscription1: subscription1, + subscription2: subscription2 + ] end - def handle_info({:events, received_events}, %{events: events, subscription: subscription} = state) do - Subscription.ack(subscription, received_events) + defp append_events_to_stream(count) do + stream_uuid = UUID.uuid4() + stream_events = EventFactory.create_events(count) + + with {:ok, _stream} <- Streams.Supervisor.open_stream(stream_uuid) do + :ok = Stream.append_to_stream(stream_uuid, 0, stream_events) + end - {:noreply, %{state | events: events ++ received_events}} + stream_uuid end end diff --git a/test/support/collecting_subscriber.ex b/test/support/collecting_subscriber.ex new file mode 100644 index 00000000..eb18d5d7 --- /dev/null +++ b/test/support/collecting_subscriber.ex @@ -0,0 +1,48 @@ +defmodule EventStore.Support.CollectingSubscriber do + use GenServer + + alias EventStore.Subscriptions + alias EventStore.Subscriptions.Subscription + + def start_link(subscription_name) do + GenServer.start_link(__MODULE__, subscription_name) + end + + def received_events(subscriber) do + GenServer.call(subscriber, {:received_events}) + end + + def subscribed?(subscriber) do + GenServer.call(subscriber, {:subscribed?}) + end + + def unsubscribe(subscriber) do + GenServer.call(subscriber, {:unsubscribe}) + end + + def init(subscription_name) do + {:ok, subscription} = Subscriptions.subscribe_to_all_streams(subscription_name, self()) + + {:ok, %{events: [], subscription: subscription, subscription_name: subscription_name}} + end + + def handle_call({:received_events}, _from, %{events: events} = state) do + {:reply, events, state} + end + + def handle_call({:subscribed?}, _from, %{subscription: subscription} = state) do + reply = Subscription.subscribed?(subscription) + {:reply, reply, state} + end + + def handle_call({:unsubscribe}, _from, %{subscription_name: subscription_name} = state) do + Subscriptions.unsubscribe_from_all_streams(subscription_name) + {:reply, :ok, state} + end + + def handle_info({:events, received_events}, %{events: events, subscription: subscription} = state) do + Subscription.ack(subscription, received_events) + + {:noreply, %{state | events: events ++ received_events}} + end +end diff --git a/test/support/wait.ex b/test/support/wait.ex index 82ad3742..1e4c6f3e 100644 --- a/test/support/wait.ex +++ b/test/support/wait.ex @@ -1,5 +1,5 @@ defmodule EventStore.Wait do - def until(fun), do: until(500, fun) + def until(fun), do: until(1_000, fun) def until(0, fun), do: fun.() From 9a0c4ad4bcdb289b8af8c1e760bdec74d7518cf1 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 5 Jan 2018 19:21:40 +0000 Subject: [PATCH 4/6] Configure subscription rety interval for all test environments --- config/distributed.exs | 3 ++- config/jsonb.exs | 3 ++- config/local.exs | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/config/distributed.exs b/config/distributed.exs index ff649d70..f099ffdc 100644 --- a/config/distributed.exs +++ b/config/distributed.exs @@ -19,7 +19,8 @@ config :eventstore, EventStore.Storage, config :eventstore, registry: :distributed, - restart_stream_timeout: 1_000 + restart_stream_timeout: 1_000, + subscription_retry_interval: 1_000 config :swarm, nodes: [:"node1@127.0.0.1", :"node2@127.0.0.1", :"node3@127.0.0.1"], diff --git a/config/jsonb.exs b/config/jsonb.exs index 12621415..441751fe 100644 --- a/config/jsonb.exs +++ b/config/jsonb.exs @@ -18,5 +18,6 @@ config :eventstore, EventStore.Storage, pool_overflow: 0 config :eventstore, + column_data_type: "jsonb", registry: :local, - column_data_type: "jsonb" + subscription_retry_interval: 1_000 diff --git a/config/local.exs b/config/local.exs index ead4f337..c8adf5f4 100644 --- a/config/local.exs +++ b/config/local.exs @@ -17,4 +17,5 @@ config :eventstore, EventStore.Storage, pool_overflow: 0 config :eventstore, - registry: :local + registry: :local, + subscription_retry_interval: 1_000 From 4b8584d4e1ff621d72e0ac712e79504a73ddc400 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 5 Jan 2018 20:03:28 +0000 Subject: [PATCH 5/6] Include #98 in CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ed8c9fb..14917cc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Replace `:info` level logging with `:debug` ([#90](https://github.com/commanded/eventstore/issues/90)). - Dealing better with Poison dependancy ([#91](https://github.com/commanded/eventstore/issues/91)). - Publish events directly to subscriptions ([#93](https://github.com/commanded/eventstore/pull/93)). +- Use PostgreSQL advisory locks to enforce only one subscription instance ([#98](https://github.com/commanded/eventstore/pull/98)). ## v0.13.2 From e46a9904f568306c34933884ee3bb5946975d172 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 5 Jan 2018 20:12:18 +0000 Subject: [PATCH 6/6] Ensure subscription retry interval is no shorter than 1s --- lib/event_store/subscriptions/subscription.ex | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/event_store/subscriptions/subscription.ex b/lib/event_store/subscriptions/subscription.ex index 30808698..4771fee7 100644 --- a/lib/event_store/subscriptions/subscription.ex +++ b/lib/event_store/subscriptions/subscription.ex @@ -187,11 +187,12 @@ defmodule EventStore.Subscriptions.Subscription do {:ok, _} = Registry.register(EventStore.Subscriptions.PubSub, stream_uuid, []) end - # get the delay between subscription attempts, in milliseconds, from app - # config. Default value is one minute. + # Get the delay between subscription attempts, in milliseconds, from app + # config. The default value is one minute and minimum allowed value is one + # second. defp subscription_retry_interval do case Application.get_env(:eventstore, :subscription_retry_interval) do - interval when is_integer(interval) -> interval + interval when is_integer(interval) and interval > 0 -> min(interval, 1_000) _ -> 60_000 end end