Skip to content

Commit

Permalink
Allow snapshots to be deleted
Browse files Browse the repository at this point in the history
Fixes #26.

Extend the Event Store API with an `EventStore.delete_snapshot`
function. This deletes an existing snapshot.
  • Loading branch information
slashdotdash committed Dec 19, 2016
1 parent 2e4fbb4 commit b7e8b16
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 51 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v0.7.1

### Enhancements

- Allow snapshots to be deleted ([#26](https://github.com/slashdotdash/eventstore/issues/26)).

## v0.7.0

### Enhancements
Expand Down
10 changes: 10 additions & 0 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,14 @@ defmodule EventStore do
def record_snapshot(%SnapshotData{} = snapshot, timeout) do
Snapshotter.record_snapshot(snapshot, timeout)
end

@doc """
Delete a previously recorded snapshop for a given source
Returns `:ok` on success, or when the snapshot does not exist
"""
@spec delete_snapshot(String.t) :: :ok | {:error, reason :: term}
def delete_snapshot(source_uuid) do
Snapshotter.delete_snapshot(source_uuid)
end
end
23 changes: 20 additions & 3 deletions lib/event_store/snapshots/snapshotter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule EventStore.Snapshots.Snapshotter do
end

@doc """
Record a snapshot of the data and metadata for a given source
Record a snapshot containing data and metadata for a given source
Returns `:ok` on success
"""
Expand All @@ -38,16 +38,27 @@ defmodule EventStore.Snapshots.Snapshotter do
end

@doc """
Record a snapshot of the data and metadata for a given source
Record a snapshot containing data and metadata for a given source
- `timeout` is an integer greater than zero which specifies how many milliseconds to wait for a reply, or the atom :infinity to wait indefinitely. The default value is 5000. If no reply is received within the specified time, the function call fails and the caller exits.
- `timeout` is an integer greater than zero which specifies how many milliseconds to wait for a reply, or the atom :infinity to wait indefinitely.
If no reply is received within the specified time, the function call fails and the caller exits.
The default value is 5000.
Returns `:ok` on success
"""
def record_snapshot(%SnapshotData{} = snapshot, timeout) do
GenServer.call(__MODULE__, {:record_snapshot, snapshot}, timeout)
end

@doc """
Delete a previously recorded snapshot for a given source
Returns `:ok` on success
"""
def delete_snapshot(source_uuid) do
GenServer.call(__MODULE__, {:delete_snapshot, source_uuid})
end

def handle_call({:read_snapshot, source_uuid}, _from, %Snapshotter{serializer: serializer} = state) do
reply = case Storage.read_snapshot(source_uuid) do
{:ok, snapshot} -> {:ok, deserialize_snapshot(snapshot, serializer)}
Expand All @@ -66,6 +77,12 @@ defmodule EventStore.Snapshots.Snapshotter do
{:reply, reply, state}
end

def handle_call({:delete_snapshot, source_uuid}, _from, %Snapshotter{} = state) do
reply = Storage.delete_snapshot(source_uuid)

{:reply, reply, state}
end

defp serialize_snapshot(%SnapshotData{data: data, metadata: metadata} = snapshot, serializer) do
%SnapshotData{snapshot |
data: serializer.serialize(data),
Expand Down
17 changes: 8 additions & 9 deletions lib/event_store/sql/statements.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule EventStore.Sql.Statements do
create_subscriptions_table,
create_subscription_index,
create_snapshots_table,
create_snapshots_index
]
end

Expand Down Expand Up @@ -93,8 +92,7 @@ CREATE UNIQUE INDEX ix_subscriptions_stream_uuid_subscription_name ON subscripti
"""
CREATE TABLE snapshots
(
snapshot_id bigserial PRIMARY KEY NOT NULL,
source_uuid text NOT NULL,
source_uuid text PRIMARY KEY NOT NULL,
source_version bigint NOT NULL,
source_type text NOT NULL,
data bytea NOT NULL,
Expand All @@ -104,12 +102,6 @@ CREATE TABLE snapshots
"""
end

defp create_snapshots_index do
"""
CREATE UNIQUE INDEX ix_snapshots_source_uuid ON snapshots (source_uuid);
"""
end

def create_stream do
"""
INSERT INTO streams (stream_uuid)
Expand Down Expand Up @@ -179,6 +171,13 @@ DO UPDATE SET source_version = $2, source_type = $3, data = $4, metadata = $5;
"""
end

def delete_snapshot do
"""
DELETE FROM snapshots
WHERE source_uuid = $1;
"""
end

def query_all_subscriptions do
"""
SELECT subscription_id, stream_uuid, subscription_name, last_seen_event_id, last_seen_stream_version, created_at
Expand Down
7 changes: 7 additions & 0 deletions lib/event_store/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ defmodule EventStore.Storage do
execute_using_storage_pool(&Snapshot.record_snapshot(&1, snapshot))
end

@doc """
Delete an existing snapshot for a given source
"""
def delete_snapshot(source_uuid) do
execute_using_storage_pool(&Snapshot.delete_snapshot(&1, source_uuid))
end

# Execute the given `transaction` function using a database worker from the pool
defp execute_using_storage_pool(transaction) do
:poolboy.transaction(@storage_pool_name, transaction)
Expand Down
25 changes: 23 additions & 2 deletions lib/event_store/storage/snapshot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ defmodule EventStore.Storage.Snapshot do
alias EventStore.Storage.Snapshot

def read_snapshot(conn, source_uuid) do
Snapshot.QueryGetSnapshot.execute(conn, source_uuid)
Snapshot.QuerySnapshot.execute(conn, source_uuid)
end

def record_snapshot(conn, %SnapshotData{} = snapshot) do
Snapshot.RecordSnapshot.execute(conn, snapshot)
end

defmodule QueryGetSnapshot do
def delete_snapshot(conn, source_uuid) do
Snapshot.DeleteSnapshot.execute(conn, source_uuid)
end

defmodule QuerySnapshot do
def execute(conn, source_uuid) do
conn
|> Postgrex.query(Statements.query_get_snapshot, [source_uuid])
Expand Down Expand Up @@ -60,4 +64,21 @@ defmodule EventStore.Storage.Snapshot do
{:error, error}
end
end

defmodule DeleteSnapshot do
def execute(conn, source_uuid) do
conn
|> Postgrex.query(Statements.delete_snapshot, [source_uuid])
|> handle_response(source_uuid)
end

defp handle_response({:ok, _result}, _source_uuid) do
:ok
end

defp handle_response({:error, error}, source_uuid) do
_ = Logger.warn(fn -> "failed to delete snapshot for source \"#{source_uuid}\" due to: #{inspect error}" end)
{:error, error}
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule EventStore.Mixfile do
def project do
[
app: :eventstore,
version: "0.7.0",
version: "0.7.1",
elixir: "~> 1.3",
elixirc_paths: elixirc_paths(Mix.env),
description: description,
Expand Down
3 changes: 1 addition & 2 deletions scripts/snapshots.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
select
snapshot_id,
source_uuid,
source_version,
source_type,
convert_from(data, current_setting('server_encoding')) as data,
convert_from(metadata, current_setting('server_encoding')) as metadata,
created_at
from snapshots
order by created_at;
order by created_at;
35 changes: 22 additions & 13 deletions test/event_store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -79,29 +79,38 @@ defmodule EventStoreTest do
end

test "record snapshot" do
snapshot = %SnapshotData{
source_uuid: UUID.uuid4,
source_version: 1,
source_type: Atom.to_string(ExampleData),
data: %ExampleData{data: "some data"}
}
:ok = EventStore.record_snapshot(snapshot)
assert record_snapshot != nil
end

test "read a snapshot" do
snapshot = record_snapshot

{:ok, read_snapshot} = EventStore.read_snapshot(snapshot.source_uuid)

assert snapshot.source_uuid == read_snapshot.source_uuid
assert snapshot.source_version == read_snapshot.source_version
assert snapshot.source_type == read_snapshot.source_type
assert snapshot.data == read_snapshot.data
end

test "delete a snapshot" do
snapshot = record_snapshot

:ok = EventStore.delete_snapshot(snapshot.source_uuid)

{:error, :snapshot_not_found} = EventStore.read_snapshot(snapshot.source_uuid)
end

defp record_snapshot do
snapshot = %SnapshotData{
source_uuid: UUID.uuid4,
source_version: 1,
source_type: Atom.to_string(ExampleData),
data: %ExampleData{data: "some data"}
}
:ok = EventStore.record_snapshot(snapshot)

{:ok, read_snapshot} = EventStore.read_snapshot(snapshot.source_uuid)
:ok = EventStore.record_snapshot(snapshot)

assert snapshot.source_uuid == read_snapshot.source_uuid
assert snapshot.source_version == read_snapshot.source_version
assert snapshot.source_type == read_snapshot.source_type
assert snapshot.data == read_snapshot.data
snapshot
end
end
79 changes: 58 additions & 21 deletions test/storage/snapshot_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,50 @@ defmodule EventStore.Storage.SnapshotTest do
{:ok, %{conn: conn}}
end

test "read snapshot when not exists", %{conn: conn} do
source_uuid = UUID.uuid4
{:error, :snapshot_not_found} = Snapshot.read_snapshot(conn, source_uuid)
end
describe "read snapshot" do
test "should error when none exists", %{conn: conn} do
source_uuid = UUID.uuid4
{:error, :snapshot_not_found} = Snapshot.read_snapshot(conn, source_uuid)
end

test "read snapshot when present", %{conn: conn} do
source_uuid = UUID.uuid4
source_version = 1
recorded_event = hd(EventFactory.create_recorded_events(1, 1))
:ok = Snapshot.record_snapshot(conn, %SnapshotData{source_uuid: source_uuid, source_version: source_version, source_type: recorded_event.event_type, data: recorded_event.data, metadata: recorded_event.metadata})
test "should read successfully when present", %{conn: conn} do
source_uuid = UUID.uuid4
source_version = 1
recorded_event = hd(EventFactory.create_recorded_events(1, 1))
:ok = Snapshot.record_snapshot(conn, %SnapshotData{source_uuid: source_uuid, source_version: source_version, source_type: recorded_event.event_type, data: recorded_event.data, metadata: recorded_event.metadata})

{:ok, snapshot} = Snapshot.read_snapshot(conn, source_uuid)
{:ok, snapshot} = Snapshot.read_snapshot(conn, source_uuid)

assert snapshot.source_uuid == source_uuid
assert snapshot.source_version == source_version
assert snapshot.source_type == recorded_event.event_type
assert snapshot.data == recorded_event.data
assert snapshot.metadata == recorded_event.metadata
assert snapshot.source_uuid == source_uuid
assert snapshot.source_version == source_version
assert snapshot.source_type == recorded_event.event_type
assert snapshot.data == recorded_event.data
assert snapshot.metadata == recorded_event.metadata
end
end

test "record snapshot when none exists", %{conn: conn} do
source_uuid = UUID.uuid4
source_version = 1
recorded_event = hd(EventFactory.create_recorded_events(1, 1))
describe "record snapshot" do
test "should record snapshot when none exists", %{conn: conn} do
source_uuid = UUID.uuid4
source_version = 1
recorded_event = hd(EventFactory.create_recorded_events(1, 1))

:ok = Snapshot.record_snapshot(conn, %SnapshotData{source_uuid: source_uuid, source_version: source_version, source_type: recorded_event.event_type, data: recorded_event.data, metadata: recorded_event.metadata})
end

@tag :wip
test "should modify snapshot when already exists", %{conn: conn} do
source_uuid = UUID.uuid4
[recorded_event1, recorded_event2] = EventFactory.create_recorded_events(2, 1)

:ok = Snapshot.record_snapshot(conn, %SnapshotData{source_uuid: source_uuid, source_version: source_version, source_type: recorded_event.event_type, data: recorded_event.data, metadata: recorded_event.metadata})
:ok = Snapshot.record_snapshot(conn, %SnapshotData{source_uuid: source_uuid, source_version: 1, source_type: recorded_event1.event_type, data: recorded_event1.data, metadata: recorded_event1.metadata})
:ok = Snapshot.record_snapshot(conn, %SnapshotData{source_uuid: source_uuid, source_version: 2, source_type: recorded_event2.event_type, data: recorded_event2.data, metadata: recorded_event2.metadata})

{:ok, snapshot} = Snapshot.read_snapshot(conn, source_uuid)
assert snapshot.data == recorded_event2.data
assert snapshot.metadata == recorded_event2.metadata
assert snapshot.source_version == 2
end
end

test "record snapshot when present should update existing", %{conn: conn} do
Expand All @@ -49,11 +67,30 @@ defmodule EventStore.Storage.SnapshotTest do
:ok = Snapshot.record_snapshot(conn, %SnapshotData{source_uuid: source_uuid, source_version: 2, source_type: updated_recorded_event.event_type, data: updated_recorded_event.data, metadata: updated_recorded_event.metadata})

{:ok, snapshot} = Snapshot.read_snapshot(conn, source_uuid)

assert snapshot.source_uuid == source_uuid
assert snapshot.source_version == 2
assert snapshot.source_type == updated_recorded_event.event_type
assert snapshot.data == updated_recorded_event.data
assert snapshot.metadata == updated_recorded_event.metadata
end

describe "delete snapshot" do
test "should delete existing snapshot", %{conn: conn} do
source_uuid = UUID.uuid4
source_version = 1
recorded_event = hd(EventFactory.create_recorded_events(1, 1))
:ok = Snapshot.record_snapshot(conn, %SnapshotData{source_uuid: source_uuid, source_version: source_version, source_type: recorded_event.event_type, data: recorded_event.data, metadata: recorded_event.metadata})

:ok = Snapshot.delete_snapshot(conn, source_uuid)

{:error, :snapshot_not_found} = Snapshot.read_snapshot(conn, source_uuid)
end

test "should ignore missing snapshot requested to delete", %{conn: conn} do
source_uuid = UUID.uuid4

:ok = Snapshot.delete_snapshot(conn, source_uuid)
end
end
end

0 comments on commit b7e8b16

Please sign in to comment.