Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the records iterator #76

Merged
merged 8 commits into from
Apr 16, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions lib/inventory_refresh/application_record_iterator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,11 @@ module InventoryRefresh
class ApplicationRecordIterator
attr_reader :inventory_collection, :manager_uuids_set, :iterator, :query
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ladas can @manager_uuids_set be deleted now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually are any of these ivars still used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


# An iterator that can fetch batches of the AR objects based on a set of manager refs, or just mimics AR relation
# when given an iterator. Or given query, acts as iterator by selecting batches.
# An iterator that can fetch batches of the AR objects based on a set of attribute_indexes
#
# @param inventory_collection [InventoryRefresh::InventoryCollection] Inventory collection owning the iterator
# @param manager_uuids_set [Array<InventoryRefresh::InventoryCollection::Reference>] Array of references we want to
# fetch from the DB
# @param iterator [Proc] Block based iterator
# @query query [ActiveRecord::Relation] Existing query we want to use for querying the db
def initialize(inventory_collection: nil, manager_uuids_set: nil, iterator: nil, query: nil)
def initialize(inventory_collection: nil)
@inventory_collection = inventory_collection
@manager_uuids_set = manager_uuids_set
@iterator = iterator
@query = query
end

# Iterator that mimics find_in_batches of ActiveRecord::Relation. This iterator serves for making more optimized query
Expand All @@ -25,20 +17,11 @@ def initialize(inventory_collection: nil, manager_uuids_set: nil, iterator: nil,
# and relation.where(:id => 500ids)
#
# @param batch_size [Integer] A batch size we want to fetch from DB
# @param attributes_index [Hash{String => Hash}] Indexed hash with data we will be saving
# @yield Code processing the batches
def find_in_batches(batch_size: 1000, attributes_index: {})
if iterator
iterator.call do |batch|
yield(batch)
end
elsif query
attributes_index.each_slice(batch_size) do |batch|
yield(query.where(inventory_collection.targeted_selection_for(batch)))
end
else
attributes_index.each_slice(batch_size) do |batch|
yield(inventory_collection.db_collection_for_comparison_for(batch))
end
attributes_index.each_slice(batch_size) do |batch|
yield(inventory_collection.db_collection_for_comparison_for(batch))
end
end

Expand Down
20 changes: 19 additions & 1 deletion lib/inventory_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -486,12 +486,30 @@ def targeted_selection_for(references)
build_multi_selection_condition(references.map(&:second))
end

def select_keys
@select_keys ||= [@model_class.primary_key] + manager_ref_to_cols.map(&:to_s) + internal_columns.map(&:to_s)
end

# @return [ActiveRecord::ConnectionAdapters::AbstractAdapter] ActiveRecord connection
def get_connection
ActiveRecord::Base.connection
end

def pure_sql_record_fetching?
!use_ar_object?
end

# Builds an ActiveRecord::Relation that can fetch all the references from the DB
#
# @param references [Hash{String => InventoryRefresh::InventoryCollection::Reference}] passed references
# @return [ActiveRecord::Relation] relation that can fetch all the references from the DB
def db_collection_for_comparison_for(references)
full_collection_for_comparison.where(targeted_selection_for(references))
query = full_collection_for_comparison.where(targeted_selection_for(references))
if pure_sql_record_fetching?
return get_connection.query(query.select(*select_keys).to_sql)
end

query
end

# @return [ActiveRecord::Relation] relation that can fetch all the references from the DB
Expand Down
11 changes: 3 additions & 8 deletions lib/inventory_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,10 @@ def initialize(inventory_collection)
@unique_db_primary_keys = Set.new
@unique_db_indexes = Set.new

# Right now ApplicationRecordIterator in association is used for targeted refresh. Given the small amount of
# records flowing through there, we probably don't need to optimize that association to fetch a pure SQL.
# TODO(lsmola) since we save everything through targeted mode, we want to optimize this
@pure_sql_records_fetching = !inventory_collection.use_ar_object? && !@association.kind_of?(InventoryRefresh::ApplicationRecordIterator)

@batch_size_for_persisting = inventory_collection.batch_size_pure_sql
@batch_size = inventory_collection.use_ar_object? ? @batch_size_for_persisting : inventory_collection.batch_size

@batch_size = @pure_sql_records_fetching ? @batch_size_for_persisting : inventory_collection.batch_size
@record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key
@record_key_method = inventory_collection.pure_sql_record_fetching? ? :pure_sql_record_key : :ar_record_key
@select_keys_indexes = @select_keys.each_with_object({}).with_index { |(key, obj), index| obj[key.to_s] = index }
@pg_types = @model_class.attribute_names.each_with_object({}) do |key, obj|
obj[key.to_sym] = inventory_collection.model_class.columns_hash[key]
Expand Down Expand Up @@ -120,7 +115,7 @@ def transform_to_hash!(all_attribute_keys, hash)
private

attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes,
:primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes,
:primary_key, :arel_primary_key, :record_key_method, :select_keys_indexes,
:batch_size, :batch_size_for_persisting, :model_class, :serializable_keys, :deserializable_keys, :pg_types, :table_name,
:q_table_name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,42 +46,6 @@ def pure_sql_record_key(record, key)
record[select_keys_indexes[key]]
end

# Returns iterator or relation based on settings
#
# @param association [Symbol] An existing association on manager
# @return [ActiveRecord::Relation, InventoryRefresh::ApplicationRecordIterator] iterator or relation based on settings
def batch_iterator(association)
if pure_sql_records_fetching
# Building fast iterator doing pure SQL query and therefore avoiding redundant creation of AR objects. The
# iterator responds to find_in_batches, so it acts like the AR relation. For targeted refresh, the association
# can already be ApplicationRecordIterator, so we will skip that.
# TODO(lsmola) Since everything is targeted now, we want to probably delete this branch and make iterator to
# do pure SQL queries, if there will be no .check_changed?
pure_sql_iterator = lambda do |&block|
primary_key_offset = nil
loop do
relation = association.select(*select_keys)
.reorder("#{primary_key} ASC")
.limit(batch_size)
# Using rails way of comparing primary key instead of offset
relation = relation.where(arel_primary_key.gt(primary_key_offset)) if primary_key_offset
records = get_connection.query(relation.to_sql)
last_record = records.last
block.call(records)

break if records.size < batch_size
primary_key_offset = record_key(last_record, primary_key)
end
end

InventoryRefresh::ApplicationRecordIterator.new(:iterator => pure_sql_iterator)
else
# Normal Rails ActiveRecord::Relation where we can call find_in_batches or
# InventoryRefresh::ApplicationRecordIterator passed from targeted refresh
association
end
end

# Saves the InventoryCollection
#
# @param association [Symbol] An existing association on manager
Expand All @@ -105,7 +69,7 @@ def save!(association)
logger.debug("Processing #{inventory_collection} of size #{inventory_collection.size}...")

unless inventory_collection.create_only?
update_or_destroy_records!(batch_iterator(association), inventory_objects_index, attributes_index, all_attribute_keys)
update_or_destroy_records!(association, inventory_objects_index, attributes_index, all_attribute_keys)
end

unless inventory_collection.create_only?
Expand Down Expand Up @@ -212,9 +176,9 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index,
[:resource_counter, :resource_counters_max]
end

next if skeletonize_or_skip_record(record_key(record, version_attr),
next if skeletonize_or_skip_record(record_key(record, version_attr.to_s),
hash[version_attr],
record_key(record, max_version_attr),
record_key(record, max_version_attr.to_s),
inventory_object)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,25 @@ def create_partial!(all_attribute_keys, hashes, on_conflict: nil, column_name: n
)
end

def comparable_timestamp(timestamp)
# Lets cast all timestamps to to_f, rounding the time comparing precision to miliseconds, that should be
# enough, since we are the ones setting the record version in collector. Otherwise we will have hard time with
# doing equality, since the value changes going through DB (DB. cuts it at 5 decimal places)

if timestamp.kind_of?(String)
Time.use_zone('UTC') { Time.zone.parse(timestamp) }.to_f.round(3)
elsif timestamp.kind_of?(Time)
timestamp.in_time_zone('UTC').to_f.round(3)
else
timestamp
end
end

def skeletonize_or_skip_record(record_version, hash_version, record_versions_max, inventory_object)
record_version = comparable_timestamp(record_version)
record_versions_max = comparable_timestamp(record_versions_max) if record_versions_max
hash_version = comparable_timestamp(hash_version)

# Skip updating this record, because it is old
return true if record_version && hash_version && record_version >= hash_version

Expand Down
40 changes: 28 additions & 12 deletions spec/persister/parallel_saving_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,34 @@
end

[{
:upsert_only => true,
:parallel_saving_column => "resource_counter",
}, {
:upsert_only => false,
:parallel_saving_column => "resource_counter",
}, {
:upsert_only => true,
:parallel_saving_column => "resource_timestamp",
}, {
:upsert_only => false,
:parallel_saving_column => "resource_timestamp",
},].each do |settings|
:upsert_only => true,
:parallel_saving_column => "resource_counter",
}, {
:upsert_only => false,
:parallel_saving_column => "resource_counter",
}, {
:upsert_only => true,
:parallel_saving_column => "resource_timestamp",
}, {
:upsert_only => false,
:parallel_saving_column => "resource_timestamp",
}, {
:upsert_only => true,
:parallel_saving_column => "resource_timestamp",
:use_ar_object => true,
}, {
:upsert_only => false,
:parallel_saving_column => "resource_timestamp",
:use_ar_object => true,
}, {
:upsert_only => true,
:parallel_saving_column => "resource_timestamp",
:use_ar_object => false,
}, {
:upsert_only => false,
:parallel_saving_column => "resource_timestamp",
:use_ar_object => false,
}].each do |settings|
context "with settings #{settings}" do
before(:each) do
if settings[:upsert_only]
Expand Down
43 changes: 21 additions & 22 deletions spec/persister/sweep_inactive_records_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
context "with config #{config}" do
context "with :retention_strategy => 'archive'" do
it "automatically fills :last_seen_at timestamp for refreshed entities and archives them in last step" do
time_now = Time.now.utc
time_before = Time.now.utc - 20.seconds
time_after = Time.now.utc + 20.seconds
time_now = Time.now.utc
time_before = Time.now.utc - 20.seconds
time_more_before = Time.now.utc - 40.seconds
_time_after = Time.now.utc + 20.seconds

cg1 = FactoryBot.create(:container_group, container_group_data(1).merge(:ext_management_system => @ems, :resource_timestamp => time_before))
_cg2 = FactoryBot.create(:container_group, container_group_data(2).merge(:ext_management_system => @ems, :resource_timestamp => time_after))
cg2 = FactoryBot.create(:container_group, container_group_data(2).merge(:ext_management_system => @ems, :resource_timestamp => time_more_before))
_cg3 = FactoryBot.create(:container_group, container_group_data(3).merge(:ext_management_system => @ems, :resource_timestamp => time_now))
_cg4 = FactoryBot.create(:container_group, container_group_data(4).merge(:ext_management_system => @ems, :last_seen_at => time_before))
_cg6 = FactoryBot.create(:container_group, container_group_data(6).merge(:ext_management_system => @ems, :last_seen_at => time_before))
Expand All @@ -40,10 +41,10 @@
persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before))
persist(persister, config)
persister = persist(persister, config)

# We update just the first record, and last_seen_at is updated for all records involved
expect(persister.container_groups.updated_records).to(match_array([{:id => cg1.id}]))
expect(persister.container_groups.updated_records).to(match_array([{:id => cg2.id}]))

date_field = ContainerGroup.arel_table[:last_seen_at]
expect(ContainerGroup.where(date_field.gt(time_now)).pluck(:ems_ref)).to(
Expand All @@ -61,7 +62,7 @@
persister.refresh_state_part_uuid = part2_uuid

persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before))
persist(persister, config)
persister = persist(persister, config)

date_field = ContainerGroup.arel_table[:last_seen_at]
expect(ContainerGroup.where(date_field.gt(time_now)).pluck(:ems_ref)).to(
Expand All @@ -73,7 +74,6 @@
container_group_data(7)[:ems_ref]])
)

# Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column
# Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column
persister.sweep_scope = ["container_groups", "container_nodes"]
sweep(persister, time_now, config)
Expand Down Expand Up @@ -121,15 +121,15 @@
persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before))
persist(persister, config)
_persister = persist(persister, config)

# Refresh second part and mark :last_seen_at
persister = create_containers_persister(:retention_strategy => "archive")
persister.refresh_state_uuid = refresh_state_uuid
persister.refresh_state_part_uuid = part2_uuid

persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before))
persist(persister, config)
persister = persist(persister, config)

# Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on column
persister.sweep_scope = ["container_groups"]
Expand Down Expand Up @@ -192,7 +192,7 @@
persister.container_groups.build(container_group_data(1).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(2).merge(:resource_timestamp => time_before))
persister.container_groups.build(container_group_data(5).merge(:resource_timestamp => time_before))
persist(persister, config)
_persister = persist(persister, config)

################################################################################################################
# Refresh second part
Expand All @@ -201,7 +201,7 @@
persister.refresh_state_part_uuid = part2_uuid

persister.container_groups.build(container_group_data(6).merge(:resource_timestamp => time_before))
persist(persister, config)
_persister = persist(persister, config)

################################################################################################################
# Sweeping step. Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on
Expand Down Expand Up @@ -245,7 +245,7 @@
)
)

persist(persister, config)
_persister = persist(persister, config)

################################################################################################################
# Sweeping step.
Expand Down Expand Up @@ -338,7 +338,7 @@
)
)

persist(persister, config)
_persister = persist(persister, config)

################################################################################################################
# Refresh second part
Expand All @@ -353,7 +353,7 @@
)
)

persist(persister, config)
_persister = persist(persister, config)

################################################################################################################
# Sweeping step. Send persister with total_parts = XY, that will cause sweeping all tables having :last_seen_on
Expand Down Expand Up @@ -485,14 +485,13 @@ def create_persister
create_containers_persister(:retention_strategy => "archive")
end

def persist(persister, _config)
# TODO(lsmola) serializing is messing with the timestamp, so the reconnect unconnected edges doesn't work
# if config[:serialize]
# persister.class.from_json(persister.to_json, @ems).persist!
# else
# persister.persist!
# end
def persist(persister, config)
if config[:serialize]
persister = persister.class.from_json(persister.to_json, @ems)
end
persister.persist!

persister
end

def sweep(persister, time, config)
Expand Down
Loading