Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate connection handling #79

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/solid_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module SolidCache
mattr_accessor :executor, :connects_to

def self.all_shard_keys
all_shards_config&.keys || [ Record.default_shard ]
all_shards_config&.keys || []
end

def self.all_shards_config
Expand All @@ -20,7 +20,7 @@ def self.all_shards_config
def self.each_shard
return to_enum(:each_shard) unless block_given?

if (shards = connects_to && connects_to[:shards]&.keys)
if (shards = all_shards_config&.keys)
shards.each do |shard|
Record.connected_to(shard: shard) { yield }
end
Expand Down
4 changes: 2 additions & 2 deletions lib/solid_cache/cluster.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@

module SolidCache
class Cluster
include Instrumented, Sharded, Execution, Trimming, Stats
include Instrumented, Connections, Execution, Trimming, Stats

def initialize(options = {})
super(options)
end

def setup!
setup_shards!
super
end
end
end
57 changes: 57 additions & 0 deletions lib/solid_cache/cluster/connections.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module SolidCache
class Cluster
module Connections
def initialize(options = {})
super(options)
@shard_options = options.fetch(:shards, nil)

if [Hash, Array, NilClass].none? { |klass| @shard_options.is_a? klass }
raise ArgumentError, "`shards` is a `#{@shard_options.class.name}`, it should be one of Array, Hash or nil"
end
end

def with_each_connection(async: false, &block)
return enum_for(:with_each_connection) unless block_given?

connections.with_each do
async_if_required(async, &block)
end
end

def with_connection_for(key, async: false, &block)
connections.with_connection_for(key) do
async_if_required(async, &block)
end
end

def with_connection(name, async: false, &block)
connections.with(name) do
async_if_required(async, &block)
end
end

def group_by_connection(keys)
connections.assign(keys)
end

def connection_names
connections.names
end

private
def setup!
return if defined?(@connections)
@connections = sharded? ? Managed.new(@shard_options) : Unmanaged.new
end

def sharded?
@shard_options.present? || SolidCache.all_shards_config.present?
end

def connections
setup! unless defined?(@connections)
@connections
end
end
end
end
65 changes: 65 additions & 0 deletions lib/solid_cache/cluster/connections/managed.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module SolidCache
class Cluster
module Connections
class Managed
attr_reader :names, :nodes, :consistent_hash

def initialize(options)
case options
when Array, NilClass
@names = options || SolidCache.all_shard_keys
@nodes = @names.to_h { |name| [ name, name ] }
when Hash
@names = options.keys
@nodes = options.invert
end

if (unknown_shards = names - SolidCache.all_shard_keys).any?
raise ArgumentError, "Unknown #{"shard".pluralize(unknown_shards)}: #{unknown_shards.join(", ")}"
end

@consistent_hash = MaglevHash.new(@nodes.keys) if sharded?
end

def with_each(&block)
return enum_for(:with_each) unless block_given?

names.each { |name| with(name, &block) }
end

def with(name, &block)
Record.with_shard(name, &block)
end

def with_connection_for(key, &block)
with(shard_for(key), &block)
end

def assign(keys)
if sharded?
keys.group_by { |key| shard_for(key.is_a?(Hash) ? key[:key] : key) }
else
{ names.first => keys }
end
end

def count
names.count
end

private
def shard_for(key)
if sharded?
nodes[consistent_hash.node(key)]
else
names.first
end
end

def sharded?
names.count > 1
end
end
end
end
end
33 changes: 33 additions & 0 deletions lib/solid_cache/cluster/connections/unmanaged.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module SolidCache
class Cluster
module Connections
class Unmanaged
def with_each(&block)
return enum_for(:with_each) unless block_given?

yield
end

def with(name)
yield
end

def with_connection_for(key, &block)
yield
end

def assign(keys)
{ :default => keys }
end

def count
1
end

def names
[ :default ]
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_cache/cluster/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def async(&block)
current_shard = Entry.current_shard
@background << ->() do
wrap_in_rails_executor do
shards.with(current_shard, &block)
connections.with(current_shard, &block)
end
end
end
Expand Down
53 changes: 0 additions & 53 deletions lib/solid_cache/cluster/sharded.rb

This file was deleted.

10 changes: 5 additions & 5 deletions lib/solid_cache/cluster/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ def initialize(options = {})

def stats
stats = {
shards: shards.count,
shards_stats: shards_stats
connections: connections.count,
connection_stats: connections_stats
}
end

private
def shards_stats
with_each_shard.to_h { |shard| [Entry.current_shard, shard_stats] }
def connections_stats
with_each_connection.to_h { |connection| [Entry.current_shard, connection_stats] }
end

def shard_stats
def connection_stats
oldest_created_at = Entry.order(:id).pick(:created_at)

{
Expand Down
11 changes: 6 additions & 5 deletions lib/solid_cache/cluster/trimming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ module Trimming
# might be deleted already. The delete multiplier should compensate for that.
TRIM_SELECT_MULTIPLIER = 3

attr_reader :trim_batch_size, :max_age, :max_entries
attr_reader :trim_batch_size, :trim_every, :max_age, :max_entries

def initialize(options = {})
super(options)
@trim_batch_size = options.fetch(:trim_batch_size, 100)
@trim_every = [(trim_batch_size * 0.8).floor, 1].max
@max_age = options.fetch(:max_age, 2.weeks.to_i)
@max_entries = options.fetch(:max_entries, nil)
end

def trim(write_count)
counter = trim_counters[Entry.current_shard]
counter.increment(write_count * TRIM_DELETE_MULTIPLIER)
counter.increment(write_count)
value = counter.value
if value > trim_batch_size && counter.compare_and_set(value, value - trim_batch_size)
if value > trim_every && counter.compare_and_set(value, value - trim_every)
async { trim_batch }
end
end
Expand All @@ -38,7 +39,7 @@ def trim_batch
end

def trim_counters
@trim_counters ||= shards.names.to_h { |shard_name| [ shard_name, trim_counter ] }
@trim_counters ||= connection_names.to_h { |connection_name| [ connection_name, trim_counter ] }
end

def cache_full?
Expand All @@ -48,7 +49,7 @@ def cache_full?
def trim_counter
# Pre-fill the first counter to prevent herding and to account
# for discarded counters from the last shutdown
Concurrent::AtomicFixnum.new(rand(trim_batch_size).to_i)
Concurrent::AtomicFixnum.new(rand(trim_every).to_i)
end

def trim_select_size
Expand Down
61 changes: 0 additions & 61 deletions lib/solid_cache/shards.rb

This file was deleted.

Loading