Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the new schema_migrations_ran table to track remote schema migrations #18393

Merged
merged 3 commits into from
Jan 29, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 34 additions & 45 deletions lib/extensions/ar_migration.rb
Original file line number Diff line number Diff line change
@@ -1,48 +1,36 @@
module ArPglogicalMigration
module PglogicalMigrationHelper
def self.migrations_column_present?
ActiveRecord::Base.connection.columns("miq_regions").any? { |c| c.name == "migrations_ran" }
end

def self.my_region_number
# Use ApplicationRecord here because we need to query region information
@my_region_number ||= ApplicationRecord.my_region_number
end

def self.my_region_created?
ActiveRecord::Base.connection.exec_query(<<~SQL).first["exists"]
SELECT EXISTS(
SELECT id FROM miq_regions WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)}
)
SQL
module ArPglogicalMigrationHelper
def self.discover_schema_migrations_ran_class
return unless ActiveRecord::Base.connection.table_exists?("schema_migrations_ran")
Class.new(ActiveRecord::Base) do
require 'active_record-id_regions'
include ActiveRecord::IdRegions
self.table_name = "schema_migrations_ran"
default_scope { in_my_region }
end
end

def self.update_local_migrations_ran(version, direction)
return unless migrations_column_present?
return unless my_region_created?
def self.update_local_migrations_ran(version, direction)
return unless schema_migrations_ran_class = discover_schema_migrations_ran_class

new_migrations = ActiveRecord::SchemaMigration.normalized_versions
new_migrations << version if direction == :up
migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations))
new_migrations = ActiveRecord::SchemaMigration.normalized_versions
new_migrations << version if direction == :up

ActiveRecord::Base.connection.exec_query(<<~SQL)
UPDATE miq_regions
SET migrations_ran = #{migrations_value}
WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)}
SQL
(new_migrations - schema_migrations_ran_class.pluck(:version)).each do |v|
bdunne marked this conversation as resolved.
Show resolved Hide resolved
schema_migrations_ran_class.find_or_create_by(:version => v)
end

schema_migrations_ran_class.where(:version => version).delete_all if direction == :down
end

class RemoteRegionMigrationWatcher
class HelperARClass < ActiveRecord::Base; end
class SubscriptionHelper < ActiveRecord::Base; end

attr_reader :region, :subscription, :version
attr_reader :subscription, :version, :schema_migrations_ran_class

def initialize(subscription, version)
region_class = Class.new(ActiveRecord::Base) { self.table_name = "miq_regions" }
@region = region_class.find_by(:region => subscription.provider_region)
@subscription = subscription
@version = version
@schema_migrations_ran_class = ArPglogicalMigrationHelper.discover_schema_migrations_ran_class
@subscription = subscription
@version = version
end

def wait_for_remote_region_migration(wait_time = 1)
carbonin marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -55,42 +43,43 @@ def wait_for_remote_region_migration(wait_time = 1)
print(".")
restart_subscription
sleep(wait_time)
region.reload
end

puts("\n")
end

private

def wait_for_migration?
migrations_column_present? ? !region.migrations_ran&.include?(version) : false
def region_number
subscription.provider_region
end

def migrations_column_present?
@migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present?
def wait_for_migration?
return false unless schema_migrations_ran_class
!schema_migrations_ran_class.unscoped.in_region(region_number).where(:version => version).exists?
bdunne marked this conversation as resolved.
Show resolved Hide resolved
end

def wait_message
@wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}"
@wait_message ||= "Waiting for remote region #{region_number} to run migration #{version}"
end

def restart_subscription
c = HelperARClass.establish_connection.connection
c = SubscriptionHelper.establish_connection.connection
c.pglogical.subscription_disable(subscription.id)
c.pglogical.subscription_enable(subscription.id)
ensure
HelperARClass.remove_connection
SubscriptionHelper.remove_connection
end
end
end

module ArPglogicalMigration
def migrate(direction)
PglogicalSubscription.all.each do |s|
RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
ArPglogicalMigrationHelper::RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
end

ret = super
PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ArPglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ret
end
end
Expand Down