Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the new schema_migrations_ran table to track remote schema migrations #18393

Merged
merged 3 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 36 additions & 45 deletions lib/extensions/ar_migration.rb
Original file line number Diff line number Diff line change
@@ -1,48 +1,36 @@
module ArPglogicalMigration
module PglogicalMigrationHelper
def self.migrations_column_present?
ActiveRecord::Base.connection.columns("miq_regions").any? { |c| c.name == "migrations_ran" }
end

def self.my_region_number
# Use ApplicationRecord here because we need to query region information
@my_region_number ||= ApplicationRecord.my_region_number
end

def self.my_region_created?
ActiveRecord::Base.connection.exec_query(<<~SQL).first["exists"]
SELECT EXISTS(
SELECT id FROM miq_regions WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)}
)
SQL
module ArPglogicalMigrationHelper
def self.discover_schema_migrations_ran_class
return unless ActiveRecord::Base.connection.table_exists?("schema_migrations_ran")
Class.new(ActiveRecord::Base) do
require 'active_record-id_regions'
include ActiveRecord::IdRegions
self.table_name = "schema_migrations_ran"
default_scope { in_my_region }
end
end

def self.update_local_migrations_ran(version, direction)
return unless migrations_column_present?
return unless my_region_created?
def self.update_local_migrations_ran(version, direction)
return unless schema_migrations_ran_class = discover_schema_migrations_ran_class

new_migrations = ActiveRecord::SchemaMigration.normalized_versions
new_migrations << version if direction == :up
migrations_value = ActiveRecord::Base.connection.quote(PG::TextEncoder::Array.new.encode(new_migrations))
new_migrations = ActiveRecord::SchemaMigration.normalized_versions
new_migrations << version if direction == :up

ActiveRecord::Base.connection.exec_query(<<~SQL)
UPDATE miq_regions
SET migrations_ran = #{migrations_value}
WHERE region = #{ActiveRecord::Base.connection.quote(my_region_number)}
SQL
(new_migrations - schema_migrations_ran_class.pluck(:version)).each do |v|
bdunne marked this conversation as resolved.
Show resolved Hide resolved
schema_migrations_ran_class.find_or_create_by(:version => v)
end

schema_migrations_ran_class.where(:version => version).delete_all if direction == :down
end

class RemoteRegionMigrationWatcher
class HelperARClass < ActiveRecord::Base; end
class SubscriptionHelper < ActiveRecord::Base; end

attr_reader :region, :subscription, :version
attr_reader :subscription, :version, :schema_migrations_ran_class

def initialize(subscription, version)
region_class = Class.new(ActiveRecord::Base) { self.table_name = "miq_regions" }
@region = region_class.find_by(:region => subscription.provider_region)
@subscription = subscription
@version = version
@schema_migrations_ran_class = ArPglogicalMigrationHelper.discover_schema_migrations_ran_class
@subscription = subscription
@version = version
end

def wait_for_remote_region_migration(wait_time = 1)
carbonin marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -55,42 +43,45 @@ def wait_for_remote_region_migration(wait_time = 1)
print(".")
restart_subscription
sleep(wait_time)
region.reload
end

puts("\n")
end

private

def wait_for_migration?
migrations_column_present? ? !region.migrations_ran&.include?(version) : false
def region_number
subscription.provider_region
end

def migrations_column_present?
@migrations_column_present ||= PglogicalMigrationHelper.migrations_column_present?
def wait_for_migration?
return false unless schema_migrations_ran_class
# We need to unscope here since in_region doesn't override the default scope of in_my_region
# see https://github.com/ManageIQ/activerecord-id_regions/issues/11
!schema_migrations_ran_class.unscoped.in_region(region_number).where(:version => version).exists?
bdunne marked this conversation as resolved.
Show resolved Hide resolved
end

def wait_message
@wait_message ||= "Waiting for remote region #{region.region} to run migration #{version}"
@wait_message ||= "Waiting for remote region #{region_number} to run migration #{version}"
end

def restart_subscription
c = HelperARClass.establish_connection.connection
c = SubscriptionHelper.establish_connection.connection
c.pglogical.subscription_disable(subscription.id)
c.pglogical.subscription_enable(subscription.id)
ensure
HelperARClass.remove_connection
SubscriptionHelper.remove_connection
end
end
end

module ArPglogicalMigration
def migrate(direction)
PglogicalSubscription.all.each do |s|
RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
ArPglogicalMigrationHelper::RemoteRegionMigrationWatcher.new(s, version.to_s).wait_for_remote_region_migration
end

ret = super
PglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ArPglogicalMigrationHelper.update_local_migrations_ran(version.to_s, direction)
ret
end
end
Expand Down
139 changes: 61 additions & 78 deletions spec/lib/extensions/ar_migration_spec.rb
Original file line number Diff line number Diff line change
@@ -1,109 +1,92 @@
shared_context "without the migrations ran column" do
before do
column_list = %w(id region created_at updated_at description guid).map { |n| double(:name => n) }
allow(ActiveRecord::Base.connection).to receive(:columns).with("miq_regions").and_return(column_list)
describe ArPglogicalMigrationHelper do
shared_context "without the schema_migrations_ran table" do
before do
allow(ActiveRecord::Base.connection).to receive(:table_exists?).with("schema_migrations_ran").and_return(false)
end
end
end

shared_context "with a dummy version" do
let(:version) { "1234567890" }

# sanity check - if this is somehow a version we have, these tests will make no sense
before { expect(my_region.migrations_ran).not_to include(version) }
end
shared_context "with a dummy version" do
let(:version) { "1234567890" }

context "with a region seeded" do
let!(:my_region) do
MiqRegion.seed
MiqRegion.my_region
# sanity check - if this is somehow a version we have, these tests will make no sense
before { expect(ActiveRecord::SchemaMigration.normalized_versions).not_to include(version) }
end

describe ArPglogicalMigration::PglogicalMigrationHelper do
context "without the migrations ran column" do
include_context "without the migrations ran column"
context "with a region seeded" do
let!(:my_region) do
MiqRegion.seed
MiqRegion.my_region
end

describe ".migrations_column_present?" do
it "is falsey" do
expect(described_class.migrations_column_present?).to be_falsey
end
end
describe ".update_local_migrations_ran" do
context "without the schema_migrations_ran table" do
include_context "without the schema_migrations_ran table"

describe ".update_local_migrations_ran" do
it "does nothing" do
expect(ActiveRecord::SchemaMigration).not_to receive(:normalized_versions)
described_class.update_local_migrations_ran("12345", :up)
end
end
end

describe ".migrations_column_present?" do
it "is truthy" do
# we never want to remove this column so we can just test directly
expect(described_class.migrations_column_present?).to be_truthy
end
end

describe ".update_local_migrations_ran" do
include_context "with a dummy version"

it "adds the given version when the direction is :up" do
described_class.update_local_migrations_ran(version, :up)
expect(my_region.reload.migrations_ran).to match_array(ActiveRecord::SchemaMigration.normalized_versions << version)
end

it "doesn't blow up when there is no region" do
MiqRegion.destroy_all
MiqRegion.my_region_clear_cache
described_class.update_local_migrations_ran(version, :up)
end
end
end

describe ArPglogicalMigration::RemoteRegionMigrationWatcher do
include_context "with a dummy version"
context "with the schema_migrations_ran table" do
include_context "with a dummy version"

let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => my_region.region) }
it "adds the given version when the direction is :up" do
described_class.update_local_migrations_ran(version, :up)
expect(described_class.discover_schema_migrations_ran_class.where(:version => version).exists?).to eq(true)
end

subject do
described_class.new(subscription, version).tap do |s|
allow(s).to receive_messages(:puts => nil, :print => nil)
it "doesn't blow up when there is no region" do
MiqRegion.destroy_all
MiqRegion.my_region_clear_cache
described_class.update_local_migrations_ran(version, :up)
end
end
end

describe "#wait_for_remote_region_migrations" do
context "without the migrations ran column present" do
include_context "without the migrations ran column"
describe ArPglogicalMigrationHelper::RemoteRegionMigrationWatcher do
include_context "with a dummy version"
let(:helper_class) { Class.new(ActiveRecord::Base) { include ActiveRecord::IdRegions } }
let(:other_region_number) { helper_class.my_region_number + rand(1..50) }
let(:subscription) { double("Subscription", :enable => nil, :disable => nil, :provider_region => other_region_number) }

it "does nothing" do
expect(Vmdb.rails_logger).not_to receive(:info)
subject.wait_for_remote_region_migration
subject do
described_class.new(subscription, version).tap do |s|
allow(s).to receive_messages(:puts => nil, :print => nil)
end
end

it "sleeps until the migration is added" do
allow(subject).to receive(:restart_subscription)
allow(subject.region).to receive(:reload)
describe "#wait_for_remote_region_migration" do
def wait_for_migration_called
@count ||= 0
if @count == 5
ArPglogicalMigrationHelper.discover_schema_migrations_ran_class.create!(:id => helper_class.id_in_region(1, other_region_number), :version => version)
end
@count += 1
end

subject.region.update_attributes!(:migrations_ran => nil)
context "without the schema_migrations_ran table present" do
include_context "without the schema_migrations_ran table"

t = Thread.new do
Thread.current.abort_on_exception = true
subject.wait_for_remote_region_migration(0)
it "does nothing" do
expect(Vmdb.rails_logger).not_to receive(:info)
subject.wait_for_remote_region_migration
end
end

# Try to pass execution to the created thread
# NOTE: This is could definitely be a source of weird spec timing issues because
# we're relying on the thread scheduler to pass to the next thread
# when we sleep, but if this isn't here we likely won't execute the conditional
# block in .wait_for_remote_region_migrations
sleep 1
it "waits for the migration to be added" do
allow(subject).to receive(:restart_subscription)
expect(ArPglogicalMigrationHelper.discover_schema_migrations_ran_class.unscoped.where(:version => version).exists?).to eq(false)

expect(t.alive?).to be true
subject.region.update_attributes!(:migrations_ran => ActiveRecord::SchemaMigration.normalized_versions << version)
allow(subject).to receive(:wait_for_migration?).and_wrap_original do |m, _args|
wait_for_migration_called
m.call
end

# Wait a max of 5 seconds so we don't disrupt the whole test suite if something terrible happens
t = t.join(5)
expect(t.status).to be false
subject.wait_for_remote_region_migration(0)

expect(ArPglogicalMigrationHelper.discover_schema_migrations_ran_class.unscoped.where(:version => version).exists?).to eq(true)
end
end
end
end
Expand Down