Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First version of targeted concurrent safe Persistor strategy #15227

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions app/models/manager_refresh/inventory/persister.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ def to_raw_data
next if collection.data.blank?

{
:name => key,
:unique_uuids => [], # TODO(lsmola) allow to set a scope, so we can say it's a complete set of data
:data => collection.to_raw_data
:name => key,
:manager_uuids => collection.manager_uuids,
:data => collection.to_raw_data
}
end.compact

Expand Down Expand Up @@ -192,6 +192,7 @@ def from_raw_data(persister_data)
inventory_collection = persister.collections[collection['name'].try(:to_sym)]
raise "Unrecognized InventoryCollection name: #{inventory_collection}" if inventory_collection.blank?

inventory_collection.manager_uuids.merge(collection['manager_uuids'] || [])
inventory_collection.from_raw_data(collection['data'], persister.collections)
end
persister
Expand Down
55 changes: 50 additions & 5 deletions app/models/manager_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class InventoryCollection
:internal_attributes, :delete_method, :data, :data_index, :dependency_attributes, :manager_ref,
:association, :complete, :update_only, :transitive_dependency_attributes, :custom_manager_uuid,
:custom_db_finder, :check_changed, :arel, :builder_params, :loaded_references, :db_data_index,
:inventory_object_attributes, :name, :parent_inventory_collections, :manager_uuids,
:inventory_object_attributes, :name, :saver_strategy, :parent_inventory_collections, :manager_uuids,
:skeletal_manager_uuids, :targeted_arel, :targeted, :manager_ref_allowed_nil

delegate :each, :size, :to => :to_a
Expand Down Expand Up @@ -286,6 +286,15 @@ class InventoryCollection
# one.
# @param name [Symbol] A unique name of the InventoryCollection under a Persister. If not provided, the :association
# attribute is used. Providing either :name or :association is mandatory.
# @param saver_strategy [Symbol] A strategy that will be used for InventoryCollection persisting into the DB.
# Allowed saver strategies are:
# - :default => Using Rails saving methods, this way is not safe to run in multiple workers concurrently,
# since it will lead to non consistent data.
# - :concurrent_safe => This method is designed for concurrent saving. It uses atomic upsert to avoid
# data duplication and it uses timestamp based atomic checks to avoid new data being overwritten by the
# the old data.
# - :concurrent_safe_batch => Same as :concurrent_safe, but the upsert/update queries are executed as
# batched SQL queries, instead of sending 1 query per record.
# @param parent_inventory_collections [Array] Array of symbols having a name of the
# ManagerRefresh::InventoryCollection objects, that serve as parents to this InventoryCollection. Then this
# InventoryCollection completeness will be encapsulated by the parent_inventory_collections :manager_uuids
Expand Down Expand Up @@ -323,7 +332,7 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil
custom_save_block: nil, delete_method: nil, data_index: nil, data: nil, dependency_attributes: nil,
attributes_blacklist: nil, attributes_whitelist: nil, complete: nil, update_only: nil,
check_changed: nil, custom_manager_uuid: nil, custom_db_finder: nil, arel: nil, builder_params: {},
inventory_object_attributes: nil, unique_index_columns: nil, name: nil,
inventory_object_attributes: nil, unique_index_columns: nil, name: nil, saver_strategy: nil,
parent_inventory_collections: nil, manager_uuids: [], targeted_arel: nil, targeted: nil,
manager_ref_allowed_nil: nil)
@model_class = model_class
Expand All @@ -347,13 +356,14 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil
@builder_params = builder_params
@unique_index_columns = unique_index_columns
@name = name || association
@saver_strategy = process_saver_strategy(saver_strategy)

@manager_ref_allowed_nil = manager_ref_allowed_nil || []

# Targeted mode related attributes
@manager_uuids = Set.new.merge(manager_uuids)
@parent_inventory_collections = parent_inventory_collections
@skeletal_manager_uuids = Set.new
@skeletal_manager_uuids = Set.new.merge(manager_uuids)
@targeted_arel = targeted_arel
@targeted = !!targeted

Expand Down Expand Up @@ -427,6 +437,18 @@ def to_raw_data
end
end

def process_saver_strategy(saver_strategy)
return :default unless saver_strategy

case saver_strategy
when :default, :concurrent_safe, :concurrent_safe_batch
saver_strategy
else
raise "Unknown InventoryCollection saver strategy: :#{saver_strategy}, allowed strategies are "\
":default, :concurrent_safe and :concurrent_safe_batch"
end
end

def process_strategy(strategy_name)
return unless strategy_name

Expand Down Expand Up @@ -496,8 +518,8 @@ def unique_index_columns
end

if unique_indexes.blank?
raise "#{self} and its table #{model_class.table_name} must have a unique index defined, in order to use"\
" strategy :stream_data."
raise "#{self} and its table #{model_class.table_name} must have a unique index defined, in order to use "\
"saver_strategy :concurrent_safe or :concurrent_safe_batch."
end
@unique_index_columns = unique_indexes.first.columns
end
Expand Down Expand Up @@ -774,6 +796,11 @@ def inspect
def scan!(indexed_inventory_collections)
data.each do |inventory_object|
scan_inventory_object!(inventory_object)

if targeted? && inventory_object.inventory_collection.parent_inventory_collections.blank?
# We want to track what manager_uuids we should query from a db, for the targeted refresh
manager_uuids << inventory_object.manager_uuid
end
end

if targeted? && parent_inventory_collections.present?
Expand Down Expand Up @@ -945,6 +972,24 @@ def scan_inventory_object_attribute!(key, value)
# Storing attributes and their dependencies
(dependency_attributes[key] ||= Set.new) << value.inventory_collection if value.dependency?

# For concurent safe strategies, we want to pre-build the relations using the lazy_link data, so we can fill up
# the foreign key in first pass.
if [:concurrent_safe, :concurrent_safe_batch].include?(saver_strategy)
if value.inventory_collection.manager_ref.size == 1 && inventory_object_lazy?(value) &&
!value.ems_ref.blank? && value.key.nil? && value.dependency?
# Instead of loading the reference from the DB, we'll add the dummy InventoryObject (having only ems_ref and
# info from the builder_params) to the correct InventoryCollection. Which will either be found in the DB or
# created as a small dummy object. The refresh of the object will then fill the rest of the data, while not
# touching the reference.

# TODO(lsmola) solve the :key, since that requires data from the actual reference. At best our DB should be
# designed the way, we don't duplicate the data, but rather get them with a join. (3NF!)

value.inventory_collection.find_or_build(value.ems_ref)
value.inventory_collection.skeletal_manager_uuids
end
end

# Storing a reference in the target inventory_collection, then each IC knows about all the references and can
# e.g. load all the referenced uuids from a DB
value.inventory_collection.references << value.to_s
Expand Down
3 changes: 2 additions & 1 deletion app/models/manager_refresh/save_collection/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def save_inventory_object_inventory(ems, inventory_collection)
private

def save_inventory(inventory_collection)
ManagerRefresh::SaveCollection::Saver::Default.new(inventory_collection).save_inventory_collection!
saver_class = "ManagerRefresh::SaveCollection::Saver::#{inventory_collection.saver_strategy.to_s.camelize}"
saver_class.constantize.new(inventory_collection).save_inventory_collection!
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions app/models/manager_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ def initialize(inventory_collection)
def save_inventory_collection!
# If we have not data to save and delete is not allowed, we can just skip
return if inventory_collection.data.blank? && !inventory_collection.delete_allowed?
# If we have a targeted InventoryCollection that wouldn't do anything
return if inventory_collection.targeted? && inventory_collection.manager_uuids.blank? &&
inventory_collection.skeletal_manager_uuids.blank? &&
inventory_collection.parent_inventory_collections.blank? &&
inventory_collection.custom_save_block.nil?

# TODO(lsmola) do I need to reload every time? Also it should be enough to clear the associations.
inventory_collection.parent.reload if inventory_collection.parent
Expand Down
120 changes: 120 additions & 0 deletions app/models/manager_refresh/save_collection/saver/concurrent_safe.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
module ManagerRefresh::SaveCollection
module Saver
class ConcurrentSafe < ManagerRefresh::SaveCollection::Saver::Base
private

def save!(inventory_collection, association)
attributes_index = {}
inventory_objects_index = {}
inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid

attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
end

inventory_collection_size = inventory_collection.size
deleted_counter = 0
created_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
# Records that are in the DB, we will be updating or deleting them.
association.find_in_batches do |batch|
ActiveRecord::Base.transaction do
batch.each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
# TODO(lsmola) do a transaction for a batches of deletion
deleted_counter += 1 if delete_record!(inventory_collection, record)
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object)
end
end
end
end

# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
inventory_objects_index.each_slice(1000) do |batch|
ActiveRecord::Base.transaction do
batch.each do |index, inventory_object|
hash = attributes_index.delete(index)
create_record!(inventory_collection, hash, inventory_object)
created_counter += 1
end
end
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************")
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
true
end

def update_record!(inventory_collection, record, hash, inventory_object)
record.assign_attributes(hash.except(:id, :type))

# TODO(lsmola) ignore all N:M relations, since we use pure SQL, all N:M needs to be modeled as a separate IC, or
# can we process those automatically? Using a convention? But still, it needs to be a separate IC, to have
# efficient saving.
hash.reject! { |_key, value| value.kind_of?(Array) }

if !inventory_object.inventory_collection.check_changed? || record.changed?
update_query = inventory_object.inventory_collection.model_class.where(:id => record.id)
if hash[:remote_data_timestamp]
timestamp_field = inventory_collection.model_class.arel_table[:remote_data_timestamp]
update_query = update_query.where(timestamp_field.lt(hash[:remote_data_timestamp]))
end

update_query.update_all(hash)
end

inventory_object.id = record.id
end

def create_record!(inventory_collection, hash, inventory_object)
return unless assert_referential_integrity(hash, inventory_object)

hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].blank?
table_name = inventory_object.inventory_collection.model_class.table_name
insert_query = %{
INSERT INTO #{table_name} (#{hash.keys.join(", ")})
VALUES
(
#{hash.values.map { |x| ActiveRecord::Base.connection.quote(x) }.join(", ")}
)
ON CONFLICT (#{inventory_object.inventory_collection.unique_index_columns.join(", ")})
DO
UPDATE
SET #{hash.keys.map { |x| "#{x} = EXCLUDED.#{x}" }.join(", ")}
}
# TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change
# the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of
# changing ems_id could lead to putting it back by a refresh.

# This conditional will avoid rewriting new data by old data. But we want it only when remote_data_timestamp is a
# part of the data, since for the fake records, we just want to update ems_ref.
if hash[:remote_data_timestamp].present?
insert_query += %{
WHERE EXCLUDED.remote_data_timestamp IS NULL OR (EXCLUDED.remote_data_timestamp > #{table_name}.remote_data_timestamp)
}
end
result_id = ActiveRecord::Base.connection.insert_sql(insert_query)
inventory_object.id = result_id
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module ManagerRefresh::SaveCollection
module Saver
class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base
private

def save!(_inventory_collection, _association)
raise "saver_strategy :concurent_safe_batch is not implemented"
end
end
end
end