|
| 1 | +require "concurrent/atomics" |
| 2 | +require "json" |
| 3 | + |
| 4 | +module LaunchDarkly |
| 5 | + module Impl |
| 6 | + module Integrations |
| 7 | + module Redis |
| 8 | + # |
| 9 | + # Internal implementation of the Redis feature store, intended to be used with CachingStoreWrapper. |
| 10 | + # |
| 11 | + class RedisFeatureStoreCore |
| 12 | + begin |
| 13 | + require "redis" |
| 14 | + require "connection_pool" |
| 15 | + REDIS_ENABLED = true |
| 16 | + rescue ScriptError, StandardError |
| 17 | + REDIS_ENABLED = false |
| 18 | + end |
| 19 | + |
| 20 | + def initialize(opts) |
| 21 | + if !REDIS_ENABLED |
| 22 | + raise RuntimeError.new("can't use Redis feature store because one of these gems is missing: redis, connection_pool") |
| 23 | + end |
| 24 | + |
| 25 | + @redis_opts = opts[:redis_opts] || Hash.new |
| 26 | + if opts[:redis_url] |
| 27 | + @redis_opts[:url] = opts[:redis_url] |
| 28 | + end |
| 29 | + if !@redis_opts.include?(:url) |
| 30 | + @redis_opts[:url] = LaunchDarkly::Integrations::Redis::default_redis_url |
| 31 | + end |
| 32 | + max_connections = opts[:max_connections] || 16 |
| 33 | + @pool = opts[:pool] || ConnectionPool.new(size: max_connections) do |
| 34 | + ::Redis.new(@redis_opts) |
| 35 | + end |
| 36 | + @prefix = opts[:prefix] || LaunchDarkly::Integrations::Redis::default_prefix |
| 37 | + @logger = opts[:logger] || Config.default_logger |
| 38 | + @test_hook = opts[:test_hook] # used for unit tests, deliberately undocumented |
| 39 | + |
| 40 | + @stopped = Concurrent::AtomicBoolean.new(false) |
| 41 | + |
| 42 | + with_connection do |redis| |
| 43 | + @logger.info("RedisFeatureStore: using Redis instance at #{redis.connection[:host]}:#{redis.connection[:port]} \ |
| 44 | + and prefix: #{@prefix}") |
| 45 | + end |
| 46 | + end |
| 47 | + |
| 48 | + def init_internal(all_data) |
| 49 | + count = 0 |
| 50 | + with_connection do |redis| |
| 51 | + all_data.each do |kind, items| |
| 52 | + redis.multi do |multi| |
| 53 | + multi.del(items_key(kind)) |
| 54 | + count = count + items.count |
| 55 | + items.each { |key, item| |
| 56 | + redis.hset(items_key(kind), key, item.to_json) |
| 57 | + } |
| 58 | + end |
| 59 | + end |
| 60 | + end |
| 61 | + @logger.info { "RedisFeatureStore: initialized with #{count} items" } |
| 62 | + end |
| 63 | + |
| 64 | + def get_internal(kind, key) |
| 65 | + with_connection do |redis| |
| 66 | + get_redis(redis, kind, key) |
| 67 | + end |
| 68 | + end |
| 69 | + |
| 70 | + def get_all_internal(kind) |
| 71 | + fs = {} |
| 72 | + with_connection do |redis| |
| 73 | + hashfs = redis.hgetall(items_key(kind)) |
| 74 | + hashfs.each do |k, json_item| |
| 75 | + f = JSON.parse(json_item, symbolize_names: true) |
| 76 | + fs[k.to_sym] = f |
| 77 | + end |
| 78 | + end |
| 79 | + fs |
| 80 | + end |
| 81 | + |
| 82 | + def upsert_internal(kind, new_item) |
| 83 | + base_key = items_key(kind) |
| 84 | + key = new_item[:key] |
| 85 | + try_again = true |
| 86 | + final_item = new_item |
| 87 | + while try_again |
| 88 | + try_again = false |
| 89 | + with_connection do |redis| |
| 90 | + redis.watch(base_key) do |
| 91 | + old_item = get_redis(redis, kind, key) |
| 92 | + before_update_transaction(base_key, key) |
| 93 | + if old_item.nil? || old_item[:version] < new_item[:version] |
| 94 | + result = redis.multi do |multi| |
| 95 | + multi.hset(base_key, key, new_item.to_json) |
| 96 | + end |
| 97 | + if result.nil? |
| 98 | + @logger.debug { "RedisFeatureStore: concurrent modification detected, retrying" } |
| 99 | + try_again = true |
| 100 | + end |
| 101 | + else |
| 102 | + final_item = old_item |
| 103 | + action = new_item[:deleted] ? "delete" : "update" |
| 104 | + @logger.warn { "RedisFeatureStore: attempted to #{action} #{key} version: #{old_item[:version]} \ |
| 105 | + in '#{kind[:namespace]}' with a version that is the same or older: #{new_item[:version]}" } |
| 106 | + end |
| 107 | + redis.unwatch |
| 108 | + end |
| 109 | + end |
| 110 | + end |
| 111 | + final_item |
| 112 | + end |
| 113 | + |
| 114 | + def initialized_internal? |
| 115 | + with_connection { |redis| redis.exists(items_key(FEATURES)) } |
| 116 | + end |
| 117 | + |
| 118 | + def stop |
| 119 | + if @stopped.make_true |
| 120 | + @pool.shutdown { |redis| redis.close } |
| 121 | + end |
| 122 | + end |
| 123 | + |
| 124 | + private |
| 125 | + |
| 126 | + def before_update_transaction(base_key, key) |
| 127 | + @test_hook.before_update_transaction(base_key, key) if !@test_hook.nil? |
| 128 | + end |
| 129 | + |
| 130 | + def items_key(kind) |
| 131 | + @prefix + ":" + kind[:namespace] |
| 132 | + end |
| 133 | + |
| 134 | + def cache_key(kind, key) |
| 135 | + kind[:namespace] + ":" + key.to_s |
| 136 | + end |
| 137 | + |
| 138 | + def with_connection |
| 139 | + @pool.with { |redis| yield(redis) } |
| 140 | + end |
| 141 | + |
| 142 | + def get_redis(redis, kind, key) |
| 143 | + json_item = redis.hget(items_key(kind), key) |
| 144 | + json_item.nil? ? nil : JSON.parse(json_item, symbolize_names: true) |
| 145 | + end |
| 146 | + end |
| 147 | + end |
| 148 | + end |
| 149 | + end |
| 150 | +end |
0 commit comments