From ffa691d282729aed6da7f3f3a014e2ca401d5622 Mon Sep 17 00:00:00 2001 From: Michael Buhot Date: Thu, 4 Oct 2018 19:36:16 +1000 Subject: [PATCH 1/3] Dispatch available jobs when polling This is primarily to support EctoJob in Azure where pg_notify is known to not work. https://github.com/elixir-ecto/postgrex/issues/375 Additionally, this change ensures that even if there is no demand, poll messages still activate scheduled / expired jobs. --- lib/ecto_job/job_queue.ex | 3 ++- lib/ecto_job/producer.ex | 24 ++++++++++-------------- test/producer_test.exs | 7 +++++++ 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/lib/ecto_job/job_queue.ex b/lib/ecto_job/job_queue.ex index d42bdfb..27a945e 100644 --- a/lib/ecto_job/job_queue.ex +++ b/lib/ecto_job/job_queue.ex @@ -277,7 +277,8 @@ defmodule EctoJob.JobQueue do Returns `{:ok, job}` when sucessful, `{:error, :expired}` otherwise. """ - @spec update_job_in_progress(repo, job, DateTime.t(), integer) :: {:ok, job} | {:error, :expired} + @spec update_job_in_progress(repo, job, DateTime.t(), integer) :: + {:ok, job} | {:error, :expired} def update_job_in_progress(repo, job = %schema{}, now, timeout_ms) do {count, results} = repo.update_all( diff --git a/lib/ecto_job/producer.ex b/lib/ecto_job/producer.ex index 77031b2..34d9df9 100644 --- a/lib/ecto_job/producer.ex +++ b/lib/ecto_job/producer.ex @@ -1,12 +1,12 @@ defmodule EctoJob.Producer do @moduledoc """ - GenStage producer responsible for reserving available jobs from a job queue, and + `GenStage` producer responsible for reserving available jobs from a job queue, and passing them on to the consumer module. - The GenStage will buffer demand when there are insufficient jobs available in the + The `GenStage` will buffer demand when there are insufficient jobs available in the database. - Installs a timer to check for expired jobs, and uses a Postgrex.Notifications listener + Installs a timer to check for expired jobs, and uses a `Postgrex.Notifications` listener to dispatch jobs immediately when new jobs are inserted into the database and there is pending demand. """ @@ -142,24 +142,15 @@ defmodule EctoJob.Producer do @doc """ Messages from the timer and the notifications listener will be handled in `handle_info`. - If there is no pending demand for jobs, then all messages are ignored. `:poll` messages will attempt to activate jobs, and dispatch them according to current demand. `:notification` messages will dispatch any active jobs according to current demand. """ @spec handle_info(term, State.t()) :: {:noreply, [JobQueue.job()], State.t()} - def handle_info(_, state = %State{demand: 0}) do - {:noreply, [], state} - end - def handle_info(:poll, state = %State{repo: repo, schema: schema, clock: clock}) do now = clock.() _ = JobQueue.fail_expired_jobs_at_max_attempts(repo, schema, now) - - if activate_jobs(repo, schema, now) > 0 do - dispatch_jobs(state, now) - else - {:noreply, [], state} - end + activate_jobs(repo, schema, now) + dispatch_jobs(state, now) end def handle_info({:notification, _pid, _ref, _channel, _payload}, state = %State{clock: clock}) do @@ -182,7 +173,12 @@ defmodule EctoJob.Producer do end # Reserve jobs according to demand, and construct the GenState reply tuple + # Short-circuit when zero demand @spec dispatch_jobs(State.t(), DateTime.t()) :: {:noreply, [JobQueue.job()], State.t()} + defp dispatch_jobs(state = %State{demand: 0}, _now) do + {:noreply, [], state} + end + defp dispatch_jobs(state = %State{}, now) do %{repo: repo, schema: schema, demand: demand, reservation_timeout: timeout} = state {count, jobs} = JobQueue.reserve_available_jobs(repo, schema, demand, now, timeout) diff --git a/test/producer_test.exs b/test/producer_test.exs index 026b656..949747b 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -27,6 +27,13 @@ defmodule EctoJob.ProducerTest do assert {:noreply, [], ^state} = Producer.handle_info(:poll, state) end + test "When pending demand and available jobs", %{state: state} do + Repo.insert!(JobQueue.new(%{})) + + assert {:noreply, [%JobQueue{}], %{demand: 9}} = + Producer.handle_info(:poll, %{state | demand: 10}) + end + test "When scheduled jobs can be activated", %{state: state} do at = DateTime.from_naive!(~N[2017-08-17T12:23:34.0Z], "Etc/UTC") Repo.insert!(JobQueue.new(%{}, schedule: at)) From 3f5dc23b7f055ebb7362aac3eec023b1ef9f4cbe Mon Sep 17 00:00:00 2001 From: Michael Buhot Date: Thu, 4 Oct 2018 19:51:51 +1000 Subject: [PATCH 2/3] Add elixir 1.7 and otp 21 to travis --- .travis.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index e0b0726..ccb18a0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,10 +2,12 @@ language: elixir elixir: - 1.5.3 - - 1.6.4 + - 1.6.6 + - 1.7.3 otp_release: - 20.3 + - 21.1 addons: postgresql: "9.6" From ec929d48fa00d9eab2acc7015ee74cd063b7a9fd Mon Sep 17 00:00:00 2001 From: Michael Buhot Date: Thu, 4 Oct 2018 20:14:57 +1000 Subject: [PATCH 3/3] Bump minimum elixir version to 1.6 --- .travis.yml | 1 - mix.exs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index ccb18a0..053957b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: elixir elixir: - - 1.5.3 - 1.6.6 - 1.7.3 diff --git a/mix.exs b/mix.exs index 283994d..8efd904 100644 --- a/mix.exs +++ b/mix.exs @@ -9,7 +9,7 @@ defmodule EctoJob.Mixfile do app: :ecto_job, description: "A transactional job queue built with Ecto, PostgreSQL and GenStage.", version: @version, - elixir: "~> 1.5", + elixir: "~> 1.6", elixirc_paths: elixirc_paths(Mix.env()), elixirc_options: [warnings_as_errors: true], start_permanent: Mix.env() == :prod,