From 2083f6aa02df81055c3c35dbb6477138f10ebb20 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 28 Aug 2018 15:25:01 -0400 Subject: [PATCH] Patch ActiveRecord::Migration#migrate to ensure global region schema consistency This patch does two things. 1. It maintains the migration list in MiqRegion#migrations_ran as each migration is completed 2. It halts the global region migration process if the changes from the same migration in all the remote regions have not been replicated This combination will allow us to ensure that the global region will never attempt to replicate data which would be inconsistent with the current schema. This patch uses SQL directly to do the update because the migration which adds the migrations_ran column was not able to utilize the attribute for the newly added column. We were not able to refresh the attributes cache of the MiqRegion model from within the #migrate method for some reason, but were able to detect if the column was present and update it while bypassing the AR caches. This seemed to be the more sane route given the issues we've had with model cache clearing in the past. Fixes https://bugzilla.redhat.com/show_bug.cgi?id=1601015 --- lib/extensions/ar_migration.rb | 45 ++++++++++++ spec/lib/extensions/ar_migration_spec.rb | 87 ++++++++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 lib/extensions/ar_migration.rb create mode 100644 spec/lib/extensions/ar_migration_spec.rb diff --git a/lib/extensions/ar_migration.rb b/lib/extensions/ar_migration.rb new file mode 100644 index 000000000000..b925faa4e7eb --- /dev/null +++ b/lib/extensions/ar_migration.rb @@ -0,0 +1,45 @@ +module MigrationSyncHelper + def self.migrations_column_present? + ActiveRecord::Base.connection.columns("miq_regions").detect { |c| c.name == "migrations_ran" } + end + + def self.wait_for_remote_region_migrations(subscription, version, wait_time = 1) + return unless MigrationSyncHelper.migrations_column_present? + region = MiqRegion.find(subscription.provider_region) + until region.migrations_ran.include?(version) + subscription.disable + subscription.enable + sleep(wait_time) + region.reload + end + end + + def self.update_local_migrations_ran(version, direction) + return unless MigrationSyncHelper.migrations_column_present? + return unless (region = MiqRegion.my_region) + + new_migrations = ActiveRecord::SchemaMigration.normalized_versions + new_migrations << version if direction == :up + migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations)) + + ActiveRecord::Base.connection.exec_query(<<~SQL) + UPDATE #{MiqRegion.table_name} + SET migrations_ran = #{migrations_value} + WHERE id = #{region.id} + SQL + end +end + +module ArPglogicalMigration + def migrate(direction) + PglogicalSubscription.all.each do |s| + MigrationSyncHelper.wait_for_remote_region_migration(s, version.to_s) + end + + ret = super + MigrationSyncHelper.update_local_migrations_ran(version.to_s, direction) + ret + end +end + +ActiveRecord::Migration.prepend(ArPglogicalMigration) diff --git a/spec/lib/extensions/ar_migration_spec.rb b/spec/lib/extensions/ar_migration_spec.rb new file mode 100644 index 000000000000..48414c793647 --- /dev/null +++ b/spec/lib/extensions/ar_migration_spec.rb @@ -0,0 +1,87 @@ +describe MigrationSyncHelper do + let!(:my_region) do + MiqRegion.seed + MiqRegion.my_region + end + + context "without the migrations ran column" do + before do + column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) } + allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list) + end + + describe ".migrations_column_present?" do + it "is falsey" do + expect(described_class.migrations_column_present?).to be_falsey + end + end + + describe ".wait_for_remote_region_migrations" do + it "does nothing" do + expect(MiqRegion).not_to receive(:find) + described_class.wait_for_remote_region_migrations(double("subscription"), "12345") + end + end + + describe ".update_local_migrations_ran" do + it "does nothing" do + expect(MiqRegion).not_to receive(:my_region) + described_class.update_local_migrations_ran("12345", :up) + end + end + end + + describe ".migrations_column_present?" do + it "is truthy" do + # we never want to remove this column so we can just test directly + expect(described_class.migrations_column_present?).to be_truthy + end + end + + context "with a dummy version" do + let(:version) { "1234567890" } + + # sanity check - if this is somehow a version we have, these tests will make no sense + before { expect(my_region.migrations_ran).not_to include(version) } + + describe ".wait_for_remote_region_migrations" do + let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.id) } + + it "sleeps until the migration is added" do + # need to stub this because the thread uses a separate connection object which won't be in the same transaction + allow(MiqRegion).to receive(:find).with(my_region.id).and_return(my_region) + t = Thread.new do + Thread.current.abort_on_exception = true + described_class.wait_for_remote_region_migrations(subscription, version, 0) + end + + # Try to pass execution to the created thread + # NOTE: This is could definitely be a source of weird spec timing issues because + # we're relying on the thread scheduler to pass to the next thread + # when we sleep, but if this isn't here we likely won't execute the conditional + # block in .wait_for_remote_region_migrations + sleep 1 + + expect(t.alive?).to be true + my_region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version) + + # Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens + t = t.join(5) + expect(t.status).to be false + end + end + + describe ".update_local_migrations_ran" do + it "adds the given version when the direction is :up" do + described_class.update_local_migrations_ran(version, :up) + expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version) + end + + it "doesn't blow up when there is no region" do + MiqRegion.destroy_all + MiqRegion.my_region_clear_cache + described_class.update_local_migrations_ran(version, :up) + end + end + end +end