Skip to content

Commit

Permalink
Use the new schema_migrations_ran table
Browse files Browse the repository at this point in the history
  • Loading branch information
bdunne committed Jan 28, 2019
1 parent 7b0a212 commit 25536cf
Showing 1 changed file with 33 additions and 45 deletions.
78 changes: 33 additions & 45 deletions lib/extensions/ar_migration.rb
Original file line number Diff line number Diff line change
@@ -1,48 +1,35 @@
module ArPglogicalMigration
module PglogicalMigrationHelper
def self.migrations_column_present?
ActiveRecord::Base.connection.columns("miq_regions").any? { |c| c.name == "migrations_ran" }
end

def self.my_region_number
# Use ApplicationRecord here because we need to query region information
@my_region_number ||= ApplicationRecord.my_region_number
end

def self.my_region_created?
ActiveRecord::Base.connection.exec_query(<<~SQL).first["exists"]
SELECT EXISTS(
SELECT id FROM miq_regions WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)}
)
SQL
module ArPglogicalMigrationHelper
def self.discover_schema_migrations_ran_class
return unless ActiveRecord::Base.connection.table_exists?("schema_migrations_ran")
Class.new(ActiveRecord::Base) do
require 'active_record-id_regions'
include ActiveRecord::IdRegions
self.table_name = "schema_migrations_ran"
end
end

def self.update_local_migrations_ran(version, direction)
return unless migrations_column_present?
return unless my_region_created?
def self.update_local_migrations_ran(version, direction)
return unless schema_migrations_ran_class = discover_schema_migrations_ran_class

new_migrations = ActiveRecord::SchemaMigration.normalized_versions
new_migrations << version if direction == :up
migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations))
new_migrations = ActiveRecord::SchemaMigration.normalized_versions
new_migrations << version if direction == :up

ActiveRecord::Base.connection.exec_query(<<~SQL)
UPDATE miq_regions
SET migrations_ran = #{migrations_value}
WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)}
SQL
(new_migrations - schema_migrations_ran_class.pluck(:version)).each do |v|
schema_migrations_ran_class.find_or_create_by(:version => v)
end

schema_migrations_ran_class.in_my_region.where(:version => version).delete_all if direction == :down
end

class RemoteRegionMigrationWatcher
class HelperARClass < ActiveRecord::Base; end
class SubscriptionHelper < ActiveRecord::Base; end

attr_reader :region, :subscription, :version
attr_reader :subscription, :version, :schema_migrations_ran_class

def initialize(subscription, version)
region_class = Class.new(ActiveRecord::Base) { self.table_name = "miq_regions" }
@region = region_class.find_by(:region => subscription.provider_region)
@subscription = subscription
@version = version
@schema_migrations_ran_class = ArPglogicalMigrationHelper.discover_schema_migrations_ran_class
@subscription = subscription
@version = version
end

def wait_for_remote_region_migration(wait_time = 1)
Expand All @@ -55,42 +42,43 @@ def wait_for_remote_region_migration(wait_time = 1)
print(".")
restart_subscription
sleep(wait_time)
region.reload
end

puts("\n")
end

private

def wait_for_migration?
migrations_column_present? ? !region.migrations_ran&.include?(version) : false
def region_number
subscription.provider_region
end

def migrations_column_present?
@migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present?
def wait_for_migration?
return false unless schema_migrations_ran_class
!schema_migrations_ran_class.in_region(region_number).where(:version => version).exists?
end

def wait_message
@wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}"
@wait_message ||= "Waiting for remote region #{region_number} to run migration #{version}"
end

def restart_subscription
c = HelperARClass.establish_connection.connection
c = SubscriptionHelper.establish_connection.connection
c.pglogical.subscription_disable(subscription.id)
c.pglogical.subscription_enable(subscription.id)
ensure
HelperARClass.remove_connection
SubscriptionHelper.remove_connection
end
end
end

module ArPglogicalMigration
def migrate(direction)
PglogicalSubscription.all.each do |s|
RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
ArPglogicalMigrationHelper::RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
end

ret = super
PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ArPglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ret
end
end
Expand Down

0 comments on commit 25536cf

Please sign in to comment.