Skip to content

Commit

Permalink
Better metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sleipnir committed Nov 30, 2022
1 parent 324e106 commit ba9a19b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 31 deletions.
3 changes: 2 additions & 1 deletion lib/actors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ defmodule Actors do
%InvocationRequest{
actor: %Actor{} = actor,
system: %ActorSystem{} = system,
command_name: command_name,
async: async?,
metadata: metadata,
caller: caller,
Expand Down Expand Up @@ -230,7 +231,7 @@ defmodule Actors do
end
end)

Measurements.emit_invoke_duration(system.name, actor.id.name, time)
Measurements.dispatch_invoke_duration(system.name, actor.id.name, command_name, time)
result
end

Expand Down
3 changes: 2 additions & 1 deletion lib/actors/actor/entity/invocation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ defmodule Actors.Actor.Entity.Invocation do
payload,
%ActorInvocation{actor: %ActorId{name: caller_actor_name, system: actor_system}},
%EntityState{
system: _actor_system,
system: actor_system,
actor: %Actor{id: %ActorId{name: actor_name} = _id} = actor
} = state
) do
Expand Down Expand Up @@ -213,6 +213,7 @@ defmodule Actors.Actor.Entity.Invocation do
actor: %Actor{state: actor_state, commands: commands, timer_commands: timers}
} = state
) do

ctx = Keyword.get(opts, :span_ctx, OpenTelemetry.Ctx.new())

Tracer.with_span ctx, "#{actor_name} invocation handler", kind: :server do
Expand Down
16 changes: 15 additions & 1 deletion lib/actors/actor/entity/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ defmodule Actors.Actor.Entity.Lifecycle do

alias Phoenix.PubSub

alias Sidecar.Measurements

@default_deactivate_timeout 10_000
@default_snapshot_timeout 2_000

Expand Down Expand Up @@ -107,8 +109,10 @@ defmodule Actors.Actor.Entity.Lifecycle do

def snapshot(
%EntityState{
system: system,
actor:
%Actor{
id: %ActorId{name: name} = _id,
state: actor_state,
settings: %ActorSettings{
stateful: true,
Expand All @@ -120,12 +124,15 @@ defmodule Actors.Actor.Entity.Lifecycle do
} = state
)
when is_nil(actor_state) or actor_state == %{} do
{:message_queue_len, size} = Process.info(self(), :message_queue_len)
Measurements.dispatch_actor_inflights(system, name, size)
schedule_snapshot(snapshot_strategy)
{:noreply, state, :hibernate}
end

def snapshot(
%EntityState{
system: system,
state_hash: old_hash,
actor:
%Actor{
Expand All @@ -140,6 +147,8 @@ defmodule Actors.Actor.Entity.Lifecycle do
} = _actor
} = state
) do
{:message_queue_len, size} = Process.info(self(), :message_queue_len)
Measurements.dispatch_actor_inflights(system, name, size)
# Persist State only when necessary
res =
if StateManager.is_new?(old_hash, actor_state.state) do
Expand Down Expand Up @@ -171,6 +180,7 @@ defmodule Actors.Actor.Entity.Lifecycle do

def deactivate(
%EntityState{
system: system,
actor:
%Actor{
id: %ActorId{name: name} = _id,
Expand All @@ -182,7 +192,11 @@ defmodule Actors.Actor.Entity.Lifecycle do
} = _actor
} = state
) do
case Process.info(self(), :message_queue_len) do
queue_length = Process.info(self(), :message_queue_len)
{:message_queue_len, size} = queue_length
Measurements.dispatch_actor_inflights(system, name, size)

case queue_length do
{:message_queue_len, 0} ->
Logger.debug("Deactivating actor #{name} for timeout")
{:stop, :shutdown, state}
Expand Down
49 changes: 41 additions & 8 deletions lib/sidecar/measurements.ex
Original file line number Diff line number Diff line change
@@ -1,19 +1,52 @@
defmodule Sidecar.Measurements do
@moduledoc false

def dispatch_system_info() do
def stats() do
otp_release = :erlang.system_info(:otp_release)
multi_scheduling = :erlang.system_info(:multi_scheduling)
threads = :erlang.system_info(:threads)
smp_support = :erlang.system_info(:smp_support)

dirty_cpu_schedulers = :erlang.system_info(:dirty_cpu_schedulers)
dirty_cpu_schedulers_online = :erlang.system_info(:dirty_cpu_schedulers_online)
dist_buf_busy_limit = :erlang.system_info(:dist_buf_busy_limit)
process_count = :erlang.system_info(:process_count)
schedulers = :erlang.system_info(:schedulers)
schedulers_online = :erlang.system_info(:schedulers_online)
thread_pool_size = :erlang.system_info(:thread_pool_size)

:telemetry.execute(
[:vm, :system_info, :process_count],
%{last_value: :erlang.system_info(:process_count)},
%{}
[:vm, :system_info],
%{
dirty_cpu_schedulers: dirty_cpu_schedulers,
dirty_cpu_schedulers_online: dirty_cpu_schedulers_online,
dist_buf_busy_limit: dist_buf_busy_limit,
process_count: process_count,
schedulers: schedulers,
schedulers_online: schedulers_online,
thread_pool_size: thread_pool_size
},
%{
otp_release: otp_release,
smp_support: smp_support,
multi_scheduling: multi_scheduling,
threads: threads
}
)
end

def emit_invoke_duration(system, actor_name, duration) do
:telemetry.execute([:spawn, :invoke, :stop], %{system: system, actor: actor_name, duration: duration})
def dispatch_invoke_duration(system, actor_name, action, duration) do
:telemetry.execute(
[:spawn, :invoke, :stop],
%{duration: duration},
%{system: system, name: actor_name, action: action}
)
end

def emit_actor_inflights(system, actor_name, value) do
:telemetry.execute([:spawn, :actor, :inflights], %{value: value}, %{system: system, actor: actor_name})
def dispatch_actor_inflights(system, actor_name, value) do
:telemetry.execute([:spawn, :actor, :inflight_messages], %{messages: value}, %{
system: system,
name: actor_name
})
end
end
28 changes: 8 additions & 20 deletions lib/sidecar/metrics_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@ defmodule Sidecar.MetricsSupervisor do
defp metrics do
[
# VM Metrics
last_value("vm.system_info.system_version"),
last_value("vm.system_info.otp_release"),
last_value("vm.system_info.process_count"),
# last_value("vm.system_info.schedulers"),
# last_value("vm.system_info.schedulers_online"),
# last_value("vm.system_info.dirty_cpu_schedulers"),
# last_value("vm.system_info.dirty_cpu_schedulers_online"),
# last_value("vm.system_info.multi_scheduling"),
# last_value("vm.system_info.threads"),
# last_value("vm.system_info.thread_pool_size"),
# last_value("vm.system_info.smp_support"),
# last_value("vm.system_info.dist_buf_busy_limit", unit: :byte),

last_value("vm.system_info.schedulers"),
last_value("vm.system_info.schedulers_online"),
last_value("vm.system_info.dirty_cpu_schedulers"),
last_value("vm.system_info.dirty_cpu_schedulers_online"),
last_value("vm.system_info.thread_pool_size"),
last_value("vm.system_info.dist_buf_busy_limit", unit: :byte),
last_value("vm.memory.total", unit: :byte),
last_value("vm.total_run_queue_lengths.total"),
last_value("vm.total_run_queue_lengths.cpu"),
Expand All @@ -41,23 +35,17 @@ defmodule Sidecar.MetricsSupervisor do
# Actor Metrics
last_value("spawn.actor.memory", unit: :byte),
last_value("spawn.actor.message_queue_len"),
last_value("spawn.actor.inflight_messages.messages"),
counter("spawn.invoke.stop.duration"),
summary("spawn.invoke.stop.duration", unit: {:native, :millisecond})

# Database Time Metrics
# summary("my_app.repo.query.total_time", unit: {:native, :millisecond}),
# summary("my_app.repo.query.decode_time", unit: {:native, :millisecond}),
# summary("my_app.repo.query.query_time", unit: {:native, :millisecond}),
# summary("my_app.repo.query.idle_time", unit: {:native, :millisecond}),
# summary("my_app.repo.query.queue_time", unit: {:native, :millisecond}),
]
end

defp periodic_measurements do
[
{:process_info,
event: [:spawn, :actor], name: Actors.Actor.Entity, keys: [:message_queue_len, :memory]},
{Sidecar.Measurements, :dispatch_system_info, []}
{Sidecar.Measurements, :stats, []}
]
end
end

0 comments on commit ba9a19b

Please sign in to comment.