Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurent safe batch saver #15247

Merged
merged 8 commits into from
Jun 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions app/models/manager_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module ManagerRefresh::SaveCollection
module Saver
class Base
include Vmdb::Logging
include ManagerRefresh::SaveCollection::Saver::SqlHelper

attr_reader :inventory_collection

Expand Down Expand Up @@ -31,6 +32,10 @@ def save_inventory_collection!

attr_reader :unique_index_keys, :unique_db_primary_keys

def batch_size
1000
end

def delete_complement(inventory_collection)
return unless inventory_collection.delete_allowed?

Expand All @@ -55,6 +60,12 @@ def delete_complement(inventory_collection)
"#{all_manager_uuids_size}, deleted=#{deleted_counter} *************")
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
true
end

def assert_distinct_relation(record)
if unique_db_primary_keys.include?(record.id) # Include on Set is O(1)
# Change the InventoryCollection's :association or :arel parameter to return distinct results. The :through
Expand All @@ -75,12 +86,11 @@ def assert_distinct_relation(record)

def assert_referential_integrity(hash, inventory_object)
inventory_object.inventory_collection.fixed_foreign_keys.each do |x|
if hash[x].blank?
_log.info("Ignoring #{inventory_object} because of missing foreign key #{x} for "\
"#{inventory_object.inventory_collection.parent.class.name}:"\
"#{inventory_object.inventory_collection.parent.id}")
return false
end
next unless hash[x].blank?
_log.info("Ignoring #{inventory_object} because of missing foreign key #{x} for "\
"#{inventory_object.inventory_collection.parent.class.name}:"\
"#{inventory_object.inventory_collection.parent.id}")
return false
end
true
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,59 +19,42 @@ def save!(inventory_collection, association)
created_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
# Records that are in the DB, we will be updating or deleting them.
association.find_in_batches do |batch|
ActiveRecord::Base.transaction do
batch.each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
# TODO(lsmola) do a transaction for a batches of deletion
deleted_counter += 1 if delete_record!(inventory_collection, record)
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object)
end
ActiveRecord::Base.transaction do
association.find_each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
deleted_counter += 1 if delete_record!(inventory_collection, record)
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object)
end
end
end

# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
inventory_objects_index.each_slice(1000) do |batch|
ActiveRecord::Base.transaction do
batch.each do |index, inventory_object|
hash = attributes_index.delete(index)
create_record!(inventory_collection, hash, inventory_object)
created_counter += 1
end
ActiveRecord::Base.transaction do
inventory_objects_index.each do |index, inventory_object|
create_record!(inventory_collection, attributes_index.delete(index), inventory_object)
created_counter += 1
end
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************")
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
true
end

def update_record!(inventory_collection, record, hash, inventory_object)
record.assign_attributes(hash.except(:id, :type))

# TODO(lsmola) ignore all N:M relations, since we use pure SQL, all N:M needs to be modeled as a separate IC, or
# can we process those automatically? Using a convention? But still, it needs to be a separate IC, to have
# efficient saving.
hash.reject! { |_key, value| value.kind_of?(Array) }

if !inventory_object.inventory_collection.check_changed? || record.changed?
update_query = inventory_object.inventory_collection.model_class.where(:id => record.id)
if hash[:remote_data_timestamp]
Expand All @@ -88,31 +71,12 @@ def update_record!(inventory_collection, record, hash, inventory_object)
def create_record!(inventory_collection, hash, inventory_object)
return unless assert_referential_integrity(hash, inventory_object)

hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].blank?
table_name = inventory_object.inventory_collection.model_class.table_name
insert_query = %{
INSERT INTO #{table_name} (#{hash.keys.join(", ")})
VALUES
(
#{hash.values.map { |x| ActiveRecord::Base.connection.quote(x) }.join(", ")}
)
ON CONFLICT (#{inventory_object.inventory_collection.unique_index_columns.join(", ")})
DO
UPDATE
SET #{hash.keys.map { |x| "#{x} = EXCLUDED.#{x}" }.join(", ")}
}
# TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change
# the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of
# changing ems_id could lead to putting it back by a refresh.
all_attribute_keys = hash.keys
hash = inventory_collection.model_class.new(hash).attributes.symbolize_keys

# This conditional will avoid rewriting new data by old data. But we want it only when remote_data_timestamp is a
# part of the data, since for the fake records, we just want to update ems_ref.
if hash[:remote_data_timestamp].present?
insert_query += %{
WHERE EXCLUDED.remote_data_timestamp IS NULL OR (EXCLUDED.remote_data_timestamp > #{table_name}.remote_data_timestamp)
}
end
result_id = ActiveRecord::Base.connection.insert_sql(insert_query)
result_id = ActiveRecord::Base.connection.insert_sql(
build_insert_query(inventory_collection, all_attribute_keys, [hash])
)
inventory_object.id = result_id
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,137 @@ module Saver
class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base
private

def save!(_inventory_collection, _association)
raise "saver_strategy :concurent_safe_batch is not implemented"
def save!(inventory_collection, association)
attributes_index = {}
inventory_objects_index = {}
all_attribute_keys = Set.new

inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid

attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
all_attribute_keys.merge(attributes_index[index].keys)
end

inventory_collection_size = inventory_collection.size
deleted_counter = 0
created_counter = 0
updated_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
hashes_for_update = []
records_for_destroy = []

# Records that are in the DB, we will be updating or deleting them.
association.find_in_batches do |batch|
batch.each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
if inventory_collection.delete_allowed?
records_for_destroy << record
deleted_counter += 1
end
else
# Record was found in the DB and sent for saving, we will be updating the DB.
next unless assert_referential_integrity(hash, inventory_object)
inventory_object.id = record.id

record.assign_attributes(hash.except(:id, :type))
if !inventory_collection.check_changed? || record.changed?
hashes_for_update << record.attributes.symbolize_keys
end
end
end

# Update in batches
if hashes_for_update.size >= batch_size
update_records!(inventory_collection, all_attribute_keys, hashes_for_update)
updated_counter += hashes_for_update.count

hashes_for_update = []
end

# Destroy in batches
if records_for_destroy.size >= batch_size
destroy_records(records)
records_for_destroy = []
end
end

# Update the last batch
update_records!(inventory_collection, all_attribute_keys, hashes_for_update)
updated_counter += hashes_for_update.count
hashes_for_update = [] # Cleanup so GC can release it sooner

# Destroy the last batch
destroy_records(records_for_destroy)
records_for_destroy = [] # Cleanup so GC can release it sooner

all_attribute_keys << :type if inventory_collection.supports_sti?
# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
inventory_objects_index.each_slice(batch_size) do |batch|
create_records!(inventory_collection, all_attribute_keys, batch, attributes_index)
created_counter += batch.size
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{updated_counter}, deleted=#{deleted_counter} *************")
end

def destroy_records(records)
# TODO(lsmola) we need at least batch disconnect. Batch destroy won't be probably possible because of the
# :dependent => :destroy.
ActiveRecord::Base.transaction do
records.each do |record|
delete_record!(inventory_collection, record)
end
end
end

def update_records!(inventory_collection, all_attribute_keys, hashes)
return if hashes.blank?

ActiveRecord::Base.connection.execute(build_update_query(inventory_collection, all_attribute_keys, hashes))
end

def create_records!(inventory_collection, all_attribute_keys, batch, attributes_index)
indexed_inventory_objects = {}
hashes = []
batch.each do |index, inventory_object|
hash = inventory_collection.model_class.new(attributes_index.delete(index)).attributes.symbolize_keys
next unless assert_referential_integrity(hash, inventory_object)

hashes << hash
# Index on Unique Columns values, so we can easily fill in the :id later
indexed_inventory_objects[inventory_collection.unique_index_columns.map { |x| hash[x] }] = inventory_object
end

return if hashes.blank?

ActiveRecord::Base.connection.execute(
build_insert_query(inventory_collection, all_attribute_keys, hashes)
)
# TODO(lsmola) we need to do the mapping only if this IC has dependents/dependees
map_ids_to_inventory_objects(inventory_collection, indexed_inventory_objects, hashes)
end

def map_ids_to_inventory_objects(inventory_collection, indexed_inventory_objects, hashes)
inventory_collection.model_class.where(
build_multi_selection_query(inventory_collection, hashes)
).find_each do |inserted_record|
inventory_object = indexed_inventory_objects[inventory_collection.unique_index_columns.map { |x| inserted_record.public_send(x) }]
inventory_object.id = inserted_record.id if inventory_object
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def save!(inventory_collection, association)
inventory_objects_index[index] = inventory_object
end

unique_db_indexes = Set.new
unique_db_indexes = Set.new

inventory_collection_size = inventory_collection.size
deleted_counter = 0
Expand Down Expand Up @@ -66,12 +66,6 @@ def save!(inventory_collection, association)
"updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************")
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
true
end

def update_record!(inventory_collection, record, hash, inventory_object)
record.assign_attributes(hash.except(:id, :type))
record.save if !inventory_collection.check_changed? || record.changed?
Expand Down
Loading