From 23f0d7a9649fd071abb12429409f643cd3cb5c5a Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Mon, 19 Dec 2016 15:35:18 +0000 Subject: [PATCH] Subscribe to a stream from a specified position Fixes #17. Allow subscriptions to a single stream, or all streams, to optionally specify a given start position. - `:origin` - subscribe to events from the start of the stream (identical to using 0). This is the default behaviour. - `:current` - subscribe to events from the current version. - stream version or event id - provide an exact stream version (single stream), or last seen event id (all stream), to subscribe from. --- CHANGELOG.md | 6 + README.md | 11 +- config/bench.exs | 3 +- config/dev.exs | 3 +- config/test.exs | 3 +- lib/event_store.ex | 16 +-- lib/event_store/sql/statements.ex | 4 +- lib/event_store/storage.ex | 6 +- lib/event_store/storage/reader.ex | 3 +- lib/event_store/storage/subscription.ex | 8 +- lib/event_store/streams/all_stream.ex | 15 ++- lib/event_store/streams/stream.ex | 14 ++- lib/event_store/subscriptions.ex | 16 +-- .../subscriptions/stream_subscription.ex | 6 +- lib/event_store/subscriptions/subscription.ex | 20 +++- lib/event_store/subscriptions/supervisor.ex | 4 +- mix.exs | 10 +- mix.lock | 14 +-- test/event_store_test.exs | 17 +++ test/streams/all_stream_test.exs | 77 +++++++++++++ test/streams/stream_test.exs | 36 +++++- .../all_streams_subscription_test.exs | 13 ++- .../single_stream_subscription_test.exs | 14 +++ .../subscribe_to_stream_test.exs | 104 +++++++++++++----- test/support/event_factory.ex | 4 +- 25 files changed, 327 insertions(+), 100 deletions(-) create mode 100644 test/streams/all_stream_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cdfc154..10a2fb0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v0.7.0 + +### Enhancements + +- Subscribe to a single stream, or all streams, from a specified start position ([#17](https://github.com/slashdotdash/eventstore/issues/17)). + ## v0.6.2 ### Bug fixes diff --git a/README.md b/README.md index 3a465d38..e1b2ba7f 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,7 @@ EventStore is [available in Hex](https://hex.pm/packages/eventstore) and can be password: "postgres", database: "eventstore_dev", hostname: "localhost", - pool_size: 10, - extensions: [{Postgrex.Extensions.Calendar, []}] + pool_size: 10 ``` 4. Create the EventStore database and tables using the `mix` task @@ -90,10 +89,16 @@ Read all events from the stream, starting at the stream's first event. Subscriptions to a stream will guarantee at least once delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped. A subscription can be created to receive events published from a single logical stream or from all streams. -Events are received in batches after being persisted to storage. Each batch contains events from a single stream only with the same correlation id. +Events are received in batches after being persisted to storage. Each batch contains events from a single stream only and with the same correlation id. Subscriptions must be uniquely named and support a single subscriber. Attempting to connect two subscribers to the same subscription will return an error. +By default subscriptions are created from the single stream, or all stream, origin. So it will receive all events from the single stream, or all streams. You can optionally specify a given start position. + +- `:origin` - subscribe to events from the start of the stream (identical to using 0). This is the current behaviour and will remain the default. +- `:current` - subscribe to events from the current version. +- `stream_version` or `event_id` (integer) - specify an exact stream version to subscribe from for a single stream subscription. You provide an event id for an all stream subscription. + #### Ack received events Receipt of each event by the subscriber must be acknowledged. This allows the subscription to resume on failure without missing an event. diff --git a/config/bench.exs b/config/bench.exs index d20c8159..7c5f4a09 100644 --- a/config/bench.exs +++ b/config/bench.exs @@ -8,5 +8,4 @@ config :eventstore, EventStore.Storage, password: "postgres", database: "eventstore_bench", hostname: "localhost", - pool_size: 10, - extensions: [{Postgrex.Extensions.Calendar, []}] + pool_size: 10 diff --git a/config/dev.exs b/config/dev.exs index 49a2660d..d711c5c9 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -8,5 +8,4 @@ config :eventstore, EventStore.Storage, password: "postgres", database: "eventstore_dev", hostname: "localhost", - pool_size: 10, - extensions: [{Postgrex.Extensions.Calendar, []}] + pool_size: 10 diff --git a/config/test.exs b/config/test.exs index f98825dc..dc1b7de5 100644 --- a/config/test.exs +++ b/config/test.exs @@ -11,5 +11,4 @@ config :eventstore, EventStore.Storage, password: "postgres", database: "eventstore_test", hostname: "localhost", - pool_size: 1, - extensions: [{Postgrex.Extensions.Calendar, []}] + pool_size: 1 diff --git a/lib/event_store.ex b/lib/event_store.ex index 82aa3eb4..388380ad 100644 --- a/lib/event_store.ex +++ b/lib/event_store.ex @@ -14,6 +14,8 @@ defmodule EventStore do {:ok, recorded_events} = EventStore.read_stream_forward(stream_uuid) """ + @type start_from :: :origin | :current | integer + alias EventStore.Snapshots.{SnapshotData,Snapshotter} alias EventStore.{EventData,RecordedEvent,Streams,Subscriptions} alias EventStore.Streams.{AllStream,Stream} @@ -88,13 +90,13 @@ defmodule EventStore do Returns `{:ok, subscription}` when subscription succeeds. """ - @spec subscribe_to_stream(String.t, String.t, pid) :: {:ok, subscription :: pid} + @spec subscribe_to_stream(String.t, String.t, pid, start_from) :: {:ok, subscription :: pid} | {:error, :subscription_already_exists} | {:error, reason :: term} - def subscribe_to_stream(stream_uuid, subscription_name, subscriber) do + def subscribe_to_stream(stream_uuid, subscription_name, subscriber, start_from \\ :origin) do {:ok, stream} = Streams.open_stream(stream_uuid) - Stream.subscribe_to_stream(stream, subscription_name, subscriber) + Stream.subscribe_to_stream(stream, subscription_name, subscriber, start_from) end @doc """ @@ -106,11 +108,11 @@ defmodule EventStore do Returns `{:ok, subscription}` when subscription succeeds. """ - @spec subscribe_to_all_streams(String.t, pid) :: {:ok, subscription :: pid} + @spec subscribe_to_all_streams(String.t, pid, start_from) :: {:ok, subscription :: pid} | {:error, :subscription_already_exists} | {:error, reason :: term} - def subscribe_to_all_streams(subscription_name, subscriber) do - AllStream.subscribe_to_stream(subscription_name, subscriber) + def subscribe_to_all_streams(subscription_name, subscriber, start_from \\ :origin) do + AllStream.subscribe_to_stream(subscription_name, subscriber, start_from) end @doc """ @@ -163,7 +165,7 @@ defmodule EventStore do Record a snapshot of the data and metadata for a given source - `timeout` is an integer greater than zero which specifies how many milliseconds to wait for a reply, or the atom :infinity to wait indefinitely. The default value is 5000. If no reply is received within the specified time, the function call fails and the caller exits. - + Returns `:ok` on success """ @spec record_snapshot(SnapshotData.t, timeout :: integer) :: :ok | {:error, reason :: term} diff --git a/lib/event_store/sql/statements.ex b/lib/event_store/sql/statements.ex index 6ded1d9b..93efdfcc 100644 --- a/lib/event_store/sql/statements.ex +++ b/lib/event_store/sql/statements.ex @@ -149,8 +149,8 @@ RETURNING stream_id; def create_subscription do """ -INSERT INTO subscriptions (stream_uuid, subscription_name) -VALUES ($1, $2) +INSERT INTO subscriptions (stream_uuid, subscription_name, last_seen_event_id, last_seen_stream_version) +VALUES ($1, $2, $3, $4) RETURNING subscription_id, stream_uuid, subscription_name, last_seen_event_id, last_seen_stream_version, created_at; """ end diff --git a/lib/event_store/storage.ex b/lib/event_store/storage.ex index d87aac3c..811df366 100644 --- a/lib/event_store/storage.ex +++ b/lib/event_store/storage.ex @@ -72,10 +72,10 @@ defmodule EventStore.Storage do end @doc """ - Create, or locate an existing, persistent subscription to a stream using a unique name + Create, or locate an existing, persistent subscription to a stream using a unique name and starting position (event id or stream version) """ - def subscribe_to_stream(stream_uuid, subscription_name) do - execute_using_storage_pool(&Subscription.subscribe_to_stream(&1, stream_uuid, subscription_name)) + def subscribe_to_stream(stream_uuid, subscription_name, start_from_event_id \\ nil, start_from_stream_version \\ nil) do + execute_using_storage_pool(&Subscription.subscribe_to_stream(&1, stream_uuid, subscription_name, start_from_event_id, start_from_stream_version)) end @doc """ diff --git a/lib/event_store/storage/reader.ex b/lib/event_store/storage/reader.ex index 174da510..49298c33 100644 --- a/lib/event_store/storage/reader.ex +++ b/lib/event_store/storage/reader.ex @@ -64,10 +64,11 @@ defmodule EventStore.Storage.Reader do correlation_id: correlation_id, data: data, metadata: metadata, - created_at: to_naive(created_at) + created_at: to_naive(created_at), } end + defp to_naive(%NaiveDateTime{} = naive), do: naive defp to_naive(%Postgrex.Timestamp{year: year, month: month, day: day, hour: hour, min: minute, sec: second, usec: microsecond}) do {:ok, naive} = NaiveDateTime.new(year, month, day, hour, minute, second, {microsecond, 6}) naive diff --git a/lib/event_store/storage/subscription.ex b/lib/event_store/storage/subscription.ex index 140e8f8d..bbd32e02 100644 --- a/lib/event_store/storage/subscription.ex +++ b/lib/event_store/storage/subscription.ex @@ -17,10 +17,10 @@ defmodule EventStore.Storage.Subscription do Subscription.All.execute(conn) end - def subscribe_to_stream(conn, stream_uuid, subscription_name) do + def subscribe_to_stream(conn, stream_uuid, subscription_name, start_from_event_id, start_from_stream_version) do case Subscription.Query.execute(conn, stream_uuid, subscription_name) do {:ok, subscription} -> {:ok, subscription} - {:error, :subscription_not_found} -> Subscription.Subscribe.execute(conn, stream_uuid, subscription_name) + {:error, :subscription_not_found} -> Subscription.Subscribe.execute(conn, stream_uuid, subscription_name, start_from_event_id, start_from_stream_version) end end @@ -65,11 +65,11 @@ defmodule EventStore.Storage.Subscription do end defmodule Subscribe do - def execute(conn, stream_uuid, subscription_name) do + def execute(conn, stream_uuid, subscription_name, start_from_event_id, start_from_stream_version) do _ = Logger.debug(fn -> "attempting to create subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\"" end) conn - |> Postgrex.query(Statements.create_subscription, [stream_uuid, subscription_name]) + |> Postgrex.query(Statements.create_subscription, [stream_uuid, subscription_name, start_from_event_id, start_from_stream_version]) |> handle_response(stream_uuid, subscription_name) end diff --git a/lib/event_store/streams/all_stream.ex b/lib/event_store/streams/all_stream.ex index 2fa95d6b..17893ca1 100644 --- a/lib/event_store/streams/all_stream.ex +++ b/lib/event_store/streams/all_stream.ex @@ -20,8 +20,8 @@ defmodule EventStore.Streams.AllStream do GenServer.call(__MODULE__, {:read_stream_forward, start_event_id, count}) end - def subscribe_to_stream(subscription_name, subscriber) do - GenServer.call(__MODULE__, {:subscribe_to_stream, subscription_name, subscriber}) + def subscribe_to_stream(subscription_name, subscriber, start_from) do + GenServer.call(__MODULE__, {:subscribe_to_stream, subscription_name, subscriber, start_from}) end def init(%AllStream{} = state) do @@ -34,12 +34,19 @@ defmodule EventStore.Streams.AllStream do {:reply, reply, state} end - def handle_call({:subscribe_to_stream, subscription_name, subscriber}, _from, %AllStream{} = state) do - reply = Subscriptions.subscribe_to_all_streams(self, subscription_name, subscriber) + def handle_call({:subscribe_to_stream, subscription_name, subscriber, start_from}, _from, %AllStream{} = state) do + reply = Subscriptions.subscribe_to_all_streams(self, subscription_name, subscriber, start_from_event_id(start_from)) {:reply, reply, state} end + defp start_from_event_id(:origin), do: 0 + defp start_from_event_id(:current) do + {:ok, event_id} = Storage.latest_event_id + event_id + end + defp start_from_event_id(start_from) when is_integer(start_from), do: start_from + defp read_storage_forward(start_event_id, count, serializer) do case Storage.read_all_streams_forward(start_event_id, count) do {:ok, recorded_events} -> {:ok, Enum.map(recorded_events, fn event -> deserialize_recorded_event(event, serializer) end)} diff --git a/lib/event_store/streams/stream.ex b/lib/event_store/streams/stream.ex index 7485dd62..27ff6d88 100644 --- a/lib/event_store/streams/stream.ex +++ b/lib/event_store/streams/stream.ex @@ -30,8 +30,8 @@ defmodule EventStore.Streams.Stream do GenServer.call(stream, {:read_stream_forward, start_version, count}) end - def subscribe_to_stream(stream, subscription_name, subscriber) do - GenServer.call(stream, {:subscribe_to_stream, subscription_name, subscriber}) + def subscribe_to_stream(stream, subscription_name, subscriber, start_from) do + GenServer.call(stream, {:subscribe_to_stream, subscription_name, subscriber, start_from}) end def stream_version(stream) do @@ -66,8 +66,8 @@ defmodule EventStore.Streams.Stream do {:reply, reply, state} end - def handle_call({:subscribe_to_stream, subscription_name, subscriber}, _from, %Stream{stream_uuid: stream_uuid} = state) do - reply = Subscriptions.subscribe_to_stream(stream_uuid, self, subscription_name, subscriber) + def handle_call({:subscribe_to_stream, subscription_name, subscriber, start_from}, _from, %Stream{stream_uuid: stream_uuid} = state) do + reply = Subscriptions.subscribe_to_stream(stream_uuid, self, subscription_name, subscriber, start_from_stream_version(state, start_from)) {:reply, reply, state} end @@ -76,6 +76,10 @@ defmodule EventStore.Streams.Stream do {:reply, {:ok, stream_version}, state} end + defp start_from_stream_version(%Stream{} = _stream, :origin), do: 0 + defp start_from_stream_version(%Stream{stream_version: stream_version}, :current), do: stream_version + defp start_from_stream_version(%Stream{} = _stream, start_from) when is_integer(start_from), do: start_from + defp append_to_storage(expected_version, events, %Stream{stream_uuid: stream_uuid, stream_id: stream_id, stream_version: stream_version} = state) when expected_version == 0 and is_nil(stream_id) and stream_version == 0 do {:ok, stream_id} = Storage.create_stream(stream_uuid) @@ -113,7 +117,7 @@ defmodule EventStore.Streams.Stream do event_type: event_type, data: serializer.serialize(data), metadata: serializer.serialize(metadata), - created_at: utc_now + created_at: utc_now, } end diff --git a/lib/event_store/subscriptions.ex b/lib/event_store/subscriptions.ex index f3669457..d0641f75 100644 --- a/lib/event_store/subscriptions.ex +++ b/lib/event_store/subscriptions.ex @@ -17,12 +17,12 @@ defmodule EventStore.Subscriptions do GenServer.start_link(__MODULE__, %Subscriptions{serializer: serializer}, name: __MODULE__) end - def subscribe_to_stream(stream_uuid, stream, subscription_name, subscriber) do - GenServer.call(__MODULE__, {:subscribe_to_stream, stream_uuid, stream, subscription_name, subscriber}) + def subscribe_to_stream(stream_uuid, stream, subscription_name, subscriber, start_from_stream_version \\ nil) do + GenServer.call(__MODULE__, {:subscribe_to_stream, stream_uuid, stream, subscription_name, subscriber, [start_from_stream_version: start_from_stream_version]}) end - def subscribe_to_all_streams(all_stream, subscription_name, subscriber) do - GenServer.call(__MODULE__, {:subscribe_to_stream, @all_stream, all_stream, subscription_name, subscriber}) + def subscribe_to_all_streams(all_stream, subscription_name, subscriber, start_from_event_id \\ nil) do + GenServer.call(__MODULE__, {:subscribe_to_stream, @all_stream, all_stream, subscription_name, subscriber, [start_from_event_id: start_from_event_id]}) end def unsubscribe_from_stream(stream_uuid, subscription_name) do @@ -45,9 +45,9 @@ defmodule EventStore.Subscriptions do {:ok, subscriptions} end - def handle_call({:subscribe_to_stream, stream_uuid, stream, subscription_name, subscriber}, _from, %Subscriptions{supervisor: supervisor} = subscriptions) do + def handle_call({:subscribe_to_stream, stream_uuid, stream, subscription_name, subscriber, opts}, _from, %Subscriptions{supervisor: supervisor} = subscriptions) do reply = case get_subscription(stream_uuid, subscription_name, subscriptions) do - nil -> create_subscription(supervisor, stream_uuid, stream, subscription_name, subscriber) + nil -> create_subscription(supervisor, stream_uuid, stream, subscription_name, subscriber, opts) _subscription -> {:error, :subscription_already_exists} end @@ -100,10 +100,10 @@ defmodule EventStore.Subscriptions do end end - defp create_subscription(supervisor, stream_uuid, stream, subscription_name, subscriber) do + defp create_subscription(supervisor, stream_uuid, stream, subscription_name, subscriber, opts) do _ = Logger.debug(fn -> "creating subscription process on stream #{inspect stream_uuid} named: #{inspect subscription_name}" end) - {:ok, subscription} = Subscriptions.Supervisor.subscribe_to_stream(supervisor, stream_uuid, stream, subscription_name, subscriber) + {:ok, subscription} = Subscriptions.Supervisor.subscribe_to_stream(supervisor, stream_uuid, stream, subscription_name, subscriber, opts) Process.monitor(subscription) diff --git a/lib/event_store/subscriptions/stream_subscription.ex b/lib/event_store/subscriptions/stream_subscription.ex index bc1efed4..aebc3cf2 100644 --- a/lib/event_store/subscriptions/stream_subscription.ex +++ b/lib/event_store/subscriptions/stream_subscription.ex @@ -23,7 +23,7 @@ defmodule EventStore.Subscriptions.StreamSubscription do defstate initial do defevent subscribe(stream_uuid, stream, subscription_name, source, subscriber, opts), data: %SubscriptionData{} = data do - case subscribe_to_stream(stream_uuid, subscription_name) do + case subscribe_to_stream(stream_uuid, subscription_name, opts[:start_from_event_id], opts[:start_from_stream_version]) do {:ok, subscription} -> last_ack = subscription_provider(stream_uuid).last_ack(subscription) || 0 @@ -189,8 +189,8 @@ defmodule EventStore.Subscriptions.StreamSubscription do defp subscription_provider(@all_stream), do: AllStreamsSubscription defp subscription_provider(_stream_uuid), do: SingleStreamSubscription - defp subscribe_to_stream(stream_uuid, subscription_name) do - Storage.subscribe_to_stream(stream_uuid, subscription_name) + defp subscribe_to_stream(stream_uuid, subscription_name, start_from_event_id, start_from_stream_version) do + Storage.subscribe_to_stream(stream_uuid, subscription_name, start_from_event_id, start_from_stream_version) end defp unsubscribe_from_stream(stream_uuid, subscription_name) do diff --git a/lib/event_store/subscriptions/subscription.ex b/lib/event_store/subscriptions/subscription.ex index 19acaca3..b49de988 100644 --- a/lib/event_store/subscriptions/subscription.ex +++ b/lib/event_store/subscriptions/subscription.ex @@ -11,15 +11,23 @@ defmodule EventStore.Subscriptions.Subscription do alias EventStore.Subscriptions.{StreamSubscription,Subscription} - defstruct stream_uuid: nil, stream: nil, subscription_name: nil, subscriber: nil, subscription: nil - - def start_link(stream_uuid, stream, subscription_name, subscriber) do + defstruct [ + stream_uuid: nil, + stream: nil, + subscription_name: nil, + subscriber: nil, + subscription: nil, + subscription_opts: [], + ] + + def start_link(stream_uuid, stream, subscription_name, subscriber, opts) do GenServer.start_link(__MODULE__, %Subscription{ stream_uuid: stream_uuid, stream: stream, subscription_name: subscription_name, subscriber: subscriber, - subscription: StreamSubscription.new + subscription: StreamSubscription.new, + subscription_opts: opts, }) end @@ -39,10 +47,10 @@ defmodule EventStore.Subscriptions.Subscription do {:ok, state} end - def handle_cast({:subscribe_to_stream}, %Subscription{stream_uuid: stream_uuid, stream: stream, subscription_name: subscription_name, subscriber: subscriber, subscription: subscription} = state) do + def handle_cast({:subscribe_to_stream}, %Subscription{stream_uuid: stream_uuid, stream: stream, subscription_name: subscription_name, subscriber: subscriber, subscription: subscription, subscription_opts: opts} = state) do subscription = subscription - |> StreamSubscription.subscribe(stream_uuid, stream, subscription_name, self, subscriber, []) + |> StreamSubscription.subscribe(stream_uuid, stream, subscription_name, self, subscriber, opts) state = %Subscription{state | subscription: subscription} diff --git a/lib/event_store/subscriptions/supervisor.ex b/lib/event_store/subscriptions/supervisor.ex index f46412b6..af46cf6e 100644 --- a/lib/event_store/subscriptions/supervisor.ex +++ b/lib/event_store/subscriptions/supervisor.ex @@ -9,8 +9,8 @@ defmodule EventStore.Subscriptions.Supervisor do Supervisor.start_link(__MODULE__, nil) end - def subscribe_to_stream(supervisor, stream_uuid, stream, subscription_name, subscriber) do - Supervisor.start_child(supervisor, [stream_uuid, stream, subscription_name, subscriber]) + def subscribe_to_stream(supervisor, stream_uuid, stream, subscription_name, subscriber, opts) do + Supervisor.start_child(supervisor, [stream_uuid, stream, subscription_name, subscriber, opts]) end def unsubscribe_from_stream(supervisor, subscription) do diff --git a/mix.exs b/mix.exs index 562892b5..bdb26722 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule EventStore.Mixfile do def project do [ app: :eventstore, - version: "0.6.2", + version: "0.7.0", elixir: "~> 1.3", elixirc_paths: elixirc_paths(Mix.env), description: description, @@ -37,15 +37,15 @@ defmodule EventStore.Mixfile do defp deps do [ {:benchfella, "~> 0.3", only: :bench}, - {:credo, "~> 0.4", only: [:dev, :test]}, - {:dialyxir, "~> 0.3.5", only: [:dev]}, - {:ex_doc, "~> 0.13", only: :dev}, + {:credo, "~> 0.5", only: [:dev, :test]}, + {:dialyxir, "~> 0.4", only: [:dev]}, + {:ex_doc, "~> 0.14", only: :dev}, {:fsm, "~> 0.2"}, {:markdown, github: "devinus/markdown", only: :dev}, {:mix_test_watch, "~> 0.2", only: :dev}, {:poison, "~> 3.0", only: [:bench, :test]}, {:poolboy, "~> 1.5"}, - {:postgrex, "~> 0.12"}, + {:postgrex, "~> 0.13"}, {:uuid, "~> 1.1", only: [:bench, :test]} ] end diff --git a/mix.lock b/mix.lock index 041d02ce..54e7a481 100644 --- a/mix.lock +++ b/mix.lock @@ -1,12 +1,12 @@ %{"benchfella": {:hex, :benchfella, "0.3.3", "bbde48b5fe1ef556baa7ad933008e214e050e81ddb0916350715f5759fb35c0c", [], []}, "bunt": {:hex, :bunt, "0.1.6", "5d95a6882f73f3b9969fdfd1953798046664e6f77ec4e486e6fafc7caad97c6f", [:mix], []}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [], []}, - "credo": {:hex, :credo, "0.5.1", "2395862b94628cadf0f5c68975c1440393f425b955f1e70ce1aea267e00187a1", [:mix], [{:bunt, "~> 0.1.6", [hex: :bunt, optional: false]}]}, - "db_connection": {:hex, :db_connection, "1.0.0", "63c03e520d54886a66104d34e32397ba960db6e74b596ce221592c07d6a40d8d", [], [{:connection, "~> 1.0.2", [hex: :connection, optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, optional: true]}]}, - "decimal": {:hex, :decimal, "1.2.0", "462960fd71af282e570f7b477f6be56bf8968e68277d4d0b641a635269bf4b0d", [], []}, - "dialyxir": {:hex, :dialyxir, "0.3.5", "eaba092549e044c76f83165978979f60110dc58dd5b92fd952bf2312f64e9b14", [], []}, - "earmark": {:hex, :earmark, "1.0.2", "a0b0904d74ecc14da8bd2e6e0248e1a409a2bc91aade75fcf428125603de3853", [], []}, - "ex_doc": {:hex, :ex_doc, "0.14.3", "e61cec6cf9731d7d23d254266ab06ac1decbb7651c3d1568402ec535d387b6f7", [], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, + "credo": {:hex, :credo, "0.5.3", "0c405b36e7651245a8ed63c09e2d52c2e2b89b6d02b1570c4d611e0fcbecf4a2", [:mix], [{:bunt, "~> 0.1.6", [hex: :bunt, optional: false]}]}, + "db_connection": {:hex, :db_connection, "1.1.0", "b2b88db6d7d12f99997b584d09fad98e560b817a20dab6a526830e339f54cdb3", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, optional: true]}]}, + "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], []}, + "dialyxir": {:hex, :dialyxir, "0.4.1", "236056d6acd25f740f336756c0f3b5dd6e2f0156074bc15f3b779aeee15390c8", [:mix], []}, + "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []}, + "ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, "fs": {:hex, :fs, "0.9.2", "ed17036c26c3f70ac49781ed9220a50c36775c6ca2cf8182d123b6566e49ec59", [:rebar], []}, "fsm": {:hex, :fsm, "0.2.0", "53bcc0fd4a470c92cbbc3bae2d7f7dd8462898eedd62c37a6be556b94fba0e05", [:mix], []}, "hoedown": {:git, "https://github.com/hoedown/hoedown.git", "980b9c549b4348d50b683ecee6abee470b98acda", []}, @@ -14,5 +14,5 @@ "mix_test_watch": {:hex, :mix_test_watch, "0.2.6", "9fcc2b1b89d1594c4a8300959c19d50da2f0ff13642c8f681692a6e507f92cab", [:mix], [{:fs, "~> 0.9.1", [hex: :fs, optional: false]}]}, "poison": {:hex, :poison, "3.0.0", "625ebd64d33ae2e65201c2c14d6c85c27cc8b68f2d0dd37828fde9c6920dd131", [], []}, "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}, - "postgrex": {:hex, :postgrex, "0.12.1", "2f8b46cb3a44dcd42f42938abedbfffe7e103ba4ce810ccbeee8dcf27ca0fb06", [], [{:connection, "~> 1.0", [hex: :connection, optional: false]}, {:db_connection, "~> 1.0-rc.4", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]}, + "postgrex": {:hex, :postgrex, "0.13.0", "e101ab47d0725955c5c8830ae8812412992e02e4bd9db09e17abb0a5d82d09c7", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]}, "uuid": {:hex, :uuid, "1.1.5", "96cb36d86ee82f912efea4d50464a5df606bf3f1163d6bdbb302d98474969369", [], []}} diff --git a/test/event_store_test.exs b/test/event_store_test.exs index 9647a2f4..345164d0 100644 --- a/test/event_store_test.exs +++ b/test/event_store_test.exs @@ -57,6 +57,23 @@ defmodule EventStoreTest do assert hd(received_events).data == hd(events).data end + test "subscribe to all streams from current position" do + stream_uuid = UUID.uuid4 + initial_events = EventFactory.create_events(1) + new_events = EventFactory.create_events(1, 2) + + :ok = EventStore.append_to_stream(stream_uuid, 0, initial_events) + + {:ok, subscription} = EventStore.subscribe_to_all_streams(@subscription_name, self, :current) + + :ok = EventStore.append_to_stream(stream_uuid, 1, new_events) + + assert_receive {:events, received_events, ^subscription} + + assert length(received_events) == 1 + assert hd(received_events).data == hd(new_events).data + end + defmodule ExampleData do defstruct [:data] end diff --git a/test/streams/all_stream_test.exs b/test/streams/all_stream_test.exs new file mode 100644 index 00000000..1e0934ac --- /dev/null +++ b/test/streams/all_stream_test.exs @@ -0,0 +1,77 @@ +defmodule EventStore.Streams.AllStreamTest do + use EventStore.StorageCase + doctest EventStore.Streams.Supervisor + doctest EventStore.Streams.Stream + + alias EventStore.EventFactory + alias EventStore.Streams + alias EventStore.Streams.{AllStream,Stream} + + @subscription_name "test_subscription" + + describe "read stream forward" do + setup [:append_events_to_streams] + + test "should fetch events from all streams" do + {:ok, read_events} = AllStream.read_stream_forward(0, 1_000) + + assert length(read_events) == 6 + end + end + + describe "subscribe to all streams" do + setup [:append_events_to_streams] + + test "from origin should receive all events" do + {:ok, _subscription} = AllStream.subscribe_to_stream(@subscription_name, self, :origin) + + assert_receive {:events, received_events1, _} + assert_receive {:events, received_events2, _} + assert length(received_events1 ++ received_events2) == 6 + end + + test "from current should receive only new events", context do + {:ok, _subscription} = AllStream.subscribe_to_stream(@subscription_name, self, :current) + + refute_receive {:events, _received_events, _} + + events = EventFactory.create_events(1, 4) + :ok = Stream.append_to_stream(context[:stream1], 3, events) + + assert_receive {:events, received_events, _} + assert length(received_events) == 1 + end + + test "from given event id should receive only later events" do + {:ok, _subscription} = AllStream.subscribe_to_stream(@subscription_name, self, 2) + + assert_receive {:events, received_events1, _} + assert_receive {:events, received_events2, _} + assert length(received_events1 ++ received_events2) == 4 + end + end + + defp append_events_to_streams(_context) do + {stream1_uuid, stream1, stream1_events} = append_events_to_stream + {stream2_uuid, stream2, stream2_events} = append_events_to_stream + + [ + stream1_uuid: stream1_uuid, + stream1: stream1, + stream1_events: stream1_events, + stream2_uuid: stream2_uuid, + stream2: stream2, + stream2_events: stream2_events, + ] + end + + defp append_events_to_stream do + stream_uuid = UUID.uuid4 + events = EventFactory.create_events(3) + + {:ok, stream} = Streams.open_stream(stream_uuid) + :ok = Stream.append_to_stream(stream, 0, events) + + {stream_uuid, stream, events} + end +end diff --git a/test/streams/stream_test.exs b/test/streams/stream_test.exs index 94ae8688..6af6ca10 100644 --- a/test/streams/stream_test.exs +++ b/test/streams/stream_test.exs @@ -8,7 +8,7 @@ defmodule EventStore.Streams.StreamTest do alias EventStore.Streams alias EventStore.Streams.Stream - @all_stream "$all" + @subscription_name "test_subscription" test "open a stream" do stream_uuid = UUID.uuid4 @@ -52,7 +52,7 @@ defmodule EventStore.Streams.StreamTest do test "should set created at datetime", context do now = DateTime.utc_now |> DateTime.to_naive - {:ok, [event|_]} = Stream.read_stream_forward(context[:stream], 0, 1_000) + {:ok, [event]} = Stream.read_stream_forward(context[:stream], 0, 1) created_at = event.created_at assert created_at != nil @@ -85,7 +85,37 @@ defmodule EventStore.Streams.StreamTest do end end - test "stream should correctly restore stream_version after reopening" do + describe "subscribe to stream" do + setup [:append_events_to_stream] + + test "from origin should receive all events", context do + {:ok, _subscription} = Stream.subscribe_to_stream(context[:stream], @subscription_name, self, :origin) + + assert_receive {:events, received_events, _} + assert length(received_events) == 3 + end + + test "from current should receive only new events", context do + {:ok, _subscription} = Stream.subscribe_to_stream(context[:stream], @subscription_name, self, :current) + + refute_receive {:events, _received_events, _} + + events = EventFactory.create_events(1, 4) + :ok = Stream.append_to_stream(context[:stream], 3, events) + + assert_receive {:events, received_events, _} + assert length(received_events) == 1 + end + + test "from given stream version should receive only later events", context do + {:ok, _subscription} = Stream.subscribe_to_stream(context[:stream], @subscription_name, self, 2) + + assert_receive {:events, received_events, _} + assert length(received_events) == 1 + end + end + + test "stream should correctly restore `stream_version` after reopening" do stream_uuid = UUID.uuid4 events = EventFactory.create_events(3) diff --git a/test/subscriptions/all_streams_subscription_test.exs b/test/subscriptions/all_streams_subscription_test.exs index d0751ffa..015b0ce3 100644 --- a/test/subscriptions/all_streams_subscription_test.exs +++ b/test/subscriptions/all_streams_subscription_test.exs @@ -15,13 +15,24 @@ defmodule EventStore.Subscriptions.AllStreamsSubscriptionTest do {:ok, %{conn: conn}} end - test "create subscription to stream" do + test "create subscription to all streams" do subscription = create_subscription assert subscription.state == :catching_up assert subscription.data.subscription_name == @subscription_name assert subscription.data.subscriber == self assert subscription.data.last_seen == 0 + assert subscription.data.last_ack == 0 + end + + test "create subscription to all streams from starting event id" do + subscription = create_subscription(start_from_event_id: 2) + + assert subscription.state == :catching_up + assert subscription.data.subscription_name == @subscription_name + assert subscription.data.subscriber == self + assert subscription.data.last_seen == 2 + assert subscription.data.last_ack == 2 end test "catch-up subscription, no persisted events" do diff --git a/test/subscriptions/single_stream_subscription_test.exs b/test/subscriptions/single_stream_subscription_test.exs index 3c495f9a..5a1154f0 100644 --- a/test/subscriptions/single_stream_subscription_test.exs +++ b/test/subscriptions/single_stream_subscription_test.exs @@ -25,6 +25,20 @@ defmodule EventStore.Subscriptions.SingleStreamSubscriptionTest do assert subscription.data.subscription_name == @subscription_name assert subscription.data.subscriber == self assert subscription.data.last_seen == 0 + assert subscription.data.last_ack == 0 + end + + test "create subscription to a single stream from starting stream version" do + stream_uuid = UUID.uuid4 + {:ok, stream} = Streams.open_stream(stream_uuid) + + subscription = create_subscription(stream_uuid, stream, start_from_stream_version: 2) + + assert subscription.state == :catching_up + assert subscription.data.subscription_name == @subscription_name + assert subscription.data.subscriber == self + assert subscription.data.last_seen == 2 + assert subscription.data.last_ack == 2 end test "catch-up subscription, no persisted events" do diff --git a/test/subscriptions/subscribe_to_stream_test.exs b/test/subscriptions/subscribe_to_stream_test.exs index a00fd5eb..dab95ab7 100644 --- a/test/subscriptions/subscribe_to_stream_test.exs +++ b/test/subscriptions/subscribe_to_stream_test.exs @@ -16,46 +16,64 @@ defmodule EventStore.Subscriptions.SubscribeToStream do {:ok, %{subscription_name: subscription_name, all_stream: all_stream}} end - test "subscribe to single stream", %{subscription_name: subscription_name} do - stream_uuid = UUID.uuid4 - events = EventFactory.create_events(1) + describe "single stream subscription" do + test "subscribe to single stream from origin should receive all its events", %{subscription_name: subscription_name} do + stream_uuid = UUID.uuid4 + events = EventFactory.create_events(1) - {:ok, stream} = Streams.open_stream(stream_uuid) + {:ok, stream} = Streams.open_stream(stream_uuid) - {:ok, subscription} = Subscriptions.subscribe_to_stream(stream_uuid, stream, subscription_name, self) + {:ok, subscription} = Subscriptions.subscribe_to_stream(stream_uuid, stream, subscription_name, self) - :ok = Stream.append_to_stream(stream, 0, events) + :ok = Stream.append_to_stream(stream, 0, events) - assert_receive {:events, received_events, ^subscription}, @receive_timeout - assert pluck(received_events, :data) == pluck(events, :data) - end + assert_receive {:events, received_events, ^subscription}, @receive_timeout + assert pluck(received_events, :data) == pluck(events, :data) + end - test "subscribe to stream more than once using same subscription name should error", %{subscription_name: subscription_name} do - stream_uuid = UUID.uuid4 - {:ok, stream} = Streams.open_stream(stream_uuid) + test "subscribe to single stream from given stream version should only receive later events", %{subscription_name: subscription_name} do + stream_uuid = UUID.uuid4 + initial_events = EventFactory.create_events(1) + new_events = EventFactory.create_events(1, 2) - {:ok, _} = Subscriptions.subscribe_to_stream(stream_uuid, stream, subscription_name, self) - {:error, :subscription_already_exists} = Subscriptions.subscribe_to_stream(stream_uuid, stream, subscription_name, self) - end + {:ok, stream} = Streams.open_stream(stream_uuid) + :ok = Stream.append_to_stream(stream, 0, initial_events) + + {:ok, subscription} = Subscriptions.subscribe_to_stream(stream_uuid, stream, subscription_name, self, 1) + + :ok = Stream.append_to_stream(stream, 1, new_events) + + assert_receive {:events, received_events, ^subscription}, @receive_timeout + assert pluck(received_events, :data) == pluck(new_events, :data) + end - test "subscribe to single stream should ignore events from another stream", %{subscription_name: subscription_name} do - interested_stream_uuid = UUID.uuid4 - other_stream_uuid = UUID.uuid4 + test "subscribe to stream more than once using same subscription name should error", %{subscription_name: subscription_name} do + stream_uuid = UUID.uuid4 + {:ok, stream} = Streams.open_stream(stream_uuid) + + {:ok, _} = Subscriptions.subscribe_to_stream(stream_uuid, stream, subscription_name, self) + {:error, :subscription_already_exists} = Subscriptions.subscribe_to_stream(stream_uuid, stream, subscription_name, self) + end - interested_events = EventFactory.create_events(1) - other_events = EventFactory.create_events(1) + test "subscribe to single stream should ignore events from another stream", %{subscription_name: subscription_name} do + interested_stream_uuid = UUID.uuid4 + other_stream_uuid = UUID.uuid4 - {:ok, interested_stream} = Streams.open_stream(interested_stream_uuid) - {:ok, other_stream} = Streams.open_stream(other_stream_uuid) + interested_events = EventFactory.create_events(1) + other_events = EventFactory.create_events(1) - {:ok, subscription} = Subscriptions.subscribe_to_stream(interested_stream_uuid, interested_stream, subscription_name, self) + {:ok, interested_stream} = Streams.open_stream(interested_stream_uuid) + {:ok, other_stream} = Streams.open_stream(other_stream_uuid) - :ok = Stream.append_to_stream(interested_stream, 0, interested_events) - :ok = Stream.append_to_stream(other_stream, 0, other_events) + {:ok, subscription} = Subscriptions.subscribe_to_stream(interested_stream_uuid, interested_stream, subscription_name, self) - # received events should not include events from the other stream - assert_receive {:events, received_events, ^subscription}, @receive_timeout - assert pluck(received_events, :data) == pluck(interested_events, :data) + :ok = Stream.append_to_stream(interested_stream, 0, interested_events) + :ok = Stream.append_to_stream(other_stream, 0, other_events) + + # received events should not include events from the other stream + assert_receive {:events, received_events, ^subscription}, @receive_timeout + assert pluck(received_events, :data) == pluck(interested_events, :data) + end end describe "all stream subscription" do @@ -84,6 +102,36 @@ defmodule EventStore.Subscriptions.SubscribeToStream do assert stream1_received_events != stream2_received_events end + test "subscribe to all streams from given stream id should only receive later events from all streams", %{subscription_name: subscription_name, all_stream: all_stream} do + stream1_uuid = UUID.uuid4 + stream2_uuid = UUID.uuid4 + + stream1_initial_events = EventFactory.create_events(1) + stream2_initial_events = EventFactory.create_events(1) + stream1_new_events = EventFactory.create_events(1, 2) + stream2_new_events = EventFactory.create_events(1, 2) + + {:ok, stream1} = Streams.open_stream(stream1_uuid) + {:ok, stream2} = Streams.open_stream(stream2_uuid) + + :ok = Stream.append_to_stream(stream1, 0, stream1_initial_events) + :ok = Stream.append_to_stream(stream2, 0, stream2_initial_events) + + {:ok, subscription} = Subscriptions.subscribe_to_all_streams(all_stream, subscription_name, self, 2) + + :ok = Stream.append_to_stream(stream1, 1, stream1_new_events) + :ok = Stream.append_to_stream(stream2, 1, stream2_new_events) + + assert_receive {:events, stream1_received_events, ^subscription}, @receive_timeout + send(subscription, {:ack, List.last(stream1_received_events).event_id}) + + assert_receive {:events, stream2_received_events, ^subscription}, @receive_timeout + + assert pluck(stream1_received_events, :data) == pluck(stream1_new_events, :data) + assert pluck(stream2_received_events, :data) == pluck(stream2_new_events, :data) + assert stream1_received_events != stream2_received_events + end + test "should monitor all stream subscription, terminate subscription and subscriber on error", %{subscription_name: subscription_name, all_stream: all_stream} do stream_uuid = UUID.uuid4 events = EventFactory.create_events(1) diff --git a/test/support/event_factory.ex b/test/support/event_factory.ex index aafd46d5..055a5660 100644 --- a/test/support/event_factory.ex +++ b/test/support/event_factory.ex @@ -5,7 +5,7 @@ defmodule EventStore.EventFactory do defstruct event: nil end - def create_events(number_of_events) when number_of_events > 0 do + def create_events(number_of_events, initial_event_number \\ 1) when number_of_events > 0 do correlation_id = UUID.uuid4 1..number_of_events @@ -13,7 +13,7 @@ defmodule EventStore.EventFactory do %EventData{ correlation_id: correlation_id, event_type: "Elixir.EventStore.EventFactory.Event", - data: %EventStore.EventFactory.Event{event: number}, + data: %EventStore.EventFactory.Event{event: (initial_event_number + number - 1)}, metadata: %{"user" => "user@example.com"} } end)