Skip to content

Commit

Permalink
Expire with ExpiryJob (#87)
Browse files Browse the repository at this point in the history
Add the option to expire records with a `SolidCache::ExpiryJob`. Move
the expiry candidate logic in `SolidCache::Entry` so the job can call
it directly, without needing to know about the cache store that
triggered the expiration.
  • Loading branch information
djmb committed Sep 28, 2023
1 parent cdf317a commit f2db657
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 108 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Solid cache supports these options in addition to the universal `ActiveSupport::

- `error_handler` - a Proc to call to handle any `ActiveRecord::ActiveRecordError`s that are raises (default: log errors as warnings)
- `expiry_batch_size` - the batch size to use when deleting old records (default: `100`)
- `expiry_method` - what expiry method to use `thread` or `job` (default: `thread`)
- `max_age` - the maximum age of entries in the cache (default: `2.weeks.to_i`)
- `max_entries` - the maximum number of entries allowed in the cache (default: `2.weeks.to_i`)
- `cluster` - a Hash of options for the cache database cluster, e.g `{ shards: [:database1, :database2, :database3] }`
Expand All @@ -99,6 +100,8 @@ Expiring when we reach 80% of the batch size allows us to expire records from th

Only triggering expiry when we write means that the if the cache is idle, the background thread is also idle.

If you want the cache expiry to be run in a background job instead of a thread, you can set `expiry_method` to `:job`. This will enqueue a `SolidCache::ExpiryJob`.

### Using a dedicated cache database

Add database configuration to database.yml, e.g.:
Expand Down
9 changes: 9 additions & 0 deletions app/jobs/solid_cache/expiry_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module SolidCache
class ExpiryJob < ActiveJob::Base
def perform(count, shard: nil, max_age:, max_entries:)
Record.with_shard(shard) do
Entry.expire(count, max_age: max_age, max_entries: max_entries)
end
end
end
end
19 changes: 16 additions & 3 deletions app/models/solid_cache/entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ def id_range
end
end

def first_n_id_and_created_at(n)
uncached do
order(:id).limit(n).pluck(:id, :created_at)
def expire(count, max_age:, max_entries:)
if (ids = expiry_candidate_ids(count, max_age: max_age, max_entries: max_entries)).any?
delete(ids)
end
end

Expand Down Expand Up @@ -137,6 +137,19 @@ def delete_no_query_cache(attribute, values)
def to_binary(key)
ActiveModel::Type::Binary.new.serialize(key)
end

def expiry_candidate_ids(count, max_age:, max_entries:)
cache_full = max_entries && max_entries < id_range
min_created_at = max_age.seconds.ago

uncached do
order(:id)
.limit(count * 3)
.pluck(:id, :created_at)
.filter_map { |id, created_at| id if cache_full || created_at < min_created_at }
.sample(count)
end
end
end
end
end
Expand Down
33 changes: 9 additions & 24 deletions lib/solid_cache/cluster/expiry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,30 @@ module Expiry
# This ensures there is downward pressure on the cache size while there is valid data to delete
EXPIRY_MULTIPLIER = 1.25

# If deleting X records, we'll select X * EXPIRY_SELECT_MULTIPLIER and randomly delete X of those
# The selection doesn't lock so it allows more deletion concurrency, but some of the selected records
# might be deleted already. The expiry multiplier should compensate for that.
EXPIRY_SELECT_MULTIPLIER = 3

attr_reader :expiry_batch_size, :expiry_select_size, :expire_every, :max_age, :max_entries
attr_reader :expiry_batch_size, :expiry_method, :expire_every, :max_age, :max_entries

def initialize(options = {})
super(options)
@expiry_batch_size = options.fetch(:expiry_batch_size, 100)
@expiry_select_size = expiry_batch_size * EXPIRY_SELECT_MULTIPLIER
@expiry_method = options.fetch(:expiry_method, :thread)
@expire_every = [ (expiry_batch_size / EXPIRY_MULTIPLIER).floor, 1 ].max
@max_age = options.fetch(:max_age, 2.weeks.to_i)
@max_entries = options.fetch(:max_entries, nil)

raise ArgumentError, "Expiry method must be one of `:thread` or `:job`" unless [ :thread, :job ].include?(expiry_method)
end

def track_writes(count)
expire_later if expiry_counter.count(count)
end

private
def cache_full?
max_entries && max_entries < Entry.id_range
end

def expire_later
async { expire }
end

def expire
Entry.expire(expiry_candidate_ids)
end

def expiry_candidate_ids
Entry \
.first_n_id_and_created_at(expiry_select_size)
.tap { |candidates| candidates.select! { |id, created_at| created_at < max_age.seconds.ago } unless cache_full? }
.sample(expiry_batch_size)
.map { |id, created_at| id }
if expiry_method == :job
ExpiryJob.perform_later(expiry_batch_size, shard: Entry.current_shard, max_age: max_age, max_entries: max_entries)
else
async { Entry.expire(expiry_batch_size, max_age: max_age, max_entries: max_entries) }
end
end

def expiry_counter
Expand Down
170 changes: 89 additions & 81 deletions test/unit/expiry_test.rb
Original file line number Diff line number Diff line change
@@ -1,116 +1,124 @@
require "test_helper"
require "active_support/testing/method_call_assertions"

class SolidCache::TrimmingTest < ActiveSupport::TestCase
class SolidCache::ExpiryTest < ActiveSupport::TestCase
include ActiveSupport::Testing::TimeHelpers
include ActiveJob::TestHelper

setup do
@namespace = "test-#{SecureRandom.hex}"
SolidCache::Cluster.any_instance.stubs(:rand).returns(0)
end

def test_expires_old_records
@cache = lookup_store(expiry_batch_size: 3, max_age: 2.weeks)
default_shard_keys = shard_keys(@cache, :default)
@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)
assert_equal 1, @cache.read(default_shard_keys[0])
assert_equal 2, @cache.read(default_shard_keys[1])

send_entries_back_in_time(3.weeks)

@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)

sleep 0.1
assert_nil @cache.read(default_shard_keys[0])
assert_nil @cache.read(default_shard_keys[1])
assert_equal 3, @cache.read(default_shard_keys[2])
assert_equal 4, @cache.read(default_shard_keys[3])
end

def test_expires_records_when_the_cache_is_full
@cache = lookup_store(expiry_batch_size: 3, max_age: 2.weeks, max_entries: 2)
default_shard_keys = shard_keys(@cache, :default)
@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)

sleep 0.1
[ :thread, :job ].each do |expiry_method|
test "expires old records (#{expiry_method})" do
@cache = lookup_store(expiry_batch_size: 3, max_age: 2.weeks, expiry_method: expiry_method)
default_shard_keys = shard_keys(@cache, :default)
@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)
assert_equal 1, @cache.read(default_shard_keys[0])
assert_equal 2, @cache.read(default_shard_keys[1])

@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)
send_entries_back_in_time(3.weeks)

sleep 0.1
@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)

# Two records have been deleted
assert_equal 1, SolidCache.each_shard.sum { SolidCache::Entry.count }
end
sleep 0.1
perform_enqueued_jobs

def test_expires_records_no_shards
@cache = ActiveSupport::Cache.lookup_store(:solid_cache_store, expiry_batch_size: 3, namespace: @namespace, max_entries: 2)
default_shard_keys = shard_keys(@cache, :default)
assert_nil @cache.read(default_shard_keys[0])
assert_nil @cache.read(default_shard_keys[1])
assert_equal 3, @cache.read(default_shard_keys[2])
assert_equal 4, @cache.read(default_shard_keys[3])
end

@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)
test "expires records when the cache is full (#{expiry_method})" do
@cache = lookup_store(expiry_batch_size: 3, max_age: 2.weeks, max_entries: 2, expiry_method: expiry_method)
default_shard_keys = shard_keys(@cache, :default)
@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)

sleep 0.1
sleep 0.1

@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)
@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)

sleep 0.1
sleep 0.1
perform_enqueued_jobs

# Two records have been deleted
assert_equal 1, SolidCache.each_shard.sum { SolidCache::Entry.count }
end
# Two records have been deleted
assert_equal 1, SolidCache.each_shard.sum { SolidCache::Entry.count }
end

unless ENV["NO_CONNECTS_TO"]
def test_expires_old_records_multiple_shards
@cache = lookup_store(expiry_batch_size: 2, cluster: { shards: [ :default, :primary_shard_one ] })
test "expires records no shards (#{expiry_method})" do
@cache = ActiveSupport::Cache.lookup_store(:solid_cache_store, expiry_batch_size: 3, namespace: @namespace, max_entries: 2, expiry_method: expiry_method)
default_shard_keys = shard_keys(@cache, :default)
shard_one_keys = shard_keys(@cache, :primary_shard_one)

@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)
@cache.write(shard_one_keys[0], 3)
@cache.write(shard_one_keys[1], 4)

assert_equal 1, @cache.read(default_shard_keys[0])
assert_equal 2, @cache.read(default_shard_keys[1])
assert_equal 3, @cache.read(shard_one_keys[0])
assert_equal 4, @cache.read(shard_one_keys[1])

sleep 0.1 # ensure they are marked as read
send_entries_back_in_time(3.weeks)
sleep 0.1

@cache.write(default_shard_keys[2], 5)
@cache.write(default_shard_keys[3], 6)
@cache.write(shard_one_keys[2], 7)
@cache.write(shard_one_keys[3], 8)
@cache.write(default_shard_keys[2], 3)
@cache.write(default_shard_keys[3], 4)

sleep 0.1
perform_enqueued_jobs

assert_nil @cache.read(default_shard_keys[0])
assert_nil @cache.read(default_shard_keys[1])
assert_nil @cache.read(shard_one_keys[0])
assert_nil @cache.read(shard_one_keys[1])
assert_equal 5, @cache.read(default_shard_keys[2])
assert_equal 6, @cache.read(default_shard_keys[3])
assert_equal 7, @cache.read(shard_one_keys[2])
assert_equal 8, @cache.read(shard_one_keys[3])

[ :default, :primary_shard_one ].each do |shard|
SolidCache::Record.connected_to(shard: shard) do
assert_equal 2, SolidCache::Entry.count
# Three records have been deleted
assert_equal 1, SolidCache.each_shard.sum { SolidCache::Entry.count }
end

unless ENV["NO_CONNECTS_TO"]
test "expires old records multiple shards (#{expiry_method})" do
@cache = lookup_store(expiry_batch_size: 2, cluster: { shards: [ :default, :primary_shard_one ] }, expiry_method: expiry_method)
default_shard_keys = shard_keys(@cache, :default)
shard_one_keys = shard_keys(@cache, :primary_shard_one)

@cache.write(default_shard_keys[0], 1)
@cache.write(default_shard_keys[1], 2)
@cache.write(shard_one_keys[0], 3)
@cache.write(shard_one_keys[1], 4)

assert_equal 1, @cache.read(default_shard_keys[0])
assert_equal 2, @cache.read(default_shard_keys[1])
assert_equal 3, @cache.read(shard_one_keys[0])
assert_equal 4, @cache.read(shard_one_keys[1])

sleep 0.1 # ensure they are marked as read
send_entries_back_in_time(3.weeks)

@cache.write(default_shard_keys[2], 5)
@cache.write(default_shard_keys[3], 6)
@cache.write(shard_one_keys[2], 7)
@cache.write(shard_one_keys[3], 8)

sleep 0.1
perform_enqueued_jobs

assert_nil @cache.read(default_shard_keys[0])
assert_nil @cache.read(default_shard_keys[1])
assert_nil @cache.read(shard_one_keys[0])
assert_nil @cache.read(shard_one_keys[1])
assert_equal 5, @cache.read(default_shard_keys[2])
assert_equal 6, @cache.read(default_shard_keys[3])
assert_equal 7, @cache.read(shard_one_keys[2])
assert_equal 8, @cache.read(shard_one_keys[3])

[ :default, :primary_shard_one ].each do |shard|
SolidCache::Record.connected_to(shard: shard) do
assert_equal 2, SolidCache::Entry.count
end
end
end
end
end

private
def shard_keys(cache, shard)
namespaced_keys = 100.times.map { |i| @cache.send(:normalize_key, "key#{i}", {}) }
shard_keys = cache.primary_cluster.send(:connections).assign(namespaced_keys)[shard]
shard_keys.map { |key| key.delete_prefix("#{@namespace}:") }
end
private
def shard_keys(cache, shard)
namespaced_keys = 100.times.map { |i| @cache.send(:normalize_key, "key#{i}", {}) }
shard_keys = cache.primary_cluster.send(:connections).assign(namespaced_keys)[shard]
shard_keys.map { |key| key.delete_prefix("#{@namespace}:") }
end
end

0 comments on commit f2db657

Please sign in to comment.