-
Notifications
You must be signed in to change notification settings - Fork 31
/
mutate.rb
90 lines (75 loc) · 2.01 KB
/
mutate.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
require "faktory/client"
##
#
# Faktory's MUTATE API allows you to scan the sorted sets
# within Redis (retries, scheduled, dead) and take action
# (delete, enqueue, kill) on entries.
#
# require 'faktory/mutate'
# cl = Faktory::Client.new
# cl.discard(Faktory::RETRIES) do |filter|
# filter.with_type("QuickBooksSyncJob")
# filter.matching("*uid:12345*"))
# end
module Faktory
# Valid targets
RETRIES = "retries"
SCHEDULED = "scheduled"
DEAD = "dead"
module Mutator
class Filter
attr_accessor :hash
def initialize
@hash = {}
end
# This must be the exact type of the job, no pattern matching
def with_type(jobtype)
@hash[:jobtype] = jobtype
end
# This is a regexp that will be passed as is to Redis's SCAN.
# Notably you should surround it with * to ensure it matches
# substrings within the job payload.
# See https://redis.io/commands/scan for details.
def matching(regexp)
@hash[:regexp] = regexp
end
# One or more JIDs to target:
# filter.jids << 'abcdefgh1234'
# filter.jids = ['abcdefgh1234', '1234567890']
def jids
@hash[:jids] ||= []
end
def jids=(ary)
@hash[:jids] = Array(ary)
end
end
def discard(target, &block)
filter = Filter.new
block&.call(filter)
mutate("discard", target, filter)
end
def kill(target, &block)
filter = Filter.new
block&.call(filter)
mutate("kill", target, filter)
end
def requeue(target, &block)
filter = Filter.new
block&.call(filter)
mutate("requeue", target, filter)
end
def clear(target)
mutate("discard", target, nil)
end
private
def mutate(cmd, target, filter)
payload = {cmd: cmd, target: target}
payload[:filter] = filter.hash if filter && !filter.hash.empty?
transaction do
command("MUTATE", JSON.dump(payload))
ok
end
end
end
end
Faktory::Client.send(:include, Faktory::Mutator)