Skip to content

Commit

Permalink
Track in-flight events sent to subscribers
Browse files Browse the repository at this point in the history
To ensure that acknowledgements are committed for a subscription based upon the events sent to the subscriber.
  • Loading branch information
slashdotdash committed May 27, 2021
1 parent bc5d6dd commit e32bcbd
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 68 deletions.
141 changes: 74 additions & 67 deletions lib/event_store/subscriptions/subscription_fsm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

defstate initial do
defevent subscribe, data: %SubscriptionState{transient: true} = data do
data = %SubscriptionState{
data
| queue_size: 0,
partitions: %{},
processed_event_numbers: MapSet.new(),
checkpoints_pending: 0
}
data = SubscriptionState.reset_event_tracking(data)

with :ok <- subscribe_to_events(data) do
last_seen = data.start_from
Expand All @@ -68,13 +62,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

defevent subscribe,
data: %SubscriptionState{} = data do
data = %SubscriptionState{
data
| queue_size: 0,
partitions: %{},
processed_event_numbers: MapSet.new(),
checkpoints_pending: 0
}
data = SubscriptionState.reset_event_tracking(data)

with {:ok, subscription} <- create_subscription(data),
{:ok, lock_ref} <- try_acquire_exclusive_lock(data, subscription),
Expand Down Expand Up @@ -290,14 +278,8 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do

defevent disconnect(lock_ref), data: %SubscriptionState{lock_ref: lock_ref} = data do
data =
%SubscriptionState{
data
| lock_ref: nil,
queue_size: 0,
partitions: %{},
processed_event_numbers: MapSet.new(),
checkpoints_pending: 0
}
%SubscriptionState{data | lock_ref: nil}
|> SubscriptionState.reset_event_tracking()
|> purge_in_flight_events()

next_state(:disconnected, data)
Expand Down Expand Up @@ -378,15 +360,14 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
{:ok, %Subscriber{} = subscriber} ->
%Subscriber{in_flight: in_flight} = subscriber

# Prepend in-flight events for the removed subscriber to the pending
# event queue so they can be sent to another available subscriber.
# Prepend in-flight events sent to the removed subscriber back into the
# pending queue so they can be sent to another available subscriber.

data =
in_flight
|> Enum.sort_by(fn %RecordedEvent{event_number: event_number} -> -event_number end)
|> Enum.reduce(data, fn event, data ->
enqueue_event(data, event, &:queue.in_r/2)
end)
|> Enum.reduce(data, fn event, data -> enqueue_event(data, event, &:queue.in_r/2) end)
|> SubscriptionState.track_in_flight(in_flight)

%SubscriptionState{data | subscribers: Map.delete(subscribers, subscriber_pid)}

Expand Down Expand Up @@ -421,10 +402,10 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
%SubscriptionState{data | last_received: max(last_received, event_number)}
end

defp track_last_sent(%SubscriptionState{} = data, event_number) do
%SubscriptionState{last_sent: last_sent} = data

%SubscriptionState{data | last_sent: max(last_sent, event_number)}
defp track_sent(%SubscriptionState{} = data, event_number) do
data
|> SubscriptionState.track_last_sent(event_number)
|> SubscriptionState.track_in_flight(event_number)
end

defp first_event_number([%RecordedEvent{event_number: event_number} | _]), do: event_number
Expand Down Expand Up @@ -489,22 +470,20 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
defp enqueue_events(%SubscriptionState{} = data, []), do: data

defp enqueue_events(%SubscriptionState{} = data, [event | events]) do
%SubscriptionState{processed_event_numbers: processed_event_numbers} = data
%SubscriptionState{acknowledged_event_numbers: acknowledged_event_numbers} = data
%RecordedEvent{event_number: event_number} = event

data =
case selected?(event, data) do
true ->
# Unfiltered event, enqueue to send to a subscriber
enqueue_event(data, event)

false ->
# Filtered event, don't send to subscriber, but track it as processed.
%SubscriptionState{
data
| processed_event_numbers: MapSet.put(processed_event_numbers, event_number)
}
|> track_last_sent(event_number)
if selected?(event, data) do
# Unfiltered event, enqueue to send to a subscriber
enqueue_event(data, event)
else
# Filtered event, don't send to subscriber, but track it as ack'd.
%SubscriptionState{
data
| acknowledged_event_numbers: MapSet.put(acknowledged_event_numbers, event_number)
}
|> track_sent(event_number)
end

data
Expand Down Expand Up @@ -540,13 +519,13 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
|> Enum.reduce(data, fn {partition_key, _pending_events}, data ->
notify_partition_subscriber(data, partition_key)
end)
|> checkpoint_last_seen()
|> checkpoint_acknowledged()
end

defp peek_event_number(pending_events) do
case :queue.peek(pending_events) do
{:value, %RecordedEvent{event_number: event_number}} -> event_number
_ -> nil
:empty -> nil
end
end

Expand Down Expand Up @@ -577,7 +556,7 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
subscribers: Map.put(subscribers, subscriber_pid, subscriber),
queue_size: max(queue_size - 1, 0)
}
|> track_last_sent(event_number)
|> track_sent(event_number)
|> notify_partition_subscriber(partition_key, [{subscriber_pid, event} | events_to_send])
else
_ ->
Expand Down Expand Up @@ -644,21 +623,23 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
defp map(events, _mapper), do: events

defp ack_events(%SubscriptionState{} = data, ack, subscriber_pid) do
%SubscriptionState{subscribers: subscribers, processed_event_numbers: processed_event_numbers} =
data
%SubscriptionState{
subscribers: subscribers,
acknowledged_event_numbers: acknowledged_event_numbers
} = data

with {:ok, subscriber} <- subscriber_by_pid(subscribers, subscriber_pid),
{:ok, subscriber, acknowledged_events} <- Subscriber.acknowledge(subscriber, ack) do
processed_event_numbers =
acknowledged_event_numbers =
acknowledged_events
|> Enum.map(& &1.event_number)
|> Enum.reduce(processed_event_numbers, &MapSet.put(&2, &1))
|> Enum.reduce(acknowledged_event_numbers, &MapSet.put(&2, &1))

data =
%SubscriptionState{
data
| subscribers: Map.put(subscribers, subscriber_pid, subscriber),
processed_event_numbers: processed_event_numbers
acknowledged_event_numbers: acknowledged_event_numbers
}
|> notify_subscribers()

Expand All @@ -673,32 +654,55 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
end
end

defp checkpoint_last_seen(%SubscriptionState{} = data, persist \\ false) do
defp checkpoint_acknowledged(data, persist \\ false)

defp checkpoint_acknowledged(%SubscriptionState{in_flight_event_numbers: []} = data, persist) do
if persist, do: maybe_persist_checkpoint(data), else: data
end

defp checkpoint_acknowledged(%SubscriptionState{} = data, persist) do
%SubscriptionState{
processed_event_numbers: processed_event_numbers,
checkpoint_after: checkpoint_after,
checkpoint_threshold: checkpoint_threshold,
checkpoint_timer_ref: checkpoint_timer_ref,
acknowledged_event_numbers: acknowledged_event_numbers,
checkpoints_pending: checkpoints_pending,
last_ack: last_ack
in_flight_event_numbers: [ack | in_flight_event_numbers]
} = data

ack = last_ack + 1
# IO.inspect(in_flight_event_numbers, label: "#{ack}")
# IO.inspect(acknowledged_event_numbers, label: "#{ack}")

cond do
MapSet.member?(processed_event_numbers, ack) ->
MapSet.member?(acknowledged_event_numbers, ack) ->
%SubscriptionState{
data
| processed_event_numbers: MapSet.delete(processed_event_numbers, ack),
| acknowledged_event_numbers: MapSet.delete(acknowledged_event_numbers, ack),
in_flight_event_numbers: in_flight_event_numbers,
checkpoints_pending: checkpoints_pending + 1,
last_ack: ack
}
|> checkpoint_last_seen(true)
|> checkpoint_acknowledged(true)

true ->
if persist, do: maybe_persist_checkpoint(data), else: data
end
end

defp maybe_persist_checkpoint(%SubscriptionState{transient: true} = data) do
%SubscriptionState{data | checkpoints_pending: 0}
end

persist and checkpoints_pending >= checkpoint_threshold ->
defp maybe_persist_checkpoint(%SubscriptionState{transient: false} = data) do
%SubscriptionState{
checkpoint_after: checkpoint_after,
checkpoints_pending: checkpoints_pending,
checkpoint_threshold: checkpoint_threshold,
checkpoint_timer_ref: checkpoint_timer_ref
} = data

cond do
checkpoints_pending >= checkpoint_threshold ->
persist_checkpoint(data)

persist and checkpoint_after > 0 ->
checkpoint_after > 0 ->
if checkpoint_timer_ref, do: Process.cancel_timer(checkpoint_timer_ref)

checkpoint_timer_ref = Process.send_after(self(), :checkpoint, checkpoint_after)
Expand All @@ -710,18 +714,21 @@ defmodule EventStore.Subscriptions.SubscriptionFsm do
end
end

defp persist_checkpoint(%SubscriptionState{} = data) do
defp persist_checkpoint(%SubscriptionState{transient: true} = data) do
%SubscriptionState{data | checkpoints_pending: 0}
end

defp persist_checkpoint(%SubscriptionState{transient: false} = data) do
%SubscriptionState{
conn: conn,
schema: schema,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
transient: transient,
last_ack: last_ack,
checkpoints_pending: checkpoints_pending
} = data

if checkpoints_pending > 0 and !transient do
if checkpoints_pending > 0 do
Storage.Subscription.ack_last_seen_event(conn, stream_uuid, subscription_name, last_ack,
schema: schema
)
Expand Down
48 changes: 47 additions & 1 deletion lib/event_store/subscriptions/subscription_state.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
defmodule EventStore.Subscriptions.SubscriptionState do
@moduledoc false

alias EventStore.RecordedEvent
alias __MODULE__

defstruct [
:conn,
:event_store,
Expand All @@ -26,7 +29,50 @@ defmodule EventStore.Subscriptions.SubscriptionState do
checkpoints_pending: 0,
subscribers: %{},
partitions: %{},
processed_event_numbers: MapSet.new(),
acknowledged_event_numbers: MapSet.new(),
in_flight_event_numbers: [],
transient: false
]

def reset_event_tracking(%SubscriptionState{} = state) do
%SubscriptionState{
state
| queue_size: 0,
partitions: %{},
acknowledged_event_numbers: MapSet.new(),
in_flight_event_numbers: [],
checkpoints_pending: 0
}
end

def track_in_flight(%SubscriptionState{} = state, event_number) when is_number(event_number) do
%SubscriptionState{in_flight_event_numbers: in_flight_event_numbers} = state

in_flight_event_numbers =
Enum.sort([event_number | in_flight_event_numbers])
|> Enum.uniq()

%SubscriptionState{state | in_flight_event_numbers: in_flight_event_numbers}
end

def track_in_flight(%SubscriptionState{} = state, []), do: state

def track_in_flight(%SubscriptionState{} = state, events) when is_list(events) do
%SubscriptionState{in_flight_event_numbers: in_flight_event_numbers} = state

in_flight_event_numbers =
events
|> Enum.map(fn %RecordedEvent{event_number: event_number} -> event_number end)
|> Enum.concat(in_flight_event_numbers)
|> Enum.sort()
|> Enum.uniq()

%SubscriptionState{state | in_flight_event_numbers: in_flight_event_numbers}
end

def track_last_sent(%SubscriptionState{} = data, event_number) when is_number(event_number) do
%SubscriptionState{last_sent: last_sent} = data

%SubscriptionState{data | last_sent: max(last_sent, event_number)}
end
end
Loading

0 comments on commit e32bcbd

Please sign in to comment.