Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure batch delete removes expiring locks #724

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/sidekiq_unique_jobs/batch_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def batch_delete(conn)
chunk.each do |digest|
del_digest(pipeline, digest)
pipeline.zrem(SidekiqUniqueJobs::DIGESTS, digest)
pipeline.zrem(SidekiqUniqueJobs::EXPIRING_DIGESTS, digest)
@count += 1
end
end
Expand Down
19 changes: 18 additions & 1 deletion lib/sidekiq_unique_jobs/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def lock(job_id, lock_info = {})
pipeline.set(key.digest, job_id)
pipeline.hset(key.locked, job_id, now_f)
info.set(lock_info, pipeline)
pipeline.zadd(key.digests, now_f, key.digest)
add_digest_to_set(pipeline, lock_info)
pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "queue.lua", "Queued"))
pipeline.zadd(key.changelog, now_f, changelog_json(job_id, "lock.lua", "Locked"))
end
Expand Down Expand Up @@ -321,5 +321,22 @@ def changelog_json(job_id, script, message)
time: now_f,
)
end

#
# Add the digest to the correct sorted set
#
# @param [Object] pipeline a redis pipeline object for issue commands
# @param [Hash] lock_info the lock info relevant to the digest
#
# @return [nil]
#
def add_digest_to_set(pipeline, lock_info)
digest_string = key.digest
if lock_info["lock"] == :until_expired
pipeline.zadd(key.expiring_digests, now_f + lock_info["ttl"], digest_string)
else
pipeline.zadd(key.digests, now_f, digest_string)
end
end
end
end
23 changes: 23 additions & 0 deletions spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,28 @@
expect(service).not_to have_received(:orphans)
end
end

context "when a lock is until_expired" do
let(:lock_info) do
{
"job_id" => job_id,
"limit" => 1,
"lock" => :until_expired,
"time" => now_f,
"timeout" => nil,
"ttl" => 1,
"lock_args" => [],
"worker" => "MyUniqueJob",
}
end

it "clears the lock" do
expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 1
sleep 2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spec passes for me locally, but I don't think a sleep this long is really appropriate even if it did work consistently. It's also not really the right place for a spec which is curious about the interaction between both the reaper and batch delete. It's here to demonstrate the issue, but I'd love your thoughts about a better location or tactic for testing this

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sleep of two seconds would increase the test suite time with more than 50% (on my machine). Seems like something else is off here, I am without a computer until Monday but I'll check then.

service.call

expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 0
end
end
end
end