Skip to content

Commit

Permalink
Subscribe to a stream from a specified position
Browse files Browse the repository at this point in the history
Fixes #17.

Allow subscriptions to a single stream, or all streams, to optionally
specify a given start position.

- `:origin` - subscribe to events from the start of the stream
(identical to using 0). This is the default behaviour.
- `:current` - subscribe to events from the current version.
- stream version or event id - provide an exact stream version (single
stream), or last seen event id (all stream), to subscribe from.
  • Loading branch information
slashdotdash committed Dec 19, 2016
1 parent cf3c605 commit 23f0d7a
Show file tree
Hide file tree
Showing 25 changed files with 327 additions and 100 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v0.7.0

### Enhancements

- Subscribe to a single stream, or all streams, from a specified start position ([#17](https://github.com/slashdotdash/eventstore/issues/17)).

## v0.6.2

### Bug fixes
Expand Down
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ EventStore is [available in Hex](https://hex.pm/packages/eventstore) and can be
password: "postgres",
database: "eventstore_dev",
hostname: "localhost",
pool_size: 10,
extensions: [{Postgrex.Extensions.Calendar, []}]
pool_size: 10
```
4. Create the EventStore database and tables using the `mix` task
Expand Down Expand Up @@ -90,10 +89,16 @@ Read all events from the stream, starting at the stream's first event.

Subscriptions to a stream will guarantee at least once delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped. A subscription can be created to receive events published from a single logical stream or from all streams.

Events are received in batches after being persisted to storage. Each batch contains events from a single stream only with the same correlation id.
Events are received in batches after being persisted to storage. Each batch contains events from a single stream only and with the same correlation id.

Subscriptions must be uniquely named and support a single subscriber. Attempting to connect two subscribers to the same subscription will return an error.

By default subscriptions are created from the single stream, or all stream, origin. So it will receive all events from the single stream, or all streams. You can optionally specify a given start position.

- `:origin` - subscribe to events from the start of the stream (identical to using 0). This is the current behaviour and will remain the default.
- `:current` - subscribe to events from the current version.
- `stream_version` or `event_id` (integer) - specify an exact stream version to subscribe from for a single stream subscription. You provide an event id for an all stream subscription.

#### Ack received events

Receipt of each event by the subscriber must be acknowledged. This allows the subscription to resume on failure without missing an event.
Expand Down
3 changes: 1 addition & 2 deletions config/bench.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ config :eventstore, EventStore.Storage,
password: "postgres",
database: "eventstore_bench",
hostname: "localhost",
pool_size: 10,
extensions: [{Postgrex.Extensions.Calendar, []}]
pool_size: 10
3 changes: 1 addition & 2 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ config :eventstore, EventStore.Storage,
password: "postgres",
database: "eventstore_dev",
hostname: "localhost",
pool_size: 10,
extensions: [{Postgrex.Extensions.Calendar, []}]
pool_size: 10
3 changes: 1 addition & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ config :eventstore, EventStore.Storage,
password: "postgres",
database: "eventstore_test",
hostname: "localhost",
pool_size: 1,
extensions: [{Postgrex.Extensions.Calendar, []}]
pool_size: 1
16 changes: 9 additions & 7 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defmodule EventStore do
{:ok, recorded_events} = EventStore.read_stream_forward(stream_uuid)
"""

@type start_from :: :origin | :current | integer

alias EventStore.Snapshots.{SnapshotData,Snapshotter}
alias EventStore.{EventData,RecordedEvent,Streams,Subscriptions}
alias EventStore.Streams.{AllStream,Stream}
Expand Down Expand Up @@ -88,13 +90,13 @@ defmodule EventStore do
Returns `{:ok, subscription}` when subscription succeeds.
"""
@spec subscribe_to_stream(String.t, String.t, pid) :: {:ok, subscription :: pid}
@spec subscribe_to_stream(String.t, String.t, pid, start_from) :: {:ok, subscription :: pid}
| {:error, :subscription_already_exists}
| {:error, reason :: term}
def subscribe_to_stream(stream_uuid, subscription_name, subscriber) do
def subscribe_to_stream(stream_uuid, subscription_name, subscriber, start_from \\ :origin) do
{:ok, stream} = Streams.open_stream(stream_uuid)

Stream.subscribe_to_stream(stream, subscription_name, subscriber)
Stream.subscribe_to_stream(stream, subscription_name, subscriber, start_from)
end

@doc """
Expand All @@ -106,11 +108,11 @@ defmodule EventStore do
Returns `{:ok, subscription}` when subscription succeeds.
"""
@spec subscribe_to_all_streams(String.t, pid) :: {:ok, subscription :: pid}
@spec subscribe_to_all_streams(String.t, pid, start_from) :: {:ok, subscription :: pid}
| {:error, :subscription_already_exists}
| {:error, reason :: term}
def subscribe_to_all_streams(subscription_name, subscriber) do
AllStream.subscribe_to_stream(subscription_name, subscriber)
def subscribe_to_all_streams(subscription_name, subscriber, start_from \\ :origin) do
AllStream.subscribe_to_stream(subscription_name, subscriber, start_from)
end

@doc """
Expand Down Expand Up @@ -163,7 +165,7 @@ defmodule EventStore do
Record a snapshot of the data and metadata for a given source
- `timeout` is an integer greater than zero which specifies how many milliseconds to wait for a reply, or the atom :infinity to wait indefinitely. The default value is 5000. If no reply is received within the specified time, the function call fails and the caller exits.
Returns `:ok` on success
"""
@spec record_snapshot(SnapshotData.t, timeout :: integer) :: :ok | {:error, reason :: term}
Expand Down
4 changes: 2 additions & 2 deletions lib/event_store/sql/statements.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ RETURNING stream_id;

def create_subscription do
"""
INSERT INTO subscriptions (stream_uuid, subscription_name)
VALUES ($1, $2)
INSERT INTO subscriptions (stream_uuid, subscription_name, last_seen_event_id, last_seen_stream_version)
VALUES ($1, $2, $3, $4)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen_event_id, last_seen_stream_version, created_at;
"""
end
Expand Down
6 changes: 3 additions & 3 deletions lib/event_store/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ defmodule EventStore.Storage do
end

@doc """
Create, or locate an existing, persistent subscription to a stream using a unique name
Create, or locate an existing, persistent subscription to a stream using a unique name and starting position (event id or stream version)
"""
def subscribe_to_stream(stream_uuid, subscription_name) do
execute_using_storage_pool(&Subscription.subscribe_to_stream(&1, stream_uuid, subscription_name))
def subscribe_to_stream(stream_uuid, subscription_name, start_from_event_id \\ nil, start_from_stream_version \\ nil) do
execute_using_storage_pool(&Subscription.subscribe_to_stream(&1, stream_uuid, subscription_name, start_from_event_id, start_from_stream_version))
end

@doc """
Expand Down
3 changes: 2 additions & 1 deletion lib/event_store/storage/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ defmodule EventStore.Storage.Reader do
correlation_id: correlation_id,
data: data,
metadata: metadata,
created_at: to_naive(created_at)
created_at: to_naive(created_at),
}
end

defp to_naive(%NaiveDateTime{} = naive), do: naive
defp to_naive(%Postgrex.Timestamp{year: year, month: month, day: day, hour: hour, min: minute, sec: second, usec: microsecond}) do
{:ok, naive} = NaiveDateTime.new(year, month, day, hour, minute, second, {microsecond, 6})
naive
Expand Down
8 changes: 4 additions & 4 deletions lib/event_store/storage/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ defmodule EventStore.Storage.Subscription do
Subscription.All.execute(conn)
end

def subscribe_to_stream(conn, stream_uuid, subscription_name) do
def subscribe_to_stream(conn, stream_uuid, subscription_name, start_from_event_id, start_from_stream_version) do
case Subscription.Query.execute(conn, stream_uuid, subscription_name) do
{:ok, subscription} -> {:ok, subscription}
{:error, :subscription_not_found} -> Subscription.Subscribe.execute(conn, stream_uuid, subscription_name)
{:error, :subscription_not_found} -> Subscription.Subscribe.execute(conn, stream_uuid, subscription_name, start_from_event_id, start_from_stream_version)
end
end

Expand Down Expand Up @@ -65,11 +65,11 @@ defmodule EventStore.Storage.Subscription do
end

defmodule Subscribe do
def execute(conn, stream_uuid, subscription_name) do
def execute(conn, stream_uuid, subscription_name, start_from_event_id, start_from_stream_version) do
_ = Logger.debug(fn -> "attempting to create subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\"" end)

conn
|> Postgrex.query(Statements.create_subscription, [stream_uuid, subscription_name])
|> Postgrex.query(Statements.create_subscription, [stream_uuid, subscription_name, start_from_event_id, start_from_stream_version])
|> handle_response(stream_uuid, subscription_name)
end

Expand Down
15 changes: 11 additions & 4 deletions lib/event_store/streams/all_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ defmodule EventStore.Streams.AllStream do
GenServer.call(__MODULE__, {:read_stream_forward, start_event_id, count})
end

def subscribe_to_stream(subscription_name, subscriber) do
GenServer.call(__MODULE__, {:subscribe_to_stream, subscription_name, subscriber})
def subscribe_to_stream(subscription_name, subscriber, start_from) do
GenServer.call(__MODULE__, {:subscribe_to_stream, subscription_name, subscriber, start_from})
end

def init(%AllStream{} = state) do
Expand All @@ -34,12 +34,19 @@ defmodule EventStore.Streams.AllStream do
{:reply, reply, state}
end

def handle_call({:subscribe_to_stream, subscription_name, subscriber}, _from, %AllStream{} = state) do
reply = Subscriptions.subscribe_to_all_streams(self, subscription_name, subscriber)
def handle_call({:subscribe_to_stream, subscription_name, subscriber, start_from}, _from, %AllStream{} = state) do
reply = Subscriptions.subscribe_to_all_streams(self, subscription_name, subscriber, start_from_event_id(start_from))

{:reply, reply, state}
end

defp start_from_event_id(:origin), do: 0
defp start_from_event_id(:current) do
{:ok, event_id} = Storage.latest_event_id
event_id
end
defp start_from_event_id(start_from) when is_integer(start_from), do: start_from

defp read_storage_forward(start_event_id, count, serializer) do
case Storage.read_all_streams_forward(start_event_id, count) do
{:ok, recorded_events} -> {:ok, Enum.map(recorded_events, fn event -> deserialize_recorded_event(event, serializer) end)}
Expand Down
14 changes: 9 additions & 5 deletions lib/event_store/streams/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ defmodule EventStore.Streams.Stream do
GenServer.call(stream, {:read_stream_forward, start_version, count})
end

def subscribe_to_stream(stream, subscription_name, subscriber) do
GenServer.call(stream, {:subscribe_to_stream, subscription_name, subscriber})
def subscribe_to_stream(stream, subscription_name, subscriber, start_from) do
GenServer.call(stream, {:subscribe_to_stream, subscription_name, subscriber, start_from})
end

def stream_version(stream) do
Expand Down Expand Up @@ -66,8 +66,8 @@ defmodule EventStore.Streams.Stream do
{:reply, reply, state}
end

def handle_call({:subscribe_to_stream, subscription_name, subscriber}, _from, %Stream{stream_uuid: stream_uuid} = state) do
reply = Subscriptions.subscribe_to_stream(stream_uuid, self, subscription_name, subscriber)
def handle_call({:subscribe_to_stream, subscription_name, subscriber, start_from}, _from, %Stream{stream_uuid: stream_uuid} = state) do
reply = Subscriptions.subscribe_to_stream(stream_uuid, self, subscription_name, subscriber, start_from_stream_version(state, start_from))

{:reply, reply, state}
end
Expand All @@ -76,6 +76,10 @@ defmodule EventStore.Streams.Stream do
{:reply, {:ok, stream_version}, state}
end

defp start_from_stream_version(%Stream{} = _stream, :origin), do: 0
defp start_from_stream_version(%Stream{stream_version: stream_version}, :current), do: stream_version
defp start_from_stream_version(%Stream{} = _stream, start_from) when is_integer(start_from), do: start_from

defp append_to_storage(expected_version, events, %Stream{stream_uuid: stream_uuid, stream_id: stream_id, stream_version: stream_version} = state) when expected_version == 0 and is_nil(stream_id) and stream_version == 0 do
{:ok, stream_id} = Storage.create_stream(stream_uuid)

Expand Down Expand Up @@ -113,7 +117,7 @@ defmodule EventStore.Streams.Stream do
event_type: event_type,
data: serializer.serialize(data),
metadata: serializer.serialize(metadata),
created_at: utc_now
created_at: utc_now,
}
end

Expand Down
16 changes: 8 additions & 8 deletions lib/event_store/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ defmodule EventStore.Subscriptions do
GenServer.start_link(__MODULE__, %Subscriptions{serializer: serializer}, name: __MODULE__)
end

def subscribe_to_stream(stream_uuid, stream, subscription_name, subscriber) do
GenServer.call(__MODULE__, {:subscribe_to_stream, stream_uuid, stream, subscription_name, subscriber})
def subscribe_to_stream(stream_uuid, stream, subscription_name, subscriber, start_from_stream_version \\ nil) do
GenServer.call(__MODULE__, {:subscribe_to_stream, stream_uuid, stream, subscription_name, subscriber, [start_from_stream_version: start_from_stream_version]})
end

def subscribe_to_all_streams(all_stream, subscription_name, subscriber) do
GenServer.call(__MODULE__, {:subscribe_to_stream, @all_stream, all_stream, subscription_name, subscriber})
def subscribe_to_all_streams(all_stream, subscription_name, subscriber, start_from_event_id \\ nil) do
GenServer.call(__MODULE__, {:subscribe_to_stream, @all_stream, all_stream, subscription_name, subscriber, [start_from_event_id: start_from_event_id]})
end

def unsubscribe_from_stream(stream_uuid, subscription_name) do
Expand All @@ -45,9 +45,9 @@ defmodule EventStore.Subscriptions do
{:ok, subscriptions}
end

def handle_call({:subscribe_to_stream, stream_uuid, stream, subscription_name, subscriber}, _from, %Subscriptions{supervisor: supervisor} = subscriptions) do
def handle_call({:subscribe_to_stream, stream_uuid, stream, subscription_name, subscriber, opts}, _from, %Subscriptions{supervisor: supervisor} = subscriptions) do
reply = case get_subscription(stream_uuid, subscription_name, subscriptions) do
nil -> create_subscription(supervisor, stream_uuid, stream, subscription_name, subscriber)
nil -> create_subscription(supervisor, stream_uuid, stream, subscription_name, subscriber, opts)
_subscription -> {:error, :subscription_already_exists}
end

Expand Down Expand Up @@ -100,10 +100,10 @@ defmodule EventStore.Subscriptions do
end
end

defp create_subscription(supervisor, stream_uuid, stream, subscription_name, subscriber) do
defp create_subscription(supervisor, stream_uuid, stream, subscription_name, subscriber, opts) do
_ = Logger.debug(fn -> "creating subscription process on stream #{inspect stream_uuid} named: #{inspect subscription_name}" end)

{:ok, subscription} = Subscriptions.Supervisor.subscribe_to_stream(supervisor, stream_uuid, stream, subscription_name, subscriber)
{:ok, subscription} = Subscriptions.Supervisor.subscribe_to_stream(supervisor, stream_uuid, stream, subscription_name, subscriber, opts)

Process.monitor(subscription)

Expand Down
6 changes: 3 additions & 3 deletions lib/event_store/subscriptions/stream_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule EventStore.Subscriptions.StreamSubscription do

defstate initial do
defevent subscribe(stream_uuid, stream, subscription_name, source, subscriber, opts), data: %SubscriptionData{} = data do
case subscribe_to_stream(stream_uuid, subscription_name) do
case subscribe_to_stream(stream_uuid, subscription_name, opts[:start_from_event_id], opts[:start_from_stream_version]) do
{:ok, subscription} ->
last_ack = subscription_provider(stream_uuid).last_ack(subscription) || 0

Expand Down Expand Up @@ -189,8 +189,8 @@ defmodule EventStore.Subscriptions.StreamSubscription do
defp subscription_provider(@all_stream), do: AllStreamsSubscription
defp subscription_provider(_stream_uuid), do: SingleStreamSubscription

defp subscribe_to_stream(stream_uuid, subscription_name) do
Storage.subscribe_to_stream(stream_uuid, subscription_name)
defp subscribe_to_stream(stream_uuid, subscription_name, start_from_event_id, start_from_stream_version) do
Storage.subscribe_to_stream(stream_uuid, subscription_name, start_from_event_id, start_from_stream_version)
end

defp unsubscribe_from_stream(stream_uuid, subscription_name) do
Expand Down
20 changes: 14 additions & 6 deletions lib/event_store/subscriptions/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@ defmodule EventStore.Subscriptions.Subscription do

alias EventStore.Subscriptions.{StreamSubscription,Subscription}

defstruct stream_uuid: nil, stream: nil, subscription_name: nil, subscriber: nil, subscription: nil

def start_link(stream_uuid, stream, subscription_name, subscriber) do
defstruct [
stream_uuid: nil,
stream: nil,
subscription_name: nil,
subscriber: nil,
subscription: nil,
subscription_opts: [],
]

def start_link(stream_uuid, stream, subscription_name, subscriber, opts) do
GenServer.start_link(__MODULE__, %Subscription{
stream_uuid: stream_uuid,
stream: stream,
subscription_name: subscription_name,
subscriber: subscriber,
subscription: StreamSubscription.new
subscription: StreamSubscription.new,
subscription_opts: opts,
})
end

Expand All @@ -39,10 +47,10 @@ defmodule EventStore.Subscriptions.Subscription do
{:ok, state}
end

def handle_cast({:subscribe_to_stream}, %Subscription{stream_uuid: stream_uuid, stream: stream, subscription_name: subscription_name, subscriber: subscriber, subscription: subscription} = state) do
def handle_cast({:subscribe_to_stream}, %Subscription{stream_uuid: stream_uuid, stream: stream, subscription_name: subscription_name, subscriber: subscriber, subscription: subscription, subscription_opts: opts} = state) do
subscription =
subscription
|> StreamSubscription.subscribe(stream_uuid, stream, subscription_name, self, subscriber, [])
|> StreamSubscription.subscribe(stream_uuid, stream, subscription_name, self, subscriber, opts)

state = %Subscription{state | subscription: subscription}

Expand Down
4 changes: 2 additions & 2 deletions lib/event_store/subscriptions/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ defmodule EventStore.Subscriptions.Supervisor do
Supervisor.start_link(__MODULE__, nil)
end

def subscribe_to_stream(supervisor, stream_uuid, stream, subscription_name, subscriber) do
Supervisor.start_child(supervisor, [stream_uuid, stream, subscription_name, subscriber])
def subscribe_to_stream(supervisor, stream_uuid, stream, subscription_name, subscriber, opts) do
Supervisor.start_child(supervisor, [stream_uuid, stream, subscription_name, subscriber, opts])
end

def unsubscribe_from_stream(supervisor, subscription) do
Expand Down
10 changes: 5 additions & 5 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule EventStore.Mixfile do
def project do
[
app: :eventstore,
version: "0.6.2",
version: "0.7.0",
elixir: "~> 1.3",
elixirc_paths: elixirc_paths(Mix.env),
description: description,
Expand Down Expand Up @@ -37,15 +37,15 @@ defmodule EventStore.Mixfile do
defp deps do
[
{:benchfella, "~> 0.3", only: :bench},
{:credo, "~> 0.4", only: [:dev, :test]},
{:dialyxir, "~> 0.3.5", only: [:dev]},
{:ex_doc, "~> 0.13", only: :dev},
{:credo, "~> 0.5", only: [:dev, :test]},
{:dialyxir, "~> 0.4", only: [:dev]},
{:ex_doc, "~> 0.14", only: :dev},
{:fsm, "~> 0.2"},
{:markdown, github: "devinus/markdown", only: :dev},
{:mix_test_watch, "~> 0.2", only: :dev},
{:poison, "~> 3.0", only: [:bench, :test]},
{:poolboy, "~> 1.5"},
{:postgrex, "~> 0.12"},
{:postgrex, "~> 0.13"},
{:uuid, "~> 1.1", only: [:bench, :test]}
]
end
Expand Down
Loading

0 comments on commit 23f0d7a

Please sign in to comment.