From e32bcbd587b009e3398b7d5523ab701ba9bb2b55 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Thu, 27 May 2021 14:41:05 +0100 Subject: [PATCH] Track in-flight events sent to subscribers To ensure that acknowledgements are committed for a subscription based upon the events sent to the subscriber. --- .../subscriptions/subscription_fsm.ex | 141 +++++++++--------- .../subscriptions/subscription_state.ex | 48 +++++- .../subscription_catch_up_test.exs | 95 ++++++++++++ 3 files changed, 216 insertions(+), 68 deletions(-) diff --git a/lib/event_store/subscriptions/subscription_fsm.ex b/lib/event_store/subscriptions/subscription_fsm.ex index b86c94aa..666f0159 100644 --- a/lib/event_store/subscriptions/subscription_fsm.ex +++ b/lib/event_store/subscriptions/subscription_fsm.ex @@ -38,13 +38,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do defstate initial do defevent subscribe, data: %SubscriptionState{transient: true} = data do - data = %SubscriptionState{ - data - | queue_size: 0, - partitions: %{}, - processed_event_numbers: MapSet.new(), - checkpoints_pending: 0 - } + data = SubscriptionState.reset_event_tracking(data) with :ok <- subscribe_to_events(data) do last_seen = data.start_from @@ -68,13 +62,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do defevent subscribe, data: %SubscriptionState{} = data do - data = %SubscriptionState{ - data - | queue_size: 0, - partitions: %{}, - processed_event_numbers: MapSet.new(), - checkpoints_pending: 0 - } + data = SubscriptionState.reset_event_tracking(data) with {:ok, subscription} <- create_subscription(data), {:ok, lock_ref} <- try_acquire_exclusive_lock(data, subscription), @@ -290,14 +278,8 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do defevent disconnect(lock_ref), data: %SubscriptionState{lock_ref: lock_ref} = data do data = - %SubscriptionState{ - data - | lock_ref: nil, - queue_size: 0, - partitions: %{}, - processed_event_numbers: MapSet.new(), - checkpoints_pending: 0 - } + %SubscriptionState{data | lock_ref: nil} + |> SubscriptionState.reset_event_tracking() |> purge_in_flight_events() next_state(:disconnected, data) @@ -378,15 +360,14 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do {:ok, %Subscriber{} = subscriber} -> %Subscriber{in_flight: in_flight} = subscriber - # Prepend in-flight events for the removed subscriber to the pending - # event queue so they can be sent to another available subscriber. + # Prepend in-flight events sent to the removed subscriber back into the + # pending queue so they can be sent to another available subscriber. data = in_flight |> Enum.sort_by(fn %RecordedEvent{event_number: event_number} -> -event_number end) - |> Enum.reduce(data, fn event, data -> - enqueue_event(data, event, &:queue.in_r/2) - end) + |> Enum.reduce(data, fn event, data -> enqueue_event(data, event, &:queue.in_r/2) end) + |> SubscriptionState.track_in_flight(in_flight) %SubscriptionState{data | subscribers: Map.delete(subscribers, subscriber_pid)} @@ -421,10 +402,10 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do %SubscriptionState{data | last_received: max(last_received, event_number)} end - defp track_last_sent(%SubscriptionState{} = data, event_number) do - %SubscriptionState{last_sent: last_sent} = data - - %SubscriptionState{data | last_sent: max(last_sent, event_number)} + defp track_sent(%SubscriptionState{} = data, event_number) do + data + |> SubscriptionState.track_last_sent(event_number) + |> SubscriptionState.track_in_flight(event_number) end defp first_event_number([%RecordedEvent{event_number: event_number} | _]), do: event_number @@ -489,22 +470,20 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do defp enqueue_events(%SubscriptionState{} = data, []), do: data defp enqueue_events(%SubscriptionState{} = data, [event | events]) do - %SubscriptionState{processed_event_numbers: processed_event_numbers} = data + %SubscriptionState{acknowledged_event_numbers: acknowledged_event_numbers} = data %RecordedEvent{event_number: event_number} = event data = - case selected?(event, data) do - true -> - # Unfiltered event, enqueue to send to a subscriber - enqueue_event(data, event) - - false -> - # Filtered event, don't send to subscriber, but track it as processed. - %SubscriptionState{ - data - | processed_event_numbers: MapSet.put(processed_event_numbers, event_number) - } - |> track_last_sent(event_number) + if selected?(event, data) do + # Unfiltered event, enqueue to send to a subscriber + enqueue_event(data, event) + else + # Filtered event, don't send to subscriber, but track it as ack'd. + %SubscriptionState{ + data + | acknowledged_event_numbers: MapSet.put(acknowledged_event_numbers, event_number) + } + |> track_sent(event_number) end data @@ -540,13 +519,13 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do |> Enum.reduce(data, fn {partition_key, _pending_events}, data -> notify_partition_subscriber(data, partition_key) end) - |> checkpoint_last_seen() + |> checkpoint_acknowledged() end defp peek_event_number(pending_events) do case :queue.peek(pending_events) do {:value, %RecordedEvent{event_number: event_number}} -> event_number - _ -> nil + :empty -> nil end end @@ -577,7 +556,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do subscribers: Map.put(subscribers, subscriber_pid, subscriber), queue_size: max(queue_size - 1, 0) } - |> track_last_sent(event_number) + |> track_sent(event_number) |> notify_partition_subscriber(partition_key, [{subscriber_pid, event} | events_to_send]) else _ -> @@ -644,21 +623,23 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do defp map(events, _mapper), do: events defp ack_events(%SubscriptionState{} = data, ack, subscriber_pid) do - %SubscriptionState{subscribers: subscribers, processed_event_numbers: processed_event_numbers} = - data + %SubscriptionState{ + subscribers: subscribers, + acknowledged_event_numbers: acknowledged_event_numbers + } = data with {:ok, subscriber} <- subscriber_by_pid(subscribers, subscriber_pid), {:ok, subscriber, acknowledged_events} <- Subscriber.acknowledge(subscriber, ack) do - processed_event_numbers = + acknowledged_event_numbers = acknowledged_events |> Enum.map(& &1.event_number) - |> Enum.reduce(processed_event_numbers, &MapSet.put(&2, &1)) + |> Enum.reduce(acknowledged_event_numbers, &MapSet.put(&2, &1)) data = %SubscriptionState{ data | subscribers: Map.put(subscribers, subscriber_pid, subscriber), - processed_event_numbers: processed_event_numbers + acknowledged_event_numbers: acknowledged_event_numbers } |> notify_subscribers() @@ -673,32 +654,55 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do end end - defp checkpoint_last_seen(%SubscriptionState{} = data, persist \\ false) do + defp checkpoint_acknowledged(data, persist \\ false) + + defp checkpoint_acknowledged(%SubscriptionState{in_flight_event_numbers: []} = data, persist) do + if persist, do: maybe_persist_checkpoint(data), else: data + end + + defp checkpoint_acknowledged(%SubscriptionState{} = data, persist) do %SubscriptionState{ - processed_event_numbers: processed_event_numbers, - checkpoint_after: checkpoint_after, - checkpoint_threshold: checkpoint_threshold, - checkpoint_timer_ref: checkpoint_timer_ref, + acknowledged_event_numbers: acknowledged_event_numbers, checkpoints_pending: checkpoints_pending, - last_ack: last_ack + in_flight_event_numbers: [ack | in_flight_event_numbers] } = data - ack = last_ack + 1 + # IO.inspect(in_flight_event_numbers, label: "#{ack}") + # IO.inspect(acknowledged_event_numbers, label: "#{ack}") cond do - MapSet.member?(processed_event_numbers, ack) -> + MapSet.member?(acknowledged_event_numbers, ack) -> %SubscriptionState{ data - | processed_event_numbers: MapSet.delete(processed_event_numbers, ack), + | acknowledged_event_numbers: MapSet.delete(acknowledged_event_numbers, ack), + in_flight_event_numbers: in_flight_event_numbers, checkpoints_pending: checkpoints_pending + 1, last_ack: ack } - |> checkpoint_last_seen(true) + |> checkpoint_acknowledged(true) + + true -> + if persist, do: maybe_persist_checkpoint(data), else: data + end + end + + defp maybe_persist_checkpoint(%SubscriptionState{transient: true} = data) do + %SubscriptionState{data | checkpoints_pending: 0} + end - persist and checkpoints_pending >= checkpoint_threshold -> + defp maybe_persist_checkpoint(%SubscriptionState{transient: false} = data) do + %SubscriptionState{ + checkpoint_after: checkpoint_after, + checkpoints_pending: checkpoints_pending, + checkpoint_threshold: checkpoint_threshold, + checkpoint_timer_ref: checkpoint_timer_ref + } = data + + cond do + checkpoints_pending >= checkpoint_threshold -> persist_checkpoint(data) - persist and checkpoint_after > 0 -> + checkpoint_after > 0 -> if checkpoint_timer_ref, do: Process.cancel_timer(checkpoint_timer_ref) checkpoint_timer_ref = Process.send_after(self(), :checkpoint, checkpoint_after) @@ -710,18 +714,21 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do end end - defp persist_checkpoint(%SubscriptionState{} = data) do + defp persist_checkpoint(%SubscriptionState{transient: true} = data) do + %SubscriptionState{data | checkpoints_pending: 0} + end + + defp persist_checkpoint(%SubscriptionState{transient: false} = data) do %SubscriptionState{ conn: conn, schema: schema, stream_uuid: stream_uuid, subscription_name: subscription_name, - transient: transient, last_ack: last_ack, checkpoints_pending: checkpoints_pending } = data - if checkpoints_pending > 0 and !transient do + if checkpoints_pending > 0 do Storage.Subscription.ack_last_seen_event(conn, stream_uuid, subscription_name, last_ack, schema: schema ) diff --git a/lib/event_store/subscriptions/subscription_state.ex b/lib/event_store/subscriptions/subscription_state.ex index 87640fdc..dfc29fa6 100644 --- a/lib/event_store/subscriptions/subscription_state.ex +++ b/lib/event_store/subscriptions/subscription_state.ex @@ -1,6 +1,9 @@ defmodule EventStore.Subscriptions.SubscriptionState do @moduledoc false + alias EventStore.RecordedEvent + alias __MODULE__ + defstruct [ :conn, :event_store, @@ -26,7 +29,50 @@ defmodule EventStore.Subscriptions.SubscriptionState do checkpoints_pending: 0, subscribers: %{}, partitions: %{}, - processed_event_numbers: MapSet.new(), + acknowledged_event_numbers: MapSet.new(), + in_flight_event_numbers: [], transient: false ] + + def reset_event_tracking(%SubscriptionState{} = state) do + %SubscriptionState{ + state + | queue_size: 0, + partitions: %{}, + acknowledged_event_numbers: MapSet.new(), + in_flight_event_numbers: [], + checkpoints_pending: 0 + } + end + + def track_in_flight(%SubscriptionState{} = state, event_number) when is_number(event_number) do + %SubscriptionState{in_flight_event_numbers: in_flight_event_numbers} = state + + in_flight_event_numbers = + Enum.sort([event_number | in_flight_event_numbers]) + |> Enum.uniq() + + %SubscriptionState{state | in_flight_event_numbers: in_flight_event_numbers} + end + + def track_in_flight(%SubscriptionState{} = state, []), do: state + + def track_in_flight(%SubscriptionState{} = state, events) when is_list(events) do + %SubscriptionState{in_flight_event_numbers: in_flight_event_numbers} = state + + in_flight_event_numbers = + events + |> Enum.map(fn %RecordedEvent{event_number: event_number} -> event_number end) + |> Enum.concat(in_flight_event_numbers) + |> Enum.sort() + |> Enum.uniq() + + %SubscriptionState{state | in_flight_event_numbers: in_flight_event_numbers} + end + + def track_last_sent(%SubscriptionState{} = data, event_number) when is_number(event_number) do + %SubscriptionState{last_sent: last_sent} = data + + %SubscriptionState{data | last_sent: max(last_sent, event_number)} + end end diff --git a/test/subscriptions/subscription_catch_up_test.exs b/test/subscriptions/subscription_catch_up_test.exs index 203b62b2..107d8eb0 100644 --- a/test/subscriptions/subscription_catch_up_test.exs +++ b/test/subscriptions/subscription_catch_up_test.exs @@ -7,6 +7,45 @@ defmodule EventStore.Subscriptions.SubscriptionCatchUpTest do describe "catch-up subscription" do test "should receive all existing events" do + restart_event_store_with_config(enable_hard_deletes: false) + + subscription_name = UUID.uuid4() + + stream1_uuid = UUID.uuid4() + stream2_uuid = UUID.uuid4() + stream3_uuid = UUID.uuid4() + stream4_uuid = UUID.uuid4() + + append_to_stream(stream1_uuid, 10) + append_to_stream(stream2_uuid, 10) + append_to_stream(stream3_uuid, 10) + append_to_stream(stream4_uuid, 10) + + {:ok, subscription} = subscribe_to_all_streams(subscription_name, self(), buffer_size: 10) + + append_to_stream(stream1_uuid, 10, 10) + + receive_and_ack(subscription, stream1_uuid, 1) + receive_and_ack(subscription, stream2_uuid, 11) + receive_and_ack(subscription, stream3_uuid, 21) + receive_and_ack(subscription, stream4_uuid, 31) + + append_to_stream(stream1_uuid, 10, 20) + + receive_and_ack(subscription, stream1_uuid, 41) + receive_and_ack(subscription, stream1_uuid, 51) + + refute_receive {:events, _events} + + append_to_stream(stream1_uuid, 10, 30) + receive_and_ack(subscription, stream1_uuid, 61) + + refute_receive {:events, _events} + end + + test "should receive events from soft deleted streams" do + restart_event_store_with_config(enable_hard_deletes: false) + subscription_name = UUID.uuid4() stream1_uuid = UUID.uuid4() @@ -19,6 +58,8 @@ defmodule EventStore.Subscriptions.SubscriptionCatchUpTest do append_to_stream(stream3_uuid, 10) append_to_stream(stream4_uuid, 10) + :ok = EventStore.delete_stream(stream2_uuid, 10, :soft) + {:ok, subscription} = subscribe_to_all_streams(subscription_name, self(), buffer_size: 10) append_to_stream(stream1_uuid, 10, 10) @@ -40,6 +81,51 @@ defmodule EventStore.Subscriptions.SubscriptionCatchUpTest do refute_receive {:events, _events} end + + test "should skip events from hard deleted streams" do + restart_event_store_with_config(enable_hard_deletes: true) + + subscription_name = UUID.uuid4() + + stream1_uuid = UUID.uuid4() + stream2_uuid = UUID.uuid4() + stream3_uuid = UUID.uuid4() + stream4_uuid = UUID.uuid4() + + append_to_stream(stream1_uuid, 10) + append_to_stream(stream2_uuid, 10) + append_to_stream(stream3_uuid, 10) + append_to_stream(stream4_uuid, 10) + + :ok = EventStore.delete_stream(stream2_uuid, 10, :hard) + + {:ok, subscription} = subscribe_to_all_streams(subscription_name, self(), buffer_size: 10) + + append_to_stream(stream1_uuid, 10, 10) + + receive_and_ack(subscription, stream1_uuid, 1) + receive_and_ack(subscription, stream3_uuid, 21) + receive_and_ack(subscription, stream4_uuid, 31) + + append_to_stream(stream1_uuid, 10, 20) + + receive_and_ack(subscription, stream1_uuid, 41) + receive_and_ack(subscription, stream1_uuid, 51) + + refute_receive {:events, _events} + + append_to_stream(stream1_uuid, 10, 30) + + receive_and_ack(subscription, stream1_uuid, 61) + + refute_receive {:events, _events} + end + end + + defp assert_last_ack(subscription, expected_ack) do + last_seen = Subscription.last_seen(subscription) + + assert last_seen == expected_ack end defp append_to_stream(stream_uuid, event_count, expected_version \\ 0) do @@ -71,5 +157,14 @@ defmodule EventStore.Subscriptions.SubscriptionCatchUpTest do end) :ok = Subscription.ack(subscription, received_events) + + assert_last_ack(subscription, expected_intial_event_number + 9) + end + + defp restart_event_store_with_config(config) do + stop_supervised!(TestEventStore) + start_supervised!({TestEventStore, config}) + + :ok end end