Skip to content

Commit

Permalink
Merge pull request #2 from egze/refactor
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
egze authored Nov 13, 2024
2 parents caedba4 + 7eb7a46 commit b927dcc
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 127 deletions.
3 changes: 3 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import Config

config :logger, level: :warning
25 changes: 25 additions & 0 deletions lib/obanalyze.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,35 @@ defmodule Obanalyze do
|> Enum.fetch!(1)
|> String.replace("doc/images", "images")

alias Obanalyze.ObanJobs
alias Obanalyze.NavItem

@oban_sorted_job_states [
"executing",
"available",
"scheduled",
"retryable",
"cancelled",
"discarded",
"completed"
]

@doc """
Returns the module for the Obanalyze Phoenix.LiveDashboard page.
"""
def dashboard do
Obanalyze.Dashboard
end

@doc """
Returns the nav items to render the menu.
"""
def get_nav_items do
job_states_with_count = ObanJobs.job_states_with_count()

for job_state <- @oban_sorted_job_states,
count = Map.get(job_states_with_count, job_state, 0),
timestamp_field = ObanJobs.timestamp_field_for_job_state(job_state),
do: NavItem.new(job_state, count, timestamp_field)
end
end
152 changes: 41 additions & 111 deletions lib/obanalyze/dashboard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,27 @@ defmodule Obanalyze.Dashboard do
use Phoenix.LiveDashboard.PageBuilder, refresher?: true

import Phoenix.LiveDashboard.Helpers, only: [format_value: 2]
import Ecto.Query

@per_page_limits [20, 50, 100]

@oban_sorted_job_states [
"executing",
"available",
"scheduled",
"retryable",
"cancelled",
"discarded",
"completed"
]
alias Obanalyze.ObanJobs

@default_job_state "executing"
@per_page_limits [20, 50, 100]

@impl true
def render(assigns) do
~H"""
<style>
#job-modal tr > :first-child {
width: 20%;
}
</style>
<h1 class="mb-3">Obanalyze</h1>
<p>Filter jobs by state:</p>
<.live_nav_bar id="oban_states" page={@page} nav_param="job_state" style={:bar} extra_params={["nav"]}>
<:item :for={{job_state, count} <- @job_state_counts} name={job_state} label={job_state_label(job_state, count)} method="navigate">
<.live_table id="oban_jobs" limit={per_page_limits()} dom_id={"oban-jobs-#{job_state}"} page={@page} row_attrs={&row_attrs/1} row_fetcher={&fetch_jobs(&1, &2, job_state)} default_sort_by={@timestamp_field} title="" search={false}>
<:item :for={nav_item <- @nav_items} name={nav_item.name} label={nav_item.label} method="navigate">
<.live_table id="oban_jobs" limit={per_page_limits()} dom_id={"oban-jobs-#{nav_item.name}"} page={@page} row_attrs={&row_attrs/1} row_fetcher={&row_fetcher(&1, &2, nav_item.name)} default_sort_by={@default_sort_by} title="" search={false}>
<:col :let={job} field={:worker} sortable={:desc}>
<p class="font-weight-bold"><%= job.worker %></p>
<pre class="font-weight-lighter text-muted"><%= truncate(inspect(job.args)) %></pre>
Expand All @@ -38,8 +33,8 @@ defmodule Obanalyze.Dashboard do
<%= job.attempt %>/<%= job.max_attempts %>
</:col>
<:col field={:queue} header="Queue" sortable={:desc} />
<:col :let={job} field={@timestamp_field} sortable={:desc}>
<%= format_value(timestamp(job, @timestamp_field)) %>
<:col :let={job} field={nav_item.timestamp_field} sortable={:desc}>
<%= format_value(timestamp(job, nav_item.timestamp_field)) %>
</:col>
</.live_table>
</:item>
Expand Down Expand Up @@ -69,12 +64,7 @@ defmodule Obanalyze.Dashboard do
end

@impl true
def mount(params, _, socket) do
socket =
socket
|> assign(job_state: Map.get(params, "job_state", @default_job_state))
|> assign_sort_by()

def mount(_params, _, socket) do
{:ok, socket}
end

Expand All @@ -84,29 +74,12 @@ defmodule Obanalyze.Dashboard do
end

@impl true
def handle_params(%{"params" => %{"job" => job_id}}, _url, socket) do
def handle_params(params, _uri, socket) do
socket =
socket
|> assign(job: nil)
|> assign_job_state_counts()
|> assign_timestamp_field()

case fetch_job(job_id) do
{:ok, job} ->
{:noreply, assign(socket, job: job)}

:error ->
to = live_dashboard_path(socket, socket.assigns.page, params: %{})
{:noreply, push_patch(socket, to: to)}
end
end

def handle_params(_params, _uri, socket) do
socket =
socket
|> assign(job: nil)
|> assign_job_state_counts()
|> assign_timestamp_field()
|> assign_nav_items()
|> assign_default_sort_by(params["job_state"])
|> assign_job(get_in(params, ["params", "job"]))

{:noreply, socket}
end
Expand All @@ -121,62 +94,42 @@ defmodule Obanalyze.Dashboard do
def handle_refresh(socket) do
socket =
socket
|> assign_job_state_counts()
|> assign_nav_items()
|> update(:job, fn
%Oban.Job{id: job_id} -> ObanJobs.get_oban_job(job_id)
_ -> nil
end)

{:noreply, socket}
end

defp assign_job_state_counts(socket) do
job_state_counts_in_db =
Oban.Repo.all(
Oban.config(),
Oban.Job
|> group_by([j], [j.state])
|> order_by([j], [j.state])
|> select([j], {j.state, count(j.id)})
)
|> Enum.into(%{})

job_state_counts =
for job_state <- @oban_sorted_job_states,
do: {job_state, Map.get(job_state_counts_in_db, job_state, 0)}

assign(socket, job_state_counts: job_state_counts)
end
defp assign_job(socket, job_id) do
if job_id do
case ObanJobs.get_oban_job(job_id) do
%Oban.Job{} = job ->
assign(socket, job: job)

defp job_state_label(job_state, count) do
"#{job_state} - (#{count})"
_ ->
to = live_dashboard_path(socket, socket.assigns.page, params: %{})
push_patch(socket, to: to)
end
else
assign(socket, job: nil)
end
end

defp fetch_jobs(params, _node, job_state) do
total_jobs = Oban.Repo.aggregate(Oban.config(), jobs_count_query(job_state), :count)

jobs =
Oban.Repo.all(Oban.config(), jobs_query(params, job_state)) |> Enum.map(&Map.from_struct/1)

{jobs, total_jobs}
defp assign_nav_items(socket) do
assign(socket, nav_items: Obanalyze.get_nav_items())
end

defp fetch_job(id) do
case Oban.Repo.get(Oban.config(), Oban.Job, id) do
%Oban.Job{} = job ->
{:ok, job}
defp assign_default_sort_by(socket, job_state) do
timestamp_field = ObanJobs.timestamp_field_for_job_state(job_state)

_ ->
:error
end
end

defp jobs_query(%{sort_by: sort_by, sort_dir: sort_dir, limit: limit}, job_state) do
Oban.Job
|> limit(^limit)
|> where([job], job.state == ^job_state)
|> order_by({^sort_dir, ^sort_by})
assign(socket, :default_sort_by, timestamp_field)
end

defp jobs_count_query(job_state) do
Oban.Job
|> where([job], job.state == ^job_state)
defp row_fetcher(params, _node, job_state) do
ObanJobs.list_jobs_with_total(params, job_state)
end

defp row_attrs(job) do
Expand All @@ -201,29 +154,6 @@ defmodule Obanalyze.Dashboard do
Map.get(job, timestamp_field)
end

defp assign_timestamp_field(%{assigns: %{job_state: job_state}} = socket) do
assign(socket, timestamp_field: timestamp_field_for_job_state(job_state))
end

defp assign_sort_by(socket) do
%{job_state: job_state} = socket.assigns

assign(socket, sort_by: timestamp_field_for_job_state(job_state))
end

defp timestamp_field_for_job_state(job_state) do
case job_state do
"available" -> :scheduled_at
"cancelled" -> :cancelled_at
"completed" -> :completed_at
"discarded" -> :discarded_at
"executing" -> :attempted_at
"retryable" -> :scheduled_at
"scheduled" -> :scheduled_at
_ -> :inserted_at
end
end

defp truncate(string, max_length \\ 50) do
if String.length(string) > max_length do
String.slice(string, 0, max_length) <> "…"
Expand Down
11 changes: 11 additions & 0 deletions lib/obanalyze/nav_item.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Obanalyze.NavItem do
defstruct [:name, :label, :timestamp_field]

def new(state, count, timestamp_field) do
%__MODULE__{
name: state,
label: "#{Phoenix.Naming.humanize(state)} (#{count})",
timestamp_field: timestamp_field
}
end
end
52 changes: 52 additions & 0 deletions lib/obanalyze/oban_jobs.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Obanalyze.ObanJobs do
import Ecto.Query, only: [group_by: 3, order_by: 2, order_by: 3, select: 3, limit: 2, where: 3]

def get_oban_job(id) do
Oban.Repo.get(Oban.config(), Oban.Job, id)
end

def list_jobs_with_total(params, job_state) do
total_jobs = Oban.Repo.aggregate(Oban.config(), jobs_count_query(job_state), :count)

jobs =
Oban.Repo.all(Oban.config(), jobs_query(params, job_state)) |> Enum.map(&Map.from_struct/1)

{jobs, total_jobs}
end

defp jobs_query(%{sort_by: sort_by, sort_dir: sort_dir, limit: limit}, job_state) do
Oban.Job
|> limit(^limit)
|> where([job], job.state == ^job_state)
|> order_by({^sort_dir, ^sort_by})
end

defp jobs_count_query(job_state) do
Oban.Job
|> where([job], job.state == ^job_state)
end

def job_states_with_count do
Oban.Repo.all(
Oban.config(),
Oban.Job
|> group_by([j], [j.state])
|> order_by([j], [j.state])
|> select([j], {j.state, count(j.id)})
)
|> Enum.into(%{})
end

def timestamp_field_for_job_state(job_state, default \\ :attempted_at) do
case job_state do
"available" -> :scheduled_at
"cancelled" -> :cancelled_at
"completed" -> :completed_at
"discarded" -> :discarded_at
"executing" -> :attempted_at
"retryable" -> :scheduled_at
"scheduled" -> :scheduled_at
_ -> default
end
end
end
Loading

0 comments on commit b927dcc

Please sign in to comment.