Skip to content

Commit

Permalink
Re-work the migration checking logic
Browse files Browse the repository at this point in the history
- Before migrating the global, wait for all remotes to run the mogration (check the remote schema_migrations table)
- Run the migration on the global
- After migrating the global, wait for the remote region record to include the migration. (verifying that replication is working)
  • Loading branch information
bdunne committed Jan 25, 2019
1 parent 05d6619 commit 920e018
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 67 deletions.
14 changes: 7 additions & 7 deletions app/models/pglogical_subscription.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ def backlog
nil
end

def with_remote_connection
find_password
MiqRegionRemote.with_remote_connection(host, port || 5432, user, decrypted_password, dbname, "postgresql") do |conn|
yield conn
end
end

# translate the output from the pglogical stored proc to our object columns
def self.subscription_to_columns(sub)
cols = sub.symbolize_keys
Expand Down Expand Up @@ -256,11 +263,4 @@ def remote_replication_lsn
def remote_node_lsn
with_remote_connection(&:xlog_location)
end

def with_remote_connection
find_password
MiqRegionRemote.with_remote_connection(host, port || 5432, user, decrypted_password, dbname, "postgresql") do |conn|
yield conn
end
end
end
52 changes: 30 additions & 22 deletions lib/extensions/ar_migration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,48 @@ def initialize(subscription, version)
end

def wait_for_remote_region_migration(wait_time = 1)
return unless wait_for_migration?
return if version_exists_on_remote_region_schema_migrations?

# Sit in a loop waiting for the remote schema_migrations table to include the version
wait_message = "Waiting for remote region #{region.region} to run migration #{version}"
Vmdb.rails_logger.info(wait_message)
print(wait_message)

while wait_for_migration?
until version_exists_on_remote_region_schema_migrations? do
print(".")
restart_subscription
sleep(wait_time)
region.reload
end

puts("\n")
region.reload
end

private
def wait_for_remote_migration_to_replicate(wait_time = 1)
# If !migrations_column_present?, we're not sure if we're replicating and can't tell, don't wait
return unless migrations_column_present?
return if region_record_includes_version?

def wait_for_migration?
migrations_column_present? ? !remote_region_migrated? : false
end
# Sit in a loop waiting for the remote schema_migrations table to include the version
wait_message = "Waiting for region #{region.region} record to include migration #{version}"
Vmdb.rails_logger.info(wait_message)
print(wait_message)

until region_record_includes_version? do
restart_subscription
print(".")
sleep(wait_time)
end

# NOTE: Check the local table first, but a migration can cause MiqRegion replication to stop (i.e. adding a new column to miq_regions).
# In that case, the global MiqRegion record for the remote region will not have the migration timestamp yet, so we need to check the
# schema_migrations table in the remote region directly.
def remote_region_migrated?
version_exists_in_replicated_region? || version_exists_on_remote_region?
puts("\n")
end

def version_exists_in_replicated_region?
region.migrations_ran&.include?(version)
private

def region_record_includes_version?
region.reload.migrations_ran&.include?(version)
end

def version_exists_on_remote_region?
def version_exists_on_remote_region_schema_migrations?
subscription.send(:with_remote_connection) do |connection|
connection.select_values("SELECT 1 FROM schema_migrations WHERE version = '#{version}' LIMIT 1")
end.any?
Expand All @@ -88,10 +97,6 @@ def migrations_column_present?
@migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present?
end

def wait_message
@wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}"
end

def restart_subscription
c = HelperARClass.establish_connection.connection
c.pglogical.subscription_disable(subscription.id)
Expand All @@ -102,12 +107,15 @@ def restart_subscription
end

def migrate(direction)
PglogicalSubscription.all.each do |s|
RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
subscriptions = PglogicalSubscription.all.collect do |s|
RemoteRegionMigrationWatcher.new(s, version.to_s).tap(&:wait_for_remote_region_migration)
end

ret = super
PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)

subscriptions.each { |s| s.wait_for_remote_migration_to_replicate }

ret
end
end
Expand Down
76 changes: 38 additions & 38 deletions spec/lib/extensions/ar_migration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,61 +70,61 @@
end
end

describe "#wait_for_remote_region_migrations" do
describe "#wait_for_remote_region_migrations sleeps until the migration is in the remote regions schema_migrations table" do
it "does not enter the loop if the version already exists" do
expect(Vmdb.rails_logger).not_to receive(:info)
expect(subject).not_to receive(:print)
expect(subscription).to receive(:with_remote_connection).and_return([version])

subject.wait_for_remote_region_migration(0)
end

it "enters the loop if the version does not exist" do
expect(subject).to receive(:print).with(".").twice
expect(subscription).to receive(:with_remote_connection).and_return([], [], [], [version])

subject.wait_for_remote_region_migration(0)
end
end

describe "#wait_for_remote_migration_to_replicate" do
context "without the migrations ran column present" do
include_context "without the migrations ran column"

it "does nothing" do
expect(Vmdb.rails_logger).not_to receive(:info)
subject.wait_for_remote_region_migration
subject.wait_for_remote_migration_to_replicate
end
end

it "sleeps until the migration is replicated up" do
allow(subject).to receive(:restart_subscription)
allow(subject.region).to receive(:reload)
allow(subscription).to receive(:with_remote_connection).and_return([])
context "with the migrations ran column present" do
def reload_called
@count ||= 0

subject.region.update_attributes!(:migrations_ran => nil)
if @count == 5
subject.region.update_attributes!(:migrations_ran => [version])
end

t = Thread.new do
Thread.current.abort_on_exception = true
subject.wait_for_remote_region_migration(0)
@count += 1
end

# Try to pass execution to the created thread
# NOTE: This is could definitely be a source of weird spec timing issues because
# we're relying on the thread scheduler to pass to the next thread
# when we sleep, but if this isn't here we likely won't execute the conditional
# block in .wait_for_remote_region_migrations
sleep 1
it "when migrations_ran is nil, sits in the loop until the version appears" do
subject.region.update_attributes!(:migrations_ran => nil)

expect(t.alive?).to be true
subject.region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version)
allow(subject.region).to receive(:reload).and_wrap_original { |m, _args| reload_called; m.call }
expect(subject).to receive(:restart_subscription).exactly(4).times

# Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens
t = t.join(5)
expect(t.status).to be false
end

it "when migrations_ran is nil, it checks the schema_migrations table on the remote region" do
expect(subject).not_to receive(:restart_subscription)
expect(subject.region).not_to receive(:reload)
expect(subscription).to receive(:with_remote_connection).and_return([version])

subject.region.update_attributes!(:migrations_ran => nil)

subject.wait_for_remote_region_migration(0)
end
subject.wait_for_remote_migration_to_replicate(0)
end

it "when migrations_ran does not include that expected migration, it checks the schema_migrations on the remote region" do
expect(subject).not_to receive(:restart_subscription)
expect(subject.region).not_to receive(:reload)
expect(subscription).to receive(:with_remote_connection).and_return([version])
it "when migrations_ran is has other values, sits in the loop until the version appears" do
subject.region.update_attributes!(:migrations_ran => ["1", "2", "3"])

subject.region.update_attributes!(:migrations_ran => ["1234", "5678"])
allow(subject.region).to receive(:reload).and_wrap_original { |m, _args| reload_called; m.call }
expect(subject).to receive(:restart_subscription).exactly(4).times

subject.wait_for_remote_region_migration(0)
subject.wait_for_remote_migration_to_replicate(0)
end
end
end
end
Expand Down

0 comments on commit 920e018

Please sign in to comment.