From 39e25321b395905f45af38672a3e4ba7995a5c1f Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Thu, 3 Feb 2022 13:45:52 -0500 Subject: [PATCH] Revert "Merge pull request #76 from Ladas/simplify_the_records_iterator" This reverts commit 84fa1f71a3a1e5d3d922cbe26132ba7ad514c1ba, reversing changes made to 334a945c421f9313fcf8d6961147a70e766850e3. --- .../application_record_iterator.rb | 29 ++++++++++--- lib/inventory_refresh/inventory_collection.rb | 20 +-------- .../save_collection/saver/base.rb | 11 +++-- .../saver/concurrent_safe_batch.rb | 42 ++++++++++++++++-- .../saver/partial_upsert_helper.rb | 18 -------- spec/persister/parallel_saving_spec.rb | 40 ++++++----------- spec/persister/sweep_inactive_records_spec.rb | 43 ++++++++++--------- spec/persister/test_collector.rb | 16 +++---- 8 files changed, 110 insertions(+), 109 deletions(-) diff --git a/lib/inventory_refresh/application_record_iterator.rb b/lib/inventory_refresh/application_record_iterator.rb index d6392aa7..e69d16ea 100644 --- a/lib/inventory_refresh/application_record_iterator.rb +++ b/lib/inventory_refresh/application_record_iterator.rb @@ -1,12 +1,20 @@ module InventoryRefresh class ApplicationRecordIterator - attr_reader :inventory_collection + attr_reader :inventory_collection, :manager_uuids_set, :iterator, :query - # An iterator that can fetch batches of the AR objects based on a set of attribute_indexes + # An iterator that can fetch batches of the AR objects based on a set of manager refs, or just mimics AR relation + # when given an iterator. Or given query, acts as iterator by selecting batches. # # @param inventory_collection [InventoryRefresh::InventoryCollection] Inventory collection owning the iterator - def initialize(inventory_collection: nil) + # @param manager_uuids_set [Array] Array of references we want to + # fetch from the DB + # @param iterator [Proc] Block based iterator + # @query query [ActiveRecord::Relation] Existing query we want to use for querying the db + def initialize(inventory_collection: nil, manager_uuids_set: nil, iterator: nil, query: nil) @inventory_collection = inventory_collection + @manager_uuids_set = manager_uuids_set + @iterator = iterator + @query = query end # Iterator that mimics find_in_batches of ActiveRecord::Relation. This iterator serves for making more optimized query @@ -17,11 +25,20 @@ def initialize(inventory_collection: nil) # and relation.where(:id => 500ids) # # @param batch_size [Integer] A batch size we want to fetch from DB - # @param attributes_index [Hash{String => Hash}] Indexed hash with data we will be saving # @yield Code processing the batches def find_in_batches(batch_size: 1000, attributes_index: {}) - attributes_index.each_slice(batch_size) do |batch| - yield(inventory_collection.db_collection_for_comparison_for(batch)) + if iterator + iterator.call do |batch| + yield(batch) + end + elsif query + attributes_index.each_slice(batch_size) do |batch| + yield(query.where(inventory_collection.targeted_selection_for(batch))) + end + else + attributes_index.each_slice(batch_size) do |batch| + yield(inventory_collection.db_collection_for_comparison_for(batch)) + end end end diff --git a/lib/inventory_refresh/inventory_collection.rb b/lib/inventory_refresh/inventory_collection.rb index 69d69952..327cceb5 100644 --- a/lib/inventory_refresh/inventory_collection.rb +++ b/lib/inventory_refresh/inventory_collection.rb @@ -486,30 +486,12 @@ def targeted_selection_for(references) build_multi_selection_condition(references.map(&:second)) end - def select_keys - @select_keys ||= [@model_class.primary_key] + manager_ref_to_cols.map(&:to_s) + internal_columns.map(&:to_s) - end - - # @return [ActiveRecord::ConnectionAdapters::AbstractAdapter] ActiveRecord connection - def get_connection - ActiveRecord::Base.connection - end - - def pure_sql_record_fetching? - !use_ar_object? - end - # Builds an ActiveRecord::Relation that can fetch all the references from the DB # # @param references [Hash{String => InventoryRefresh::InventoryCollection::Reference}] passed references # @return [ActiveRecord::Relation] relation that can fetch all the references from the DB def db_collection_for_comparison_for(references) - query = full_collection_for_comparison.where(targeted_selection_for(references)) - if pure_sql_record_fetching? - return get_connection.query(query.select(*select_keys).to_sql) - end - - query + full_collection_for_comparison.where(targeted_selection_for(references)) end # @return [ActiveRecord::Relation] relation that can fetch all the references from the DB diff --git a/lib/inventory_refresh/save_collection/saver/base.rb b/lib/inventory_refresh/save_collection/saver/base.rb index 024f7ca9..4afeac71 100644 --- a/lib/inventory_refresh/save_collection/saver/base.rb +++ b/lib/inventory_refresh/save_collection/saver/base.rb @@ -26,10 +26,15 @@ def initialize(inventory_collection) @unique_db_primary_keys = Set.new @unique_db_indexes = Set.new + # Right now ApplicationRecordIterator in association is used for targeted refresh. Given the small amount of + # records flowing through there, we probably don't need to optimize that association to fetch a pure SQL. + # TODO(lsmola) since we save everything through targeted mode, we want to optimize this + @pure_sql_records_fetching = !inventory_collection.use_ar_object? && !@association.kind_of?(InventoryRefresh::ApplicationRecordIterator) + @batch_size_for_persisting = inventory_collection.batch_size_pure_sql - @batch_size = inventory_collection.use_ar_object? ? @batch_size_for_persisting : inventory_collection.batch_size - @record_key_method = inventory_collection.pure_sql_record_fetching? ? :pure_sql_record_key : :ar_record_key + @batch_size = @pure_sql_records_fetching ? @batch_size_for_persisting : inventory_collection.batch_size + @record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key @select_keys_indexes = @select_keys.each_with_object({}).with_index { |(key, obj), index| obj[key.to_s] = index } @pg_types = @model_class.attribute_names.each_with_object({}) do |key, obj| obj[key.to_sym] = inventory_collection.model_class.columns_hash[key] @@ -115,7 +120,7 @@ def transform_to_hash!(all_attribute_keys, hash) private attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes, - :primary_key, :arel_primary_key, :record_key_method, :select_keys_indexes, + :primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes, :batch_size, :batch_size_for_persisting, :model_class, :serializable_keys, :deserializable_keys, :pg_types, :table_name, :q_table_name diff --git a/lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb b/lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb index 45cd5268..52222f9e 100644 --- a/lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/lib/inventory_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -46,6 +46,42 @@ def pure_sql_record_key(record, key) record[select_keys_indexes[key]] end + # Returns iterator or relation based on settings + # + # @param association [Symbol] An existing association on manager + # @return [ActiveRecord::Relation, InventoryRefresh::ApplicationRecordIterator] iterator or relation based on settings + def batch_iterator(association) + if pure_sql_records_fetching + # Building fast iterator doing pure SQL query and therefore avoiding redundant creation of AR objects. The + # iterator responds to find_in_batches, so it acts like the AR relation. For targeted refresh, the association + # can already be ApplicationRecordIterator, so we will skip that. + # TODO(lsmola) Since everything is targeted now, we want to probably delete this branch and make iterator to + # do pure SQL queries, if there will be no .check_changed? + pure_sql_iterator = lambda do |&block| + primary_key_offset = nil + loop do + relation = association.select(*select_keys) + .reorder("#{primary_key} ASC") + .limit(batch_size) + # Using rails way of comparing primary key instead of offset + relation = relation.where(arel_primary_key.gt(primary_key_offset)) if primary_key_offset + records = get_connection.query(relation.to_sql) + last_record = records.last + block.call(records) + + break if records.size < batch_size + primary_key_offset = record_key(last_record, primary_key) + end + end + + InventoryRefresh::ApplicationRecordIterator.new(:iterator => pure_sql_iterator) + else + # Normal Rails ActiveRecord::Relation where we can call find_in_batches or + # InventoryRefresh::ApplicationRecordIterator passed from targeted refresh + association + end + end + # Saves the InventoryCollection # # @param association [Symbol] An existing association on manager @@ -69,7 +105,7 @@ def save!(association) logger.debug("Processing #{inventory_collection} of size #{inventory_collection.size}...") unless inventory_collection.create_only? - update_or_destroy_records!(association, inventory_objects_index, attributes_index, all_attribute_keys) + update_or_destroy_records!(batch_iterator(association), inventory_objects_index, attributes_index, all_attribute_keys) end unless inventory_collection.create_only? @@ -176,9 +212,9 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, [:resource_counter, :resource_counters_max] end - next if skeletonize_or_skip_record(record_key(record, version_attr.to_s), + next if skeletonize_or_skip_record(record_key(record, version_attr), hash[version_attr], - record_key(record, max_version_attr.to_s), + record_key(record, max_version_attr), inventory_object) end diff --git a/lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb b/lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb index 31e9353e..6d94195e 100644 --- a/lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb +++ b/lib/inventory_refresh/save_collection/saver/partial_upsert_helper.rb @@ -200,25 +200,7 @@ def create_partial!(all_attribute_keys, hashes, on_conflict: nil, column_name: n ) end - def comparable_timestamp(timestamp) - # Lets cast all timestamps to to_f, rounding the time comparing precision to miliseconds, that should be - # enough, since we are the ones setting the record version in collector. Otherwise we will have hard time with - # doing equality, since the value changes going through DB (DB. cuts it at 5 decimal places) - - if timestamp.kind_of?(String) - Time.use_zone('UTC') { Time.zone.parse(timestamp) }.to_f.round(3) - elsif timestamp.kind_of?(Time) - timestamp.in_time_zone('UTC').to_f.round(3) - else - timestamp - end - end - def skeletonize_or_skip_record(record_version, hash_version, record_versions_max, inventory_object) - record_version = comparable_timestamp(record_version) - record_versions_max = comparable_timestamp(record_versions_max) if record_versions_max - hash_version = comparable_timestamp(hash_version) - # Skip updating this record, because it is old return true if record_version && hash_version && record_version >= hash_version diff --git a/spec/persister/parallel_saving_spec.rb b/spec/persister/parallel_saving_spec.rb index 11ca9c63..1bf3eda8 100644 --- a/spec/persister/parallel_saving_spec.rb +++ b/spec/persister/parallel_saving_spec.rb @@ -12,34 +12,18 @@ end [{ - :upsert_only => true, - :parallel_saving_column => "resource_counter", - }, { - :upsert_only => false, - :parallel_saving_column => "resource_counter", - }, { - :upsert_only => true, - :parallel_saving_column => "resource_timestamp", - }, { - :upsert_only => false, - :parallel_saving_column => "resource_timestamp", - }, { - :upsert_only => true, - :parallel_saving_column => "resource_timestamp", - :use_ar_object => true, - }, { - :upsert_only => false, - :parallel_saving_column => "resource_timestamp", - :use_ar_object => true, - }, { - :upsert_only => true, - :parallel_saving_column => "resource_timestamp", - :use_ar_object => false, - }, { - :upsert_only => false, - :parallel_saving_column => "resource_timestamp", - :use_ar_object => false, - }].each do |settings| + :upsert_only => true, + :parallel_saving_column => "resource_counter", + }, { + :upsert_only => false, + :parallel_saving_column => "resource_counter", + }, { + :upsert_only => true, + :parallel_saving_column => "resource_timestamp", + }, { + :upsert_only => false, + :parallel_saving_column => "resource_timestamp", + },].each do |settings| context "with settings #{settings}" do before(:each) do if settings[:upsert_only] diff --git a/spec/persister/sweep_inactive_records_spec.rb b/spec/persister/sweep_inactive_records_spec.rb index 3721280c..a62b2c35 100644 --- a/spec/persister/sweep_inactive_records_spec.rb +++ b/spec/persister/sweep_inactive_records_spec.rb @@ -15,13 +15,12 @@ context "with config #{config}" do context "with :retention_strategy => 'archive'" do it "automatically fills :last_seen_at timestamp for refreshed entities and archives them in last step" do - time_now = Time.now.utc - time_before = Time.now.utc - 20.seconds - time_more_before = Time.now.utc - 40.seconds - _time_after = Time.now.utc + 20.seconds + time_now = Time.now.utc + time_before = Time.now.utc - 20.seconds + time_after = Time.now.utc + 20.seconds cg1 = FactoryBot.create(:container_group, container_group_data(1).merge(:ext_management_system => @ems, :resource_timestamp => time_before)) - cg2 = FactoryBot.create(:container_group, container_group_data(2).merge(:ext_management_system => @ems, :resource_timestamp => time_more_before)) + _cg2 = FactoryBot.create(:container_group, container_group_data(2).merge(:ext_management_system => @ems, :resource_timestamp => time_after)) _cg3 = FactoryBot.create(:container_group, container_group_data(3).merge(:ext_management_system => @ems, :resource_timestamp => time_now)) _cg4 = FactoryBot.create(:container_group, container_group_data(4).merge(:ext_management_system => @ems, :last_seen_at => time_before)) _cg6 = FactoryBot.create(:container_group, container_group_data(6).merge(:ext_management_system => @ems, :last_seen_at => time_before)) @@ -41,10 +40,10 @@ persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before)) persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before)) persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before)) - persister = persist(persister, config) + persist(persister, config) # We update just the first record, and last_seen_at is updated for all records involved - expect(persister.container_groups.updated_records).to(match_array([{:id => cg2.id}])) + expect(persister.container_groups.updated_records).to(match_array([{:id => cg1.id}])) date_field = ContainerGroup.arel_table[:last_seen_at] expect(ContainerGroup.where(date_field.gt(time_now)).pluck(:ems_ref)).to( @@ -62,7 +61,7 @@ persister.refresh_state_part_uuid = part2_uuid persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before)) - persister = persist(persister, config) + persist(persister, config) date_field = ContainerGroup.arel_table[:last_seen_at] expect(ContainerGroup.where(date_field.gt(time_now)).pluck(:ems_ref)).to( @@ -74,6 +73,7 @@ container_group_data(7)[:ems_ref]]) ) + # Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column # Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column persister.sweep_scope = ["container_groups", "container_nodes"] sweep(persister, time_now, config) @@ -121,7 +121,7 @@ persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before)) persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before)) persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before)) - _persister = persist(persister, config) + persist(persister, config) # Refresh second part and mark :last_seen_at persister = create_containers_persister(:retention_strategy => "archive") @@ -129,7 +129,7 @@ persister.refresh_state_part_uuid = part2_uuid persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before)) - persister = persist(persister, config) + persist(persister, config) # Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column persister.sweep_scope = ["container_groups"] @@ -192,7 +192,7 @@ persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before)) persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before)) persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before)) - _persister = persist(persister, config) + persist(persister, config) ################################################################################################################ # Refresh second part @@ -201,7 +201,7 @@ persister.refresh_state_part_uuid = part2_uuid persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before)) - _persister = persist(persister, config) + persist(persister, config) ################################################################################################################ # Sweeping step. Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on @@ -245,7 +245,7 @@ ) ) - _persister = persist(persister, config) + persist(persister, config) ################################################################################################################ # Sweeping step. @@ -338,7 +338,7 @@ ) ) - _persister = persist(persister, config) + persist(persister, config) ################################################################################################################ # Refresh second part @@ -353,7 +353,7 @@ ) ) - _persister = persist(persister, config) + persist(persister, config) ################################################################################################################ # Sweeping step. Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on @@ -485,13 +485,14 @@ def create_persister create_containers_persister(:retention_strategy => "archive") end - def persist(persister, config) - if config[:serialize] - persister = persister.class.from_json(persister.to_json, @ems) - end + def persist(persister, _config) + # TODO(lsmola) serializing is messing with the timestamp, so the reconnect unconnected edges doesn't work + # if config[:serialize] + # persister.class.from_json(persister.to_json, @ems).persist! + # else + # persister.persist! + # end persister.persist! - - persister end def sweep(persister, time, config) diff --git a/spec/persister/test_collector.rb b/spec/persister/test_collector.rb index bd2bf95a..2012cb14 100644 --- a/spec/persister/test_collector.rb +++ b/spec/persister/test_collector.rb @@ -5,7 +5,7 @@ class << self def generate_batches_of_partial_container_group_data(ems_name:, version:, settings:, batch_size: 4, index_start: 0, persister: nil, resource_version: nil) ems = ExtManagementSystem.find_by(:name => ems_name) - persister ||= new_persister(ems, settings) + persister ||= new_persister(ems) (index_start * batch_size..((index_start + 1) * batch_size - 1)).each do |index| parse_partial_container_group(index, persister, settings, incremented_counter(settings, version, index), @@ -18,7 +18,7 @@ def generate_batches_of_partial_container_group_data(ems_name:, version:, settin def generate_batches_of_different_partial_container_group_data(ems_name:, version:, settings:, batch_size: 4, index_start: 0, persister: nil, resource_version: nil) ems = ExtManagementSystem.find_by(:name => ems_name) - persister ||= new_persister(ems, settings) + persister ||= new_persister(ems) (index_start * batch_size..((index_start + 1) * batch_size - 1)).each do |index| parse_another_partial_container_group(index, persister, settings, incremented_counter(settings, version, index), @@ -31,7 +31,7 @@ def generate_batches_of_different_partial_container_group_data(ems_name:, versio def generate_batches_of_full_container_group_data(ems_name:, version:, settings:, batch_size: 4, index_start: 0, persister: nil, resource_version: nil) ems = ExtManagementSystem.find_by(:name => ems_name) - persister ||= new_persister(ems, settings) + persister ||= new_persister(ems) (index_start * batch_size..((index_start + 1) * batch_size - 1)).each do |index| parse_container_group(index, persister, settings, incremented_counter(settings, version, index), resource_version) @@ -83,19 +83,13 @@ def parse_container_group(index, persister, settings, counter, resource_version) def refresh(persister) manager = persister.manager - use_ar_object = persister.inventory_collections.first.use_ar_object - persister = persister.class.from_json(persister.to_json, manager) - # :use_ar_object is not exposed to be serializable, it's taken from Persister class, so it's not changeable - # in the runtime. - persister.inventory_collections.each { |x| x.instance_variable_set(:@use_ar_object, use_ar_object) } - persister.persist! persister end - def new_persister(ems, settings) - TestPersister::Containers.new(ems, ems, :use_ar_object => settings[:use_ar_object]) + def new_persister(ems) + TestPersister::Containers.new(ems, ems) end def version_col(settings)