diff --git a/app/models/miq_region.rb b/app/models/miq_region.rb index 29d97802e7a..aac2d6d8d4b 100644 --- a/app/models/miq_region.rb +++ b/app/models/miq_region.rb @@ -113,8 +113,12 @@ def self.seed _("Region [%{region_id}] does not match the database's region [%{db_id}]") % {:region_id => my_region_id, :db_id => db_region_id} end + create_params = { + :description => "Region #{my_region_id}", + :migrations_ran => ActiveRecord::SchemaMigration.normalized_versions + } - create_with(:description => "Region #{my_region_id}").find_or_create_by!(:region => my_region_id) do + create_with(create_params).find_or_create_by!(:region => my_region_id) do _log.info("Creating Region [#{my_region_id}]") end end diff --git a/lib/extensions/ar_migration.rb b/lib/extensions/ar_migration.rb new file mode 100644 index 00000000000..e952bad150b --- /dev/null +++ b/lib/extensions/ar_migration.rb @@ -0,0 +1,98 @@ +module ArPglogicalMigration + module PglogicalMigrationHelper + def self.migrations_column_present? + ActiveRecord::Base.connection.columns("miq_regions").any? { |c| c.name == "migrations_ran" } + end + + def self.my_region_number + # Use ApplicationRecord here because we need to query region information + @my_region_number ||= ApplicationRecord.my_region_number + end + + def self.my_region_created? + ActiveRecord::Base.connection.exec_query(<<~SQL).first["exists"] + SELECT EXISTS( + SELECT id FROM miq_regions WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)} + ) + SQL + end + + def self.update_local_migrations_ran(version, direction) + return unless migrations_column_present? + return unless my_region_created? + + new_migrations = ActiveRecord::SchemaMigration.normalized_versions + new_migrations << version if direction == :up + migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations)) + + ActiveRecord::Base.connection.exec_query(<<~SQL) + UPDATE miq_regions + SET migrations_ran = #{migrations_value} + WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)} + SQL + end + end + + class RemoteRegionMigrationWatcher + class HelperARClass < ActiveRecord::Base; end + + attr_reader :region, :subscription, :version + + def initialize(subscription, version) + region_class = Class.new(ActiveRecord::Base) { self.table_name = "miq_regions" } + @region = region_class.find_by(:region => subscription.provider_region) + @subscription = subscription + @version = version + end + + def wait_for_remote_region_migration(wait_time = 1) + return unless wait_for_migration? + + Vmdb.rails_logger.info(wait_message) + print(wait_message) + + while wait_for_migration? + print(".") + restart_subscription + sleep(wait_time) + region.reload + end + + puts("\n") + end + + private + + def wait_for_migration? + migrations_column_present? ? !region.migrations_ran&.include?(version) : false + end + + def migrations_column_present? + @migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present? + end + + def wait_message + @wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}" + end + + def restart_subscription + c = HelperARClass.establish_connection.connection + c.pglogical.subscription_disable(subscription.id) + c.pglogical.subscription_enable(subscription.id) + ensure + HelperARClass.remove_connection + end + end + + def migrate(direction) + PglogicalSubscription.all.each do |s| + RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration + end + + ret = super + PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction) + ret + end +end + +ActiveRecord::Migration.prepend(ArPglogicalMigration) diff --git a/spec/lib/extensions/ar_migration_spec.rb b/spec/lib/extensions/ar_migration_spec.rb new file mode 100644 index 00000000000..fc823f5af3f --- /dev/null +++ b/spec/lib/extensions/ar_migration_spec.rb @@ -0,0 +1,110 @@ +shared_context "without the migrations ran column" do + before do + column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) } + allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list) + end +end + +shared_context "with a dummy version" do + let(:version) { "1234567890" } + + # sanity check - if this is somehow a version we have, these tests will make no sense + before { expect(my_region.migrations_ran).not_to include(version) } +end + +context "with a region seeded" do + let!(:my_region) do + MiqRegion.seed + MiqRegion.my_region + end + + describe ArPglogicalMigration::PglogicalMigrationHelper do + context "without the migrations ran column" do + include_context "without the migrations ran column" + + describe ".migrations_column_present?" do + it "is falsey" do + expect(described_class.migrations_column_present?).to be_falsey + end + end + + describe ".update_local_migrations_ran" do + it "does nothing" do + expect(ActiveRecord::SchemaMigration).not_to receive(:normalized_versions) + described_class.update_local_migrations_ran("12345", :up) + end + end + end + + describe ".migrations_column_present?" do + it "is truthy" do + # we never want to remove this column so we can just test directly + expect(described_class.migrations_column_present?).to be_truthy + end + end + + describe ".update_local_migrations_ran" do + include_context "with a dummy version" + + it "adds the given version when the direction is :up" do + described_class.update_local_migrations_ran(version, :up) + expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version) + end + + it "doesn't blow up when there is no region" do + MiqRegion.destroy_all + MiqRegion.my_region_clear_cache + described_class.update_local_migrations_ran(version, :up) + end + end + end + + describe ArPglogicalMigration::RemoteRegionMigrationWatcher do + include_context "with a dummy version" + + let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.region) } + + subject do + described_class.new(subscription, version).tap do |s| + allow(s).to receive_messages(:puts => nil, :print => nil) + end + end + + describe "#wait_for_remote_region_migrations" do + context "without the migrations ran column present" do + include_context "without the migrations ran column" + + it "does nothing" do + expect(Vmdb.rails_logger).not_to receive(:info) + subject.wait_for_remote_region_migration + end + end + + it "sleeps until the migration is added" do + allow(subject).to receive(:restart_subscription) + allow(subject.region).to receive(:reload) + + subject.region.update_attributes!(:migrations_ran => nil) + + t = Thread.new do + Thread.current.abort_on_exception = true + subject.wait_for_remote_region_migration(0) + end + + # Try to pass execution to the created thread + # NOTE: This is could definitely be a source of weird spec timing issues because + # we're relying on the thread scheduler to pass to the next thread + # when we sleep, but if this isn't here we likely won't execute the conditional + # block in .wait_for_remote_region_migrations + sleep 1 + + expect(t.alive?).to be true + subject.region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version) + + # Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens + t = t.join(5) + expect(t.status).to be false + end + end + end +end diff --git a/spec/models/miq_region_spec.rb b/spec/models/miq_region_spec.rb index f1b8d9266cc..324620be671 100644 --- a/spec/models/miq_region_spec.rb +++ b/spec/models/miq_region_spec.rb @@ -67,6 +67,10 @@ allow(MiqRegion).to receive_messages(:my_region_number => @region_number + 1) expect { MiqRegion.seed }.to raise_error(Exception) end + + it "sets the migrations_ran column" do + expect(MiqRegion.first.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions) + end end describe ".replication_type" do