|
| 1 | +require "json" |
| 2 | + |
| 3 | +module LaunchDarkly |
| 4 | + module Impl |
| 5 | + module Integrations |
| 6 | + module Consul |
| 7 | + # |
| 8 | + # Internal implementation of the Consul feature store, intended to be used with CachingStoreWrapper. |
| 9 | + # |
| 10 | + class ConsulFeatureStoreCore |
| 11 | + begin |
| 12 | + require "diplomat" |
| 13 | + CONSUL_ENABLED = true |
| 14 | + rescue ScriptError, StandardError |
| 15 | + CONSUL_ENABLED = false |
| 16 | + end |
| 17 | + |
| 18 | + def initialize(opts) |
| 19 | + if !CONSUL_ENABLED |
| 20 | + raise RuntimeError.new("can't use Consul feature store without the 'diplomat' gem") |
| 21 | + end |
| 22 | + |
| 23 | + @prefix = (opts[:prefix] || LaunchDarkly::Integrations::Consul.default_prefix) + '/' |
| 24 | + @logger = opts[:logger] || Config.default_logger |
| 25 | + Diplomat.configuration = opts[:consul_config] if !opts[:consul_config].nil? |
| 26 | + @logger.info("ConsulFeatureStore: using Consul host at #{Diplomat.configuration.url}") |
| 27 | + end |
| 28 | + |
| 29 | + def init_internal(all_data) |
| 30 | + # Start by reading the existing keys; we will later delete any of these that weren't in all_data. |
| 31 | + unused_old_keys = Set.new |
| 32 | + keys = Diplomat::Kv.get(@prefix, { keys: true, recurse: true }, :return) |
| 33 | + unused_old_keys.merge(keys) if keys != "" |
| 34 | + |
| 35 | + ops = [] |
| 36 | + num_items = 0 |
| 37 | + |
| 38 | + # Insert or update every provided item |
| 39 | + all_data.each do |kind, items| |
| 40 | + items.values.each do |item| |
| 41 | + value = item.to_json |
| 42 | + key = item_key(kind, item[:key]) |
| 43 | + ops.push({ 'KV' => { 'Verb' => 'set', 'Key' => key, 'Value' => value } }) |
| 44 | + unused_old_keys.delete(key) |
| 45 | + num_items = num_items + 1 |
| 46 | + end |
| 47 | + end |
| 48 | + |
| 49 | + # Now delete any previously existing items whose keys were not in the current data |
| 50 | + unused_old_keys.each do |key| |
| 51 | + ops.push({ 'KV' => { 'Verb' => 'delete', 'Key' => key } }) |
| 52 | + end |
| 53 | + |
| 54 | + # Now set the special key that we check in initialized_internal? |
| 55 | + ops.push({ 'KV' => { 'Verb' => 'set', 'Key' => inited_key, 'Value' => '' } }) |
| 56 | + |
| 57 | + ConsulUtil.batch_operations(ops) |
| 58 | + |
| 59 | + @logger.info { "Initialized database with #{num_items} items" } |
| 60 | + end |
| 61 | + |
| 62 | + def get_internal(kind, key) |
| 63 | + value = Diplomat::Kv.get(item_key(kind, key), {}, :return) # :return means "don't throw an error if not found" |
| 64 | + (value.nil? || value == "") ? nil : JSON.parse(value, symbolize_names: true) |
| 65 | + end |
| 66 | + |
| 67 | + def get_all_internal(kind) |
| 68 | + items_out = {} |
| 69 | + results = Diplomat::Kv.get(kind_key(kind), { recurse: true }, :return) |
| 70 | + (results == "" ? [] : results).each do |result| |
| 71 | + value = result[:value] |
| 72 | + if !value.nil? |
| 73 | + item = JSON.parse(value, symbolize_names: true) |
| 74 | + items_out[item[:key].to_sym] = item |
| 75 | + end |
| 76 | + end |
| 77 | + items_out |
| 78 | + end |
| 79 | + |
| 80 | + def upsert_internal(kind, new_item) |
| 81 | + key = item_key(kind, new_item[:key]) |
| 82 | + json = new_item.to_json |
| 83 | + |
| 84 | + # We will potentially keep retrying indefinitely until someone's write succeeds |
| 85 | + while true |
| 86 | + old_value = Diplomat::Kv.get(key, { decode_values: true }, :return) |
| 87 | + if old_value.nil? || old_value == "" |
| 88 | + mod_index = 0 |
| 89 | + else |
| 90 | + old_item = JSON.parse(old_value[0]["Value"], symbolize_names: true) |
| 91 | + # Check whether the item is stale. If so, don't do the update (and return the existing item to |
| 92 | + # FeatureStoreWrapper so it can be cached) |
| 93 | + if old_item[:version] >= new_item[:version] |
| 94 | + return old_item |
| 95 | + end |
| 96 | + mod_index = old_value[0]["ModifyIndex"] |
| 97 | + end |
| 98 | + |
| 99 | + # Otherwise, try to write. We will do a compare-and-set operation, so the write will only succeed if |
| 100 | + # the key's ModifyIndex is still equal to the previous value. If the previous ModifyIndex was zero, |
| 101 | + # it means the key did not previously exist and the write will only succeed if it still doesn't exist. |
| 102 | + success = Diplomat::Kv.put(key, json, cas: mod_index) |
| 103 | + return new_item if success |
| 104 | + |
| 105 | + # If we failed, retry the whole shebang |
| 106 | + @logger.debug { "Concurrent modification detected, retrying" } |
| 107 | + end |
| 108 | + end |
| 109 | + |
| 110 | + def initialized_internal? |
| 111 | + # Unfortunately we need to use exceptions here, instead of the :return parameter, because with |
| 112 | + # :return there's no way to distinguish between a missing value and an empty string. |
| 113 | + begin |
| 114 | + Diplomat::Kv.get(inited_key, {}) |
| 115 | + true |
| 116 | + rescue Diplomat::KeyNotFound |
| 117 | + false |
| 118 | + end |
| 119 | + end |
| 120 | + |
| 121 | + def stop |
| 122 | + # There's no Consul client instance to dispose of |
| 123 | + end |
| 124 | + |
| 125 | + private |
| 126 | + |
| 127 | + def item_key(kind, key) |
| 128 | + kind_key(kind) + key.to_s |
| 129 | + end |
| 130 | + |
| 131 | + def kind_key(kind) |
| 132 | + @prefix + kind[:namespace] + '/' |
| 133 | + end |
| 134 | + |
| 135 | + def inited_key |
| 136 | + @prefix + '$inited' |
| 137 | + end |
| 138 | + end |
| 139 | + |
| 140 | + class ConsulUtil |
| 141 | + # |
| 142 | + # Submits as many transactions as necessary to submit all of the given operations. |
| 143 | + # The ops array is consumed. |
| 144 | + # |
| 145 | + def self.batch_operations(ops) |
| 146 | + batch_size = 64 # Consul can only do this many at a time |
| 147 | + while true |
| 148 | + chunk = ops.shift(batch_size) |
| 149 | + break if chunk.empty? |
| 150 | + Diplomat::Kv.txn(chunk) |
| 151 | + end |
| 152 | + end |
| 153 | + end |
| 154 | + end |
| 155 | + end |
| 156 | + end |
| 157 | +end |
0 commit comments