Skip to content

Commit

Permalink
Schedule listening notifications when connection is down
Browse files Browse the repository at this point in the history
  • Loading branch information
José Valim committed Oct 26, 2019
1 parent 6bdc12d commit a59be70
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 65 deletions.
136 changes: 74 additions & 62 deletions lib/postgrex/notifications.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ defmodule Postgrex.Notifications do
{:notification, notification_pid, listen_ref, channel, message}
## Important note about consistency
## Async connect and auto-reconnects
While the notification system can automatically reconnect following a
disconnection, notifications that occur during the disconnection period
are not queued and cannot be recovered. Using the `:auto_reconnect` option
(see `start_link/1`) carries the risk of siliently missing notifications
fired during the disconnected period.
By default, the notification system establishes a connection to the
database on initialization, you can configure the connection to happen
asynchronously. You can also configure the connection to automatically
reconnect.
Note however that when the notification system is waiting for a connection,
any notifications that occur during the disconnection period are not queued
and cannot be recovered. Similarly, any listen command will be queued until
the connection is up.
## A note on casing
Expand Down Expand Up @@ -71,10 +75,11 @@ defmodule Postgrex.Notifications do
idle_interval: 5000,
protocol: nil,
parameters: nil,
listeners: Map.new(),
listener_channels: Map.new(),
listeners: %{},
listener_channels: %{},
auto_reconnect: false,
reconnect_backoff: 500
reconnect_backoff: 500,
connected: false

## PUBLIC API ##

Expand Down Expand Up @@ -113,14 +118,20 @@ defmodule Postgrex.Notifications do

@doc """
Listens to an asynchronous notification channel using the `LISTEN` command.
A message `{:notification, connection_pid, ref, channel, payload}` will be
sent to the calling process when a notification is received.
It returns `{:ok, reference}`. It may also return `{:eventually, reference}`
if the notification process is not currently connected to the database and
it was started with `:sync_connect` set to false or `:auto_reconnect` set
to true. The `reference` can be used to issue an `unlisten/3` command.
## Options
* `:timeout` - Call timeout (default: `#{@timeout}`)
"""
@spec listen(server, String.t(), Keyword.t()) :: {:ok, reference}
@spec listen(server, String.t(), Keyword.t()) :: {:ok, reference} | {:eventually, reference}
def listen(pid, channel, opts \\ []) do
message = {:listen, channel}
timeout = opts[:timeout] || @timeout
Expand All @@ -144,15 +155,11 @@ defmodule Postgrex.Notifications do
* `:timeout` - Call timeout (default: `#{@timeout}`)
"""
@spec unlisten(server, reference, Keyword.t()) :: :ok
@spec unlisten(server, reference, Keyword.t()) :: :ok | :error
def unlisten(pid, ref, opts \\ []) do
message = {:unlisten, ref}
timeout = opts[:timeout] || @timeout

case Connection.call(pid, message, timeout) do
:ok -> :ok
{:error, %ArgumentError{} = err} -> raise err
end
Connection.call(pid, message, timeout)
end

@doc """
Expand All @@ -161,7 +168,10 @@ defmodule Postgrex.Notifications do
"""
@spec unlisten!(server, reference, Keyword.t()) :: :ok
def unlisten!(pid, ref, opts \\ []) do
unlisten(pid, ref, opts)
case unlisten(pid, ref, opts) do
:ok -> :ok
:error -> raise ArgumentError, "unknown reference #{inspect(ref)}"
end
end

## CALLBACKS ##
Expand Down Expand Up @@ -203,7 +213,8 @@ defmodule Postgrex.Notifications do
def connect(_, s) do
case Protocol.connect([types: nil] ++ s.opts) do
{:ok, protocol} ->
reestablish_listeners(%{s | protocol: protocol})
s = %{s | listener_channels: %{}, connected: true, protocol: protocol}
Enum.reduce_while(s.listeners, {:ok, s, s.idle_interval}, &reestablish_listener/2)

{:error, reason} ->
if s.auto_reconnect do
Expand All @@ -221,37 +232,23 @@ defmodule Postgrex.Notifications do
end

def handle_call({:unlisten, ref}, from, s) do
case Map.fetch(s.listeners, ref) do
:error ->
{:reply, {:error, %ArgumentError{}}, s, s.idle_interval}

{:ok, {channel, _pid}} ->
case s.listeners do
%{^ref => {channel, _pid}} ->
Process.demonitor(ref, [:flush])
s = remove_monitored_listener(s, ref, channel)
do_unlisten(channel, ref, from, s)

if map_size(s.listener_channels[channel]) == 0 do
s = update_in(s.listener_channels, &Map.delete(&1, channel))
listener_query("UNLISTEN \"#{channel}\"", :ok, from, s)
else
{:reply, :ok, s, s.idle_interval}
end
%{} ->
{:reply, :error, s, s.idle_interval}
end
end

def handle_info({:DOWN, ref, :process, _, _}, s) do
case Map.fetch(s.listeners, ref) do
:error ->
{:noreply, s, s.idle_interval}

{:ok, {channel, _pid}} ->
s = remove_monitored_listener(s, ref, channel)
case s.listeners do
%{^ref => {channel, _pid}} ->
do_unlisten(channel, ref, nil, s)

if map_size(s.listener_channels[channel]) == 0 do
s = update_in(s.listener_channels, &Map.delete(&1, channel))
listener_query("UNLISTEN \"#{channel}\"", :ok, nil, s)
else
{:noreply, s, s.idle_interval}
end
%{} ->
{:noreply, s, s.idle_interval}
end
end

Expand All @@ -278,10 +275,6 @@ defmodule Postgrex.Notifications do
end
end

defp reestablish_listeners(s) do
Enum.reduce_while(s.listeners, {:ok, s, s.idle_interval}, &reestablish_listener/2)
end

defp reestablish_listener({ref, {channel, pid}}, {:ok, s, timeout}) do
case do_listen(channel, pid, ref, nil, s) do
{:noreply, s, _} -> {:cont, {:ok, s, timeout}}
Expand All @@ -290,32 +283,56 @@ defmodule Postgrex.Notifications do
end

defp do_listen(channel, pid, ref, from, s) do
s = update_in(s.listener_channels[channel], &((&1 || Map.new()) |> Map.put(ref, pid)))
s = update_in(s.listener_channels[channel], &((&1 || %{}) |> Map.put(ref, pid)))

# If this is the first listener for the given channel,
# we need to actually issue the LISTEN query.
if map_size(s.listener_channels[channel]) == 1 do
listener_query("LISTEN \"#{channel}\"", {:ok, ref}, from, s)
listener_query("LISTEN \"#{channel}\"", {:ok, ref}, {:eventually, ref}, from, s)
else
if from do
{:reply, {:ok, ref}, s, s.idle_interval}
else
{:noreply, s, s.idle_interval}
end
if from, do: Connection.reply(from, {:ok, ref})
{:noreply, s, s.idle_interval}
end
end

defp do_unlisten(channel, ref, from, s) do
s = update_in(s.listeners, &Map.delete(&1, ref))
s = update_in(s.listener_channels[channel], &Map.delete(&1, ref))

# If this was the last listener for the given channel,
# we need to issue the UNLISTEN query.
if map_size(s.listener_channels[channel]) == 0 do
s = update_in(s.listener_channels, &Map.delete(&1, channel))
listener_query("UNLISTEN \"#{channel}\"", :ok, :ok, from, s)
else
if from, do: Connection.reply(from, :ok)
{:noreply, s, s.idle_interval}
end
end

defp listener_query(statement, result, from, s) do
defp listener_query(_statement, _ok_result, async_result, from, %{connected: false} = s) do
if from, do: Connection.reply(from, async_result)
{:noreply, s, s.idle_interval}
end

defp listener_query(statement, ok_result, async_result, from, s) do
%{protocol: protocol, listener_channels: channels, listeners: listeners} = s
opts = [notify: &notify_listeners(channels, listeners, &1, &2)]

case Protocol.handle_listener(statement, opts, protocol) do
{:ok, %Postgrex.Result{}, protocol} ->
if from, do: Connection.reply(from, result)
if from, do: Connection.reply(from, ok_result)
checkin(protocol, s)

{error, reason, protocol} ->
reconnect_or_stop(error, reason, protocol, s)
case reconnect_or_stop(error, reason, protocol, s) do
{:stop, _, _} = stop ->
stop

{:connect, _, _} = connect ->
if from, do: Connection.reply(from, async_result)
connect
end
end
end

Expand All @@ -336,18 +353,13 @@ defmodule Postgrex.Notifications do
end
end

defp remove_monitored_listener(s, ref, channel) do
s = update_in(s.listeners, &Map.delete(&1, ref))
update_in(s.listener_channels[channel], &Map.delete(&1, ref))
end

defp reconnect_or_stop(error, reason, protocol, %{auto_reconnect: false} = s)
when error in [:error, :disconnect] do
{:stop, reason, %{s | protocol: protocol}}
end

defp reconnect_or_stop(error, _reason, _protocol, %{auto_reconnect: true} = s)
when error in [:error, :disconnect] do
{:connect, :reconnect, %{s | listener_channels: Map.new()}}
{:connect, :reconnect, %{s | connected: false}}
end
end
12 changes: 9 additions & 3 deletions test/notification_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ defmodule NotificationTest do

test "does not fail on sync connection with auto reconnect" do
Process.flag(:trap_exit, true)
assert {:ok, _} = PN.start_link(database: "nobody_knows_it", auto_reconnect: true)
assert {:ok, pid} = PN.start_link(database: "nobody_knows_it", auto_reconnect: true)
assert {:eventually, _} = PN.listen(pid, "channel")
end

test "does not fail on async connection with auto reconnect" do
Expand All @@ -28,6 +29,7 @@ defmodule NotificationTest do
assert {:ok, pid} =
PN.start_link(database: "nobody_knows_it", auto_reconnect: true, sync_connect: false)

assert {:eventually, _} = PN.listen(pid, "channel")
refute_receive {:EXIT, _, ^pid}, 100
end

Expand Down Expand Up @@ -124,7 +126,6 @@ defmodule NotificationTest do
receiver_pid = context.pid_ps

assert {:ok, ref} = PN.listen(context.pid_ps, "channel")
assert {:ok, ref2} = PN.listen(context.pid_ps, "channel2")

async =
Task.async(fn ->
Expand All @@ -134,8 +135,13 @@ defmodule NotificationTest do

{:gen_tcp, sock} = :sys.get_state(context.pid_ps).mod_state.protocol.sock
:gen_tcp.shutdown(sock, :read_write)

# Also attempt to subscribe while it is down
assert {ok_or_eventually, ref2} = PN.listen(context.pid_ps, "channel2")
assert ok_or_eventually in [:ok, :eventually]

# Give the notifier a chance to re-establish the connection and listeners
:timer.sleep(500)
Process.sleep(500)

assert {:ok, %Postgrex.Result{command: :notify}} =
P.query(context.pid, "NOTIFY channel", [])
Expand Down

0 comments on commit a59be70

Please sign in to comment.