Skip to content

Commit

Permalink
Revert "Merge pull request ManageIQ#76 from Ladas/simplify_the_record…
Browse files Browse the repository at this point in the history
…s_iterator"

This reverts commit 84fa1f7, reversing
changes made to 334a945.
  • Loading branch information
agrare committed Feb 3, 2022
1 parent 461ec5f commit 39e2532
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 109 deletions.
29 changes: 23 additions & 6 deletions lib/inventory_refresh/application_record_iterator.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
module InventoryRefresh
class ApplicationRecordIterator
attr_reader :inventory_collection
attr_reader :inventory_collection, :manager_uuids_set, :iterator, :query

# An iterator that can fetch batches of the AR objects based on a set of attribute_indexes
# An iterator that can fetch batches of the AR objects based on a set of manager refs, or just mimics AR relation
# when given an iterator. Or given query, acts as iterator by selecting batches.
#
# @param inventory_collection [InventoryRefresh::InventoryCollection] Inventory collection owning the iterator
def initialize(inventory_collection: nil)
# @param manager_uuids_set [Array<InventoryRefresh::InventoryCollection::Reference>] Array of references we want to
# fetch from the DB
# @param iterator [Proc] Block based iterator
# @query query [ActiveRecord::Relation] Existing query we want to use for querying the db
def initialize(inventory_collection: nil, manager_uuids_set: nil, iterator: nil, query: nil)
@inventory_collection = inventory_collection
@manager_uuids_set = manager_uuids_set
@iterator = iterator
@query = query
end

# Iterator that mimics find_in_batches of ActiveRecord::Relation. This iterator serves for making more optimized query
Expand All @@ -17,11 +25,20 @@ def initialize(inventory_collection: nil)
# and relation.where(:id => 500ids)
#
# @param batch_size [Integer] A batch size we want to fetch from DB
# @param attributes_index [Hash{String => Hash}] Indexed hash with data we will be saving
# @yield Code processing the batches
def find_in_batches(batch_size: 1000, attributes_index: {})
attributes_index.each_slice(batch_size) do |batch|
yield(inventory_collection.db_collection_for_comparison_for(batch))
if iterator
iterator.call do |batch|
yield(batch)
end
elsif query
attributes_index.each_slice(batch_size) do |batch|
yield(query.where(inventory_collection.targeted_selection_for(batch)))
end
else
attributes_index.each_slice(batch_size) do |batch|
yield(inventory_collection.db_collection_for_comparison_for(batch))
end
end
end

Expand Down
20 changes: 1 addition & 19 deletions lib/inventory_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -486,30 +486,12 @@ def targeted_selection_for(references)
build_multi_selection_condition(references.map(&:second))
end

def select_keys
@select_keys ||= [@model_class.primary_key] + manager_ref_to_cols.map(&:to_s) + internal_columns.map(&:to_s)
end

# @return [ActiveRecord::ConnectionAdapters::AbstractAdapter] ActiveRecord connection
def get_connection
ActiveRecord::Base.connection
end

def pure_sql_record_fetching?
!use_ar_object?
end

# Builds an ActiveRecord::Relation that can fetch all the references from the DB
#
# @param references [Hash{String => InventoryRefresh::InventoryCollection::Reference}] passed references
# @return [ActiveRecord::Relation] relation that can fetch all the references from the DB
def db_collection_for_comparison_for(references)
query = full_collection_for_comparison.where(targeted_selection_for(references))
if pure_sql_record_fetching?
return get_connection.query(query.select(*select_keys).to_sql)
end

query
full_collection_for_comparison.where(targeted_selection_for(references))
end

# @return [ActiveRecord::Relation] relation that can fetch all the references from the DB
Expand Down
11 changes: 8 additions & 3 deletions lib/inventory_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ def initialize(inventory_collection)
@unique_db_primary_keys = Set.new
@unique_db_indexes = Set.new

# Right now ApplicationRecordIterator in association is used for targeted refresh. Given the small amount of
# records flowing through there, we probably don't need to optimize that association to fetch a pure SQL.
# TODO(lsmola) since we save everything through targeted mode, we want to optimize this
@pure_sql_records_fetching = !inventory_collection.use_ar_object? && !@association.kind_of?(InventoryRefresh::ApplicationRecordIterator)

@batch_size_for_persisting = inventory_collection.batch_size_pure_sql
@batch_size = inventory_collection.use_ar_object? ? @batch_size_for_persisting : inventory_collection.batch_size

@record_key_method = inventory_collection.pure_sql_record_fetching? ? :pure_sql_record_key : :ar_record_key
@batch_size = @pure_sql_records_fetching ? @batch_size_for_persisting : inventory_collection.batch_size
@record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key
@select_keys_indexes = @select_keys.each_with_object({}).with_index { |(key, obj), index| obj[key.to_s] = index }
@pg_types = @model_class.attribute_names.each_with_object({}) do |key, obj|
obj[key.to_sym] = inventory_collection.model_class.columns_hash[key]
Expand Down Expand Up @@ -115,7 +120,7 @@ def transform_to_hash!(all_attribute_keys, hash)
private

attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes,
:primary_key, :arel_primary_key, :record_key_method, :select_keys_indexes,
:primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes,
:batch_size, :batch_size_for_persisting, :model_class, :serializable_keys, :deserializable_keys, :pg_types, :table_name,
:q_table_name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,42 @@ def pure_sql_record_key(record, key)
record[select_keys_indexes[key]]
end

# Returns iterator or relation based on settings
#
# @param association [Symbol] An existing association on manager
# @return [ActiveRecord::Relation, InventoryRefresh::ApplicationRecordIterator] iterator or relation based on settings
def batch_iterator(association)
if pure_sql_records_fetching
# Building fast iterator doing pure SQL query and therefore avoiding redundant creation of AR objects. The
# iterator responds to find_in_batches, so it acts like the AR relation. For targeted refresh, the association
# can already be ApplicationRecordIterator, so we will skip that.
# TODO(lsmola) Since everything is targeted now, we want to probably delete this branch and make iterator to
# do pure SQL queries, if there will be no .check_changed?
pure_sql_iterator = lambda do |&block|
primary_key_offset = nil
loop do
relation = association.select(*select_keys)
.reorder("#{primary_key} ASC")
.limit(batch_size)
# Using rails way of comparing primary key instead of offset
relation = relation.where(arel_primary_key.gt(primary_key_offset)) if primary_key_offset
records = get_connection.query(relation.to_sql)
last_record = records.last
block.call(records)

break if records.size < batch_size
primary_key_offset = record_key(last_record, primary_key)
end
end

InventoryRefresh::ApplicationRecordIterator.new(:iterator => pure_sql_iterator)
else
# Normal Rails ActiveRecord::Relation where we can call find_in_batches or
# InventoryRefresh::ApplicationRecordIterator passed from targeted refresh
association
end
end

# Saves the InventoryCollection
#
# @param association [Symbol] An existing association on manager
Expand All @@ -69,7 +105,7 @@ def save!(association)
logger.debug("Processing #{inventory_collection} of size #{inventory_collection.size}...")

unless inventory_collection.create_only?
update_or_destroy_records!(association, inventory_objects_index, attributes_index, all_attribute_keys)
update_or_destroy_records!(batch_iterator(association), inventory_objects_index, attributes_index, all_attribute_keys)
end

unless inventory_collection.create_only?
Expand Down Expand Up @@ -176,9 +212,9 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index,
[:resource_counter, :resource_counters_max]
end

next if skeletonize_or_skip_record(record_key(record, version_attr.to_s),
next if skeletonize_or_skip_record(record_key(record, version_attr),
hash[version_attr],
record_key(record, max_version_attr.to_s),
record_key(record, max_version_attr),
inventory_object)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,25 +200,7 @@ def create_partial!(all_attribute_keys, hashes, on_conflict: nil, column_name: n
)
end

def comparable_timestamp(timestamp)
# Lets cast all timestamps to to_f, rounding the time comparing precision to miliseconds, that should be
# enough, since we are the ones setting the record version in collector. Otherwise we will have hard time with
# doing equality, since the value changes going through DB (DB. cuts it at 5 decimal places)

if timestamp.kind_of?(String)
Time.use_zone('UTC') { Time.zone.parse(timestamp) }.to_f.round(3)
elsif timestamp.kind_of?(Time)
timestamp.in_time_zone('UTC').to_f.round(3)
else
timestamp
end
end

def skeletonize_or_skip_record(record_version, hash_version, record_versions_max, inventory_object)
record_version = comparable_timestamp(record_version)
record_versions_max = comparable_timestamp(record_versions_max) if record_versions_max
hash_version = comparable_timestamp(hash_version)

# Skip updating this record, because it is old
return true if record_version && hash_version && record_version >= hash_version

Expand Down
40 changes: 12 additions & 28 deletions spec/persister/parallel_saving_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,18 @@
end

[{
:upsert_only => true,
:parallel_saving_column => "resource_counter",
}, {
:upsert_only => false,
:parallel_saving_column => "resource_counter",
}, {
:upsert_only => true,
:parallel_saving_column => "resource_timestamp",
}, {
:upsert_only => false,
:parallel_saving_column => "resource_timestamp",
}, {
:upsert_only => true,
:parallel_saving_column => "resource_timestamp",
:use_ar_object => true,
}, {
:upsert_only => false,
:parallel_saving_column => "resource_timestamp",
:use_ar_object => true,
}, {
:upsert_only => true,
:parallel_saving_column => "resource_timestamp",
:use_ar_object => false,
}, {
:upsert_only => false,
:parallel_saving_column => "resource_timestamp",
:use_ar_object => false,
}].each do |settings|
:upsert_only => true,
:parallel_saving_column => "resource_counter",
}, {
:upsert_only => false,
:parallel_saving_column => "resource_counter",
}, {
:upsert_only => true,
:parallel_saving_column => "resource_timestamp",
}, {
:upsert_only => false,
:parallel_saving_column => "resource_timestamp",
},].each do |settings|
context "with settings #{settings}" do
before(:each) do
if settings[:upsert_only]
Expand Down
43 changes: 22 additions & 21 deletions spec/persister/sweep_inactive_records_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
context "with config #{config}" do
context "with :retention_strategy => 'archive'" do
it "automatically fills :last_seen_at timestamp for refreshed entities and archives them in last step" do
time_now = Time.now.utc
time_before = Time.now.utc - 20.seconds
time_more_before = Time.now.utc - 40.seconds
_time_after = Time.now.utc + 20.seconds
time_now = Time.now.utc
time_before = Time.now.utc - 20.seconds
time_after = Time.now.utc + 20.seconds

cg1 = FactoryBot.create(:container_group, container_group_data(1).merge(:ext_management_system => @ems, :resource_timestamp => time_before))
cg2 = FactoryBot.create(:container_group, container_group_data(2).merge(:ext_management_system => @ems, :resource_timestamp => time_more_before))
_cg2 = FactoryBot.create(:container_group, container_group_data(2).merge(:ext_management_system => @ems, :resource_timestamp => time_after))
_cg3 = FactoryBot.create(:container_group, container_group_data(3).merge(:ext_management_system => @ems, :resource_timestamp => time_now))
_cg4 = FactoryBot.create(:container_group, container_group_data(4).merge(:ext_management_system => @ems, :last_seen_at => time_before))
_cg6 = FactoryBot.create(:container_group, container_group_data(6).merge(:ext_management_system => @ems, :last_seen_at => time_before))
Expand All @@ -41,10 +40,10 @@
persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before))
persister = persist(persister, config)
persist(persister, config)

# We update just the first record, and last_seen_at is updated for all records involved
expect(persister.container_groups.updated_records).to(match_array([{:id => cg2.id}]))
expect(persister.container_groups.updated_records).to(match_array([{:id => cg1.id}]))

date_field = ContainerGroup.arel_table[:last_seen_at]
expect(ContainerGroup.where(date_field.gt(time_now)).pluck(:ems_ref)).to(
Expand All @@ -62,7 +61,7 @@
persister.refresh_state_part_uuid = part2_uuid

persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before))
persister = persist(persister, config)
persist(persister, config)

date_field = ContainerGroup.arel_table[:last_seen_at]
expect(ContainerGroup.where(date_field.gt(time_now)).pluck(:ems_ref)).to(
Expand All @@ -74,6 +73,7 @@
container_group_data(7)[:ems_ref]])
)

# Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column
# Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column
persister.sweep_scope = ["container_groups", "container_nodes"]
sweep(persister, time_now, config)
Expand Down Expand Up @@ -121,15 +121,15 @@
persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before))
_persister = persist(persister, config)
persist(persister, config)

# Refresh second part and mark :last_seen_at
persister = create_containers_persister(:retention_strategy => "archive")
persister.refresh_state_uuid = refresh_state_uuid
persister.refresh_state_part_uuid = part2_uuid

persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before))
persister = persist(persister, config)
persist(persister, config)

# Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column
persister.sweep_scope = ["container_groups"]
Expand Down Expand Up @@ -192,7 +192,7 @@
persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before))
_persister = persist(persister, config)
persist(persister, config)

################################################################################################################
# Refresh second part
Expand All @@ -201,7 +201,7 @@
persister.refresh_state_part_uuid = part2_uuid

persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before))
_persister = persist(persister, config)
persist(persister, config)

################################################################################################################
# Sweeping step. Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on
Expand Down Expand Up @@ -245,7 +245,7 @@
)
)

_persister = persist(persister, config)
persist(persister, config)

################################################################################################################
# Sweeping step.
Expand Down Expand Up @@ -338,7 +338,7 @@
)
)

_persister = persist(persister, config)
persist(persister, config)

################################################################################################################
# Refresh second part
Expand All @@ -353,7 +353,7 @@
)
)

_persister = persist(persister, config)
persist(persister, config)

################################################################################################################
# Sweeping step. Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on
Expand Down Expand Up @@ -485,13 +485,14 @@ def create_persister
create_containers_persister(:retention_strategy => "archive")
end

def persist(persister, config)
if config[:serialize]
persister = persister.class.from_json(persister.to_json, @ems)
end
def persist(persister, _config)
# TODO(lsmola) serializing is messing with the timestamp, so the reconnect unconnected edges doesn't work
# if config[:serialize]
# persister.class.from_json(persister.to_json, @ems).persist!
# else
# persister.persist!
# end
persister.persist!

persister
end

def sweep(persister, time, config)
Expand Down
Loading

0 comments on commit 39e2532

Please sign in to comment.