Skip to content

Commit

Permalink
Merge pull request #15227 from Ladas/first_version_of_targeted_concur…
Browse files Browse the repository at this point in the history
…rent_safe_strategy

First version of targeted concurrent safe Persistor strategy
  • Loading branch information
agrare committed May 29, 2017
2 parents 2d23056 + 860a8c6 commit d2a5576
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 9 deletions.
7 changes: 4 additions & 3 deletions app/models/manager_refresh/inventory/persister.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ def to_raw_data
next if collection.data.blank?

{
:name => key,
:unique_uuids => [], # TODO(lsmola) allow to set a scope, so we can say it's a complete set of data
:data => collection.to_raw_data
:name => key,
:manager_uuids => collection.manager_uuids,
:data => collection.to_raw_data
}
end.compact

Expand Down Expand Up @@ -192,6 +192,7 @@ def from_raw_data(persister_data)
inventory_collection = persister.collections[collection['name'].try(:to_sym)]
raise "Unrecognized InventoryCollection name: #{inventory_collection}" if inventory_collection.blank?

inventory_collection.manager_uuids.merge(collection['manager_uuids'] || [])
inventory_collection.from_raw_data(collection['data'], persister.collections)
end
persister
Expand Down
55 changes: 50 additions & 5 deletions app/models/manager_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class InventoryCollection
:internal_attributes, :delete_method, :data, :data_index, :dependency_attributes, :manager_ref,
:association, :complete, :update_only, :transitive_dependency_attributes, :custom_manager_uuid,
:custom_db_finder, :check_changed, :arel, :builder_params, :loaded_references, :db_data_index,
:inventory_object_attributes, :name, :parent_inventory_collections, :manager_uuids,
:inventory_object_attributes, :name, :saver_strategy, :parent_inventory_collections, :manager_uuids,
:skeletal_manager_uuids, :targeted_arel, :targeted, :manager_ref_allowed_nil

delegate :each, :size, :to => :to_a
Expand Down Expand Up @@ -286,6 +286,15 @@ class InventoryCollection
# one.
# @param name [Symbol] A unique name of the InventoryCollection under a Persister. If not provided, the :association
# attribute is used. Providing either :name or :association is mandatory.
# @param saver_strategy [Symbol] A strategy that will be used for InventoryCollection persisting into the DB.
# Allowed saver strategies are:
# - :default => Using Rails saving methods, this way is not safe to run in multiple workers concurrently,
# since it will lead to non consistent data.
# - :concurrent_safe => This method is designed for concurrent saving. It uses atomic upsert to avoid
# data duplication and it uses timestamp based atomic checks to avoid new data being overwritten by the
# the old data.
# - :concurrent_safe_batch => Same as :concurrent_safe, but the upsert/update queries are executed as
# batched SQL queries, instead of sending 1 query per record.
# @param parent_inventory_collections [Array] Array of symbols having a name of the
# ManagerRefresh::InventoryCollection objects, that serve as parents to this InventoryCollection. Then this
# InventoryCollection completeness will be encapsulated by the parent_inventory_collections :manager_uuids
Expand Down Expand Up @@ -323,7 +332,7 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil
custom_save_block: nil, delete_method: nil, data_index: nil, data: nil, dependency_attributes: nil,
attributes_blacklist: nil, attributes_whitelist: nil, complete: nil, update_only: nil,
check_changed: nil, custom_manager_uuid: nil, custom_db_finder: nil, arel: nil, builder_params: {},
inventory_object_attributes: nil, unique_index_columns: nil, name: nil,
inventory_object_attributes: nil, unique_index_columns: nil, name: nil, saver_strategy: nil,
parent_inventory_collections: nil, manager_uuids: [], targeted_arel: nil, targeted: nil,
manager_ref_allowed_nil: nil)
@model_class = model_class
Expand All @@ -347,13 +356,14 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil
@builder_params = builder_params
@unique_index_columns = unique_index_columns
@name = name || association
@saver_strategy = process_saver_strategy(saver_strategy)

@manager_ref_allowed_nil = manager_ref_allowed_nil || []

# Targeted mode related attributes
@manager_uuids = Set.new.merge(manager_uuids)
@parent_inventory_collections = parent_inventory_collections
@skeletal_manager_uuids = Set.new
@skeletal_manager_uuids = Set.new.merge(manager_uuids)
@targeted_arel = targeted_arel
@targeted = !!targeted

Expand Down Expand Up @@ -427,6 +437,18 @@ def to_raw_data
end
end

def process_saver_strategy(saver_strategy)
return :default unless saver_strategy

case saver_strategy
when :default, :concurrent_safe, :concurrent_safe_batch
saver_strategy
else
raise "Unknown InventoryCollection saver strategy: :#{saver_strategy}, allowed strategies are "\
":default, :concurrent_safe and :concurrent_safe_batch"
end
end

def process_strategy(strategy_name)
return unless strategy_name

Expand Down Expand Up @@ -496,8 +518,8 @@ def unique_index_columns
end

if unique_indexes.blank?
raise "#{self} and its table #{model_class.table_name} must have a unique index defined, in order to use"\
" strategy :stream_data."
raise "#{self} and its table #{model_class.table_name} must have a unique index defined, in order to use "\
"saver_strategy :concurrent_safe or :concurrent_safe_batch."
end
@unique_index_columns = unique_indexes.first.columns
end
Expand Down Expand Up @@ -774,6 +796,11 @@ def inspect
def scan!(indexed_inventory_collections)
data.each do |inventory_object|
scan_inventory_object!(inventory_object)

if targeted? && inventory_object.inventory_collection.parent_inventory_collections.blank?
# We want to track what manager_uuids we should query from a db, for the targeted refresh
manager_uuids << inventory_object.manager_uuid
end
end

if targeted? && parent_inventory_collections.present?
Expand Down Expand Up @@ -945,6 +972,24 @@ def scan_inventory_object_attribute!(key, value)
# Storing attributes and their dependencies
(dependency_attributes[key] ||= Set.new) << value.inventory_collection if value.dependency?

# For concurent safe strategies, we want to pre-build the relations using the lazy_link data, so we can fill up
# the foreign key in first pass.
if [:concurrent_safe, :concurrent_safe_batch].include?(saver_strategy)
if value.inventory_collection.manager_ref.size == 1 && inventory_object_lazy?(value) &&
!value.ems_ref.blank? && value.key.nil? && value.dependency?
# Instead of loading the reference from the DB, we'll add the dummy InventoryObject (having only ems_ref and
# info from the builder_params) to the correct InventoryCollection. Which will either be found in the DB or
# created as a small dummy object. The refresh of the object will then fill the rest of the data, while not
# touching the reference.

# TODO(lsmola) solve the :key, since that requires data from the actual reference. At best our DB should be
# designed the way, we don't duplicate the data, but rather get them with a join. (3NF!)

value.inventory_collection.find_or_build(value.ems_ref)
value.inventory_collection.skeletal_manager_uuids
end
end

# Storing a reference in the target inventory_collection, then each IC knows about all the references and can
# e.g. load all the referenced uuids from a DB
value.inventory_collection.references << value.to_s
Expand Down
3 changes: 2 additions & 1 deletion app/models/manager_refresh/save_collection/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def save_inventory_object_inventory(ems, inventory_collection)
private

def save_inventory(inventory_collection)
ManagerRefresh::SaveCollection::Saver::Default.new(inventory_collection).save_inventory_collection!
saver_class = "ManagerRefresh::SaveCollection::Saver::#{inventory_collection.saver_strategy.to_s.camelize}"
saver_class.constantize.new(inventory_collection).save_inventory_collection!
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions app/models/manager_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ def initialize(inventory_collection)
def save_inventory_collection!
# If we have not data to save and delete is not allowed, we can just skip
return if inventory_collection.data.blank? && !inventory_collection.delete_allowed?
# If we have a targeted InventoryCollection that wouldn't do anything
return if inventory_collection.targeted? && inventory_collection.manager_uuids.blank? &&
inventory_collection.skeletal_manager_uuids.blank? &&
inventory_collection.parent_inventory_collections.blank? &&
inventory_collection.custom_save_block.nil?

# TODO(lsmola) do I need to reload every time? Also it should be enough to clear the associations.
inventory_collection.parent.reload if inventory_collection.parent
Expand Down
120 changes: 120 additions & 0 deletions app/models/manager_refresh/save_collection/saver/concurrent_safe.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
module ManagerRefresh::SaveCollection
module Saver
class ConcurrentSafe < ManagerRefresh::SaveCollection::Saver::Base
private

def save!(inventory_collection, association)
attributes_index = {}
inventory_objects_index = {}
inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid

attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
end

inventory_collection_size = inventory_collection.size
deleted_counter = 0
created_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
# Records that are in the DB, we will be updating or deleting them.
association.find_in_batches do |batch|
ActiveRecord::Base.transaction do
batch.each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
# TODO(lsmola) do a transaction for a batches of deletion
deleted_counter += 1 if delete_record!(inventory_collection, record)
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object)
end
end
end
end

# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
inventory_objects_index.each_slice(1000) do |batch|
ActiveRecord::Base.transaction do
batch.each do |index, inventory_object|
hash = attributes_index.delete(index)
create_record!(inventory_collection, hash, inventory_object)
created_counter += 1
end
end
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************")
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
true
end

def update_record!(inventory_collection, record, hash, inventory_object)
record.assign_attributes(hash.except(:id, :type))

# TODO(lsmola) ignore all N:M relations, since we use pure SQL, all N:M needs to be modeled as a separate IC, or
# can we process those automatically? Using a convention? But still, it needs to be a separate IC, to have
# efficient saving.
hash.reject! { |_key, value| value.kind_of?(Array) }

if !inventory_object.inventory_collection.check_changed? || record.changed?
update_query = inventory_object.inventory_collection.model_class.where(:id => record.id)
if hash[:remote_data_timestamp]
timestamp_field = inventory_collection.model_class.arel_table[:remote_data_timestamp]
update_query = update_query.where(timestamp_field.lt(hash[:remote_data_timestamp]))
end

update_query.update_all(hash)
end

inventory_object.id = record.id
end

def create_record!(inventory_collection, hash, inventory_object)
return unless assert_referential_integrity(hash, inventory_object)

hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].blank?
table_name = inventory_object.inventory_collection.model_class.table_name
insert_query = %{
INSERT INTO #{table_name} (#{hash.keys.join(", ")})
VALUES
(
#{hash.values.map { |x| ActiveRecord::Base.connection.quote(x) }.join(", ")}
)
ON CONFLICT (#{inventory_object.inventory_collection.unique_index_columns.join(", ")})
DO
UPDATE
SET #{hash.keys.map { |x| "#{x} = EXCLUDED.#{x}" }.join(", ")}
}
# TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change
# the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of
# changing ems_id could lead to putting it back by a refresh.

# This conditional will avoid rewriting new data by old data. But we want it only when remote_data_timestamp is a
# part of the data, since for the fake records, we just want to update ems_ref.
if hash[:remote_data_timestamp].present?
insert_query += %{
WHERE EXCLUDED.remote_data_timestamp IS NULL OR (EXCLUDED.remote_data_timestamp > #{table_name}.remote_data_timestamp)
}
end
result_id = ActiveRecord::Base.connection.insert_sql(insert_query)
inventory_object.id = result_id
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module ManagerRefresh::SaveCollection
module Saver
class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base
private

def save!(_inventory_collection, _association)
raise "saver_strategy :concurent_safe_batch is not implemented"
end
end
end
end

0 comments on commit d2a5576

Please sign in to comment.