Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REARCH] Container workers #15884

Merged
merged 11 commits into from
Feb 16, 2018
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
class ManageIQ::Providers::BaseManager::MetricsCollectorWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

include PerEmsTypeWorkerMixin

self.required_roles = ["ems_metrics_collector"]

def self.supports_container?
true
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get rid of this now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fryguy 's comment here is a bit old, but this is on the same lines:

Is it possible that we can put this method into the MiqWorker::ReplicaPerWorker modules? Or as a shared module that the new modules you have created here inherit from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could probably move this into the MiqWorker modules in some way rather than having it implemented explicitly in each worker.

Alternatively I could just flip the default implementation in miq_worker.rb and override where we still don't want to start a worker as a container which might provide more useful information at this point. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either is fine, and it is probably simpler to only have it in one spot. Though, if you do invert it in miq_worker.rb, I think maybe a disclaimer comment above the method might be worth while, just so those passing through know it is disabled on certain workers. With the other approach, since you would be adding it to things like MiqWorker::ReplicaPerWorker and including it in the worker class, I think that would be more easily assumed.


def self.normalized_type
@normalized_type ||= "ems_metrics_collector_worker"
end
Expand Down
6 changes: 6 additions & 0 deletions app/models/miq_ems_metrics_processor_worker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class MiqEmsMetricsProcessorWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.required_roles = ["ems_metrics_processor"]
Expand All @@ -7,4 +9,8 @@ class MiqEmsMetricsProcessorWorker < MiqQueueWorkerBase
def friendly_name
@friendly_name ||= "C&U Metrics Processor"
end

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_event_handler.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
class MiqEventHandler < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.required_roles = ["event"]
self.default_queue_name = "ems"

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_generic_worker.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
class MiqGenericWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.default_queue_name = "generic"
self.check_for_minimal_role = false
self.workers = -> { MiqServer.minimal_env? ? 1 : worker_settings[:count] }

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_priority_worker.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
class MiqPriorityWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.default_queue_name = "generic"

def self.queue_priority
MiqQueue::HIGH_PRIORITY
end

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_reporting_worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
class MiqReportingWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.required_roles = ["reporting"]
self.default_queue_name = "reporting"

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_schedule_worker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class MiqScheduleWorker < MiqWorker
include MiqWorker::ReplicaPerWorker

require_nested :Jobs
require_nested :Runner

Expand All @@ -7,4 +9,8 @@ class MiqScheduleWorker < MiqWorker
return MiqServer.minimal_env_options.include?('schedule') ? 1 : 0 if MiqServer.minimal_env?
return 1
}

def self.supports_container?
true
end
end
1 change: 1 addition & 0 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def check_pending_stop(class_name = nil)
end

def check_not_responding(class_name = nil)
return [] if MiqEnvironment::Command.is_container?
processed_workers = []
miq_workers.each do |w|
next unless class_name.nil? || (w.type == class_name)
Expand Down
4 changes: 3 additions & 1 deletion app/models/miq_server/worker_management/monitor/stop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def stop_worker(worker, monitor_status = :waiting_for_stop, monitor_reason = nil
worker_set_monitor_status(w.pid, monitor_status)
worker_set_monitor_reason(w.pid, monitor_reason)

if w.respond_to?(:terminate)
if w.containerized_worker?
w.stop_container
elsif w.respond_to?(:terminate)
w.terminate
else
w.update_attributes(:status => MiqWorker::STATUS_STOPPING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ module MiqServer::WorkerManagement::Monitor::SystemLimits
}

def kill_workers_due_to_resources_exhausted?
return false if MiqEnvironment::Command.is_container?
options = worker_monitor_settings[:kill_algorithm].merge(:type => :kill)
invoke_algorithm(options)
end

def enough_resource_to_start_worker?(worker_class)
return true if MiqEnvironment::Command.is_container?
# HACK, sync_config is done in the server, while this method is called from miq_worker
# This method should move to the worker and the server should pass the settings.
sync_config if worker_monitor_settings.nil? || child_worker_settings.nil?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module MiqServer::WorkerManagement::Monitor::Validation
extend ActiveSupport::Concern

def validate_worker(w)
return true if MiqEnvironment::Command.is_container?
time_threshold = get_time_threshold(w)
restart_interval = get_restart_interval(w)
memory_threshold = get_memory_threshold(w)
Expand Down
13 changes: 13 additions & 0 deletions app/models/miq_ui_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,17 @@ def friendly_name
end

include MiqWebServerWorkerMixin
include MiqWorker::ServiceWorker

def self.supports_container?
true
end

def container_port
3001
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be 3000?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, 3000 is exposed from the container and since the UI needs to serve assets as well as run the worker httpd listens on 3000 then points to 3001 for the worker.

end

def container_image_name
"manageiq/manageiq-ui-worker"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you be able to change the org name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we drop the "manageiq-" from the container name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to put all of this in settings like we did for the awx stuff, but I think I'm going to change that in a follow-up.

As for the name, I don't really have a preference. Thoughts @Fryguy ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carbonin and I discussed moving this into Settings, but I wanted to get this in first, and do that as an incremental improvement.

end
end
5 changes: 5 additions & 0 deletions app/models/miq_ui_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
class MiqUiWorker::Runner < MiqWorker::Runner
include MiqWebServerRunnerMixin

def prepare
super
MiqApache::Control.start if MiqEnvironment::Command.is_container?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was going away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

We need it to serve static assets.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot about that 👍

end
end
6 changes: 6 additions & 0 deletions app/models/miq_vim_broker_worker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class MiqVimBrokerWorker < MiqWorker
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.required_roles = lambda {
Expand All @@ -19,6 +21,10 @@ class MiqVimBrokerWorker < MiqWorker
return 1
}

def self.supports_container?
true
end

def self.has_required_role?
return false if emses_to_monitor.empty?
super
Expand Down
5 changes: 5 additions & 0 deletions app/models/miq_web_service_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ def friendly_name
end

include MiqWebServerWorkerMixin
include MiqWorker::ServiceWorker

def self.supports_container?
true
end
end
5 changes: 5 additions & 0 deletions app/models/miq_websocket_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ def friendly_name
end

include MiqWebServerWorkerMixin
include MiqWorker::ServiceWorker

def self.supports_container?
true
end
end
35 changes: 30 additions & 5 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'io/wait'

class MiqWorker < ApplicationRecord
include_concern 'ContainerCommon'
include UuidMixin

before_destroy :log_destroy_of_worker_messages
Expand Down Expand Up @@ -251,18 +252,22 @@ def self.log_status(level = :info)
find_current.each { |w| w.log_status(level) }
end

def self.create_worker_record(*params)
def self.init_worker_object(*params)
params = params.first
params = {} unless params.kind_of?(Hash)
params[:queue_name] = default_queue_name unless params.key?(:queue_name) || default_queue_name.nil?
params[:status] = STATUS_CREATING
params[:last_heartbeat] = Time.now.utc

server_scope.create(params)
server_scope.new(params)
end

def self.create_worker_record(*params)
init_worker_object(*params).tap(&:save)
end

def self.start_worker(*params)
w = create_worker_record(*params)
w = containerized_worker? ? init_worker_object(*params) : create_worker_record(*params)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think I see what your doing here, and why, but curious what this means for the future:

  • To clarify, we are doing creating worker records because it is too hard to manage our DB records with the containers that are currently running on OpenShift?
  • Does this mean scripts/tools like evm:status would have to be completely re-tooled to work when running in a container env?
  • Is the plan just to eventually drop these tables?
  • Does this mess any other portions of the MiqServer#monitor_loop, now that there aren't DB records? (looks like you covered stop_worker, but curious if there are other pieces I am missing)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, we're still going to have worker records, it's just that they need to be created by the worker container rather than by the server.

This is really because of the worker guid. We can't generate the guid here because we can't tell a replica what guid it should be using. The solution is to have the worker itself create the guid, and by extension the entire record.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just that they need to be created by the worker container rather than by the server.

Doh... I even wrote the code that does that and forgot... okay, guess that pretty much answers most of the questions then huh?

We can't generate the guid here because we can't tell a replica what guid it should be using.

Forgive my lacking of openshift knowledge, but I assume we can't even pass a replica a ENV var if we were to make a change to support that in the run_single_worker.rb script as well? I assume no, but just thought I would check and make sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we can't even pass a replica a ENV var if we were to make a change to support that in the run_single_worker.rb script as well?

Correct, the environment for the container is defined at the deployment config level so each replica will have the same set of environment variables.

w.start
w
end
Expand Down Expand Up @@ -360,14 +365,32 @@ def queue_name
end
end

def self.supports_container?
false
end

def self.containerized_worker?
MiqEnvironment::Command.is_container? && supports_container?
end

def containerized_worker?
self.class.containerized_worker?
end

def start_runner
if ENV['MIQ_SPAWN_WORKERS'] || !Process.respond_to?(:fork)
start_runner_via_spawn
elsif containerized_worker?
start_runner_via_container
else
start_runner_via_fork
end
end

def start_runner_via_container
create_container_objects
end

def start_runner_via_fork
self.class.before_fork
pid = fork(:cow_friendly => true) do
Expand Down Expand Up @@ -412,10 +435,10 @@ def start_runner_via_spawn

def start
self.pid = start_runner
save
save unless containerized_worker?

msg = "Worker started: ID [#{id}], PID [#{pid}], GUID [#{guid}]"
MiqEvent.raise_evm_event_queue(miq_server, "evm_worker_start", :event_details => msg, :type => self.class.name)
MiqEvent.raise_evm_event_queue(miq_server || MiqServer.my_server, "evm_worker_start", :event_details => msg, :type => self.class.name)

_log.info(msg)
self
Expand Down Expand Up @@ -502,6 +525,8 @@ def log_destroy_of_worker_messages
end

def status_update
return if MiqEnvironment::Command.is_container?

begin
pinfo = MiqProcess.processInfo(pid)
rescue Errno::ESRCH
Expand Down
37 changes: 37 additions & 0 deletions app/models/miq_worker/container_common.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require 'kubeclient'

class MiqWorker
module ContainerCommon
extend ActiveSupport::Concern

def configure_worker_deployment(definition, replicas = 0)
definition[:spec][:replicas] = replicas
definition[:spec][:template][:spec][:terminationGracePeriodSeconds] = self.class.worker_settings[:stopping_timeout].seconds

container = definition[:spec][:template][:spec][:containers].first
container[:image] = "#{container_image_name}:#{container_image_tag}"
container[:env] << {:name => "WORKER_CLASS_NAME", :value => self.class.name}
end

def scale_deployment
ContainerOrchestrator.new.scale(worker_deployment_name, self.class.workers_configured_count)
delete_container_objects if self.class.workers_configured_count.zero?
end

def container_image_name
"manageiq/manageiq-base-worker"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

end

def container_image_tag
"latest"
end

def worker_deployment_name
@worker_deployment_name ||= begin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

deployment_name = abbreviated_class_name.dup.chomp("Worker").sub("Manager", "").sub(/^Miq/, "")
deployment_name << "-#{Array(ems_id).map { |id| ApplicationRecord.split_id(id).last }.join("-")}" if respond_to?(:ems_id)
deployment_name.underscore.dasherize.tr("/", "-")
end
end
end
end
20 changes: 20 additions & 0 deletions app/models/miq_worker/deployment_per_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class MiqWorker
module DeploymentPerWorker
extend ActiveSupport::Concern

def create_container_objects
ContainerOrchestrator.new.create_deployment_config(worker_deployment_name) do |definition|
configure_worker_deployment(definition, 1)
definition[:spec][:template][:spec][:containers].first[:env] << {:name => "EMS_IDS", :value => Array.wrap(self.class.ems_id_from_queue_name(queue_name)).join(",")}
end
end

def delete_container_objects
ContainerOrchestrator.new.delete_deployment_config(worker_deployment_name)
end

def stop_container
delete_container_objects
end
end
end
20 changes: 20 additions & 0 deletions app/models/miq_worker/replica_per_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class MiqWorker
module ReplicaPerWorker
extend ActiveSupport::Concern

def create_container_objects
ContainerOrchestrator.new.create_deployment_config(worker_deployment_name) do |definition|
configure_worker_deployment(definition)
end
scale_deployment
end

def delete_container_objects
ContainerOrchestrator.new.delete_deployment_config(worker_deployment_name)
end

def stop_container
scale_deployment
end
end
end
Loading