Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete event stream #203

Merged
merged 2 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Next release

### Enhancements

- Delete event stream ([#203](https://github.com/commanded/eventstore/pull/203)).

## v1.1.0

### Enhancements
Expand Down
132 changes: 123 additions & 9 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,11 @@ defmodule EventStore do
@default_count 1_000
@default_timeout config[:timeout] || 15_000

def config do
def config(opts \\ []) do
opts = Keyword.merge(unquote(opts), opts)

with {:ok, config} <-
EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, unquote(opts)) do
EventStore.Supervisor.runtime_config(__MODULE__, @otp_app, opts) do
config
end
end
Expand Down Expand Up @@ -302,6 +304,17 @@ defmodule EventStore do
def stream_all_forward(start_version, opts),
do: stream_forward(@all_stream, start_version, opts)

def delete_stream(stream_uuid, expected_version, type \\ :soft, opts \\ [])

def delete_stream(@all_stream, _expected_version, _type, _opts),
do: {:error, :cannot_delete_all_stream}

def delete_stream(stream_uuid, expected_version, type, opts) when type in [:soft, :hard] do
{conn, opts} = opts(opts)

Stream.delete(conn, stream_uuid, expected_version, type, opts)
end

def subscribe(stream_uuid, opts \\ []) do
name = name(opts)

Expand Down Expand Up @@ -494,7 +507,7 @@ defmodule EventStore do
from the provided expected version.
- `{:error, :stream_exists}` when the stream exists, but expected version
was `:no_stream`.
- `{:error, :stream_does_not_exist}` when the stream does not exist, but
- `{:error, :stream_not_found}` when the stream does not exist, but
expected version was `:stream_exists`.
"""
@callback append_to_stream(
Expand All @@ -506,8 +519,9 @@ defmodule EventStore do
:ok
| {:error, :cannot_append_to_all_stream}
| {:error, :stream_exists}
| {:error, :stream_does_not_exist}
| {:error, :stream_not_found}
| {:error, :wrong_expected_version}
| {:error, :stream_deleted}
| {:error, reason :: term}

@doc """
Expand Down Expand Up @@ -547,7 +561,7 @@ defmodule EventStore do
from the provided expected version.
- `{:error, :stream_exists}` when the stream exists, but expected version
was `:no_stream`.
- `{:error, :stream_does_not_exist}` when the stream does not exist, but
- `{:error, :stream_not_found}` when the stream does not exist, but
expected version was `:stream_exists`.
"""
@callback link_to_stream(
Expand All @@ -559,8 +573,9 @@ defmodule EventStore do
:ok
| {:error, :cannot_append_to_all_stream}
| {:error, :stream_exists}
| {:error, :stream_does_not_exist}
| {:error, :stream_not_found}
| {:error, :wrong_expected_version}
| {:error, :stream_deleted}
| {:error, reason :: term}

@doc """
Expand All @@ -585,7 +600,10 @@ defmodule EventStore do
start_version :: non_neg_integer,
count :: non_neg_integer,
opts :: options
) :: {:ok, list(EventStore.RecordedEvent.t())} | {:error, reason :: term}
) ::
{:ok, list(EventStore.RecordedEvent.t())}
| {:error, :stream_deleted}
| {:error, reason :: term}

@doc """
Reads the requested number of events from all streams, in the order in which
Expand Down Expand Up @@ -626,7 +644,7 @@ defmodule EventStore do
stream_uuid :: String.t(),
start_version :: non_neg_integer,
opts :: [options | {:read_batch_size, non_neg_integer}]
) :: Enumerable.t() | {:error, reason :: term}
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term}

@doc """
Streams events from all streams, in the order in which they were originally
Expand All @@ -645,7 +663,103 @@ defmodule EventStore do
@callback stream_all_forward(
start_version :: non_neg_integer,
opts :: [options | {:read_batch_size, non_neg_integer}]
) :: Enumerable.t()
) :: Enumerable.t() | {:error, :stream_deleted} | {:error, reason :: term}

@doc """
Delete an existing stream.

- `stream_uuid` identity of the stream to be deleted.

- `expected_version` is used for optimistic concurrency checking.
You can provide a non-negative integer to specify the expected stream
version. This is used to ensure you can only delete a stream if it is
at exactly that version.

You can also provide one of the following values to alter the concurrency
checking behaviour:

- `:any_version` - No concurrency check, allow any stream version.
- `:stream_exists` - Ensure the stream exists, at any version.

- `type` - used to indicate how the stream is deleted:

- `:soft` - the stream is marked as deleted, but no events are removed.
- `:hard` - the stream and its events are permanently deleted from the
database.

Soft deletion is the default it the type is not provided.

Returns `:ok` on success or an error tagged tuple on failure.

### Soft delete

Will mark the stream as deleted, but will not delete its events. Events from
soft deleted streams will still appear in the globally ordered all events
(`$all`) stream and in any linked streams.

A soft deleted stream cannot be read nor appended to. Subscriptions to the
deleted stream will not receive any events but subscriptions containing linked
events from the deleted stream, such as the global all events stream, will
still receive events from the deleted stream.

### Hard delete

Will permanently delete the stream and its events. **This is irreversible and
will remove data**. Events will be removed from the globally ordered all
events stream and any linked streams.

After being hard deleted, a stream can later be appended to and read as if had
never existed.

### Examples

#### Soft delete a stream

Delete a stream at any version:

:ok = MyApp.EventStore.delete_stream("stream1", :any_version, :soft)

Delete a stream at an expected version:

:ok = MyApp.EventStore.delete_stream("stream2", 3, :soft)

Delete stream will use soft delete by default so you can omit the type:

:ok = MyApp.EventStore.delete_stream("stream1", :any_version)

#### Hard delete a stream

Since hard deletes are destructive and irreversible they are disabled by
default. To use hard deletes you must first enable them for the event store:

defmodule MyApp.EventStore do
use EventStore, otp_app: :my_app, enable_hard_deletes: true
end

Or via config:

# config/config.exs
config :my_app, MyApp.EventStore, enable_hard_deletes: true

Hard delete a stream at any version:

:ok = MyApp.EventStore.delete_stream("stream1", :any_version, :hard)

Hard delete a stream that should exist:

:ok = MyApp.EventStore.delete_stream("stream2", :stream_exists, :hard)

"""
@callback delete_stream(
stream_uuid :: String.t(),
expected_version :: :any_version | :stream_exists | non_neg_integer(),
type :: :soft | :hard,
opts :: Keyword.t()
) ::
:ok
| {:error, :stream_not_found}
| {:error, :stream_deleted}
| {:error, term}

@doc """
Create a transient subscription to a given stream.
Expand Down
28 changes: 23 additions & 5 deletions lib/event_store/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ defmodule EventStore.Config do
def default_postgrex_opts(config) do
config
|> Keyword.take(@postgrex_connection_opts)
|> Keyword.put(:after_connect, set_schema_search_path(config))
|> Keyword.put(:after_connect, after_connect(config))
end

def postgrex_opts(config, name) do
Expand All @@ -97,7 +97,7 @@ defmodule EventStore.Config do
)
|> Keyword.put(:backoff_type, :exp)
|> Keyword.put(:name, Module.concat([name, Postgrex]))
|> Keyword.put(:after_connect, set_schema_search_path(config))
|> Keyword.put(:after_connect, after_connect(config))
end

def sync_connect_postgrex_opts(config) do
Expand All @@ -107,11 +107,29 @@ defmodule EventStore.Config do
|> Keyword.put(:sync_connect, true)
end

defp after_connect(config) do
schema = Keyword.fetch!(config, :schema)
enable_hard_deletes = Keyword.get(config, :enable_hard_deletes, false)

transaction = fn conn ->
set_schema_search_path(conn, schema)
set_enable_hard_deletes(conn, enable_hard_deletes)
end

{Postgrex, :transaction, [transaction]}
end

# Set the Postgres connection's `search_path` to include only the configured
# schema. This will be `public` by default.
defp set_schema_search_path(config) do
schema = Keyword.fetch!(config, :schema)
defp set_schema_search_path(conn, schema) do
Postgrex.query!(conn, "SET SESSION search_path TO #{schema};", [])
end

{Postgrex, :query!, ["SET search_path TO #{schema};", []]}
# Optionally enable hard deletes to allow destructive delete operations for
# events, streams, and stream events tables.
defp set_enable_hard_deletes(conn, true) do
Postgrex.query!(conn, "SET SESSION eventstore.enable_hard_deletes TO 'on';", [])
end

defp set_enable_hard_deletes(_conn, false), do: nil
end
18 changes: 9 additions & 9 deletions lib/event_store/recorded_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ defmodule EventStore.RecordedEvent do

## Recorded event fields

- `event_number` - position of the event within the stream. This will be
identical to the `stream_version` when fetching events from a single
stream. For the `$all` stream it will be the globally unique and ordered
event number.
- `event_number` - position of the event within the stream.
This will be identical to the `stream_version` when fetching events from a
single stream. For the `$all` stream it will be the globally ordered event
number.
- `event_id` - a globally unique UUID to identify the event.
- `stream_uuid` - the original stream identity for the event.
- `stream_version` - the original version of the stream for the event.
- `correlation_id` - an optional UUID identifier used to correlate related
messages.
- `causation_id` - an optional UUID identifier used to identify which
message you are responding to.
- `data` - the serialized event as binary data.
- `metadata` - the serialized event metadata as binary data.
- `created_at` - the date/time, in UTC, indicating when the event was
- `data` - the deserialized event data.
- `metadata` - a deserialized map of event metadata.
- `created_at` - a `DateTime` (in UTC) indicating when the event was
created.

"""
Expand All @@ -37,8 +37,8 @@ defmodule EventStore.RecordedEvent do
correlation_id: uuid() | nil,
causation_id: uuid() | nil,
event_type: String.t(),
data: term,
metadata: binary() | nil,
data: any(),
metadata: map() | nil,
created_at: DateTime.t()
}

Expand Down
Loading