Skip to content

Commit

Permalink
pure sql bulk insert
Browse files Browse the repository at this point in the history
  • Loading branch information
Ari Zellner committed Jul 18, 2017
1 parent 5ab7b5f commit 4d6ac3c
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class ContainerPortConfig < ActiveRecord::Base
belongs_to :container_definition, :class_name => 'UnifyContainerDefinitionAndContainer::ContainerDefinition'
end

class MiqQueue < ActiveRecord::Base
end

def up
# attributes
add_column :containers, :image, :string
Expand Down Expand Up @@ -62,14 +65,16 @@ def up
ContainerEnvVar.update_all("container_definition_id = (#{join_sql})")
end

say_with_time("switch container_definition_id with container_id for security_contexts") do
say_with_time("switch resource_id with container_id, resource_type to 'Container' for security_contexts") do
security_contexts = Arel::Table.new(:security_contexts)
join_sql = containers.project(containers[:id])
.where(containers[:container_definition_id].eq(security_contexts[:resource_id])
.and(security_contexts[:resource_type].eq(Arel::Nodes::Quoted.new('ContainerDefinition')))).to_sql
SecurityContext.where(:resource_type => 'ContainerDefinition').update_all("resource_type = 'Container', resource_id = (#{join_sql})")
end

MiqQueue.where(:method_name => "purge_timer", :class_name => 'ContainerDefinition').destroy_all

# relationships
rename_column :container_port_configs, :container_definition_id, :container_id
rename_column :container_env_vars, :container_definition_id, :container_id
Expand All @@ -80,12 +85,12 @@ def up

def down
create_table :container_definitions do |t|
t.belongs_to :ems, :type => :bigint
t.belongs_to :ems, :type => :bigint # reconstructed columns
t.string :ems_ref
t.bigint :old_ems_id
t.timestamp :deleted_on
t.string :name
t.string :image
t.string :image # copied over columns
t.string :image_pull_policy
t.string :memory
t.float :cpu_cores
Expand All @@ -96,36 +101,34 @@ def down
t.string :capabilities_add
t.string :capabilities_drop
t.text :command
t.bigint :container_id # temp column
end

add_column :containers, :container_definition_id, :bigint

say_with_time("splitting columns from container into container_definition") do
ContainerDefinition.transaction do
Container.all.each do |container|
container_def = ContainerDefinition.create(
:ems_id => container.ems_id,
:ems_ref => container.ems_ref,
:old_ems_id => container.old_ems_id,
:deleted_on => container.deleted_on,
:name => container.name,
:image => container.image,
:image_pull_policy => container.image_pull_policy,
:memory => container.memory,
:cpu_cores => container.cpu_cores,
:container_group_id => container.container_group_id,
:privileged => container.privileged,
:run_as_user => container.run_as_user,
:run_as_non_root => container.run_as_non_root,
:capabilities_add => container.capabilities_add,
:capabilities_drop => container.capabilities_drop,
:command => container.command
)
container.update!(:container_definition_id => container_def.id)
end
end
say_with_time("populate container_definitions. use container_id to keep relation to containers") do
insert_statement = "INSERT INTO container_definitions (container_id, ems_id, ems_ref, old_ems_id, deleted_on, name, image,
image_pull_policy, memory, cpu_cores, container_group_id,
privileged, run_as_user, run_as_non_root, capabilities_add,
capabilities_drop, command)
SELECT id, ems_id, ems_ref, old_ems_id, deleted_on, name, image, image_pull_policy, memory, cpu_cores,
container_group_id, privileged, run_as_user, run_as_non_root, capabilities_add,
capabilities_drop, command
FROM containers"
ActiveRecord::Base.connection.execute(insert_statement)
end

say_with_time("use container_id to join tables and update container_definition_id in containers") do
update_statement = "UPDATE containers
SET container_definition_id = (SELECT container_definitions.id
FROM container_definitions
WHERE containers.id = container_definitions.container_id)"
ActiveRecord::Base.connection.execute(update_statement)
end

# finally, remove the temp column
remove_column :container_definitions, :container_id

containers = Arel::Table.new(:containers)
say_with_time("switch container_definition_id with container_id for container_port_configs") do
port_configs = Arel::Table.new(:container_port_configs)
Expand All @@ -134,14 +137,14 @@ def down
ContainerPortConfig.update_all("container_id = (#{join_sql})")
end

say_with_time("switch container_definition_id with container_id for container_port_configs") do
say_with_time("switch container_definition_id with container_id for for container_env_vars") do
env_vars = Arel::Table.new(:container_env_vars)
join_sql = containers.project(containers[:container_definition_id])
.where(containers[:id].eq(env_vars[:container_id])).to_sql
ContainerEnvVar.update_all("container_id = (#{join_sql})")
end

say_with_time("switch container_definition_id with container_id for security_contexts") do
say_with_time("swtich resource_id with container_id, resource_type to 'Container' for security_contexts") do
security_contexts = Arel::Table.new(:security_contexts)
join_sql = containers.project(containers[:container_definition_id])
.where(containers[:id].eq(security_contexts[:resource_id])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,77 +27,147 @@
let(:container_hash) do
{
:name => "mycontainer",
:ems_ref => "123",
:ems_ref => "2da0c9e4-e6f9-11e6-a348-001a4a162683_registry_openshift/origin-docker-registry:v1.3.3",
:ems_id => 1,
:old_ems_id => 1,
:old_ems_id => nil,
:deleted_on => Time.now.beginning_of_hour.utc
}
end

let(:queue_stub) { migration_stub(:MiqQueue) }

migration_context :up do
it "merges container_defintion and container" do
container_def = container_definition_stub.create!(definition_hash)
container = container_stub.create!(container_hash.merge(:container_definition => container_def))
container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":5000",
:container_definition_id => container_def.id)
container_port_config_stub.create!(:ems_ref => "2da0c9e4",
:port => 5000,
:protocol => "TCP",
:container_definition_id => container_def.id)
security_context_stub.create!(:se_linux_level => "s0:c1,c0",
:resource_id => container_def.id,
:resource_type => 'ContainerDefinition')
env_var = container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":5000",
:container_definition_id => container_def.id)
port_config = container_port_config_stub.create!(:ems_ref => "2da0c9e4",
:port => 5000,
:protocol => "TCP",
:container_definition_id => container_def.id)
security_context = security_context_stub.create!(:se_linux_level => "s0:c1,c0",
:resource_id => container_def.id,
:resource_type => 'ContainerDefinition')

container_def2 = container_definition_stub.create!(definition_hash.merge(:image => "my_image2"))
container2 = container_stub.create!(container_hash.merge(:container_definition => container_def2, :name => "mycontainer2"))
container_def2.container_env_vars << container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":5000")
container_def2.container_port_configs << container_port_config_stub.create!(:ems_ref => "2da0c9e4",
:port => 5000,
:protocol => "TCP")
security_context_stub.create!(:se_linux_level => "s0:c1,c0",
:resource_id => container_def2.id,
:resource_type => 'ContainerDefinition')
container2 = container_stub.create!(container_hash.merge(:container_definition => container_def2,
:name => "mycontainer2"))
env_var2 = container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":6000",
:container_definition_id => container_def2.id)
port_config2 = container_port_config_stub.create!(:ems_ref => "2da0c9e4",
:port => 6000,
:protocol => "TCP",
:container_definition_id => container_def2.id)
security_context2 = security_context_stub.create!(:se_linux_level => "s1:c1,c0",
:resource_id => container_def2.id,
:resource_type => 'ContainerDefinition')

migrate

expect(container_stub.first).to have_attributes(definition_hash.merge(container_hash))
expect(container.reload).to have_attributes(definition_hash.merge(container_hash))

expect(container_port_config_stub.first).to have_attributes(:container_id => container.id)
expect(container_env_var_stub.first).to have_attributes(:container_id => container.id)
expect(security_context_stub.first).to have_attributes(:resource_type => "Container",
:resource_id => container.id)
expect(port_config.reload).to have_attributes(:container_id => container.id)
expect(env_var.reload).to have_attributes(:container_id => container.id)
expect(security_context.reload).to have_attributes(:resource_type => "Container",
:resource_id => container.id)

expect(container_port_config_stub.second).to have_attributes(:container_id => container2.id)
expect(container_env_var_stub.second).to have_attributes(:container_id => container2.id)
expect(security_context_stub.second).to have_attributes(:resource_type => "Container",
:resource_id => container2.id)
expect(port_config2.reload).to have_attributes(:container_id => container2.id)
expect(env_var2.reload).to have_attributes(:container_id => container2.id)
expect(security_context2.reload).to have_attributes(:resource_type => "Container",
:resource_id => container2.id)
end

it "does not fail when container_definition has no container" do
container_def = container_definition_stub.create!(definition_hash)
container = container_stub.create!(container_hash.merge(:container_definition => container_def))
env_var = container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":5000",
:container_definition_id => container_def.id)
port_config = container_port_config_stub.create!(:ems_ref => "2da0c9e4",
:port => 5000,
:protocol => "TCP",
:container_definition_id => container_def.id)
security_context = security_context_stub.create!(:se_linux_level => "s0:c1,c0",
:resource_id => container_def.id,
:resource_type => 'ContainerDefinition')

container_def2 = container_definition_stub.create!(definition_hash.merge(:image => "my_image2"))

env_var2 = container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":6000",
:container_definition_id => container_def2.id)
port_config2 = container_port_config_stub.create!(:ems_ref => "2da0c9e4",
:port => 6000,
:protocol => "TCP",
:container_definition_id => container_def2.id)
security_context2 = security_context_stub.create!(:se_linux_level => "s1:c1,c0",
:resource_id => container_def2.id,
:resource_type => 'ContainerDefinition')

migrate

expect(container.reload).to have_attributes(definition_hash.merge(container_hash))
expect(port_config.reload).to have_attributes(:container_id => container.id)
expect(env_var.reload).to have_attributes(:container_id => container.id)
expect(security_context.reload).to have_attributes(:resource_type => "Container",
:resource_id => container.id)
end

it "deletes purge jobs from the queue" do
queue_stub.create(:class_name => "ContainerDefinition", :method_name => "purge_timer")

migrate

expect(queue_stub.where(:method_name => "purge_timer", :class_name => 'ContainerDefinition').count).to eq(0)
end
end

migration_context :down do
it "splits container_definition columns from container" do
container = container_stub.create!(definition_hash.merge(container_hash))
container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":5000", :field_path => nil,
:container_id => container.id)
container_port_config_stub.create!(:ems_ref => "2da0c9e4",
:port => 5000,
:protocol => "TCP",
:container_id => container.id)
security_context_stub.create!(:resource_type => "Container",
:resource_id => container.id,
:se_linux_level => "s0:c1,c0")
env_var = container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":5000", :field_path => nil,
:container_id => container.id)
port_config = container_port_config_stub.create!(:ems_ref => "2da0c9e4",
:port => 5000,
:protocol => "TCP",
:container_id => container.id)
security_context = security_context_stub.create!(:resource_type => "Container",
:resource_id => container.id,
:se_linux_level => "s0:c0,c0")

container2 = container_stub.create!(definition_hash.merge(container_hash).merge(:name => "mycontainer2"))
env_var2 = container_env_var_stub.create!(:name => "REGISTRY_HTTP_ADDR",
:value => ":6000",
:container_id => container2.id)
port_config2 = container_port_config_stub.create!(:ems_ref => "c9e42da0",
:port => 6000,
:protocol => "TCP",
:container_id => container2.id)
security_context2 = security_context_stub.create!(:se_linux_level => "s1:c1,c0",
:resource_id => container2.id,
:resource_type => 'Container')
migrate

container_def = container_definition_stub.first
expect(container_def).to have_attributes(definition_hash)
expect(container_env_var_stub.first).to have_attributes(:container_definition_id => container_def.id)
expect(container_port_config_stub.first).to have_attributes(:container_definition_id => container_def.id)
expect(security_context_stub.first).to have_attributes(:resource_type => "ContainerDefinition",
:resource_id => container_def.id)
container.reload
container_def = container_definition_stub.find(container.container_definition_id)
expect(container_def).to have_attributes(definition_hash.merge(:ems_ref => container.ems_ref,
:ems_id => container.ems_id,
:old_ems_id => container.old_ems_id,
:deleted_on => container.deleted_on))
expect(env_var.reload).to have_attributes(:value => ":5000")
expect(port_config.reload).to have_attributes(:port => 5000)
expect(security_context.reload).to have_attributes(:se_linux_level => "s0:c0,c0")

container2.reload
container_def2 = container_definition_stub.find(container2.container_definition_id)
expect(container_def2.name).to eq("mycontainer2")
expect(env_var2.reload).to have_attributes(:value => ":6000")
expect(port_config2.reload).to have_attributes(:port => 6000)
expect(security_context2.reload).to have_attributes(:se_linux_level => "s1:c1,c0")
end
end
end

0 comments on commit 4d6ac3c

Please sign in to comment.