Skip to content

Commit

Permalink
Ensure async calls are wrapped in instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
djmb committed Sep 26, 2023
1 parent 19b25cb commit 9277563
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 68 deletions.
2 changes: 1 addition & 1 deletion lib/solid_cache/cluster.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

module SolidCache
class Cluster
include Connections, Execution, Expiry, Instrumented, Stats
include Connections, Execution, Expiry, Stats

def initialize(options = {})
super(options)
Expand Down
19 changes: 17 additions & 2 deletions lib/solid_cache/cluster/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Execution
def initialize(options = {})
super(options)
@background = Concurrent::SingleThreadExecutor.new(max_queue: 100, fallback_policy: :discard)
@active_record_instrumentation = options.fetch(:active_record_instrumentation, true)
end

private
Expand All @@ -12,14 +13,16 @@ def async(&block)
current_shard = Entry.current_shard
@background << ->() do
wrap_in_rails_executor do
connections.with(current_shard, &block)
connections.with(current_shard) do
instrument(&block)
end
end
end
end

def async_if_required(required, &block)
if required
async { instrument(&block) }
async(&block)
else
instrument(&block)
end
Expand All @@ -32,6 +35,18 @@ def wrap_in_rails_executor(&block)
block.call
end
end

def active_record_instrumentation?
@active_record_instrumentation
end

def instrument(&block)
if active_record_instrumentation?
block.call
else
Record.disable_instrumentation(&block)
end
end
end
end
end
23 changes: 0 additions & 23 deletions lib/solid_cache/cluster/instrumented.rb

This file was deleted.

42 changes: 0 additions & 42 deletions test/unit/async_executor_test.rb

This file was deleted.

133 changes: 133 additions & 0 deletions test/unit/execution_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
require "test_helper"
require "active_support/testing/method_call_assertions"

class SolidCache::ExecutionTest < ActiveSupport::TestCase
include ActiveSupport::Testing::TimeHelpers

setup do
@cache = nil
@namespace = "test-#{SecureRandom.hex}"

@cache = lookup_store(expiry_batch_size: 2, shards: nil)
end

def test_async_errors_are_reported
error_subscriber = ErrorSubscriber.new
Rails.error.subscribe(error_subscriber)

@cache.primary_cluster.send(:async) do
raise "Boom!"
end

sleep 0.1

assert_equal 1, error_subscriber.errors.count
assert_equal "Boom!", error_subscriber.errors.first[0].message
assert_equal({ context: {}, handled: false, level: :error, source: nil }, error_subscriber.errors.first[1])
ensure
Rails.error.unsubscribe(error_subscriber) if Rails.error.respond_to?(:unsubscribe)
end

def test_active_record_instrumention
instrumented_cache = lookup_store
uninstrumented_cache = lookup_store(active_record_instrumentation: false)

calls = 0
callback = ->(*args) { calls += 1 }
ActiveSupport::Notifications.subscribed(callback, "sql.active_record") do
assert_changes -> { calls } do
instrumented_cache.read("foo")
end
assert_changes -> { calls } do
instrumented_cache.write("foo", "bar")
end
assert_no_changes -> { calls } do
uninstrumented_cache.read("foo")
end
assert_no_changes -> { calls } do
uninstrumented_cache.write("foo", "bar")
end
end
end

def test_active_record_instrumention_expiry
instrumented_cache = lookup_store(expiry_batch_size: 2)
uninstrumented_cache = lookup_store(active_record_instrumentation: false, expiry_batch_size: 2)
calls = 0
callback = ->(*args) {
calls += 1
}

ActiveSupport::Notifications.subscribed(callback, "sql.active_record") do
# Warm up the connections as they may log before instrumenation can be disabled
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
sleep 0.1

assert_no_changes -> { calls } do
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
sleep 0.1
end
end

ActiveSupport::Notifications.subscribed(callback, "sql.active_record") do
# Warm up the connections as they may log before instrumenation can be disabled
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
sleep 0.1

assert_changes -> { calls } do
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
sleep 0.1
end
end
end

unless ENV["NO_CONNECTS_TO"]
def test_no_connections_uninstrumented
ActiveRecord::ConnectionAdapters::ConnectionPool.any_instance.stubs(:connection).raises(ActiveRecord::StatementInvalid)

cache = lookup_store(expires_in: 60, cluster: { shards: [ :primary_shard_one, :primary_shard_two ] }, active_record_instrumentation: false)

assert_equal false, cache.write("1", "fsjhgkjfg")
assert_nil cache.read("1")
assert_nil cache.increment("1")
assert_nil cache.decrement("1")
assert_equal false, cache.delete("1")
assert_equal({}, cache.read_multi("1", "2", "3"))
assert_equal false, cache.write_multi("1" => "a", "2" => "b", "3" => "c")
end

def test_no_connections_instrumented
ActiveRecord::ConnectionAdapters::ConnectionPool.any_instance.stubs(:connection).raises(ActiveRecord::StatementInvalid)

cache = lookup_store(expires_in: 60, cluster: { shards: [ :primary_shard_one, :primary_shard_two ] })

assert_equal false, cache.write("1", "fsjhgkjfg")
assert_nil cache.read("1")
assert_nil cache.increment("1")
assert_nil cache.decrement("1")
assert_equal false, cache.delete("1")
assert_equal({}, cache.read_multi("1", "2", "3"))
assert_equal false, cache.write_multi("1" => "a", "2" => "b", "3" => "c")
end
end

class ErrorSubscriber
attr_reader :errors

def initialize
@errors = []
end

def report(error, handled:, severity:, context:, source: nil)
errors << [ error, { context: context, handled: handled, level: severity, source: source } ]
end
end
end
39 changes: 39 additions & 0 deletions test/unit/instrumentation_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,45 @@ def test_active_record_instrumention
end
end

def test_active_record_instrumention_expiry
instrumented_cache = lookup_store(expiry_batch_size: 2)
uninstrumented_cache = lookup_store(active_record_instrumentation: false, expiry_batch_size: 2)
calls = 0
callback = ->(*args) {
calls += 1
}

ActiveSupport::Notifications.subscribed(callback, "sql.active_record") do
# Warm up the connections as they may log before instrumenation can be disabled
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
sleep 0.1

assert_no_changes -> { calls } do
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
uninstrumented_cache.write("foo", "bar")
sleep 0.1
end
end

ActiveSupport::Notifications.subscribed(callback, "sql.active_record") do
# Warm up the connections as they may log before instrumenation can be disabled
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
sleep 0.1

assert_changes -> { calls } do
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
instrumented_cache.write("foo", "bar")
sleep 0.1
end
end
end

unless ENV["NO_CONNECTS_TO"]
def test_no_connections_uninstrumented
ActiveRecord::ConnectionAdapters::ConnectionPool.any_instance.stubs(:connection).raises(ActiveRecord::StatementInvalid)
Expand Down

0 comments on commit 9277563

Please sign in to comment.