Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use configurable event store timeout for subscription queries #259

Merged
merged 1 commit into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ on: [push, pull_request]

jobs:
build:

name: Build and test
runs-on: ubuntu-latest
strategy:
matrix:
otp: ['23.3', '24.2']
elixir: ['1.13.1']
otp: ['24.3', '25.1']
elixir: ['1.14.1']

services:
postgres:
Expand All @@ -26,7 +25,7 @@ jobs:
--health-retries 5

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Set up Elixir
id: beam
Expand All @@ -36,7 +35,7 @@ jobs:
otp-version: ${{ matrix.otp }}

- name: Restore dependencies cache
uses: actions/cache@v2
uses: actions/cache@v3
with:
path: deps
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-mix-${{ hashFiles('**/mix.lock') }}
Expand All @@ -48,9 +47,6 @@ jobs:
- name: Check formatting
run: mix format --check-formatted

- name: Start epmd
run: epmd -daemon

- name: Setup EventStore test databases
run: |
MIX_ENV=test mix event_store.setup
Expand All @@ -60,7 +56,7 @@ jobs:
run: mix test.all

- name: Retrieve Dialyzer PLT cache
uses: actions/cache@v1
uses: actions/cache@v3
id: plt-cache
with:
path: priv/plts
Expand Down
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.13.4-otp-24
erlang 24.3.4.2
elixir 1.14.1-otp-24
erlang 24.3.4.6
25 changes: 18 additions & 7 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -446,23 +446,34 @@ defmodule EventStore do
schema = Keyword.fetch!(config, :schema)
serializer = Keyword.fetch!(config, :serializer)

with {start_from, opts} <- Keyword.pop(opts, :start_from, :origin),
{:ok, start_from} <- Stream.start_from(conn, stream_uuid, start_from, schema: schema) do
query_timeout = timeout(opts, config)

{start_from, opts} = Keyword.pop(opts, :start_from, :origin)

with {:ok, start_from} <-
Stream.start_from(conn, stream_uuid, start_from,
schema: schema,
timeout: query_timeout
) do
opts =
[
hibernate_after: Keyword.fetch!(config, :subscription_hibernate_after),
retry_interval: Keyword.fetch!(config, :subscription_retry_interval)
]
|> Keyword.merge(opts)
opts
|> Keyword.delete(:timeout)
|> Keyword.merge(
conn: conn,
event_store: name,
query_timeout: query_timeout,
schema: schema,
serializer: serializer,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
start_from: start_from
)
|> Keyword.put_new_lazy(:hibernate_after, fn ->
Keyword.fetch!(config, :subscription_hibernate_after)
end)
|> Keyword.put_new_lazy(:retry_interval, fn ->
Keyword.fetch!(config, :subscription_retry_interval)
end)

Subscriptions.subscribe_to_stream(subscriber, opts)
end
Expand Down
37 changes: 23 additions & 14 deletions lib/event_store/advisory_locks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ defmodule EventStore.AdvisoryLocks do

defmodule State do
@moduledoc false
defstruct [:conn, :ref, :schema, locks: %{}]
defstruct [:conn, :query_timeout, :ref, :schema, locks: %{}]

def new(opts) do
conn = Keyword.fetch!(opts, :conn)
query_timeout = Keyword.fetch!(opts, :query_timeout)
schema = Keyword.fetch!(opts, :schema)

%State{conn: conn, query_timeout: query_timeout, schema: schema}
end
end

defmodule Lock do
Expand All @@ -38,10 +46,7 @@ defmodule EventStore.AdvisoryLocks do
{start_opts, advisory_locks_opts} =
Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

conn = Keyword.fetch!(advisory_locks_opts, :conn)
schema = Keyword.fetch!(advisory_locks_opts, :schema)

state = %State{conn: conn, schema: schema}
state = State.new(advisory_locks_opts)

GenServer.start_link(__MODULE__, state, start_opts)
end
Expand All @@ -58,6 +63,9 @@ defmodule EventStore.AdvisoryLocks do
Attempt to obtain an advisory lock.

- `key` - an application specific integer to acquire a lock on.
- `timeout` - an integer greater than zero which specifies how many
milliseconds to wait for a reply, or the atom `:infinity` to wait
indefinitely.

Returns `{:ok, lock}` when lock successfully acquired, or
`{:error, :lock_already_taken}` if the lock cannot be acquired immediately.
Expand All @@ -70,14 +78,14 @@ defmodule EventStore.AdvisoryLocks do
lost lock.

"""
@spec try_advisory_lock(server :: GenServer.server(), key :: non_neg_integer()) ::
@spec try_advisory_lock(server :: GenServer.server(), key :: non_neg_integer(), timeout()) ::
{:ok, reference()} | {:error, :lock_already_taken} | {:error, term}
def try_advisory_lock(server, key) when is_integer(key) do
GenServer.call(server, {:try_advisory_lock, key, self()})
def try_advisory_lock(server, key, timeout \\ 5_000) when is_integer(key) do
GenServer.call(server, {:try_advisory_lock, key, self(), timeout}, timeout)
end

def handle_call({:try_advisory_lock, key, owner}, _from, %State{} = state) do
case try_acquire_exclusive_lock(key, owner, state) do
def handle_call({:try_advisory_lock, key, owner, timeout}, _from, %State{} = state) do
case try_acquire_exclusive_lock(key, owner, timeout, state) do
{:ok, %Lock{} = lock} ->
state = monitor_acquired_lock(lock, state)

Expand All @@ -88,10 +96,11 @@ defmodule EventStore.AdvisoryLocks do
end
end

defp try_acquire_exclusive_lock(key, owner, %State{} = state) do
defp try_acquire_exclusive_lock(key, owner, timeout, %State{} = state) do
%State{conn: conn, schema: schema} = state

with :ok <- Storage.Lock.try_acquire_exclusive_lock(conn, key, schema: schema) do
with :ok <-
Storage.Lock.try_acquire_exclusive_lock(conn, key, schema: schema, timeout: timeout) do
{:ok, Lock.new(key, owner)}
end
end
Expand Down Expand Up @@ -142,8 +151,8 @@ defmodule EventStore.AdvisoryLocks do
end

defp release_lock(key, %State{} = state) do
%State{conn: conn, schema: schema} = state
%State{conn: conn, query_timeout: query_timeout, schema: schema} = state

Storage.Lock.unlock(conn, key, schema: schema)
Storage.Lock.unlock(conn, key, schema: schema, timeout: query_timeout)
end
end
58 changes: 42 additions & 16 deletions lib/event_store/config/parser.ex
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
defmodule EventStore.Config.Parser do
@moduledoc false

@config_defaults [
column_data_type: "bytea",
enable_hard_deletes: false,
schema: "public",
timeout: 15_000
]

def parse(config) do
config
|> Enum.reduce([], fn
{:url, value}, config ->
parsed_value = get_config_value(value) |> parse_url()

Keyword.merge(config, parsed_value)
|> Enum.reduce(@config_defaults, fn
{:port, value}, config ->
Keyword.put(config, :port, get_config_integer(value))

{:session_mode_url, value}, config ->
parsed_value = get_config_value(value) |> parse_url()

Keyword.put(config, :session_mode_pool, parsed_value)

{key, value}, config when key in [:port, :timeout] ->
Keyword.put(config, key, get_config_integer(value))
{:timeout, value}, config ->
Keyword.put(config, :timeout, get_config_timeout(value))

{:url, value}, config ->
parsed_value = get_config_value(value) |> parse_url()

Keyword.merge(config, parsed_value)

{key, value}, config ->
Keyword.put(config, key, get_config_value(value))
end)
|> Keyword.put(:pool, EventStore.Config.get_pool())
|> Keyword.put_new(:schema, "public")
|> Keyword.put_new(:column_data_type, "bytea")
|> Keyword.put_new(:enable_hard_deletes, false)
end

# Converts a database url into a Keyword list
Expand Down Expand Up @@ -95,26 +102,26 @@ defmodule EventStore.Config.Parser do
end
end

def get_config_value(value, default \\ nil)
defp get_config_value(value, default \\ nil)

def get_config_value({:system, env_var}, default) do
defp get_config_value({:system, env_var}, default) do
case System.get_env(env_var) do
nil -> default
val -> val
end
end

def get_config_value({:system, env_var, default}, _default) do
defp get_config_value({:system, env_var, default}, _default) do
case System.get_env(env_var) do
nil -> default
val -> val
end
end

def get_config_value(nil, default), do: default
def get_config_value(value, _default), do: value
defp get_config_value(nil, default), do: default
defp get_config_value(value, _default), do: value

def get_config_integer(value, default \\ nil) do
defp get_config_integer(value, default \\ nil) do
case get_config_value(value) do
nil ->
default
Expand All @@ -129,4 +136,23 @@ defmodule EventStore.Config.Parser do
end
end
end

defp get_config_timeout(value, default \\ nil) do
case get_config_value(value) do
nil ->
default

:infinity ->
:infinity

n when is_integer(n) ->
n

n ->
case Integer.parse(n) do
{i, _} -> i
:error -> default
end
end
end
end
11 changes: 6 additions & 5 deletions lib/event_store/notifications/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ defmodule EventStore.Notifications.Listener do

alias EventStore.Notifications.{Listener, Notification}

defstruct [:listen_to, :schema, :ref, demand: 0, queue: :queue.new()]
defstruct [:listen_to, :query_timeout, :schema, :ref, demand: 0, queue: :queue.new()]

def start_link(opts) do
{start_opts, listener_opts} =
Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

listen_to = Keyword.fetch!(listener_opts, :listen_to)
query_timeout = Keyword.fetch!(listener_opts, :query_timeout)
schema = Keyword.fetch!(listener_opts, :schema)

state = %Listener{listen_to: listen_to, schema: schema}
state = %Listener{listen_to: listen_to, query_timeout: query_timeout, schema: schema}

GenStage.start_link(__MODULE__, state, start_opts)
end
Expand All @@ -38,7 +39,7 @@ defmodule EventStore.Notifications.Listener do
inspect(channel) <> " with payload: " <> inspect(payload)
)

state = payload |> Notification.new() |> enqueue(state)
state = Notification.new(payload) |> enqueue(state)

dispatch_events([], state)
end
Expand All @@ -52,12 +53,12 @@ defmodule EventStore.Notifications.Listener do
end

defp listen_for_events(%Listener{} = state) do
%Listener{listen_to: listen_to, schema: schema} = state
%Listener{listen_to: listen_to, query_timeout: query_timeout, schema: schema} = state

channel = schema <> ".events"

ref =
case Postgrex.Notifications.listen(listen_to, channel) do
case Postgrex.Notifications.listen(listen_to, channel, timeout: query_timeout) do
{:ok, ref} -> ref
{:eventually, ref} -> ref
end
Expand Down
11 changes: 8 additions & 3 deletions lib/event_store/notifications/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ defmodule EventStore.Notifications.Publisher do
alias EventStore.Notifications.Notification

defmodule State do
defstruct [:conn, :event_store, :schema, :serializer, :subscribe_to]
defstruct [:conn, :event_store, :query_timeout, :schema, :serializer, :subscribe_to]

def new(opts) do
%State{
conn: Keyword.fetch!(opts, :conn),
event_store: Keyword.fetch!(opts, :event_store),
query_timeout: Keyword.fetch!(opts, :query_timeout),
schema: Keyword.fetch!(opts, :schema),
serializer: Keyword.fetch!(opts, :serializer),
subscribe_to: Keyword.fetch!(opts, :subscribe_to)
Expand Down Expand Up @@ -62,12 +63,16 @@ defmodule EventStore.Notifications.Publisher do
to_stream_version: to_stream_version
} = notification

%State{conn: conn, schema: schema, serializer: serializer} = state
%State{conn: conn, query_timeout: query_timeout, schema: schema, serializer: serializer} =
state

count = to_stream_version - from_stream_version + 1

try do
case Storage.read_stream_forward(conn, stream_id, from_stream_version, count, schema: schema) do
case Storage.read_stream_forward(conn, stream_id, from_stream_version, count,
schema: schema,
timeout: query_timeout
) do
{:ok, events} ->
deserialized_events = deserialize_recorded_events(events, serializer)

Expand Down
Loading