Skip to content

Commit

Permalink
Handle database connection errors when reading events from notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Nov 12, 2020
1 parent fbdd780 commit 10c2bba
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 88 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
elixir 1.10.4-otp-22
elixir 1.11.1-otp-22
3 changes: 1 addition & 2 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ defmodule EventStore do
def config(opts \\ []) do
opts = Keyword.merge(unquote(opts), opts)

with {:ok, config} <-
EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, opts) do
with {:ok, config} <- EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, opts) do
config
end
end
Expand Down
8 changes: 2 additions & 6 deletions lib/event_store/monitored_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ defmodule EventStore.MonitoredServer do
{:noreply, start_process(state)}
end

@doc """
Handle process exit by attempting to restart, after a configurable delay.
"""
# Handle process exit by attempting to restart, after a configurable delay.
def handle_info({:EXIT, pid, reason}, %State{pid: pid} = state) do
{:noreply, on_process_exit(pid, reason, state)}
end
Expand All @@ -140,9 +138,7 @@ defmodule EventStore.MonitoredServer do
{:noreply, state}
end

@doc """
Handle process down by attempting to restart, after a configurable delay.
"""
# Handle process down by attempting to restart, after a configurable delay.
def handle_info({:DOWN, _ref, :process, pid, reason}, %State{pid: pid} = state) do
{:noreply, on_process_exit(pid, reason, state)}
end
Expand Down
27 changes: 4 additions & 23 deletions lib/event_store/notifications/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule EventStore.Notifications.Listener do

require Logger

alias EventStore.Notifications.Listener
alias EventStore.Notifications.{Listener, Notification}

defstruct [:listen_to, :schema, :ref, demand: 0, queue: :queue.new()]

Expand All @@ -38,10 +38,7 @@ defmodule EventStore.Notifications.Listener do
inspect(channel) <> " with payload: " <> inspect(payload)
)

state =
payload
|> parse_notify_payload()
|> enqueue(state)
state = payload |> Notification.new() |> enqueue(state)

dispatch_events([], state)
end
Expand Down Expand Up @@ -81,25 +78,9 @@ defmodule EventStore.Notifications.Listener do
end
end

# `NOTIFY` payload contains the stream uuid, stream id, and first / last
# stream versions (e.g. "stream-12345,1,1,5")
defp parse_notify_payload(payload) do
[last, first, stream_id, stream_uuid] =
payload
|> String.reverse()
|> String.split(",", parts: 4)
|> Enum.map(&String.reverse/1)

{stream_id, ""} = Integer.parse(stream_id)
{first_stream_version, ""} = Integer.parse(first)
{last_stream_version, ""} = Integer.parse(last)

{stream_uuid, stream_id, first_stream_version, last_stream_version}
end

defp enqueue(event, %Listener{} = state) do
defp enqueue(%Notification{} = notification, %Listener{} = state) do
%Listener{queue: queue} = state

%Listener{state | queue: :queue.in(event, queue)}
%Listener{state | queue: :queue.in(notification, queue)}
end
end
35 changes: 35 additions & 0 deletions lib/event_store/notifications/notification.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule EventStore.Notifications.Notification do
@moduledoc false

alias EventStore.Notifications.Notification

defstruct [:stream_uuid, :stream_id, :from_stream_version, :to_stream_version]

@doc """
Build a new notification struct from the `NOTIFY` payload which contains the
stream uuid, stream id, first and last stream versions.
## Example
Notification.new("stream-12345,1,1,5")
"""
def new(payload) do
[last, first, stream_id, stream_uuid] =
payload
|> String.reverse()
|> String.split(",", parts: 4)
|> Enum.map(&String.reverse/1)

{stream_id, ""} = Integer.parse(stream_id)
{from_stream_version, ""} = Integer.parse(first)
{to_stream_version, ""} = Integer.parse(last)

%Notification{
stream_uuid: stream_uuid,
stream_id: stream_id,
from_stream_version: from_stream_version,
to_stream_version: to_stream_version
}
end
end
47 changes: 29 additions & 18 deletions lib/event_store/notifications/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ defmodule EventStore.Notifications.Publisher do

require Logger

alias EventStore.PubSub
alias EventStore.RecordedEvent
alias EventStore.Storage
alias EventStore.{PubSub, RecordedEvent, Storage}
alias EventStore.Notifications.Notification

defmodule State do
defstruct [:conn, :event_store, :schema, :serializer, :subscribe_to]
Expand Down Expand Up @@ -48,31 +47,43 @@ defmodule EventStore.Notifications.Publisher do
%State{event_store: event_store} = state

events
|> Enum.map(&read_events(&1, state))
|> Stream.map(&read_events(&1, state))
|> Stream.reject(&is_nil/1)
|> Enum.each(fn {stream_uuid, batch} -> broadcast(event_store, stream_uuid, batch) end)

{:noreply, [], state}
end

defp read_events(
{stream_uuid, stream_id, from_stream_version, to_stream_version},
%State{} = state
) do
defp read_events(%Notification{} = notification, %State{} = state) do
%Notification{
stream_uuid: stream_uuid,
stream_id: stream_id,
from_stream_version: from_stream_version,
to_stream_version: to_stream_version
} = notification

%State{conn: conn, schema: schema, serializer: serializer} = state

count = to_stream_version - from_stream_version + 1

try do
{:ok, events} =
Storage.read_stream_forward(conn, stream_id, from_stream_version, count, schema: schema)

deserialized_events = deserialize_recorded_events(events, serializer)

{stream_uuid, deserialized_events}
rescue
e ->
Logger.error("Notifications failed to read events due to: " <> inspect(e))
[]
case Storage.read_stream_forward(conn, stream_id, from_stream_version, count, schema: schema) do
{:ok, events} ->
deserialized_events = deserialize_recorded_events(events, serializer)

{stream_uuid, deserialized_events}

{:error, error} ->
Logger.error(
"EventStore notifications failed to read events due to: " <> inspect(error)
)

nil
end
catch
:exit, ex ->
Logger.error("EventStore notifications failed to read events due to: " <> inspect(ex))
nil
end
end

Expand Down
44 changes: 27 additions & 17 deletions lib/event_store/notifications/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ defmodule EventStore.Notifications.Supervisor do
@moduledoc false

# Supervises the individual `GenStage` stages used to listen to, read, and
# broadcast all events appended to storage.
# publish all events appended to storage.

use Supervisor

alias EventStore.Config
alias EventStore.{Config, MonitoredServer, Subscriptions}
alias EventStore.Notifications.{Listener, Publisher}
alias EventStore.Subscriptions

def child_spec({name, _config} = init_arg) do
%{id: Module.concat(name, __MODULE__), start: {__MODULE__, :start_link, [init_arg]}}
end

def start_link(arg) do
Supervisor.start_link(__MODULE__, arg)
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg)
end

@impl Supervisor
Expand All @@ -26,18 +25,24 @@ defmodule EventStore.Notifications.Supervisor do

listener_name = Module.concat([event_store, Listener])
publisher_name = Module.concat([event_store, Publisher])
postgrex_notifications_name = Module.concat([event_store, Postgrex, Notifications])
postgrex_notifications_conn = postgrex_notifications_conn(event_store, config)

postgrex_notifications_config =
Config.postgrex_notifications_opts(config, postgrex_notifications_name)
Config.postgrex_notifications_opts(config, postgrex_notifications_conn)

hibernate_after = Subscriptions.hibernate_after(event_store, config)

Supervisor.init(
[
postgrex_notifications_child_spec(postgrex_notifications_config),
Supervisor.child_spec(
{MonitoredServer,
mfa: {Postgrex.Notifications, :start_link, [postgrex_notifications_config]},
name: Module.concat([event_store, Postgrex, Notifications, MonitoredServer]),
backoff_min: 0},
id: Module.concat([postgrex_notifications_conn, MonitoredServer])
),
{Listener,
listen_to: postgrex_notifications_name,
listen_to: postgrex_notifications_conn,
schema: schema,
name: listener_name,
hibernate_after: hibernate_after},
Expand All @@ -54,13 +59,18 @@ defmodule EventStore.Notifications.Supervisor do
)
end

defp postgrex_notifications_child_spec(postgrex_notifications_config) do
%{
id: Postgrex.Notifications,
start: {Postgrex.Notifications, :start_link, [postgrex_notifications_config]},
type: :worker,
restart: :permanent,
shutdown: 500
}
defp postgrex_notifications_conn(name, config) do
case Keyword.get(config, :shared_connection_pool) do
nil ->
Module.concat([name, Postgrex, Notifications])

shared_connection_pool when is_atom(shared_connection_pool) ->
Module.concat([shared_connection_pool, Postgrex, Notifications])

invalid ->
raise ArgumentError,
"Invalid `:shared_connection_pool` specified, expected an atom but got: " <>
inspect(invalid)
end
end
end
11 changes: 8 additions & 3 deletions lib/event_store/storage/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,15 @@ defmodule EventStore.Storage.Reader do
{:ok, %Postgrex.Result{rows: rows}} ->
{:ok, rows}

{:error, %Postgrex.Error{postgres: %{message: reason}}} ->
Logger.warn("Failed to read events from stream due to: " <> inspect(reason))
{:error, %Postgrex.Error{postgres: %{message: message}}} ->
Logger.warn("Failed to read events from stream due to: " <> inspect(message))

{:error, reason}
{:error, message}

{:error, %DBConnection.ConnectionError{message: message}} ->
Logger.warn("Failed to read events from stream due to: " <> inspect(message))

{:error, message}
end
end
end
Expand Down
64 changes: 46 additions & 18 deletions test/notifications/notifications_reconnect_test.exs
Original file line number Diff line number Diff line change
@@ -1,42 +1,70 @@
defmodule EventStore.Notifications.NotificationsReconnectTest do
use EventStore.StorageCase

alias EventStore.{EventFactory, Notifications, PubSub}
alias EventStore.{EventFactory, PubSub, ProcessHelper, Wait}

describe "notifications reconnect" do
setup do
config = TestEventStore.config()
test "resume after disconnect" do
stream_uuid = "example-stream"

:ok = PubSub.subscribe(TestEventStore, stream_uuid)

shutdown_postgrex_notifications_connection(TestEventStore.Postgrex.Notifications)

conn = start_supervised!({Postgrex, config})
start_supervised!({Notifications.Supervisor, {ES, Keyword.put(config, :conn, conn)}})
assert {:eventually, ref} =
Postgrex.Notifications.listen(TestEventStore.Postgrex.Notifications, "channel")

assert is_reference(ref)

# Wait for notifications to reconnect
Wait.until(fn ->
assert {:ok, _ref} =
Postgrex.Notifications.listen(TestEventStore.Postgrex.Notifications, "channel")
end)

:ok = append_events(stream_uuid, 3)

for child_spec <- PubSub.child_spec(ES), do: start_supervised!(child_spec)
assert_receive {:events, events}
assert length(events) == 3

:ok
refute_receive {:events, _events}
end

test "resume after disconnect" do
test "publisher handle Postgrex connection down when reading events", %{conn: conn} do
stream_uuid = "example-stream"

:ok = PubSub.subscribe(ES, stream_uuid)

shutdown_postgrex_notifications_process()
:ok = PubSub.subscribe(TestEventStore, stream_uuid)

:ok = append_events(stream_uuid, 3)

assert_receive {:events, _events}
refute_receive {:events, _events}
assert_receive {:events, events}
assert length(events) == 3

shutdown_postgrex_connection(TestEventStore.Postgrex)

Wait.until(fn ->
Postgrex.query!(conn, "select pg_notify($1, $2);", [
"public.events",
"example-stream,1,1,3"
])

assert_receive {:events, events}
assert length(events) == 3
end)
end
end

defp shutdown_postgrex_notifications_process do
notifications_pid = Process.whereis(TestEventStore.Postgrex.Notifications)
assert is_pid(notifications_pid)
defp shutdown_postgrex_connection(name) do
pid = Process.whereis(name)
assert is_pid(pid)

shutdown(notifications_pid)
ProcessHelper.shutdown(pid)
end

defp shutdown(pid) do
defp shutdown_postgrex_notifications_connection(name) do
pid = Process.whereis(name)
assert is_pid(pid)

{:gen_tcp, sock} = :sys.get_state(pid).mod_state.protocol.sock
:gen_tcp.shutdown(sock, :read_write)
end
Expand Down
Loading

0 comments on commit 10c2bba

Please sign in to comment.