diff --git a/.travis.yml b/.travis.yml index e0b0726..053957b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,12 @@ language: elixir elixir: - - 1.5.3 - - 1.6.4 + - 1.6.6 + - 1.7.3 otp_release: - 20.3 + - 21.1 addons: postgresql: "9.6" diff --git a/lib/ecto_job/job_queue.ex b/lib/ecto_job/job_queue.ex index d42bdfb..27a945e 100644 --- a/lib/ecto_job/job_queue.ex +++ b/lib/ecto_job/job_queue.ex @@ -277,7 +277,8 @@ defmodule EctoJob.JobQueue do Returns `{:ok, job}` when sucessful, `{:error, :expired}` otherwise. """ - @spec update_job_in_progress(repo, job, DateTime.t(), integer) :: {:ok, job} | {:error, :expired} + @spec update_job_in_progress(repo, job, DateTime.t(), integer) :: + {:ok, job} | {:error, :expired} def update_job_in_progress(repo, job = %schema{}, now, timeout_ms) do {count, results} = repo.update_all( diff --git a/lib/ecto_job/producer.ex b/lib/ecto_job/producer.ex index 77031b2..34d9df9 100644 --- a/lib/ecto_job/producer.ex +++ b/lib/ecto_job/producer.ex @@ -1,12 +1,12 @@ defmodule EctoJob.Producer do @moduledoc """ - GenStage producer responsible for reserving available jobs from a job queue, and + `GenStage` producer responsible for reserving available jobs from a job queue, and passing them on to the consumer module. - The GenStage will buffer demand when there are insufficient jobs available in the + The `GenStage` will buffer demand when there are insufficient jobs available in the database. - Installs a timer to check for expired jobs, and uses a Postgrex.Notifications listener + Installs a timer to check for expired jobs, and uses a `Postgrex.Notifications` listener to dispatch jobs immediately when new jobs are inserted into the database and there is pending demand. """ @@ -142,24 +142,15 @@ defmodule EctoJob.Producer do @doc """ Messages from the timer and the notifications listener will be handled in `handle_info`. - If there is no pending demand for jobs, then all messages are ignored. `:poll` messages will attempt to activate jobs, and dispatch them according to current demand. `:notification` messages will dispatch any active jobs according to current demand. """ @spec handle_info(term, State.t()) :: {:noreply, [JobQueue.job()], State.t()} - def handle_info(_, state = %State{demand: 0}) do - {:noreply, [], state} - end - def handle_info(:poll, state = %State{repo: repo, schema: schema, clock: clock}) do now = clock.() _ = JobQueue.fail_expired_jobs_at_max_attempts(repo, schema, now) - - if activate_jobs(repo, schema, now) > 0 do - dispatch_jobs(state, now) - else - {:noreply, [], state} - end + activate_jobs(repo, schema, now) + dispatch_jobs(state, now) end def handle_info({:notification, _pid, _ref, _channel, _payload}, state = %State{clock: clock}) do @@ -182,7 +173,12 @@ defmodule EctoJob.Producer do end # Reserve jobs according to demand, and construct the GenState reply tuple + # Short-circuit when zero demand @spec dispatch_jobs(State.t(), DateTime.t()) :: {:noreply, [JobQueue.job()], State.t()} + defp dispatch_jobs(state = %State{demand: 0}, _now) do + {:noreply, [], state} + end + defp dispatch_jobs(state = %State{}, now) do %{repo: repo, schema: schema, demand: demand, reservation_timeout: timeout} = state {count, jobs} = JobQueue.reserve_available_jobs(repo, schema, demand, now, timeout) diff --git a/mix.exs b/mix.exs index 283994d..8efd904 100644 --- a/mix.exs +++ b/mix.exs @@ -9,7 +9,7 @@ defmodule EctoJob.Mixfile do app: :ecto_job, description: "A transactional job queue built with Ecto, PostgreSQL and GenStage.", version: @version, - elixir: "~> 1.5", + elixir: "~> 1.6", elixirc_paths: elixirc_paths(Mix.env()), elixirc_options: [warnings_as_errors: true], start_permanent: Mix.env() == :prod, diff --git a/test/producer_test.exs b/test/producer_test.exs index 026b656..949747b 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -27,6 +27,13 @@ defmodule EctoJob.ProducerTest do assert {:noreply, [], ^state} = Producer.handle_info(:poll, state) end + test "When pending demand and available jobs", %{state: state} do + Repo.insert!(JobQueue.new(%{})) + + assert {:noreply, [%JobQueue{}], %{demand: 9}} = + Producer.handle_info(:poll, %{state | demand: 10}) + end + test "When scheduled jobs can be activated", %{state: state} do at = DateTime.from_naive!(~N[2017-08-17T12:23:34.0Z], "Etc/UTC") Repo.insert!(JobQueue.new(%{}, schedule: at))