Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize speed and stabilize the batch graph refresh memory usage #15897

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions app/models/manager_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -904,14 +904,6 @@ def fixed_foreign_keys
@fixed_foreign_keys_cache
end

def serializable_keys?(all_attribute_keys)
return @serializable_keys_cache unless @serializable_keys_cache.nil?

@serializable_keys_cache = all_attribute_keys.any? do |key|
model_class.type_for_attribute(key.to_s).respond_to?(:coder)
end
end

def base_class_name
return "" unless model_class

Expand Down
42 changes: 42 additions & 0 deletions app/models/manager_refresh/inventory_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,48 @@ def attributes(inventory_collection_scope = nil)
attributes_for_saving
end

def attributes_with_keys(inventory_collection_scope = nil, all_attribute_keys = [])
# We should explicitly pass a scope, since the inventory_object can be mapped to more InventoryCollections with
# different blacklist and whitelist. The generic code always passes a scope.
inventory_collection_scope ||= inventory_collection

attributes_for_saving = {}
# First transform the values
data.each do |key, value|
if !allowed?(inventory_collection_scope, key)
next
elsif loadable?(value) || inventory_collection_scope.association_to_foreign_key_mapping[key]
# Lets fill also the original data, so other InventoryObject referring to this attribute gets the right
# result
data[key] = value.load if value.respond_to?(:load)
if (foreign_key = inventory_collection_scope.association_to_foreign_key_mapping[key])
# We have an association to fill, lets fill also the :key, cause some other InventoryObject can refer to it
record_id = data[key].try(:id)
foreign_key_to_sym = foreign_key.to_sym
attributes_for_saving[foreign_key_to_sym] = record_id
all_attribute_keys << foreign_key_to_sym
if (foreign_type = inventory_collection_scope.association_to_foreign_type_mapping[key])
# If we have a polymorphic association, we need to also fill a base class name, but we want to nullify it
# if record_id is missing
base_class = data[key].try(:base_class_name) || data[key].class.try(:base_class).try(:name)
foreign_type_to_sym = foreign_type.to_sym
attributes_for_saving[foreign_type_to_sym] = record_id ? base_class : nil
all_attribute_keys << foreign_type_to_sym
end
else
# We have a normal attribute to fill
attributes_for_saving[key] = data[key]
all_attribute_keys << key
end
else
attributes_for_saving[key] = value
all_attribute_keys << key
end
end

attributes_for_saving
end

def assign_attributes(attributes)
attributes.each { |k, v| public_send("#{k}=", v) }
self
Expand Down
30 changes: 29 additions & 1 deletion app/models/manager_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@ def initialize(inventory_collection)
@batch_size = @pure_sql_records_fetching ? @batch_size_for_persisting : inventory_collection.batch_size
@record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key
@select_keys_indexes = @select_keys.each_with_object({}).with_index { |(key, obj), index| obj[key.to_s] = index }
@pg_types = @model_class.attribute_names.each_with_object({}) do |key, obj|
obj[key.to_sym] = inventory_collection.model_class.columns_hash[key]
.try(:sql_type_metadata)
.try(:instance_values)
.try(:[], "sql_type")
end

@serializable_keys = @model_class.attribute_names.each_with_object({}) do |key, obj|
attribute_type = @model_class.type_for_attribute(key.to_s)
pg_type = @pg_types[key.to_sym]

if inventory_collection.use_ar_object?
# When using AR object, lets make sure we type.serialize(value) every value, so we have a slow but always
# working way driven by a configuration
obj[key.to_sym] = attribute_type
elsif attribute_type.respond_to?(:coder) ||
attribute_type.type == :int4range ||
pg_type == "text[]" ||
pg_type == "character varying[]"
# Identify columns that needs to be encoded by type.serialize(value), it's a costy operations so lets do
# do it only for columns we need it for.
obj[key.to_sym] = attribute_type
end
end
end

def save_inventory_collection!
Expand All @@ -47,7 +71,7 @@ def save_inventory_collection!

attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes,
:primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes,
:batch_size, :batch_size_for_persisting, :model_class
:batch_size, :batch_size_for_persisting, :model_class, :serializable_keys, :pg_types

def save!(association)
attributes_index = {}
Expand Down Expand Up @@ -258,6 +282,10 @@ def supports_updated_at?
@supports_updated_at_cache
end

def serializable_keys?
@serializable_keys_bool_cache ||= !serializable_keys.blank?
end

def supports_remote_data_timestamp?(all_attribute_keys)
all_attribute_keys.include?(:remote_data_timestamp) # include? on Set is O(1)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ def save!(association)
all_attribute_keys = Set.new + inventory_collection.batch_extra_attributes

inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid
attributes = inventory_object.attributes_with_keys(inventory_collection, all_attribute_keys)
# TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection
index = unique_index_keys.map { |key| attributes[key].to_s }.join(inventory_collection.stringify_joiner)

# Interesting fact: not building attributes_index and using only inventory_objects_index doesn't do much
# of a difference, since the most objects inside are shared.
attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
all_attribute_keys.merge(attributes_index[index].keys)
end

all_attribute_keys << :created_at if supports_created_at?
Expand All @@ -65,8 +67,6 @@ def save!(association)

_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection.size} *************")

# TODO(lsmola) needed only because UPDATE FROM VALUES needs a specific PG typecasting, remove when fixed in PG
collect_pg_types!(all_attribute_keys)
update_or_destroy_records!(batch_iterator(association), inventory_objects_index, attributes_index, all_attribute_keys)

unless inventory_collection.custom_reconnect_block.nil?
Expand Down Expand Up @@ -102,9 +102,9 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index,
next unless assert_distinct_relation(primary_key_value)

# TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection
index = unique_index_keys_to_s.map { |attribute| record_key(record, attribute).to_s }.join(inventory_collection.stringify_joiner)
index = unique_index_keys_to_s.map { |key| record_key(record, key).to_s }.join(inventory_collection.stringify_joiner)
inventory_object = inventory_objects_index.delete(index)
hash = attributes_index[index]
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
Expand All @@ -119,13 +119,11 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index,

hash_for_update = if inventory_collection.use_ar_object?
record.assign_attributes(hash.except(:id, :type))
values_for_database(inventory_collection.model_class,
all_attribute_keys,
record.attributes.symbolize_keys)
elsif inventory_collection.serializable_keys?(all_attribute_keys)
values_for_database(inventory_collection.model_class,
all_attribute_keys,
hash)
values_for_database!(all_attribute_keys,
record.attributes.symbolize_keys)
elsif serializable_keys?
values_for_database!(all_attribute_keys,
hash)
else
hash
end
Expand Down Expand Up @@ -201,13 +199,11 @@ def create_records!(all_attribute_keys, batch, attributes_index)
batch.each do |index, inventory_object|
hash = if inventory_collection.use_ar_object?
record = inventory_collection.model_class.new(attributes_index.delete(index))
values_for_database(inventory_collection.model_class,
all_attribute_keys,
record.attributes.symbolize_keys)
elsif inventory_collection.serializable_keys?(all_attribute_keys)
values_for_database(inventory_collection.model_class,
all_attribute_keys,
attributes_index.delete(index))
values_for_database!(all_attribute_keys,
record.attributes.symbolize_keys)
elsif serializable_keys?
values_for_database!(all_attribute_keys,
attributes_index.delete(index))
else
attributes_index.delete(index)
end
Expand All @@ -233,12 +229,13 @@ def create_records!(all_attribute_keys, batch, attributes_index)
end
end

def values_for_database(model_class, all_attribute_keys, attributes)
all_attribute_keys.each_with_object({}) do |attribute_name, db_values|
type = model_class.type_for_attribute(attribute_name.to_s)
raw_val = attributes[attribute_name]
db_values[attribute_name] = type.type == :boolean ? type.cast(raw_val) : type.serialize(raw_val)
def values_for_database!(all_attribute_keys, attributes)
all_attribute_keys.each do |key|
if (type = serializable_keys[key])
attributes[key] = type.serialize(attributes[key])
end
end
attributes
end

def map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, hashes, result)
Expand Down
16 changes: 1 addition & 15 deletions app/models/manager_refresh/save_collection/saver/sql_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def quote(connection, value, name = nil, type_cast_for_pg = nil)
def quote_and_pg_type_cast(connection, value, name)
pg_type_cast(
connection.quote(value),
pg_type(name)
pg_types[name]
)
end

Expand All @@ -138,20 +138,6 @@ def pg_type_cast(value, sql_type)
"#{value}::#{sql_type}"
end
end

def pg_type(name)
@pg_types_cache[name]
end

def collect_pg_types!(all_attribute_keys)
@pg_types_cache = {}
all_attribute_keys.each do |key|
@pg_types_cache[key] = inventory_collection.model_class.columns_hash[key.to_s]
.try(:sql_type_metadata)
.try(:instance_values)
.try(:[], "sql_type")
end
end
end
end
end