|
| 1 | +require "concurrent/atomics" |
| 2 | +require "json" |
| 3 | + |
| 4 | +module LaunchDarkly |
| 5 | + module Impl |
| 6 | + module Integrations |
| 7 | + module DynamoDB |
| 8 | + # |
| 9 | + # Internal implementation of the DynamoDB feature store, intended to be used with CachingStoreWrapper. |
| 10 | + # |
| 11 | + class DynamoDBFeatureStoreCore |
| 12 | + begin |
| 13 | + require "aws-sdk-dynamodb" |
| 14 | + AWS_SDK_ENABLED = true |
| 15 | + rescue ScriptError, StandardError |
| 16 | + begin |
| 17 | + require "aws-sdk" |
| 18 | + AWS_SDK_ENABLED = true |
| 19 | + rescue ScriptError, StandardError |
| 20 | + AWS_SDK_ENABLED = false |
| 21 | + end |
| 22 | + end |
| 23 | + |
| 24 | + PARTITION_KEY = "namespace" |
| 25 | + SORT_KEY = "key" |
| 26 | + |
| 27 | + VERSION_ATTRIBUTE = "version" |
| 28 | + ITEM_JSON_ATTRIBUTE = "item" |
| 29 | + |
| 30 | + def initialize(table_name, opts) |
| 31 | + if !AWS_SDK_ENABLED |
| 32 | + raise RuntimeError.new("can't use DynamoDB feature store without the aws-sdk or aws-sdk-dynamodb gem") |
| 33 | + end |
| 34 | + |
| 35 | + @table_name = table_name |
| 36 | + @prefix = opts[:prefix] |
| 37 | + @logger = opts[:logger] || Config.default_logger |
| 38 | + |
| 39 | + @stopped = Concurrent::AtomicBoolean.new(false) |
| 40 | + |
| 41 | + if !opts[:existing_client].nil? |
| 42 | + @client = opts[:existing_client] |
| 43 | + else |
| 44 | + @client = Aws::DynamoDB::Client.new(opts[:dynamodb_opts]) |
| 45 | + end |
| 46 | + |
| 47 | + @logger.info("DynamoDBFeatureStore: using DynamoDB table \"#{table_name}\"") |
| 48 | + end |
| 49 | + |
| 50 | + def init_internal(all_data) |
| 51 | + # Start by reading the existing keys; we will later delete any of these that weren't in all_data. |
| 52 | + unused_old_keys = read_existing_keys(all_data.keys) |
| 53 | + |
| 54 | + requests = [] |
| 55 | + num_items = 0 |
| 56 | + |
| 57 | + # Insert or update every provided item |
| 58 | + all_data.each do |kind, items| |
| 59 | + items.values.each do |item| |
| 60 | + requests.push({ put_request: { item: marshal_item(kind, item) } }) |
| 61 | + unused_old_keys.delete([ namespace_for_kind(kind), item[:key] ]) |
| 62 | + num_items = num_items + 1 |
| 63 | + end |
| 64 | + end |
| 65 | + |
| 66 | + # Now delete any previously existing items whose keys were not in the current data |
| 67 | + unused_old_keys.each do |tuple| |
| 68 | + del_item = make_keys_hash(tuple[0], tuple[1]) |
| 69 | + requests.push({ delete_request: { key: del_item } }) |
| 70 | + end |
| 71 | + |
| 72 | + # Now set the special key that we check in initialized_internal? |
| 73 | + inited_item = make_keys_hash(inited_key, inited_key) |
| 74 | + requests.push({ put_request: { item: inited_item } }) |
| 75 | + |
| 76 | + DynamoDBUtil.batch_write_requests(@client, @table_name, requests) |
| 77 | + |
| 78 | + @logger.info { "Initialized table #{@table_name} with #{num_items} items" } |
| 79 | + end |
| 80 | + |
| 81 | + def get_internal(kind, key) |
| 82 | + resp = get_item_by_keys(namespace_for_kind(kind), key) |
| 83 | + unmarshal_item(resp.item) |
| 84 | + end |
| 85 | + |
| 86 | + def get_all_internal(kind) |
| 87 | + items_out = {} |
| 88 | + req = make_query_for_kind(kind) |
| 89 | + while true |
| 90 | + resp = @client.query(req) |
| 91 | + resp.items.each do |item| |
| 92 | + item_out = unmarshal_item(item) |
| 93 | + items_out[item_out[:key].to_sym] = item_out |
| 94 | + end |
| 95 | + break if resp.last_evaluated_key.nil? || resp.last_evaluated_key.length == 0 |
| 96 | + req.exclusive_start_key = resp.last_evaluated_key |
| 97 | + end |
| 98 | + items_out |
| 99 | + end |
| 100 | + |
| 101 | + def upsert_internal(kind, new_item) |
| 102 | + encoded_item = marshal_item(kind, new_item) |
| 103 | + begin |
| 104 | + @client.put_item({ |
| 105 | + table_name: @table_name, |
| 106 | + item: encoded_item, |
| 107 | + condition_expression: "attribute_not_exists(#namespace) or attribute_not_exists(#key) or :version > #version", |
| 108 | + expression_attribute_names: { |
| 109 | + "#namespace" => PARTITION_KEY, |
| 110 | + "#key" => SORT_KEY, |
| 111 | + "#version" => VERSION_ATTRIBUTE |
| 112 | + }, |
| 113 | + expression_attribute_values: { |
| 114 | + ":version" => new_item[:version] |
| 115 | + } |
| 116 | + }) |
| 117 | + new_item |
| 118 | + rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException |
| 119 | + # The item was not updated because there's a newer item in the database. |
| 120 | + # We must now read the item that's in the database and return it, so CachingStoreWrapper can cache it. |
| 121 | + get_internal(kind, new_item[:key]) |
| 122 | + end |
| 123 | + end |
| 124 | + |
| 125 | + def initialized_internal? |
| 126 | + resp = get_item_by_keys(inited_key, inited_key) |
| 127 | + !resp.item.nil? && resp.item.length > 0 |
| 128 | + end |
| 129 | + |
| 130 | + def stop |
| 131 | + # AWS client doesn't seem to have a close method |
| 132 | + end |
| 133 | + |
| 134 | + private |
| 135 | + |
| 136 | + def prefixed_namespace(base_str) |
| 137 | + (@prefix.nil? || @prefix == "") ? base_str : "#{@prefix}:#{base_str}" |
| 138 | + end |
| 139 | + |
| 140 | + def namespace_for_kind(kind) |
| 141 | + prefixed_namespace(kind[:namespace]) |
| 142 | + end |
| 143 | + |
| 144 | + def inited_key |
| 145 | + prefixed_namespace("$inited") |
| 146 | + end |
| 147 | + |
| 148 | + def make_keys_hash(namespace, key) |
| 149 | + { |
| 150 | + PARTITION_KEY => namespace, |
| 151 | + SORT_KEY => key |
| 152 | + } |
| 153 | + end |
| 154 | + |
| 155 | + def make_query_for_kind(kind) |
| 156 | + { |
| 157 | + table_name: @table_name, |
| 158 | + consistent_read: true, |
| 159 | + key_conditions: { |
| 160 | + PARTITION_KEY => { |
| 161 | + comparison_operator: "EQ", |
| 162 | + attribute_value_list: [ namespace_for_kind(kind) ] |
| 163 | + } |
| 164 | + } |
| 165 | + } |
| 166 | + end |
| 167 | + |
| 168 | + def get_item_by_keys(namespace, key) |
| 169 | + @client.get_item({ |
| 170 | + table_name: @table_name, |
| 171 | + key: make_keys_hash(namespace, key) |
| 172 | + }) |
| 173 | + end |
| 174 | + |
| 175 | + def read_existing_keys(kinds) |
| 176 | + keys = Set.new |
| 177 | + kinds.each do |kind| |
| 178 | + req = make_query_for_kind(kind).merge({ |
| 179 | + projection_expression: "#namespace, #key", |
| 180 | + expression_attribute_names: { |
| 181 | + "#namespace" => PARTITION_KEY, |
| 182 | + "#key" => SORT_KEY |
| 183 | + } |
| 184 | + }) |
| 185 | + while true |
| 186 | + resp = @client.query(req) |
| 187 | + resp.items.each do |item| |
| 188 | + namespace = item[PARTITION_KEY] |
| 189 | + key = item[SORT_KEY] |
| 190 | + keys.add([ namespace, key ]) |
| 191 | + end |
| 192 | + break if resp.last_evaluated_key.nil? || resp.last_evaluated_key.length == 0 |
| 193 | + req.exclusive_start_key = resp.last_evaluated_key |
| 194 | + end |
| 195 | + end |
| 196 | + keys |
| 197 | + end |
| 198 | + |
| 199 | + def marshal_item(kind, item) |
| 200 | + make_keys_hash(namespace_for_kind(kind), item[:key]).merge({ |
| 201 | + VERSION_ATTRIBUTE => item[:version], |
| 202 | + ITEM_JSON_ATTRIBUTE => item.to_json |
| 203 | + }) |
| 204 | + end |
| 205 | + |
| 206 | + def unmarshal_item(item) |
| 207 | + return nil if item.nil? || item.length == 0 |
| 208 | + json_attr = item[ITEM_JSON_ATTRIBUTE] |
| 209 | + raise RuntimeError.new("DynamoDB map did not contain expected item string") if json_attr.nil? |
| 210 | + JSON.parse(json_attr, symbolize_names: true) |
| 211 | + end |
| 212 | + end |
| 213 | + |
| 214 | + class DynamoDBUtil |
| 215 | + # |
| 216 | + # Calls client.batch_write_item as many times as necessary to submit all of the given requests. |
| 217 | + # The requests array is consumed. |
| 218 | + # |
| 219 | + def self.batch_write_requests(client, table, requests) |
| 220 | + batch_size = 25 |
| 221 | + while true |
| 222 | + chunk = requests.shift(batch_size) |
| 223 | + break if chunk.empty? |
| 224 | + client.batch_write_item({ request_items: { table => chunk } }) |
| 225 | + end |
| 226 | + end |
| 227 | + end |
| 228 | + end |
| 229 | + end |
| 230 | + end |
| 231 | +end |
0 commit comments