Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow RedisMutex’s locking duration and polling interval to be customizable #74

Merged
merged 4 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,23 @@ This requires that you have imagemagick installed on your computer:
bundle exec gush viz <NameOfTheWorkflow>
```

### Customizing locking options

In order to prevent getting the RedisMutex::LockError error when having a large number of jobs, you can customize these 2 fields `locking_duration` and `polling_interval` as below

```ruby
# config/initializers/gush.rb
Gush.configure do |config|
config.redis_url = "redis://localhost:6379"
config.concurrency = 5
config.locking_duration = 2 # how long you want to wait for the lock to be released, in seconds
config.polling_interval = 0.3 # how long the polling interval should be, in seconds
end
```

### Cleaning up afterwards

Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is ran. This data might be useful for analysis but at a certain point it can be purged via Redis TTL. By default gush and Redis will keep keys forever. To configure expiration you need to 2 things. Create initializer (specify config.ttl in seconds, be different per environment).
Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is ran. This data might be useful for analysis but at a certain point it can be purged via Redis TTL. By default gush and Redis will keep keys forever. To configure expiration you need to 2 things. Create initializer (specify config.ttl in seconds, be different per environment).

```ruby
# config/initializers/gush.rb
Expand All @@ -361,7 +375,7 @@ Gush.configure do |config|
end
```

And you need to call `flow.expire!` (optionally passing custom TTL value overriding `config.ttl`). This gives you control whether to expire data for specific workflow. Best NOT to set TTL to be too short (like minutes) but about a week in length. And you can run `Client.expire_workflow` and `Client.expire_job` passing appropriate IDs and TTL (pass -1 to NOT expire) values.
And you need to call `flow.expire!` (optionally passing custom TTL value overriding `config.ttl`). This gives you control whether to expire data for specific workflow. Best NOT to set TTL to be too short (like minutes) but about a week in length. And you can run `Client.expire_workflow` and `Client.expire_job` passing appropriate IDs and TTL (pass -1 to NOT expire) values.

### Avoid overlapping workflows

Expand Down
12 changes: 7 additions & 5 deletions lib/gush/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class CLI < Thor
def initialize(*)
super
Gush.configure do |config|
config.gushfile = options.fetch("gushfile", config.gushfile)
config.concurrency = options.fetch("concurrency", config.concurrency)
config.redis_url = options.fetch("redis", config.redis_url)
config.namespace = options.fetch("namespace", config.namespace)
config.ttl = options.fetch("ttl", config.ttl)
config.gushfile = options.fetch("gushfile", config.gushfile)
config.concurrency = options.fetch("concurrency", config.concurrency)
config.redis_url = options.fetch("redis", config.redis_url)
config.namespace = options.fetch("namespace", config.namespace)
config.ttl = options.fetch("ttl", config.ttl)
config.locking_duration = options.fetch("locking_duration", config.locking_duration)
config.polling_interval = options.fetch("polling_interval", config.polling_interval)
end
load_gushfile
end
Expand Down
24 changes: 14 additions & 10 deletions lib/gush/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
module Gush
class Configuration
attr_accessor :concurrency, :namespace, :redis_url, :ttl
attr_accessor :concurrency, :namespace, :redis_url, :ttl, :locking_duration, :polling_interval

def self.from_json(json)
new(Gush::JSON.decode(json, symbolize_keys: true))
end

def initialize(hash = {})
self.concurrency = hash.fetch(:concurrency, 5)
self.namespace = hash.fetch(:namespace, 'gush')
self.redis_url = hash.fetch(:redis_url, 'redis://localhost:6379')
self.gushfile = hash.fetch(:gushfile, 'Gushfile')
self.ttl = hash.fetch(:ttl, -1)
self.concurrency = hash.fetch(:concurrency, 5)
self.namespace = hash.fetch(:namespace, 'gush')
self.redis_url = hash.fetch(:redis_url, 'redis://localhost:6379')
self.gushfile = hash.fetch(:gushfile, 'Gushfile')
self.ttl = hash.fetch(:ttl, -1)
self.locking_duration = hash.fetch(:locking_duration, 2) # how long you want to wait for the lock to be released, in seconds
self.polling_interval = hash.fetch(:polling_internal, 0.3) # how long the polling interval should be, in seconds
end

def gushfile=(path)
Expand All @@ -24,10 +26,12 @@ def gushfile

def to_hash
{
concurrency: concurrency,
namespace: namespace,
redis_url: redis_url,
ttl: ttl
concurrency: concurrency,
namespace: namespace,
redis_url: redis_url,
ttl: ttl,
locking_duration: locking_duration,
polling_interval: polling_interval
}
end

Expand Down
12 changes: 10 additions & 2 deletions lib/gush/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ def perform(workflow_id, job_id)

private

attr_reader :client, :workflow_id, :job
attr_reader :client, :workflow_id, :job, :configuration

def client
@client ||= Gush::Client.new(Gush.configuration)
end

def configuration
@configuration ||= client.configuration
end

def setup_job(workflow_id, job_id)
@workflow_id = workflow_id
@job ||= client.find_job(workflow_id, job_id)
Expand Down Expand Up @@ -73,7 +77,11 @@ def elapsed(start)

def enqueue_outgoing_jobs
job.outgoing.each do |job_name|
RedisMutex.with_lock("gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}", sleep: 0.3, block: 2) do
RedisMutex.with_lock(
"gush_enqueue_outgoing_jobs_#{workflow_id}-#{job_name}",
sleep: configuration.polling_interval,
block: configuration.locking_duration
) do
out = client.find_job(workflow_id, job_name)

if out.ready_to_start?
Expand Down
6 changes: 6 additions & 0 deletions spec/gush/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
expect(subject.concurrency).to eq(5)
expect(subject.namespace).to eq('gush')
expect(subject.gushfile).to eq(GUSHFILE.realpath)
expect(subject.locking_duration).to eq(2)
expect(subject.polling_interval).to eq(0.3)
end

describe "#configure" do
it "allows setting options through a block" do
Gush.configure do |config|
config.redis_url = "redis://localhost"
config.concurrency = 25
config.locking_duration = 5
config.polling_interval = 0.5
end

expect(Gush.configuration.redis_url).to eq("redis://localhost")
expect(Gush.configuration.concurrency).to eq(25)
expect(Gush.configuration.locking_duration).to eq(5)
expect(Gush.configuration.polling_interval).to eq(0.5)
end
end
end
8 changes: 8 additions & 0 deletions spec/gush/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
subject { described_class.new }

let!(:workflow) { TestWorkflow.create }
let(:locking_duration) { 5 }
let(:polling_interval) { 0.5 }
let!(:job) { client.find_job(workflow.id, "Prepare") }
let(:config) { Gush.configuration.to_json }
let!(:client) { Gush::Client.new }
Expand Down Expand Up @@ -71,5 +73,11 @@ def configure

subject.perform(workflow.id, 'OkayJob')
end

it 'calls RedisMutex.with_lock with customizable locking_duration and polling_interval' do
expect(RedisMutex).to receive(:with_lock)
.with(anything, block: 5, sleep: 0.5).twice
subject.perform(workflow.id, 'Prepare')
end
end
end
6 changes: 4 additions & 2 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ def job_with_id(job_name)
clear_performed_jobs

Gush.configure do |config|
config.redis_url = REDIS_URL
config.gushfile = GUSHFILE
config.redis_url = REDIS_URL
config.gushfile = GUSHFILE
config.locking_duration = defined?(locking_duration) ? locking_duration : 2
config.polling_interval = defined?(polling_interval) ? polling_interval : 0.3
end
end

Expand Down