From 095804334e99cf90c14741113361b02b67c9630c Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Thu, 25 May 2017 10:45:42 +0200 Subject: [PATCH 1/6] Introduce first version of the :concurent_safe saver strategies to IC Introduce first version of the :concurent_safe saver strategies to IC --- .../manager_refresh/inventory_collection.rb | 55 +++++++++++++++++-- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/app/models/manager_refresh/inventory_collection.rb b/app/models/manager_refresh/inventory_collection.rb index 4a3689a0159..0fabf084f9c 100644 --- a/app/models/manager_refresh/inventory_collection.rb +++ b/app/models/manager_refresh/inventory_collection.rb @@ -6,7 +6,7 @@ class InventoryCollection :internal_attributes, :delete_method, :data, :data_index, :dependency_attributes, :manager_ref, :association, :complete, :update_only, :transitive_dependency_attributes, :custom_manager_uuid, :custom_db_finder, :check_changed, :arel, :builder_params, :loaded_references, :db_data_index, - :inventory_object_attributes, :name, :parent_inventory_collections, :manager_uuids, + :inventory_object_attributes, :name, :saver_strategy, :parent_inventory_collections, :manager_uuids, :skeletal_manager_uuids, :targeted_arel, :targeted, :manager_ref_allowed_nil delegate :each, :size, :to => :to_a @@ -286,6 +286,15 @@ class InventoryCollection # one. # @param name [Symbol] A unique name of the InventoryCollection under a Persister. If not provided, the :association # attribute is used. Providing either :name or :association is mandatory. + # @param saver_strategy [Symbol] A strategy that will be used for InventoryCollection persisting into the DB. + # Allowed saver strategies are: + # - :default => Using Rails saving methods, this way is not safe to run in multiple workers concurrently, + # since it will lead to non consistent data. + # - :concurrent_safe => This method is designed for concurrent saving. It uses atomic upsert to avoid + # data duplication and it uses timestamp based atomic checks to avoid new data being overwritten by the + # the old data. + # - :concurrent_safe_batch => Same as :concurrent_safe, but the upsert/update queries are executed as + # batched SQL queries, instead of sending 1 query per record. # @param parent_inventory_collections [Array] Array of symbols having a name of the # ManagerRefresh::InventoryCollection objects, that serve as parents to this InventoryCollection. Then this # InventoryCollection completeness will be encapsulated by the parent_inventory_collections :manager_uuids @@ -323,7 +332,7 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil custom_save_block: nil, delete_method: nil, data_index: nil, data: nil, dependency_attributes: nil, attributes_blacklist: nil, attributes_whitelist: nil, complete: nil, update_only: nil, check_changed: nil, custom_manager_uuid: nil, custom_db_finder: nil, arel: nil, builder_params: {}, - inventory_object_attributes: nil, unique_index_columns: nil, name: nil, + inventory_object_attributes: nil, unique_index_columns: nil, name: nil, saver_strategy: nil, parent_inventory_collections: nil, manager_uuids: [], targeted_arel: nil, targeted: nil, manager_ref_allowed_nil: nil) @model_class = model_class @@ -347,13 +356,14 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil @builder_params = builder_params @unique_index_columns = unique_index_columns @name = name || association + @saver_strategy = process_saver_strategy(saver_strategy) @manager_ref_allowed_nil = manager_ref_allowed_nil || [] # Targeted mode related attributes @manager_uuids = Set.new.merge(manager_uuids) @parent_inventory_collections = parent_inventory_collections - @skeletal_manager_uuids = Set.new + @skeletal_manager_uuids = Set.new.merge(manager_uuids) @targeted_arel = targeted_arel @targeted = !!targeted @@ -427,6 +437,18 @@ def to_raw_data end end + def process_saver_strategy(saver_strategy) + return :default unless saver_strategy + + case saver_strategy + when :default, :concurrent_safe, :concurrent_safe_batch + saver_strategy + else + raise "Unknown InventoryCollection saver strategy: :#{saver_strategy}, allowed strategies are "\ + ":default, :concurrent_safe and :concurrent_safe_batch" + end + end + def process_strategy(strategy_name) return unless strategy_name @@ -496,8 +518,8 @@ def unique_index_columns end if unique_indexes.blank? - raise "#{self} and its table #{model_class.table_name} must have a unique index defined, in order to use"\ - " strategy :stream_data." + raise "#{self} and its table #{model_class.table_name} must have a unique index defined, in order to use "\ + "saver_strategy :concurrent_safe or :concurrent_safe_batch." end @unique_index_columns = unique_indexes.first.columns end @@ -774,6 +796,11 @@ def inspect def scan!(indexed_inventory_collections) data.each do |inventory_object| scan_inventory_object!(inventory_object) + + if targeted? && inventory_object.inventory_collection.parent_inventory_collections.blank? + # We want to track what manager_uuids we should query from a db, for the targeted refresh + manager_uuids << inventory_object.manager_uuid + end end if targeted? && parent_inventory_collections.present? @@ -945,6 +972,24 @@ def scan_inventory_object_attribute!(key, value) # Storing attributes and their dependencies (dependency_attributes[key] ||= Set.new) << value.inventory_collection if value.dependency? + # For concurent safe strategies, we want to pre-build the relations using the lazy_link data, so we can fill up + # the foreign key in first pass. + if [:concurrent_safe, :concurrent_safe_batch].include?(saver_strategy) + if value.inventory_collection.manager_ref.size == 1 && inventory_object_lazy?(value) && + !value.ems_ref.blank? && value.key.nil? && value.dependency? + # Instead of loading the reference from the DB, we'll add the dummy InventoryObject (having only ems_ref and + # info from the builder_params) to the correct InventoryCollection. Which will either be found in the DB or + # created as a small dummy object. The refresh of the object will then fill the rest of the data, while not + # touching the reference. + + # TODO(lsmola) solve the :key, since that requires data from the actual reference. At best our DB should be + # designed the way, we don't duplicate the data, but rather get them with a join. (3NF!) + + value.inventory_collection.find_or_build(value.ems_ref) + value.inventory_collection.skeletal_manager_uuids + end + end + # Storing a reference in the target inventory_collection, then each IC knows about all the references and can # e.g. load all the referenced uuids from a DB value.inventory_collection.references << value.to_s From 1fb50eb93102d7b62d39e65c8397eda6511e7dff Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Thu, 25 May 2017 10:49:55 +0200 Subject: [PATCH 2/6] Allow :manager_uuids to be serialized/deserialized Allow :manager_uuids to be serialized/deserialized --- app/models/manager_refresh/inventory/persister.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/app/models/manager_refresh/inventory/persister.rb b/app/models/manager_refresh/inventory/persister.rb index 0f0b79d2d40..fa39dbd8db7 100644 --- a/app/models/manager_refresh/inventory/persister.rb +++ b/app/models/manager_refresh/inventory/persister.rb @@ -158,9 +158,9 @@ def to_raw_data next if collection.data.blank? { - :name => key, - :unique_uuids => [], # TODO(lsmola) allow to set a scope, so we can say it's a complete set of data - :data => collection.to_raw_data + :name => key, + :manager_uuids => collection.manager_uuids, + :data => collection.to_raw_data } end.compact @@ -192,6 +192,7 @@ def from_raw_data(persister_data) inventory_collection = persister.collections[collection['name'].try(:to_sym)] raise "Unrecognized InventoryCollection name: #{inventory_collection}" if inventory_collection.blank? + inventory_collection.manager_uuids.merge(collection['manager_uuids'] || []) inventory_collection.from_raw_data(collection['data'], persister.collections) end persister From 5117d028c6a268d439737bb95509d292209e4714 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Thu, 25 May 2017 10:51:30 +0200 Subject: [PATCH 3/6] Pick the right saver class based on configuration Pick the right saver class based on configuration --- app/models/manager_refresh/save_collection/base.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/models/manager_refresh/save_collection/base.rb b/app/models/manager_refresh/save_collection/base.rb index 34fa384c673..cd898968f75 100644 --- a/app/models/manager_refresh/save_collection/base.rb +++ b/app/models/manager_refresh/save_collection/base.rb @@ -18,7 +18,8 @@ def save_inventory_object_inventory(ems, inventory_collection) private def save_inventory(inventory_collection) - ManagerRefresh::SaveCollection::Saver::Default.new(inventory_collection).save_inventory_collection! + saver_class = "ManagerRefresh::SaveCollection::Saver::#{inventory_collection.saver_strategy.to_s.camelize}" + saver_class.constantize.new(inventory_collection).save_inventory_collection! end end end From 9668d009d16b60be593baacbb028a5136c9fd3fc Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Thu, 25 May 2017 10:52:23 +0200 Subject: [PATCH 4/6] Quickly skip the targeted IC that would not do any job Quickly skip the targeted IC that would not do any job --- app/models/manager_refresh/save_collection/saver/base.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/app/models/manager_refresh/save_collection/saver/base.rb b/app/models/manager_refresh/save_collection/saver/base.rb index 7d04116c014..a53dc71dea8 100644 --- a/app/models/manager_refresh/save_collection/saver/base.rb +++ b/app/models/manager_refresh/save_collection/saver/base.rb @@ -16,6 +16,11 @@ def initialize(inventory_collection) def save_inventory_collection! # If we have not data to save and delete is not allowed, we can just skip return if inventory_collection.data.blank? && !inventory_collection.delete_allowed? + # If we have a targeted InventoryCollection that wouldn't do anything + return if inventory_collection.targeted? && inventory_collection.manager_uuids.blank? && + inventory_collection.skeletal_manager_uuids.blank? && + inventory_collection.parent_inventory_collections.blank? && + inventory_collection.custom_save_block.nil? # TODO(lsmola) do I need to reload every time? Also it should be enough to clear the associations. inventory_collection.parent.reload if inventory_collection.parent From 30c167b79194372ce1ac9728efe54522183307f7 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Thu, 25 May 2017 10:54:02 +0200 Subject: [PATCH 5/6] First version of a :concurent_safe saver_strategy First version of a :concurent_safe saver_strategy --- .../save_collection/saver/concurrent_safe.rb | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 app/models/manager_refresh/save_collection/saver/concurrent_safe.rb diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe.rb new file mode 100644 index 00000000000..50dccbda031 --- /dev/null +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe.rb @@ -0,0 +1,120 @@ +module ManagerRefresh::SaveCollection + module Saver + class ConcurrentSafe < ManagerRefresh::SaveCollection::Saver::Base + private + + def save!(inventory_collection, association) + attributes_index = {} + inventory_objects_index = {} + inventory_collection.each do |inventory_object| + attributes = inventory_object.attributes(inventory_collection) + index = inventory_object.manager_uuid + + attributes_index[index] = attributes + inventory_objects_index[index] = inventory_object + end + + inventory_collection_size = inventory_collection.size + deleted_counter = 0 + created_counter = 0 + _log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************") + # Records that are in the DB, we will be updating or deleting them. + association.find_in_batches do |batch| + ActiveRecord::Base.transaction do + batch.each do |record| + next unless assert_distinct_relation(record) + + index = inventory_collection.object_index_with_keys(unique_index_keys, record) + + inventory_object = inventory_objects_index.delete(index) + hash = attributes_index.delete(index) + + if inventory_object.nil? + # Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should + # delete it from the DB. + # TODO(lsmola) do a transaction for a batches of deletion + deleted_counter += 1 if delete_record!(inventory_collection, record) + else + # Record was found in the DB and sent for saving, we will be updating the DB. + update_record!(inventory_collection, record, hash, inventory_object) + end + end + end + end + + # Records that were not found in the DB but sent for saving, we will be creating these in the DB. + if inventory_collection.create_allowed? + inventory_objects_index.each_slice(1000) do |batch| + ActiveRecord::Base.transaction do + batch.each do |index, inventory_object| + hash = attributes_index.delete(index) + create_record!(inventory_collection, hash, inventory_object) + created_counter += 1 + end + end + end + end + _log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\ + "updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************") + end + + def delete_record!(inventory_collection, record) + return false unless inventory_collection.delete_allowed? + record.public_send(inventory_collection.delete_method) + true + end + + def update_record!(inventory_collection, record, hash, inventory_object) + record.assign_attributes(hash.except(:id, :type)) + + # TODO(lsmola) ignore all N:M relations, since we use pure SQL, all N:M needs to be modeled as a separate IC, or + # can we process those automatically? Using a convention? But still, it needs to be a separate IC, to have + # efficient saving. + hash.reject! { |_key, value| value.kind_of?(Array) } + + if !inventory_object.inventory_collection.check_changed? || record.changed? + update_query = inventory_object.inventory_collection.model_class.where(:id => record.id) + if hash[:remote_data_timestamp] + timestamp_field = inventory_collection.model_class.arel_table[:remote_data_timestamp] + update_query = update_query.where(timestamp_field.lt(hash[:remote_data_timestamp])) + end + + update_query.update_all(hash) + end + + inventory_object.id = record.id + end + + def create_record!(inventory_collection, hash, inventory_object) + return unless assert_referential_integrity(hash, inventory_object) + + hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].blank? + table_name = inventory_object.inventory_collection.model_class.table_name + insert_query = %{ + INSERT INTO #{table_name} (#{hash.keys.join(", ")}) + VALUES + ( + #{hash.values.map { |x| ActiveRecord::Base.connection.quote(x) }.join(", ")} + ) + ON CONFLICT (#{inventory_object.inventory_collection.unique_index_columns.join(", ")}) + DO + UPDATE + SET #{hash.keys.map { |x| "#{x} = EXCLUDED.#{x}" }.join(", ")} + } + # TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change + # the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of + # changing ems_id could lead to putting it back by a refresh. + + # This conditional will avoid rewriting new data by old data. But we want it only when remote_data_timestamp is a + # part of the data, since for the fake records, we just want to update ems_ref. + if hash[:remote_data_timestamp].present? + insert_query += %{ + WHERE EXCLUDED.remote_data_timestamp IS NULL OR (EXCLUDED.remote_data_timestamp > #{table_name}.remote_data_timestamp) + } + end + result_id = ActiveRecord::Base.connection.insert_sql(insert_query) + inventory_object.id = result_id + end + end + end +end From 860a8c616a0b78fa7a7d5126eef28f20d47ee53d Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Thu, 25 May 2017 10:56:11 +0200 Subject: [PATCH 6/6] Saving strategy :concurent_safe_batch placeholder Saving strategy :concurent_safe_batch placeholder --- .../save_collection/saver/concurrent_safe_batch.rb | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb new file mode 100644 index 00000000000..a2cb54e015a --- /dev/null +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -0,0 +1,11 @@ +module ManagerRefresh::SaveCollection + module Saver + class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base + private + + def save!(_inventory_collection, _association) + raise "saver_strategy :concurent_safe_batch is not implemented" + end + end + end +end