Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share database connection for notifications #225

Merged
merged 2 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
elixir 1.10.4-otp-22
elixir 1.11.1-otp-22
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Hibernate subscription process after inactivity ([#214](https://github.com/commanded/eventstore/pull/214)).
- Runtime event store configuration ([#217](https://github.com/commanded/eventstore/pull/217)).
- Shared database connection pools ([#216](https://github.com/commanded/eventstore/pull/216)).
- Shared database connection for notifications ([#225](https://github.com/commanded/eventstore/pull/225)).
- Transient subscriptions ([#215](https://github.com/commanded/eventstore/pull/215))

### Bug fixes
Expand Down
6 changes: 1 addition & 5 deletions guides/Cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ EventStore supports running on multiple nodes as either a [distributed Erlang](h

## Event publication

PostgreSQL's `LISTEN` / `NOTIFY` is used to pub/sub event notifications.

A single listener process will connect to the database to listen for events when using a distributed cluster. Events will be broadcast from the single listener process to a `GenServer` process running on each connected node that forwards events to its local subscribers. This limits the number of database connections to at most the number of running clusters.

Running EventStore on multiple nodes that are not connected together to form a cluster will result in one listener process and database connection per node.
PostgreSQL's `LISTEN` / `NOTIFY` is used to pub/sub event notifications. A listener database connection process is started on each node. It connects to the database to listen for events and publishes them to interested subscription processes running on the node. The approach is the same regardless of whether distributed Erlang is used or not.

## Subscriptions

Expand Down
2 changes: 1 addition & 1 deletion guides/Subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ There are two types of subscriptions provided by EventStore:

PostgreSQL's `LISTEN` and `NOTIFY` commands are used to pub/sub event notifications from the database. An after update trigger on the `streams` table is used to execute `NOTIFY` for each batch of inserted events. The notification payload contains the stream uuid, stream id, and first / last stream versions (e.g. `stream-12345,1,1,5`).

A single listener process will connect to the database to listen for these notifications. It fetches the event data and broadcasts to all interested subscriptions. This approach supports running the EventStore on multiple nodes, regardless of whether they are connected together to form a cluster. A single listener will be used when nodes form a cluster, otherwise one connection per node is used.
A single process will connect to the database to listen for these notifications. It fetches the event data and broadcasts to all interested subscriptions. This approach supports running an EventStore on multiple nodes, regardless of whether they are connected together to form a cluster using distributed Erlang. One connection per node is used for single node and multi-node deployments.

## Transient subscriptions

Expand Down
3 changes: 1 addition & 2 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ defmodule EventStore do
def config(opts \\ []) do
opts = Keyword.merge(unquote(opts), opts)

with {:ok, config} <-
EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, opts) do
with {:ok, config} <- EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, opts) do
config
end
end
Expand Down
10 changes: 10 additions & 0 deletions lib/event_store/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ defmodule EventStore.Config do
|> Keyword.put(:name, name)
end

def postgrex_notifications_opts(config, name) do
config
|> default_postgrex_opts()
|> Keyword.put(:auto_reconnect, true)
|> Keyword.put(:backoff_type, :rand_exp)
|> Keyword.put(:sync_connect, false)
|> Keyword.put(:pool_size, 1)
|> Keyword.put(:name, name)
end

def sync_connect_postgrex_opts(config) do
config
|> default_postgrex_opts()
Expand Down
8 changes: 2 additions & 6 deletions lib/event_store/monitored_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ defmodule EventStore.MonitoredServer do
{:noreply, start_process(state)}
end

@doc """
Handle process exit by attempting to restart, after a configurable delay.
"""
# Handle process exit by attempting to restart, after a configurable delay.
def handle_info({:EXIT, pid, reason}, %State{pid: pid} = state) do
{:noreply, on_process_exit(pid, reason, state)}
end
Expand All @@ -140,9 +138,7 @@ defmodule EventStore.MonitoredServer do
{:noreply, state}
end

@doc """
Handle process down by attempting to restart, after a configurable delay.
"""
# Handle process down by attempting to restart, after a configurable delay.
def handle_info({:DOWN, _ref, :process, pid, reason}, %State{pid: pid} = state) do
{:noreply, on_process_exit(pid, reason, state)}
end
Expand Down
49 changes: 0 additions & 49 deletions lib/event_store/notifications/broadcaster.ex

This file was deleted.

44 changes: 5 additions & 39 deletions lib/event_store/notifications/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ defmodule EventStore.Notifications.Listener do

require Logger

alias EventStore.MonitoredServer
alias EventStore.Notifications.Listener
alias EventStore.Notifications.{Listener, Notification}

defstruct [:listen_to, :schema, :ref, demand: 0, queue: :queue.new()]

Expand All @@ -29,27 +28,7 @@ defmodule EventStore.Notifications.Listener do
end

def init(%Listener{} = state) do
%Listener{listen_to: listen_to} = state

:ok = MonitoredServer.monitor(listen_to)

{:producer, state}
end

def handle_info({:UP, listen_to, _pid}, %Listener{listen_to: listen_to} = state) do
{:noreply, [], listen_for_events(state)}
end

def handle_info({:DOWN, listen_to, _pid, _reason}, %Listener{listen_to: listen_to} = state) do
{:noreply, [], %Listener{state | ref: nil}}
end

# Ignore notifications when database connection down.
def handle_info(
{:notification, _connection_pid, _ref, _channel, _payload},
%Listener{ref: nil} = state
) do
{:noreply, [], state}
{:producer, listen_for_events(state)}
end

# Notification received from PostgreSQL's `NOTIFY`
Expand All @@ -59,20 +38,7 @@ defmodule EventStore.Notifications.Listener do
inspect(channel) <> " with payload: " <> inspect(payload)
)

# `NOTIFY` payload contains the stream uuid, stream id, and first / last
# stream versions (e.g. "stream-12345,1,1,5")

[last, first, stream_id, stream_uuid] =
payload
|> String.reverse()
|> String.split(",", parts: 4)
|> Enum.map(&String.reverse/1)

{stream_id, ""} = Integer.parse(stream_id)
{first_stream_version, ""} = Integer.parse(first)
{last_stream_version, ""} = Integer.parse(last)

state = enqueue({stream_uuid, stream_id, first_stream_version, last_stream_version}, state)
state = payload |> Notification.new() |> enqueue(state)

dispatch_events([], state)
end
Expand Down Expand Up @@ -112,9 +78,9 @@ defmodule EventStore.Notifications.Listener do
end
end

defp enqueue(event, %Listener{} = state) do
defp enqueue(%Notification{} = notification, %Listener{} = state) do
%Listener{queue: queue} = state

%Listener{state | queue: :queue.in(event, queue)}
%Listener{state | queue: :queue.in(notification, queue)}
end
end
35 changes: 35 additions & 0 deletions lib/event_store/notifications/notification.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule EventStore.Notifications.Notification do
@moduledoc false

alias EventStore.Notifications.Notification

defstruct [:stream_uuid, :stream_id, :from_stream_version, :to_stream_version]

@doc """
Build a new notification struct from the `NOTIFY` payload which contains the
stream uuid, stream id, first and last stream versions.

## Example

Notification.new("stream-12345,1,1,5")

"""
def new(payload) do
[last, first, stream_id, stream_uuid] =
payload
|> String.reverse()
|> String.split(",", parts: 4)
|> Enum.map(&String.reverse/1)

{stream_id, ""} = Integer.parse(stream_id)
{from_stream_version, ""} = Integer.parse(first)
{to_stream_version, ""} = Integer.parse(last)

%Notification{
stream_uuid: stream_uuid,
stream_id: stream_id,
from_stream_version: from_stream_version,
to_stream_version: to_stream_version
}
end
end
97 changes: 97 additions & 0 deletions lib/event_store/notifications/publisher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
defmodule EventStore.Notifications.Publisher do
@moduledoc false

# Reads events from storage by each event number range received and publishes
# them.

use GenStage

require Logger

alias EventStore.{PubSub, RecordedEvent, Storage}
alias EventStore.Notifications.Notification

defmodule State do
defstruct [:conn, :event_store, :schema, :serializer, :subscribe_to]

def new(opts) do
%State{
conn: Keyword.fetch!(opts, :conn),
event_store: Keyword.fetch!(opts, :event_store),
schema: Keyword.fetch!(opts, :schema),
serializer: Keyword.fetch!(opts, :serializer),
subscribe_to: Keyword.fetch!(opts, :subscribe_to)
}
end
end

def start_link(opts) do
{start_opts, reader_opts} =
Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

state = State.new(reader_opts)

GenStage.start_link(__MODULE__, state, start_opts)
end

# Starts a permanent subscription to the listener producer stage which will
# automatically start requesting items.
def init(%State{} = state) do
%State{subscribe_to: subscribe_to} = state

{:consumer, state, [subscribe_to: [{subscribe_to, max_demand: 1}]]}
end

# Fetch events from storage and pass onwards to subscibers
def handle_events(events, _from, state) do
%State{event_store: event_store} = state

events
|> Stream.map(&read_events(&1, state))
|> Stream.reject(&is_nil/1)
|> Enum.each(fn {stream_uuid, batch} -> broadcast(event_store, stream_uuid, batch) end)

{:noreply, [], state}
end

defp read_events(%Notification{} = notification, %State{} = state) do
%Notification{
stream_uuid: stream_uuid,
stream_id: stream_id,
from_stream_version: from_stream_version,
to_stream_version: to_stream_version
} = notification

%State{conn: conn, schema: schema, serializer: serializer} = state

count = to_stream_version - from_stream_version + 1

try do
case Storage.read_stream_forward(conn, stream_id, from_stream_version, count, schema: schema) do
{:ok, events} ->
deserialized_events = deserialize_recorded_events(events, serializer)

{stream_uuid, deserialized_events}

{:error, error} ->
Logger.error(
"EventStore notifications failed to read events due to: " <> inspect(error)
)

nil
end
catch
:exit, ex ->
Logger.error("EventStore notifications failed to read events due to: " <> inspect(ex))
nil
end
end

defp deserialize_recorded_events(recorded_events, serializer) do
Enum.map(recorded_events, &RecordedEvent.deserialize(&1, serializer))
end

defp broadcast(event_store, stream_uuid, events) do
PubSub.broadcast(event_store, stream_uuid, {:events, events})
end
end
Loading