Skip to content

Commit

Permalink
Merge pull request #98 from commanded/feature/advisory-locks
Browse files Browse the repository at this point in the history
Use PostgreSQL advisory locks to enforce only one subscription instance
  • Loading branch information
slashdotdash authored Jan 5, 2018
2 parents dc51b3c + e46a990 commit 76bd6f1
Show file tree
Hide file tree
Showing 22 changed files with 604 additions and 289 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Replace `:info` level logging with `:debug` ([#90](https://github.com/commanded/eventstore/issues/90)).
- Dealing better with Poison dependancy ([#91](https://github.com/commanded/eventstore/issues/91)).
- Publish events directly to subscriptions ([#93](https://github.com/commanded/eventstore/pull/93)).
- Use PostgreSQL advisory locks to enforce only one subscription instance ([#98](https://github.com/commanded/eventstore/pull/98)).

## v0.13.2

Expand Down
2 changes: 2 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use Mix.Config
# Do not include metadata nor timestamps in development logs
config :logger, :console, format: "[$level] $message\n"

config :mix_test_watch, clear: true

config :eventstore, EventStore.Storage,
serializer: EventStore.TermSerializer,
username: "postgres",
Expand Down
3 changes: 2 additions & 1 deletion config/distributed.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ config :eventstore, EventStore.Storage,

config :eventstore,
registry: :distributed,
restart_stream_timeout: 1_000
restart_stream_timeout: 1_000,
subscription_retry_interval: 1_000

config :swarm,
nodes: [:"node1@127.0.0.1", :"node2@127.0.0.1", :"node3@127.0.0.1"],
Expand Down
3 changes: 2 additions & 1 deletion config/jsonb.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ config :eventstore, EventStore.Storage,
pool_overflow: 0

config :eventstore,
column_data_type: "jsonb",
registry: :local,
column_data_type: "jsonb"
subscription_retry_interval: 1_000
3 changes: 2 additions & 1 deletion config/local.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ config :eventstore, EventStore.Storage,
pool_overflow: 0

config :eventstore,
registry: :local
registry: :local,
subscription_retry_interval: 1_000
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ config :eventstore, EventStore.Storage,
pool_overflow: 0

config :eventstore,
registry: :local
registry: :local,
subscription_retry_interval: 1_000
6 changes: 6 additions & 0 deletions lib/event_store/sql/statements.ex
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ WHERE stream_uuid = $1 AND subscription_name = $2;
"""
end

def try_advisory_lock do
"""
SELECT pg_try_advisory_lock($1);
"""
end

def ack_last_seen_event do
"""
UPDATE subscriptions
Expand Down
17 changes: 13 additions & 4 deletions lib/event_store/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,37 @@ defmodule EventStore.Storage do
unique name and starting position (event number or stream version).
"""
def subscribe_to_stream(stream_uuid, subscription_name, start_from_event_number \\ nil, start_from_stream_version \\ nil) do
Subscription.subscribe_to_stream(@event_store, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version)
Subscription.subscribe_to_stream(@event_store, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version, pool: DBConnection.Poolboy)
end

@doc """
Attempt to acquire an exclusive lock for the given subscription id. Uses
PostgreSQL's advisory locks[1] to provide session level locking.
[1] https://www.postgresql.org/docs/current/static/explicit-locking.html#ADVISORY-LOCKS
"""
def try_acquire_exclusive_lock(subscription_id) do
Subscription.try_acquire_exclusive_lock(@event_store, subscription_id, pool: DBConnection.Poolboy)
end

@doc """
Acknowledge receipt of an event by its number, for a single subscription.
"""
def ack_last_seen_event(stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version) do
Subscription.ack_last_seen_event(@event_store, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version)
Subscription.ack_last_seen_event(@event_store, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version, pool: DBConnection.Poolboy)
end

@doc """
Unsubscribe from an existing named subscription to a stream.
"""
def unsubscribe_from_stream(stream_uuid, subscription_name) do
Subscription.unsubscribe_from_stream(@event_store, stream_uuid, subscription_name)
Subscription.unsubscribe_from_stream(@event_store, stream_uuid, subscription_name, pool: DBConnection.Poolboy)
end

@doc """
Get all known subscriptions, to any stream.
"""
def subscriptions do
Subscription.subscriptions(@event_store)
Subscription.subscriptions(@event_store, pool: DBConnection.Poolboy)
end

@doc """
Expand Down
100 changes: 60 additions & 40 deletions lib/event_store/storage/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,33 @@ defmodule EventStore.Storage.Subscription do
@doc """
List all known subscriptions
"""
def subscriptions(conn),
do: Subscription.All.execute(conn)
def subscriptions(conn, opts \\ []),
do: Subscription.All.execute(conn, opts)

def subscribe_to_stream(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) do
case Subscription.Query.execute(conn, stream_uuid, subscription_name) do
{:ok, subscription} -> {:ok, subscription}
{:error, :subscription_not_found} -> Subscription.Subscribe.execute(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version)
def subscribe_to_stream(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version, opts \\ []) do
with {:ok, subscription} <- Subscription.Query.execute(conn, stream_uuid, subscription_name, opts) do
{:ok, subscription}
else
{:error, :subscription_not_found} ->
Subscription.Subscribe.execute(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version, opts)
end
end

def ack_last_seen_event(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version) do
Subscription.Ack.execute(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version)
def try_acquire_exclusive_lock(conn, subscription_id, opts \\ []),
do: Subscription.TryAdvisoryLock.execute(conn, subscription_id, opts)

def ack_last_seen_event(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version, opts \\ []) do
Subscription.Ack.execute(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version, opts)
end

def unsubscribe_from_stream(conn, stream_uuid, subscription_name),
do: Subscription.Unsubscribe.execute(conn, stream_uuid, subscription_name)
def unsubscribe_from_stream(conn, stream_uuid, subscription_name, opts \\ []),
do: Subscription.Unsubscribe.execute(conn, stream_uuid, subscription_name, opts)

defmodule All do
def execute(conn) do
def execute(conn, opts) do
conn
|> Postgrex.query(Statements.query_all_subscriptions, [], pool: DBConnection.Poolboy)
|> handle_response
|> Postgrex.query(Statements.query_all_subscriptions, [], opts)
|> handle_response()
end

defp handle_response({:ok, %Postgrex.Result{num_rows: 0}}),
Expand All @@ -61,10 +66,10 @@ defmodule EventStore.Storage.Subscription do
end

defmodule Query do
def execute(conn, stream_uuid, subscription_name) do
def execute(conn, stream_uuid, subscription_name, opts) do
conn
|> Postgrex.query(Statements.query_get_subscription, [stream_uuid, subscription_name], pool: DBConnection.Poolboy)
|> handle_response
|> Postgrex.query(Statements.query_get_subscription, [stream_uuid, subscription_name], opts)
|> handle_response()
end

defp handle_response({:ok, %Postgrex.Result{num_rows: 0}}),
Expand All @@ -75,11 +80,11 @@ defmodule EventStore.Storage.Subscription do
end

defmodule Subscribe do
def execute(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version) do
def execute(conn, stream_uuid, subscription_name, start_from_event_number, start_from_stream_version, opts) do
_ = Logger.debug(fn -> "Attempting to create subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\"" end)

conn
|> Postgrex.query(Statements.create_subscription, [stream_uuid, subscription_name, start_from_event_number, start_from_stream_version], pool: DBConnection.Poolboy)
|> Postgrex.query(Statements.create_subscription, [stream_uuid, subscription_name, start_from_event_number, start_from_stream_version], opts)
|> handle_response(stream_uuid, subscription_name)
end

Expand All @@ -89,7 +94,7 @@ defmodule EventStore.Storage.Subscription do
end

defp handle_response({:error, %Postgrex.Error{postgres: %{code: :unique_violation}}}, stream_uuid, subscription_name) do
_ = Logger.warn(fn -> "Failed to create subscription on stream #{stream_uuid} named #{subscription_name}, already exists" end)
_ = Logger.debug(fn -> "Failed to create subscription on stream #{stream_uuid} named #{subscription_name}, already exists" end)
{:error, :subscription_already_exists}
end

Expand All @@ -99,10 +104,30 @@ defmodule EventStore.Storage.Subscription do
end
end

defmodule TryAdvisoryLock do
def execute(conn, subscription_id, opts) do
conn
|> Postgrex.query(Statements.try_advisory_lock(), [subscription_id], opts)
|> handle_response()
end

defp handle_response({:ok, %Postgrex.Result{rows: [[true]]}}) do
:ok
end

defp handle_response({:ok, %Postgrex.Result{rows: [[false]]}}) do
{:error, :lock_already_taken}
end

defp handle_response({:error, error}) do
{:error, error}
end
end

defmodule Ack do
def execute(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version) do
def execute(conn, stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version, opts) do
conn
|> Postgrex.query(Statements.ack_last_seen_event, [stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version], pool: DBConnection.Poolboy)
|> Postgrex.query(Statements.ack_last_seen_event, [stream_uuid, subscription_name, last_seen_event_number, last_seen_stream_version], opts)
|> handle_response(stream_uuid, subscription_name)
end

Expand All @@ -117,11 +142,11 @@ defmodule EventStore.Storage.Subscription do
end

defmodule Unsubscribe do
def execute(conn, stream_uuid, subscription_name) do
def execute(conn, stream_uuid, subscription_name, opts) do
_ = Logger.debug(fn -> "Attempting to unsubscribe from stream \"#{stream_uuid}\" named \"#{subscription_name}\"" end)

conn
|> Postgrex.query(Statements.delete_subscription, [stream_uuid, subscription_name], pool: DBConnection.Poolboy)
|> Postgrex.query(Statements.delete_subscription, [stream_uuid, subscription_name], opts)
|> handle_response(stream_uuid, subscription_name)
end

Expand All @@ -137,25 +162,20 @@ defmodule EventStore.Storage.Subscription do
end

defmodule Adapter do
def to_subscriptions(rows) do
rows
|> Enum.map(&to_subscription_from_row/1)
end
def to_subscriptions(rows), do: Enum.map(rows, &to_subscription_from_row/1)

def to_subscription(rows) do
rows
|> List.first()
|> to_subscription_from_row()
end
def to_subscription([row | _]), do: to_subscription_from_row(row)

defp to_subscription_from_row(row) do
[
subscription_id,
stream_uuid,
subscription_name,
last_seen_event_number,
last_seen_stream_version,
created_at,
] = row

defp to_subscription_from_row([
subscription_id,
stream_uuid,
subscription_name,
last_seen_event_number,
last_seen_stream_version,
created_at,
]) do
%Subscription{
subscription_id: subscription_id,
stream_uuid: stream_uuid,
Expand Down
2 changes: 1 addition & 1 deletion lib/event_store/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule EventStore.Subscriptions do

defp notify_subscribers(stream_uuid, events) do
Registry.dispatch(EventStore.Subscriptions.PubSub, stream_uuid, fn subscribers ->
for {subscription, {module, function}} <- subscribers, do: apply(module, function, [subscription, events])
for {pid, _} <- subscribers, do: send(pid, {:notify_events, events})
end)
end
end
Loading

0 comments on commit 76bd6f1

Please sign in to comment.