Skip to content

Commit

Permalink
Job retries with exponential backoff
Browse files Browse the repository at this point in the history
* Configure job retries in Config.mix
* Move failed jobs only after they have failed
  • Loading branch information
akira committed Nov 14, 2015
1 parent e80f3a7 commit 1451693
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 40 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ config :exq,
concurrency: :infinite,
queues: ["default"],
poll_timeout: 50,
scheduler_enable: false,
scheduler_poll_timeout: 200
scheduler_enable: true,
max_retries: 25
```

### Concurrency:
Expand Down
5 changes: 2 additions & 3 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ config :exq,
port: 6379,
namespace: "exq",
queues: ["default"],
scheduler_enable: false,
scheduler_enable: true,
scheduler_poll_timeout: 200,
redis_timeout: 5000,
genserver_timeout: 5000,
test_with_local_redis: true

max_retries: 25

import_config "#{Mix.env}.exs"
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ config :exq,
scheduler_poll_timeout: 200,
redis_timeout: 5000,
genserver_timeout: 5000,
test_with_local_redis: true
test_with_local_redis: true,
max_retries: 0
1 change: 1 addition & 0 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Exq.Manager.Server do
use GenServer
alias Exq.Stats.Server, as: Stats
alias Exq.Enqueuer
alias Exq.Redis.JobQueue
alias Exq.Support.Config

@default_name :exq
Expand Down
73 changes: 61 additions & 12 deletions lib/exq/redis/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ defmodule Exq.Redis.JobQueue do

alias Exq.Redis.Connection
alias Exq.Support.Json
alias Exq.Support.Job
alias Exq.Support.Config
alias Exq.Support.Randomize

#TODO: set to 25
@default_max_retries 5
@default_queue "default"

def find_job(redis, namespace, jid, :scheduled) do
Expand Down Expand Up @@ -67,17 +71,20 @@ defmodule Exq.Redis.JobQueue do
end
end

def enqueue_in(redis, namespace, queue, offset, worker, args) do
def enqueue_in(redis, namespace, queue, offset, worker, args) when is_integer(offset) do
time = Time.add(Time.now, Time.from(offset * 1_000_000, :usecs))
enqueue_at(redis, namespace, queue, time, worker, args)
end

def enqueue_at(redis, namespace, queue, time, worker, args) do
enqueued_at = DateFormat.format!(Date.from(time, :timestamp) |> Date.local, "{ISO}")
enqueued_at = DateFormat.format!(Date.from(time, :timestamp) |> Date.universal, "{ISO}")
{jid, job_json} = to_job_json(queue, worker, args, enqueued_at)
enqueue_job_at(redis, namespace, job_json, jid, time, scheduled_queue_key(namespace))
end

def enqueue_job_at(redis, namespace, job_json, jid, time, scheduled_queue) do
score = time_to_score(time)
try do
case Connection.zadd(redis, scheduled_queue_key(namespace), score, job_json) do
case Connection.zadd(redis, scheduled_queue, score, job_json) do
{:ok, _} -> {:ok, jid}
other -> other
end
Expand All @@ -104,21 +111,24 @@ defmodule Exq.Redis.JobQueue do
scheduler_dequeue(redis, namespace, queues, time_to_score(Time.now))
end
def scheduler_dequeue(redis, namespace, queues, max_score) when is_list(queues) do
Connection.zrangebyscore!(redis, scheduled_queue_key(namespace), 0, max_score)
|> scheduler_dequeue_requeue(redis, namespace, queues, 0)
Enum.reduce(schedule_queues(namespace), 0, fn(schedule_queue, acc) ->
deq_count = Connection.zrangebyscore!(redis, schedule_queue, 0, max_score)
|> scheduler_dequeue_requeue(redis, namespace, queues, schedule_queue, 0)
deq_count + acc
end)
end

def scheduler_dequeue_requeue([], _redis, _namespace, _queues, count), do: count
def scheduler_dequeue_requeue([job_json|t], redis, namespace, queues, count) do
if Connection.zrem!(redis, scheduled_queue_key(namespace), job_json) == "1" do
def scheduler_dequeue_requeue([], _redis, _namespace, _queues, schedule_queue, count), do: count
def scheduler_dequeue_requeue([job_json|t], redis, namespace, queues, schedule_queue, count) do
if Connection.zrem!(redis, schedule_queue, job_json) == "1" do
if Enum.count(queues) == 1 do
enqueue(redis, namespace, hd(queues), job_json)
else
enqueue(redis, namespace, job_json)
end
count = count + 1
end
scheduler_dequeue_requeue(t, redis, namespace, queues, count)
scheduler_dequeue_requeue(t, redis, namespace, queues, schedule_queue, count)
end


Expand All @@ -132,14 +142,53 @@ defmodule Exq.Redis.JobQueue do
full_key(namespace, "queue:#{queue}")
end

def schedule_queues(namespace) do
[ scheduled_queue_key(namespace), retry_queue_key(namespace) ]
end

def scheduled_queue_key(namespace) do
full_key(namespace, "schedule")
end

def retry_queue_key(namespace) do
full_key(namespace, "retry")
end

def time_to_score(time) do
Float.to_string(time |> Time.to_secs, [decimals: 6])
end

def retry_or_fail_job(redis, namespace, %{retry: true} = job, error) do
retry_count = (job.retry_count || 0) + 1
max_retries = Config.get(:max_retries, @default_max_retries)

if (retry_count <= max_retries) do
job = %{job |
retry_count: retry_count,
error_message: error
}

# Similar to Sidekiq strategy
offset = :math.pow(job.retry_count, 4) + 15 + (Randomize.random(30) * (job.retry_count + 1))
time = Time.add(Time.now, Time.from(offset * 1_000_000, :usecs))
Logger.info("Queueing job #{job.jid} to retry in #{offset} seconds")
enqueue_job_at(redis, namespace, Job.to_json(job), job.jid, time, retry_queue_key(namespace))
else
Logger.info("Max retries on job #{job.jid} exceeded")
fail_job(redis, namespace, job, error)
end
end
def retry_or_fail_job(redis, namespace, job, error) do
fail_job(redis, namespace, job, error)
end

def fail_job(redis, namespace, job, error) do
failed_at = DateFormat.format!(Date.universal, "{ISO}")
job = %{job | failed_at: failed_at, error_class: "ExqGenericError", error_message: error}
job_json = Job.to_json(job)
Connection.rpush!(redis, full_key(namespace, "failed"), job_json)
end

defp dequeue_random(_redis, _namespace, []) do
{:ok, {:none, nil}}
end
Expand All @@ -153,7 +202,7 @@ defmodule Exq.Redis.JobQueue do
end

def to_job_json(queue, worker, args) do
to_job_json(queue, worker, args, DateFormat.format!(Date.local, "{ISO}"))
to_job_json(queue, worker, args, DateFormat.format!(Date.universal, "{ISO}"))
end
def to_job_json(queue, worker, args, enqueued_at) when is_atom(worker) do
to_job_json(queue, to_string(worker), args, enqueued_at)
Expand All @@ -163,7 +212,7 @@ defmodule Exq.Redis.JobQueue do
end
def to_job_json(queue, worker, args, enqueued_at) do
jid = UUID.uuid4
job = Enum.into([{:queue, queue}, {:class, worker}, {:args, args}, {:jid, jid}, {:enqueued_at, enqueued_at}], HashDict.new)
job = Enum.into([{:queue, queue}, {:retry, true}, {:class, worker}, {:args, args}, {:jid, jid}, {:enqueued_at, enqueued_at}], HashDict.new)
{jid, Json.encode!(job)}
end
end
36 changes: 36 additions & 0 deletions lib/exq/redis/job_stat.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Exq.Redis.JobStat do
require Logger
use Timex

alias Exq.Redis.Connection
alias Exq.Redis.JobQueue
alias Exq.Support.Json
alias Exq.Support.Job

def record_processed(redis, namespace, _job) do
time = DateFormat.format!(Date.universal, "%Y-%m-%d %T %z", :strftime)
date = DateFormat.format!(Date.universal, "%Y-%m-%d", :strftime)

[{:ok, count}, {:ok, _,}, {:ok, _}, {:ok, _}] = :eredis.qp(redis,[
["INCR", JobQueue.full_key(namespace, "stat:processed")],
["INCR", JobQueue.full_key(namespace, "stat:processed_rt:#{time}")],
["EXPIRE", JobQueue.full_key(namespace, "stat:processed_rt:#{time}"), 120],
["INCR", JobQueue.full_key(namespace, "stat:processed:#{date}")]
])
{:ok, count}
end

def record_failure(redis, namespace, error, _job) do
time = DateFormat.format!(Date.universal, "%Y-%m-%d %T %z", :strftime)
date = DateFormat.format!(Date.universal, "%Y-%m-%d", :strftime)

[{:ok, count}, {:ok, _,}, {:ok, _}, {:ok, _}] = :eredis.qp(redis, [
["INCR", JobQueue.full_key(namespace, "stat:failed")],
["INCR", JobQueue.full_key(namespace, "stat:failed_rt:#{time}")],
["EXPIRE", JobQueue.full_key(namespace, "stat:failed_rt:#{time}"), 120],
["INCR", JobQueue.full_key(namespace, "stat:failed:#{date}")]
])
{:ok, count}
end

end
23 changes: 9 additions & 14 deletions lib/exq/stats/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Exq.Stats.Server do
use Timex
alias Exq.Redis.Connection
alias Exq.Redis.JobQueue
alias Exq.Redis.JobStat
alias Exq.Support.Json
alias Exq.Support.Binary
alias Exq.Stats.Process
Expand All @@ -15,7 +16,8 @@ defmodule Exq.Stats.Server do
end

def add_process(pid, namespace, worker, host, job) do
GenServer.cast(pid, {:add_process, namespace, %Process{pid: worker, host: host, job: job, started_at: DateFormat.format!(Date.local, "{ISO}")}})
GenServer.cast(pid, {:add_process, namespace,
%Process{pid: worker, host: host, job: job, started_at: DateFormat.format!(Date.universal, "{ISO}")}})
end

##===========================================================
Expand All @@ -40,12 +42,15 @@ defmodule Exq.Stats.Server do
end

def handle_cast({:record_processed, namespace, job}, state) do
record_processed(state.redis, namespace, job)
JobStat.record_processed(state.redis, namespace, job)
{:noreply, state}
end

def handle_cast({:record_failure, namespace, error, job}, state) do
record_failure(state.redis, namespace, error, job)
if job do
JobQueue.retry_or_fail_job(state.redis, namespace, job, error)
end
JobStat.record_failure(state.redis, namespace, error, job)
{:noreply, state}
end

Expand Down Expand Up @@ -143,26 +148,16 @@ defmodule Exq.Stats.Server do
{:ok, count}
end

def record_failure(redis, namespace, error, json) do
def record_failure(redis, namespace, error, job) do
count = Connection.incr!(redis, JobQueue.full_key(namespace, "stat:failed"))

time = DateFormat.format!(Date.universal, "%Y-%m-%d %T %z", :strftime)
Connection.incr!(redis, JobQueue.full_key(namespace, "stat:failed_rt:#{time}"))
Connection.expire!(redis, JobQueue.full_key(namespace, "stat:failed_rt:#{time}"), 120)


date = DateFormat.format!(Date.universal, "%Y-%m-%d", :strftime)
Connection.incr!(redis, JobQueue.full_key(namespace, "stat:failed:#{date}"))

failed_at = DateFormat.format!(Date.local, "{ISO}")

job = Exq.Support.Job.from_json(json)
job = Enum.into([{:failed_at, failed_at}, {:error_class, "ExqGenericError"}, {:error_message, error}, {:queue, job.queue}, {:class, job.class}, {:args, job.args}, {:jid, job.jid}, {:enqueued_at, job.enqueued_at}], HashDict.new)

job_json = Json.encode!(job)

Connection.rpush!(redis, JobQueue.full_key(namespace, "failed"), job_json)

{:ok, count}
end

Expand Down
13 changes: 13 additions & 0 deletions lib/exq/support/randomize.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# https://groups.google.com/forum/#!topic/elixir-lang-talk/drTvms2hcwE
defmodule Exq.Support.Randomize do
@on_load :reseed_generator

def reseed_generator do
:random.seed(:os.timestamp())
:ok
end

def random(number) do
:random.uniform(number)
end
end
24 changes: 15 additions & 9 deletions lib/exq/worker/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ defmodule Exq.Worker.Server do
use GenServer

defmodule State do
defstruct job: nil, manager: nil, queue: nil, work_table: nil
defstruct job_json: nil, job: nil, manager: nil, queue: nil, work_table: nil, worker_module: nil, worker_function: nil
end

def start(job, manager, queue, work_table) do
GenServer.start(__MODULE__, {job, manager, queue, work_table}, [])
def start(job_json, manager, queue, work_table) do
GenServer.start(__MODULE__, {job_json, manager, queue, work_table}, [])
end

def work(pid) do
Expand All @@ -18,12 +18,12 @@ defmodule Exq.Worker.Server do
## gen server callbacks
##===========================================================

def init({job, manager, queue, work_table}) do
{:ok, %State{job: job, manager: manager, queue: queue, work_table: work_table}}
def init({job_json, manager, queue, work_table}) do
{:ok, %State{job_json: job_json, manager: manager, queue: queue, work_table: work_table}}
end

def handle_cast(:work, state) do
job = Exq.Support.Job.from_json(state.job)
job = Exq.Support.Job.from_json(state.job_json)

target = String.replace(job.class, "::", ".")
[mod | func_or_empty] = Regex.split(~r/\//, target)
Expand All @@ -32,8 +32,13 @@ defmodule Exq.Worker.Server do
[f] -> :erlang.binary_to_atom(f, :utf8)
end
args = job.args
dispatch_work(mod, func, args)
{:stop, :normal, state}
GenServer.cast(self, :dispatch)
{:noreply, %{state | worker_module: String.to_atom("Elixir.#{mod}"), worker_function: func, job: job} }
end

def handle_cast(:dispatch, state) do
dispatch_work(state.worker_module, state.worker_function, state.job.args)
{:stop, :normal, state }
end

def code_change(_old_version, state, _extra) do
Expand Down Expand Up @@ -63,6 +68,7 @@ defmodule Exq.Worker.Server do
error_msg = Inspect.Algebra.format(Inspect.Algebra.to_doc(error, %Inspect.Opts{}), %Inspect.Opts{}.width)
GenServer.cast(state.manager, {:failure, to_string(error_msg), state.job})
Exq.Manager.Server.update_worker_count(state.work_table, state.queue, -1)
Logger.error("Worker terminated, #{error_msg}")
_ ->
Logger.error("Worker terminated, but manager was not alive.")
end
Expand All @@ -77,6 +83,6 @@ defmodule Exq.Worker.Server do
dispatch_work(worker_module, :perform, args)
end
def dispatch_work(worker_module, method, args) do
:erlang.apply(String.to_atom("Elixir.#{worker_module}"), method, args)
apply(worker_module, method, args)
end
end

0 comments on commit 1451693

Please sign in to comment.