Skip to content

Commit

Permalink
aggregate improvements around retries and telemetry (#2)
Browse files Browse the repository at this point in the history
Co-authored-by: Cees de Groot <cg@cdegroot.com>
  • Loading branch information
thomasdziedzic-calmwave and cdegroot authored Aug 25, 2024
1 parent 2f67dd6 commit 3c084bd
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 14 deletions.
70 changes: 61 additions & 9 deletions lib/commanded/aggregates/aggregate_state_builder.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
defmodule Commanded.Aggregates.AggregateStateBuilder do
use TelemetryRegistry
alias Commanded.Aggregates.Aggregate
alias Commanded.EventStore
alias Commanded.EventStore.RecordedEvent
alias Commanded.EventStore.SnapshotData
alias Commanded.Snapshotting
alias Commanded.Telemetry

telemetry_event(%{
event: [:commanded, :aggregate, :populate, :start],
description: "Emitted when an aggregate begins loading from the event store",
measurements: "%{system_time: integer()}",
metadata: """
%{application: Commanded.Application.t(),
aggregate_uuid: String.t(),
aggregate_state: struct(),
aggregate_version: non_neg_integer()}
"""
})

telemetry_event(%{
event: [:commanded, :aggregate, :populate, :stop],
description: "Emitted when an aggregate completes loading from the event store",
measurements: "%{duration: non_neg_integer(), count: non_neg_integer()}",
metadata: """
%{application: Commanded.Application.t(),
aggregate_uuid: String.t(),
aggregate_state: struct(),
aggregate_version: non_neg_integer()}
"""
})

@read_event_batch_size 1_000

Expand Down Expand Up @@ -62,15 +88,41 @@ defmodule Commanded.Aggregates.AggregateStateBuilder do

# Rebuild aggregate state from a `Stream` of its events.
defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do
Enum.reduce(event_stream, state, fn event, state ->
%RecordedEvent{data: data, stream_version: stream_version} = event
%Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state
telemetry_prefix = [:commanded, :aggregate, :populate]
start_time = Telemetry.start(telemetry_prefix, telemetry_metadata(state))

%Aggregate{
state
| aggregate_version: stream_version,
aggregate_state: aggregate_module.apply(aggregate_state, data)
}
end)
{state, count} =
Enum.reduce(event_stream, {state, 0}, fn event, {state, count} ->
%RecordedEvent{data: data, stream_version: stream_version} = event
%Aggregate{aggregate_module: aggregate_module, aggregate_state: aggregate_state} = state

state = %Aggregate{
state
| aggregate_version: stream_version,
aggregate_state: aggregate_module.apply(aggregate_state, data)
}

{state, count + 1}
end)

Telemetry.stop(telemetry_prefix, start_time, telemetry_metadata(state), %{count: count})

state
end

defp telemetry_metadata(%Aggregate{} = state) do
%Aggregate{
application: application,
aggregate_uuid: aggregate_uuid,
aggregate_state: aggregate_state,
aggregate_version: aggregate_version
} = state

%{
application: application,
aggregate_uuid: aggregate_uuid,
aggregate_state: aggregate_state,
aggregate_version: aggregate_version
}
end
end
11 changes: 9 additions & 2 deletions lib/commanded/commands/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ defmodule Commanded.Commands.Dispatcher do
# Maybe retry command when aggregate process not found on a remote node
maybe_retry(pipeline, payload, context)

{:error, :aggregate_execution_timeout} ->
# The main reason for a timeout is that aggregate loading is slow, so retrying
# is expected to help.
maybe_retry(pipeline, payload, context)

{:error, error} ->
pipeline
|> Pipeline.respond({:error, error})
Expand Down Expand Up @@ -239,8 +244,10 @@ defmodule Commanded.Commands.Dispatcher do
{:ok, context} ->
execute(pipeline, payload, context)

reply ->
reply
{:error, :too_many_attempts} = error ->
pipeline
|> Pipeline.respond(error)
|> after_failure(payload)
end
end
end
40 changes: 39 additions & 1 deletion test/aggregates/aggregate_telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,42 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do

refute_received {[:commanded, :aggregate, :execute, :stop], _measurements, _metadata}
end

test "emit `[:commanded, :aggregate, :populate]` events",
%{aggregate_uuid: aggregate_uuid, pid: pid} do
context = %ExecutionContext{
command: %Ok{message: "ok"},
function: :execute,
handler: ExampleAggregate
}

# Send some commands, then kill the process to force a reload.
count = 3

for _i <- 1..count do
{:ok, _version, _events} = GenServer.call(pid, {:execute_command, context})
end

Process.exit(pid, :normal)

# Do the reload, we should now have telemetry
start_aggregate(aggregate_uuid)

assert_receive {[:commanded, :aggregate, :populate, :start], _measurements, _metadata}
assert_receive {[:commanded, :aggregate, :populate, :stop], measurements, metadata}

assert match?(%{count: ^count}, measurements)

assert match?(
%{
aggregate_state: %ExampleAggregate{},
aggregate_uuid: ^aggregate_uuid,
aggregate_version: ^count,
application: DefaultApp
},
metadata
)
end
end

def start_aggregate(aggregate_uuid) do
Expand All @@ -207,7 +243,9 @@ defmodule Commanded.Aggregates.AggregateTelemetryTest do
[
[:commanded, :aggregate, :execute, :start],
[:commanded, :aggregate, :execute, :stop],
[:commanded, :aggregate, :execute, :exception]
[:commanded, :aggregate, :execute, :exception],
[:commanded, :aggregate, :populate, :start],
[:commanded, :aggregate, :populate, :stop]
],
fn event_name, measurements, metadata, reply_to ->
send(reply_to, {event_name, measurements, metadata})
Expand Down
2 changes: 1 addition & 1 deletion test/commands/command_timeout_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Commanded.Commands.CommandTimeoutTest do
# Handler is set to take longer than the configured timeout
case TimeoutRouter.dispatch(command, application: DefaultApp) do
{:error, :aggregate_execution_failed} -> :ok
{:error, :aggregate_execution_timeout} -> :ok
{:error, :too_many_attempts} -> :ok
reply -> flunk("received an unexpected response: #{inspect(reply)}")
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/middleware/middleware_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ defmodule Commanded.Middleware.MiddlewareTest do
# Force command handling to timeout so the aggregate process is terminated
:ok =
case Router.dispatch(command, application: DefaultApp, timeout: 50) do
{:error, :aggregate_execution_timeout} -> :ok
{:error, :too_many_attempts} -> :ok
{:error, :aggregate_execution_failed} -> :ok
end

Expand Down

0 comments on commit 3c084bd

Please sign in to comment.