Skip to content

Commit

Permalink
add GenServer which handles job processing
Browse files Browse the repository at this point in the history
  • Loading branch information
electronicbites committed Sep 3, 2024
1 parent cb12454 commit 912585c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
3 changes: 2 additions & 1 deletion lib/radiator/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ defmodule Radiator.Application do
RadiatorWeb.Endpoint,
{EventProducer, name: EventProducer},
{CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]},
{NodeChangeListener, name: NodeChangeListener}
{NodeChangeListener, name: NodeChangeListener},
{DynamicSupervisor, strategy: :one_for_one, name: Radiator.JobRunner}
]

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
68 changes: 68 additions & 0 deletions lib/radiator/job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# test with
# GenServer.start(Radiator.Job, work: fn -> Process.sleep(5000);{:ok,[]} end)
defmodule Radiator.Job do
@moduledoc """
WIP: Job module to handle work in a GenServer
rename: JobProcessor
idea taken from https://pragprog.com/titles/sgdpelixir/concurrent-data-processing-in-elixir/
"""
use GenServer, restart: :transient
require Logger

defstruct [:work, :id, :max_retries, retries: 0, status: "new"]

def start_link(args) do
GenServer.start_link(__MODULE__, args)
end

def init(args) do
work = Keyword.fetch!(args, :work)
id = Keyword.get(args, :id, random_job_id())
max_retries = Keyword.get(args, :max_retries, 3)

state = %Radiator.Job{id: id, work: work, max_retries: max_retries}
{:ok, state, {:continue, :run}}
end

def handle_continue(:run, state) do
new_state = state.work.() |> handle_job_result(state)

if new_state.status == "errored" do
Process.send_after(self(), :retry, 5000)
{:noreply, new_state}
else
Logger.info("Job exiting #{state.id}")
{:stop, :normal, new_state}
end
end

def handle_info(:retry, state) do
# Delegate work to the `handle_continue/2` callback.
{:noreply, state, {:continue, :run}}
end

defp handle_job_result({:ok, _data}, state) do
Logger.info("Job completed #{state.id}")
%Radiator.Job{state | status: "done"}
end

defp handle_job_result(:error, %{status: "new"} = state) do
Logger.warning("Job errored #{state.id}")
%Radiator.Job{state | status: "errored"}
end

defp handle_job_result(:error, %{status: "errored"} = state) do
Logger.warning("Job retry failed #{state.id}")
new_state = %Radiator.Job{state | retries: state.retries + 1}

if new_state.retries == state.max_retries do
%Radiator.Job{new_state | status: "failed"}
else
new_state
end
end

defp random_job_id do
:crypto.strong_rand_bytes(5) |> Base.url_encode64(padding: false)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Radiator.MixProject do
def application do
[
mod: {Radiator.Application, []},
extra_applications: [:logger, :runtime_tools]
extra_applications: [:logger, :runtime_tools, :crypto]
]
end

Expand Down

0 comments on commit 912585c

Please sign in to comment.