From c074b4563a9d4d20ddcc036a4f6bf3c38969116c Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 6 Nov 2020 14:50:52 +0000 Subject: [PATCH 1/2] Share database connection for notifications Rework notifications supervisor to use shared Postgres database connection for notifications. Handle database connection errors when reading events from notifications. --- .tool-versions | 2 +- guides/Cluster.md | 6 +- guides/Subscriptions.md | 2 +- lib/event_store.ex | 3 +- lib/event_store/config.ex | 10 ++ lib/event_store/monitored_server.ex | 8 +- lib/event_store/notifications/broadcaster.ex | 49 ---------- lib/event_store/notifications/listener.ex | 44 +-------- lib/event_store/notifications/notification.ex | 35 +++++++ lib/event_store/notifications/publisher.ex | 97 +++++++++++++++++++ lib/event_store/notifications/reader.ex | 70 ------------- lib/event_store/notifications/supervisor.ex | 70 ++++++------- lib/event_store/storage/reader.ex | 11 ++- lib/event_store/streams/stream.ex | 5 +- .../notifications_reconnect_test.exs | 77 +++++++++++++++ .../notifications_supervisor_test.exs | 25 ++--- test/shared_connection_pool_test.exs | 57 +++++++++++ .../subscribe_to_stream_test.exs | 10 +- 18 files changed, 353 insertions(+), 228 deletions(-) delete mode 100644 lib/event_store/notifications/broadcaster.ex create mode 100644 lib/event_store/notifications/notification.ex create mode 100644 lib/event_store/notifications/publisher.ex delete mode 100644 lib/event_store/notifications/reader.ex create mode 100644 test/notifications/notifications_reconnect_test.exs diff --git a/.tool-versions b/.tool-versions index bee1b6fd..52a9f305 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1 @@ -elixir 1.10.4-otp-22 +elixir 1.11.1-otp-22 diff --git a/guides/Cluster.md b/guides/Cluster.md index ceb89834..804f789d 100644 --- a/guides/Cluster.md +++ b/guides/Cluster.md @@ -4,11 +4,7 @@ EventStore supports running on multiple nodes as either a [distributed Erlang](h ## Event publication -PostgreSQL's `LISTEN` / `NOTIFY` is used to pub/sub event notifications. - -A single listener process will connect to the database to listen for events when using a distributed cluster. Events will be broadcast from the single listener process to a `GenServer` process running on each connected node that forwards events to its local subscribers. This limits the number of database connections to at most the number of running clusters. - -Running EventStore on multiple nodes that are not connected together to form a cluster will result in one listener process and database connection per node. +PostgreSQL's `LISTEN` / `NOTIFY` is used to pub/sub event notifications. A listener database connection process is started on each node. It connects to the database to listen for events and publishes them to interested subscription processes running on the node. The approach is the same regardless of whether distributed Erlang is used or not. ## Subscriptions diff --git a/guides/Subscriptions.md b/guides/Subscriptions.md index 082bd3c0..00d7e02d 100644 --- a/guides/Subscriptions.md +++ b/guides/Subscriptions.md @@ -9,7 +9,7 @@ There are two types of subscriptions provided by EventStore: PostgreSQL's `LISTEN` and `NOTIFY` commands are used to pub/sub event notifications from the database. An after update trigger on the `streams` table is used to execute `NOTIFY` for each batch of inserted events. The notification payload contains the stream uuid, stream id, and first / last stream versions (e.g. `stream-12345,1,1,5`). -A single listener process will connect to the database to listen for these notifications. It fetches the event data and broadcasts to all interested subscriptions. This approach supports running the EventStore on multiple nodes, regardless of whether they are connected together to form a cluster. A single listener will be used when nodes form a cluster, otherwise one connection per node is used. +A single process will connect to the database to listen for these notifications. It fetches the event data and broadcasts to all interested subscriptions. This approach supports running an EventStore on multiple nodes, regardless of whether they are connected together to form a cluster using distributed Erlang. One connection per node is used for single node and multi-node deployments. ## Transient subscriptions diff --git a/lib/event_store.ex b/lib/event_store.ex index 8d182722..dac7c3ba 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -221,8 +221,7 @@ defmodule EventStore do def config(opts \\ []) do opts = Keyword.merge(unquote(opts), opts) - with {:ok, config} <- - EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, opts) do + with {:ok, config} <- EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, opts) do config end end diff --git a/lib/event_store/config.ex b/lib/event_store/config.ex index 76c241cd..9e3f9e1b 100644 --- a/lib/event_store/config.ex +++ b/lib/event_store/config.ex @@ -98,6 +98,16 @@ defmodule EventStore.Config do |> Keyword.put(:name, name) end + def postgrex_notifications_opts(config, name) do + config + |> default_postgrex_opts() + |> Keyword.put(:auto_reconnect, true) + |> Keyword.put(:backoff_type, :rand_exp) + |> Keyword.put(:sync_connect, false) + |> Keyword.put(:pool_size, 1) + |> Keyword.put(:name, name) + end + def sync_connect_postgrex_opts(config) do config |> default_postgrex_opts() diff --git a/lib/event_store/monitored_server.ex b/lib/event_store/monitored_server.ex index f36b788b..5fd01639 100644 --- a/lib/event_store/monitored_server.ex +++ b/lib/event_store/monitored_server.ex @@ -125,9 +125,7 @@ defmodule EventStore.MonitoredServer do {:noreply, start_process(state)} end - @doc """ - Handle process exit by attempting to restart, after a configurable delay. - """ + # Handle process exit by attempting to restart, after a configurable delay. def handle_info({:EXIT, pid, reason}, %State{pid: pid} = state) do {:noreply, on_process_exit(pid, reason, state)} end @@ -140,9 +138,7 @@ defmodule EventStore.MonitoredServer do {:noreply, state} end - @doc """ - Handle process down by attempting to restart, after a configurable delay. - """ + # Handle process down by attempting to restart, after a configurable delay. def handle_info({:DOWN, _ref, :process, pid, reason}, %State{pid: pid} = state) do {:noreply, on_process_exit(pid, reason, state)} end diff --git a/lib/event_store/notifications/broadcaster.ex b/lib/event_store/notifications/broadcaster.ex deleted file mode 100644 index fe024af1..00000000 --- a/lib/event_store/notifications/broadcaster.ex +++ /dev/null @@ -1,49 +0,0 @@ -defmodule EventStore.Notifications.Broadcaster do - @moduledoc false - - # Broadcasts events to subscriptions. - - use GenStage - - alias EventStore.PubSub - - defmodule State do - defstruct [:event_store, :subscribe_to] - - def new(opts) do - %State{ - event_store: Keyword.fetch!(opts, :event_store), - subscribe_to: Keyword.fetch!(opts, :subscribe_to) - } - end - end - - def start_link(opts) do - {start_opts, broadcaster_opts} = - Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after]) - - state = State.new(broadcaster_opts) - - GenStage.start_link(__MODULE__, state, start_opts) - end - - def init(%State{} = state) do - %State{subscribe_to: subscribe_to} = state - - {:consumer, state, subscribe_to: [subscribe_to]} - end - - def handle_events(events, _from, %State{} = state) do - for {stream_uuid, batch} <- events do - :ok = broadcast(stream_uuid, batch, state) - end - - {:noreply, [], state} - end - - defp broadcast(stream_uuid, events, %State{} = state) do - %State{event_store: event_store} = state - - PubSub.broadcast(event_store, stream_uuid, {:events, events}) - end -end diff --git a/lib/event_store/notifications/listener.ex b/lib/event_store/notifications/listener.ex index 4a2b7e0c..c4355e94 100644 --- a/lib/event_store/notifications/listener.ex +++ b/lib/event_store/notifications/listener.ex @@ -11,8 +11,7 @@ defmodule EventStore.Notifications.Listener do require Logger - alias EventStore.MonitoredServer - alias EventStore.Notifications.Listener + alias EventStore.Notifications.{Listener, Notification} defstruct [:listen_to, :schema, :ref, demand: 0, queue: :queue.new()] @@ -29,27 +28,7 @@ defmodule EventStore.Notifications.Listener do end def init(%Listener{} = state) do - %Listener{listen_to: listen_to} = state - - :ok = MonitoredServer.monitor(listen_to) - - {:producer, state} - end - - def handle_info({:UP, listen_to, _pid}, %Listener{listen_to: listen_to} = state) do - {:noreply, [], listen_for_events(state)} - end - - def handle_info({:DOWN, listen_to, _pid, _reason}, %Listener{listen_to: listen_to} = state) do - {:noreply, [], %Listener{state | ref: nil}} - end - - # Ignore notifications when database connection down. - def handle_info( - {:notification, _connection_pid, _ref, _channel, _payload}, - %Listener{ref: nil} = state - ) do - {:noreply, [], state} + {:producer, listen_for_events(state)} end # Notification received from PostgreSQL's `NOTIFY` @@ -59,20 +38,7 @@ defmodule EventStore.Notifications.Listener do inspect(channel) <> " with payload: " <> inspect(payload) ) - # `NOTIFY` payload contains the stream uuid, stream id, and first / last - # stream versions (e.g. "stream-12345,1,1,5") - - [last, first, stream_id, stream_uuid] = - payload - |> String.reverse() - |> String.split(",", parts: 4) - |> Enum.map(&String.reverse/1) - - {stream_id, ""} = Integer.parse(stream_id) - {first_stream_version, ""} = Integer.parse(first) - {last_stream_version, ""} = Integer.parse(last) - - state = enqueue({stream_uuid, stream_id, first_stream_version, last_stream_version}, state) + state = payload |> Notification.new() |> enqueue(state) dispatch_events([], state) end @@ -112,9 +78,9 @@ defmodule EventStore.Notifications.Listener do end end - defp enqueue(event, %Listener{} = state) do + defp enqueue(%Notification{} = notification, %Listener{} = state) do %Listener{queue: queue} = state - %Listener{state | queue: :queue.in(event, queue)} + %Listener{state | queue: :queue.in(notification, queue)} end end diff --git a/lib/event_store/notifications/notification.ex b/lib/event_store/notifications/notification.ex new file mode 100644 index 00000000..f25b8730 --- /dev/null +++ b/lib/event_store/notifications/notification.ex @@ -0,0 +1,35 @@ +defmodule EventStore.Notifications.Notification do + @moduledoc false + + alias EventStore.Notifications.Notification + + defstruct [:stream_uuid, :stream_id, :from_stream_version, :to_stream_version] + + @doc """ + Build a new notification struct from the `NOTIFY` payload which contains the + stream uuid, stream id, first and last stream versions. + + ## Example + + Notification.new("stream-12345,1,1,5") + + """ + def new(payload) do + [last, first, stream_id, stream_uuid] = + payload + |> String.reverse() + |> String.split(",", parts: 4) + |> Enum.map(&String.reverse/1) + + {stream_id, ""} = Integer.parse(stream_id) + {from_stream_version, ""} = Integer.parse(first) + {to_stream_version, ""} = Integer.parse(last) + + %Notification{ + stream_uuid: stream_uuid, + stream_id: stream_id, + from_stream_version: from_stream_version, + to_stream_version: to_stream_version + } + end +end diff --git a/lib/event_store/notifications/publisher.ex b/lib/event_store/notifications/publisher.ex new file mode 100644 index 00000000..1523705a --- /dev/null +++ b/lib/event_store/notifications/publisher.ex @@ -0,0 +1,97 @@ +defmodule EventStore.Notifications.Publisher do + @moduledoc false + + # Reads events from storage by each event number range received and publishes + # them. + + use GenStage + + require Logger + + alias EventStore.{PubSub, RecordedEvent, Storage} + alias EventStore.Notifications.Notification + + defmodule State do + defstruct [:conn, :event_store, :schema, :serializer, :subscribe_to] + + def new(opts) do + %State{ + conn: Keyword.fetch!(opts, :conn), + event_store: Keyword.fetch!(opts, :event_store), + schema: Keyword.fetch!(opts, :schema), + serializer: Keyword.fetch!(opts, :serializer), + subscribe_to: Keyword.fetch!(opts, :subscribe_to) + } + end + end + + def start_link(opts) do + {start_opts, reader_opts} = + Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after]) + + state = State.new(reader_opts) + + GenStage.start_link(__MODULE__, state, start_opts) + end + + # Starts a permanent subscription to the listener producer stage which will + # automatically start requesting items. + def init(%State{} = state) do + %State{subscribe_to: subscribe_to} = state + + {:consumer, state, [subscribe_to: [{subscribe_to, max_demand: 1}]]} + end + + # Fetch events from storage and pass onwards to subscibers + def handle_events(events, _from, state) do + %State{event_store: event_store} = state + + events + |> Stream.map(&read_events(&1, state)) + |> Stream.reject(&is_nil/1) + |> Enum.each(fn {stream_uuid, batch} -> broadcast(event_store, stream_uuid, batch) end) + + {:noreply, [], state} + end + + defp read_events(%Notification{} = notification, %State{} = state) do + %Notification{ + stream_uuid: stream_uuid, + stream_id: stream_id, + from_stream_version: from_stream_version, + to_stream_version: to_stream_version + } = notification + + %State{conn: conn, schema: schema, serializer: serializer} = state + + count = to_stream_version - from_stream_version + 1 + + try do + case Storage.read_stream_forward(conn, stream_id, from_stream_version, count, schema: schema) do + {:ok, events} -> + deserialized_events = deserialize_recorded_events(events, serializer) + + {stream_uuid, deserialized_events} + + {:error, error} -> + Logger.error( + "EventStore notifications failed to read events due to: " <> inspect(error) + ) + + nil + end + catch + :exit, ex -> + Logger.error("EventStore notifications failed to read events due to: " <> inspect(ex)) + nil + end + end + + defp deserialize_recorded_events(recorded_events, serializer) do + Enum.map(recorded_events, &RecordedEvent.deserialize(&1, serializer)) + end + + defp broadcast(event_store, stream_uuid, events) do + PubSub.broadcast(event_store, stream_uuid, {:events, events}) + end +end diff --git a/lib/event_store/notifications/reader.ex b/lib/event_store/notifications/reader.ex deleted file mode 100644 index fbc006b5..00000000 --- a/lib/event_store/notifications/reader.ex +++ /dev/null @@ -1,70 +0,0 @@ -defmodule EventStore.Notifications.Reader do - @moduledoc false - - # Reads events from storage by each event number range received. - - use GenStage - - alias EventStore.RecordedEvent - alias EventStore.Storage - - defmodule State do - defstruct [:conn, :schema, :serializer, :subscribe_to] - - def new(opts) do - %State{ - conn: Keyword.fetch!(opts, :conn), - schema: Keyword.fetch!(opts, :schema), - serializer: Keyword.fetch!(opts, :serializer), - subscribe_to: Keyword.fetch!(opts, :subscribe_to) - } - end - end - - def start_link(opts) do - {start_opts, reader_opts} = - Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after]) - - state = State.new(reader_opts) - - GenStage.start_link(__MODULE__, state, start_opts) - end - - # Starts a permanent subscription to the listener producer stage which will - # automatically start requesting items. - def init(%State{} = state) do - %State{subscribe_to: subscribe_to} = state - - {:producer_consumer, state, - [ - dispatcher: GenStage.BroadcastDispatcher, - subscribe_to: [{subscribe_to, max_demand: 1}] - ]} - end - - # Fetch events from storage and pass onwards to subscibers - def handle_events(events, _from, state) do - stream_events = Enum.map(events, &read_events(&1, state)) - - {:noreply, stream_events, state} - end - - defp read_events( - {stream_uuid, stream_id, from_stream_version, to_stream_version}, - %State{} = state - ) do - %State{conn: conn, schema: schema, serializer: serializer} = state - - count = to_stream_version - from_stream_version + 1 - - with {:ok, events} <- - Storage.read_stream_forward(conn, stream_id, from_stream_version, count, schema: schema), - deserialized_events <- deserialize_recorded_events(events, serializer) do - {stream_uuid, deserialized_events} - end - end - - defp deserialize_recorded_events(recorded_events, serializer) do - Enum.map(recorded_events, &RecordedEvent.deserialize(&1, serializer)) - end -end diff --git a/lib/event_store/notifications/supervisor.ex b/lib/event_store/notifications/supervisor.ex index 687884bb..e276e392 100644 --- a/lib/event_store/notifications/supervisor.ex +++ b/lib/event_store/notifications/supervisor.ex @@ -2,73 +2,75 @@ defmodule EventStore.Notifications.Supervisor do @moduledoc false # Supervises the individual `GenStage` stages used to listen to, read, and - # broadcast all events appended to storage. + # publish all events appended to storage. use Supervisor - alias EventStore.Config - alias EventStore.MonitoredServer - alias EventStore.Notifications.{Listener, Reader, Broadcaster} - alias EventStore.Subscriptions + alias EventStore.{Config, MonitoredServer, Subscriptions} + alias EventStore.Notifications.{Listener, Publisher} def child_spec({name, _config} = init_arg) do %{id: Module.concat(name, __MODULE__), start: {__MODULE__, :start_link, [init_arg]}} end - def start_link(arg) do - Supervisor.start_link(__MODULE__, arg) + def start_link(init_arg) do + Supervisor.start_link(__MODULE__, init_arg) end @impl Supervisor def init({event_store, config}) do schema = Keyword.fetch!(config, :schema) serializer = Keyword.fetch!(config, :serializer) + conn = Keyword.fetch!(config, :conn) - postgrex_config = Config.sync_connect_postgrex_opts(config) - hibernate_after = Subscriptions.hibernate_after(event_store, config) + listener_name = Module.concat([event_store, Listener]) + publisher_name = Module.concat([event_store, Publisher]) + postgrex_notifications_conn = postgrex_notifications_conn(event_store, config) + + postgrex_notifications_config = + Config.postgrex_notifications_opts(config, postgrex_notifications_conn) - listener_name = listener_name(event_store) - reader_name = reader_name(event_store) - broadcaster_name = broadcaster_name(event_store) - postgrex_listener_name = Module.concat([listener_name, Postgrex]) - postgrex_reader_name = Module.concat([reader_name, Postgrex]) + hibernate_after = Subscriptions.hibernate_after(event_store, config) Supervisor.init( [ Supervisor.child_spec( {MonitoredServer, - mfa: {Postgrex.Notifications, :start_link, [postgrex_config]}, - name: postgrex_listener_name}, - id: Module.concat([postgrex_listener_name, MonitoredServer]) - ), - Supervisor.child_spec( - {MonitoredServer, - mfa: {Postgrex, :start_link, [postgrex_config]}, name: postgrex_reader_name}, - id: Module.concat([postgrex_reader_name, MonitoredServer]) + mfa: {Postgrex.Notifications, :start_link, [postgrex_notifications_config]}, + name: Module.concat([event_store, Postgrex, Notifications, MonitoredServer]), + backoff_min: 0}, + id: Module.concat([postgrex_notifications_conn, MonitoredServer]) ), {Listener, - listen_to: postgrex_listener_name, + listen_to: postgrex_notifications_conn, schema: schema, name: listener_name, hibernate_after: hibernate_after}, - {Reader, - conn: postgrex_reader_name, + {Publisher, + conn: conn, + event_store: event_store, schema: schema, serializer: serializer, subscribe_to: listener_name, - name: reader_name, - hibernate_after: hibernate_after}, - {Broadcaster, - event_store: event_store, - subscribe_to: reader_name, - name: broadcaster_name, + name: publisher_name, hibernate_after: hibernate_after} ], strategy: :one_for_all ) end - def broadcaster_name(event_store), do: Module.concat([event_store, Broadcaster]) - def listener_name(event_store), do: Module.concat([event_store, Listener]) - def reader_name(event_store), do: Module.concat([event_store, Reader]) + defp postgrex_notifications_conn(name, config) do + case Keyword.get(config, :shared_connection_pool) do + nil -> + Module.concat([name, Postgrex, Notifications]) + + shared_connection_pool when is_atom(shared_connection_pool) -> + Module.concat([shared_connection_pool, Postgrex, Notifications]) + + invalid -> + raise ArgumentError, + "Invalid `:shared_connection_pool` specified, expected an atom but got: " <> + inspect(invalid) + end + end end diff --git a/lib/event_store/storage/reader.ex b/lib/event_store/storage/reader.ex index 6939051a..cc560df2 100644 --- a/lib/event_store/storage/reader.ex +++ b/lib/event_store/storage/reader.ex @@ -110,10 +110,15 @@ defmodule EventStore.Storage.Reader do {:ok, %Postgrex.Result{rows: rows}} -> {:ok, rows} - {:error, %Postgrex.Error{postgres: %{message: reason}}} -> - Logger.warn("Failed to read events from stream due to: " <> inspect(reason)) + {:error, %Postgrex.Error{postgres: %{message: message}}} -> + Logger.warn("Failed to read events from stream due to: " <> inspect(message)) - {:error, reason} + {:error, message} + + {:error, %DBConnection.ConnectionError{message: message}} -> + Logger.warn("Failed to read events from stream due to: " <> inspect(message)) + + {:error, message} end end end diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index e4652d5a..d1a0f39b 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -55,9 +55,8 @@ defmodule EventStore.Streams.Stream do def start_from(conn, stream_uuid, :current, opts), do: stream_version(conn, stream_uuid, opts) - def start_from(_conn, _stream_uuid, start_from, _opts) - when is_integer(start_from), - do: {:ok, start_from} + def start_from(_conn, _stream_uuid, start_from, _opts) when is_integer(start_from), + do: {:ok, start_from} def start_from(_conn, _stream_uuid, _start_from, _opts), do: {:error, :invalid_start_from} diff --git a/test/notifications/notifications_reconnect_test.exs b/test/notifications/notifications_reconnect_test.exs new file mode 100644 index 00000000..af5eb899 --- /dev/null +++ b/test/notifications/notifications_reconnect_test.exs @@ -0,0 +1,77 @@ +defmodule EventStore.Notifications.NotificationsReconnectTest do + use EventStore.StorageCase + + alias EventStore.{EventFactory, PubSub, ProcessHelper, Wait} + + describe "notifications reconnect" do + test "resume after disconnect" do + stream_uuid = "example-stream" + + :ok = PubSub.subscribe(TestEventStore, stream_uuid) + + shutdown_postgrex_notifications_connection(TestEventStore.Postgrex.Notifications) + + assert {:eventually, ref} = + Postgrex.Notifications.listen(TestEventStore.Postgrex.Notifications, "channel") + + assert is_reference(ref) + + # Wait for notifications to reconnect + Wait.until(fn -> + assert {:ok, _ref} = + Postgrex.Notifications.listen(TestEventStore.Postgrex.Notifications, "channel") + end) + + :ok = append_events(stream_uuid, 3) + + assert_receive {:events, events} + assert length(events) == 3 + + refute_receive {:events, _events} + end + + test "publisher handle Postgrex connection down when reading events", %{conn: conn} do + stream_uuid = "example-stream" + + :ok = PubSub.subscribe(TestEventStore, stream_uuid) + + :ok = append_events(stream_uuid, 3) + + assert_receive {:events, events} + assert length(events) == 3 + + shutdown_postgrex_connection(TestEventStore.Postgrex) + + Wait.until(fn -> + Postgrex.query!(conn, "select pg_notify($1, $2);", [ + "public.events", + "example-stream,1,1,3" + ]) + + assert_receive {:events, events} + assert length(events) == 3 + end) + end + end + + defp shutdown_postgrex_connection(name) do + pid = Process.whereis(name) + assert is_pid(pid) + + ProcessHelper.shutdown(pid) + end + + defp shutdown_postgrex_notifications_connection(name) do + pid = Process.whereis(name) + assert is_pid(pid) + + {:gen_tcp, sock} = :sys.get_state(pid).mod_state.protocol.sock + :gen_tcp.shutdown(sock, :read_write) + end + + defp append_events(stream_uuid, count, expected_version \\ 0) do + events = EventFactory.create_events(count, expected_version) + + TestEventStore.append_to_stream(stream_uuid, expected_version, events) + end +end diff --git a/test/notifications/notifications_supervisor_test.exs b/test/notifications/notifications_supervisor_test.exs index 8ee40c2a..8877d3eb 100644 --- a/test/notifications/notifications_supervisor_test.exs +++ b/test/notifications/notifications_supervisor_test.exs @@ -2,28 +2,32 @@ defmodule EventStore.Notifications.NotificationsSupervisorTest do use EventStore.StorageCase alias EventStore.{EventFactory, Notifications, PubSub, Wait} - alias TestEventStore, as: EventStore describe "notifications supervisor" do setup do - config = TestEventStore.config() |> Keyword.put(:subscription_hibernate_after, 0) + config = + TestEventStore.config() + |> Keyword.put(:subscription_hibernate_after, 0) + + conn = start_supervised!({Postgrex, config}) + + start_supervised!({Notifications.Supervisor, {ES, Keyword.put(config, :conn, conn)}}) - start_supervised!({Notifications.Supervisor, {ES, config}}) for child_spec <- PubSub.child_spec(ES), do: start_supervised!(child_spec) :ok end test "hibernate processes after inactivity" do - listener_pid = Notifications.Supervisor.listener_name(ES) |> Process.whereis() - reader_name_pid = Notifications.Supervisor.reader_name(ES) |> Process.whereis() - broadcaster_pid = Notifications.Supervisor.broadcaster_name(ES) |> Process.whereis() + listener_pid = Module.concat([ES, EventStore.Notifications.Listener]) |> Process.whereis() + + publisher_name_pid = + Module.concat([ES, EventStore.Notifications.Publisher]) |> Process.whereis() # Listener processes should be hibernated after inactivity Wait.until(fn -> assert_hibernated(listener_pid) - assert_hibernated(reader_name_pid) - assert_hibernated(broadcaster_pid) + assert_hibernated(publisher_name_pid) end) stream_uuid = "example-stream" @@ -38,8 +42,7 @@ defmodule EventStore.Notifications.NotificationsSupervisorTest do # Listener processes should be hibernated again after inactivity Wait.until(fn -> assert_hibernated(listener_pid) - assert_hibernated(reader_name_pid) - assert_hibernated(broadcaster_pid) + assert_hibernated(publisher_name_pid) end) end end @@ -51,6 +54,6 @@ defmodule EventStore.Notifications.NotificationsSupervisorTest do defp append_events(stream_uuid, count, expected_version \\ 0) do events = EventFactory.create_events(count, expected_version) - EventStore.append_to_stream(stream_uuid, expected_version, events) + TestEventStore.append_to_stream(stream_uuid, expected_version, events) end end diff --git a/test/shared_connection_pool_test.exs b/test/shared_connection_pool_test.exs index eca44d83..3d25be2e 100644 --- a/test/shared_connection_pool_test.exs +++ b/test/shared_connection_pool_test.exs @@ -90,6 +90,40 @@ defmodule EventStore.SharedConnectionPoolTest do assert_receive {:DOWN, ^ref, :process, _object, _reason} end + @tag :manual + test "ensure database connections are shared between instances" do + # Starting another event store instance using shared pool should only + # increase connection count by one (used for advisory locks). + assert_connection_count_diff(1, fn -> + start_supervised!( + {TestEventStore, shared_connection_pool: :shared_pool, name: :eventstore4} + ) + end) + + assert_connection_count_diff(1, fn -> + start_supervised!( + {TestEventStore, shared_connection_pool: :shared_pool, name: :eventstore5} + ) + end) + + assert_connection_count_diff(3, fn -> + start_supervised!( + {TestEventStore, + shared_connection_pool: :another_pool, name: :eventstore6, pool_size: 1} + ) + end) + + # Start another event store instance with its own connection pool of size 10 + assert_connection_count_diff(12, fn -> + start_supervised!({TestEventStore, name: :eventstore7, pool_size: 10}) + end) + + assert_connection_count_diff(-12, fn -> stop_supervised!(:eventstore7) end) + assert_connection_count_diff(-3, fn -> stop_supervised!(:eventstore6) end) + assert_connection_count_diff(-1, fn -> stop_supervised!(:eventstore5) end) + assert_connection_count_diff(-1, fn -> stop_supervised!(:eventstore4) end) + end + test "append and read events" do stream_uuid = UUID.uuid4() @@ -161,4 +195,27 @@ defmodule EventStore.SharedConnectionPoolTest do assert {:error, :stream_not_found} == TestEventStore.stream_forward(stream_uuid, 0, name: event_store_name) end + + defp assert_connection_count_diff(expected_diff, fun) when is_function(fun, 0) do + conn = Process.whereis(Module.concat([:shared_pool, Postgrex])) + + count_before = count_connections(conn) + + fun.() + + Wait.until(fn -> + assert count_connections(conn) == count_before + expected_diff + end) + end + + defp count_connections(conn) do + %Postgrex.Result{rows: [[count]]} = + Postgrex.query!( + conn, + "SELECT count(*) FROM pg_stat_activity WHERE pid <> pg_backend_pid();", + [] + ) + + count + end end diff --git a/test/subscriptions/subscribe_to_stream_test.exs b/test/subscriptions/subscribe_to_stream_test.exs index 319b8420..92186bd1 100644 --- a/test/subscriptions/subscribe_to_stream_test.exs +++ b/test/subscriptions/subscribe_to_stream_test.exs @@ -343,7 +343,7 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do stream1_events = EventFactory.create_events(1) stream2_events = EventFactory.create_events(1) - {:ok, subscription} = subscribe_to_all_streams(subscription_name, self()) + {:ok, subscription} = subscribe_to_all_streams(subscription_name, self(), buffer_size: 1) :ok = EventStore.append_to_stream(stream1_uuid, 0, stream1_events) :ok = EventStore.append_to_stream(stream2_uuid, 0, stream2_events) @@ -392,12 +392,14 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do :ok = EventStore.append_to_stream(stream1_uuid, 0, stream1_initial_events) :ok = EventStore.append_to_stream(stream2_uuid, 0, stream2_initial_events) - {:ok, subscription} = subscribe_to_all_streams(subscription_name, self(), start_from: 2) + {:ok, subscription} = + subscribe_to_all_streams(subscription_name, self(), buffer_size: 1, start_from: 2) :ok = EventStore.append_to_stream(stream1_uuid, 1, stream1_new_events) :ok = EventStore.append_to_stream(stream2_uuid, 1, stream2_new_events) assert_receive {:events, stream1_received_events} + :ok = Subscription.ack(subscription, stream1_received_events) assert_receive {:events, stream2_received_events} @@ -462,7 +464,7 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do stream1_events = EventFactory.create_events(1) stream2_events = EventFactory.create_events(1) - {:ok, subscription} = subscribe_to_all_streams(subscription_name, self()) + {:ok, subscription} = subscribe_to_all_streams(subscription_name, self(), buffer_size: 1) refute_receive {:events, _events} @@ -819,7 +821,7 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do end # subscribe to all streams and wait for the subscription to be subscribed - defp subscribe_to_all_streams(subscription_name, subscriber, opts \\ []) do + defp subscribe_to_all_streams(subscription_name, subscriber, opts) do subscribe_to_stream("$all", subscription_name, subscriber, opts) end From abf774c1c7b386331991843d9f17cd2942d86567 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Fri, 13 Nov 2020 17:17:00 +0000 Subject: [PATCH 2/2] Include #225 in CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e28b4ace..1a6fc78f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Hibernate subscription process after inactivity ([#214](https://github.com/commanded/eventstore/pull/214)). - Runtime event store configuration ([#217](https://github.com/commanded/eventstore/pull/217)). - Shared database connection pools ([#216](https://github.com/commanded/eventstore/pull/216)). +- Shared database connection for notifications ([#225](https://github.com/commanded/eventstore/pull/225)). - Transient subscriptions ([#215](https://github.com/commanded/eventstore/pull/215)) ### Bug fixes