From a6efa23b4609d5f705bd4f239727c783d60db9e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Wo=CC=88ginger?= Date: Sun, 1 Sep 2024 19:01:35 +0200 Subject: [PATCH] add GenServer which handles job processing --- lib/radiator/application.ex | 10 +++- lib/radiator/job.ex | 89 ++++++++++++++++++++++++++++++++++ lib/radiator/job_supervisor.ex | 24 +++++++++ mix.exs | 2 +- 4 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 lib/radiator/job.ex create mode 100644 lib/radiator/job_supervisor.ex diff --git a/lib/radiator/application.ex b/lib/radiator/application.ex index 88e2e5cb..ca8da6a3 100644 --- a/lib/radiator/application.ex +++ b/lib/radiator/application.ex @@ -11,6 +11,12 @@ defmodule Radiator.Application do @impl true def start(_type, _args) do + job_runner_config = [ + strategy: :one_for_one, + max_seconds: 30, + name: Radiator.JobRunner + ] + children = [ RadiatorWeb.Telemetry, Radiator.Repo, @@ -24,7 +30,9 @@ defmodule Radiator.Application do RadiatorWeb.Endpoint, {EventProducer, name: EventProducer}, {CommandProcessor, name: CommandProcessor, subscribe_to: [{EventProducer, max_demand: 1}]}, - {NodeChangeListener, name: NodeChangeListener} + {NodeChangeListener, name: NodeChangeListener}, + {Registry, keys: :unique, name: Radiator.JobRegistry}, + {DynamicSupervisor, job_runner_config} ] # See https://hexdocs.pm/elixir/Supervisor.html diff --git a/lib/radiator/job.ex b/lib/radiator/job.ex new file mode 100644 index 00000000..87566537 --- /dev/null +++ b/lib/radiator/job.ex @@ -0,0 +1,89 @@ +# test with +# GenServer.start(Radiator.Job, work: fn -> Process.sleep(5000);{:ok,[]} end) +defmodule Radiator.Job do + @moduledoc """ + WIP: Job module to handle work in a GenServer + idea taken from https://pragprog.com/titles/sgdpelixir/concurrent-data-processing-in-elixir/ + """ + use GenServer, restart: :transient + require Logger + + alias __MODULE__ + alias Radiator.JobRunner + alias Radiator.JobSupervisor + + defstruct [:work, :arguments, :id, :max_retries, retries: 0, status: "new"] + + def start_job(args) do + if Enum.count(running_jobs()) >= 5 do + {:error, :import_quota_reached} + else + DynamicSupervisor.start_child(JobRunner, {JobSupervisor, args}) + end + end + + def running_jobs do + match_all = {:"$1", :"$2", :"$3"} + # TODO import is a placeholder + guards = [{:==, :"$3", "import"}] + map_result = [%{id: :"$1", pid: :"$2", type: :"$3"}] + Registry.select(Radiator.JobRegistry, [{match_all, guards, map_result}]) + end + + def start_link(args) do + GenServer.start_link(__MODULE__, args) + end + + def init(args) do + work = Keyword.fetch!(args, :work) + id = Keyword.get(args, :id, random_job_id()) + max_retries = Keyword.get(args, :max_retries, 3) + arguments = Keyword.fetch!(args, :arguments) + + state = %Job{id: id, work: work, max_retries: max_retries, arguments: arguments} + + {:ok, state, {:continue, :run}} + end + + def handle_continue(:run, state) do + new_state = state.work.(state.arguments) |> handle_job_result(state) + + if new_state.status == "errored" do + Process.send_after(self(), :retry, 5000) + {:noreply, new_state} + else + Logger.info("Job exiting #{state.id}") + {:stop, :normal, new_state} + end + end + + def handle_info(:retry, state) do + # Delegate work to the `handle_continue/2` callback. + {:noreply, state, {:continue, :run}} + end + + defp handle_job_result({:ok, _data}, state) do + Logger.info("Job completed #{state.id}") + %Job{state | status: "done"} + end + + defp handle_job_result(:error, %{status: "new"} = state) do + Logger.warning("Job errored #{state.id}") + %Job{state | status: "errored"} + end + + defp handle_job_result(:error, %{status: "errored"} = state) do + Logger.warning("Job retry failed #{state.id}") + new_state = %Job{state | retries: state.retries + 1} + + if new_state.retries == state.max_retries do + %Job{new_state | status: "failed"} + else + new_state + end + end + + defp random_job_id do + :crypto.strong_rand_bytes(5) |> Base.url_encode64(padding: false) + end +end diff --git a/lib/radiator/job_supervisor.ex b/lib/radiator/job_supervisor.ex new file mode 100644 index 00000000..31b1e8a8 --- /dev/null +++ b/lib/radiator/job_supervisor.ex @@ -0,0 +1,24 @@ +defmodule Radiator.JobSupervisor do + @moduledoc """ + A Supervisor for each job for greater flexibility. Starts as a child of + JobRunner and as a Supervisor for each job + """ + use Supervisor, restart: :temporary + + def start_link(args) do + Supervisor.start_link(__MODULE__, args) + end + + def init(args) do + children = [ + {Radiator.Job, args} + ] + + options = [ + strategy: :one_for_one, + max_seconds: 30 + ] + + Supervisor.init(children, options) + end +end diff --git a/mix.exs b/mix.exs index eeff958f..79376c92 100644 --- a/mix.exs +++ b/mix.exs @@ -19,7 +19,7 @@ defmodule Radiator.MixProject do def application do [ mod: {Radiator.Application, []}, - extra_applications: [:logger, :runtime_tools] + extra_applications: [:logger, :runtime_tools, :crypto] ] end