Skip to content

Commit

Permalink
Notify distributor when a new task added
Browse files Browse the repository at this point in the history
  • Loading branch information
ThisIsClark committed Aug 30, 2021
1 parent 903138a commit 8c34e9b
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 20 deletions.
21 changes: 4 additions & 17 deletions delfin/api/v1/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from delfin.common import constants
from delfin.drivers import api as driverapi
from delfin.i18n import _
from delfin.task_manager import perf_job_manager
from delfin.task_manager import rpcapi as task_rpcapi
from delfin.task_manager.tasks import resources
from delfin.task_manager.tasks import telemetry as task_telemetry
Expand All @@ -43,6 +44,7 @@ def __init__(self):
super().__init__()
self.task_rpcapi = task_rpcapi.TaskAPI()
self.driver_api = driverapi.API()
self.perf_job_manager = perf_job_manager.PerfJobManager()
self.search_options = ['name', 'vendor', 'model', 'status',
'serial_number']

Expand Down Expand Up @@ -109,8 +111,8 @@ def create(self, req, body):
capabilities = self.driver_api.get_capabilities(
context=ctxt, storage_id=storage['id'])
validation.validate_capabilities(capabilities)
_create_performance_monitoring_task(ctxt, storage['id'],
capabilities)
self.perf_job_manager.create_perf_job(ctxt, storage['id'],
capabilities)
except exception.EmptyResourceMetrics:
msg = _("Resource metric provided by capabilities is empty for "
"storage: %s") % storage['id']
Expand Down Expand Up @@ -265,18 +267,3 @@ def _set_synced_if_ok(context, storage_id, resource_count):
storage['sync_status'] = resource_count * constants.ResourceSync.START
storage['updated_at'] = current_time
db.storage_update(context, storage['id'], storage)


def _create_performance_monitoring_task(context, storage_id, capabilities):
# Check resource_metric attribute availability and
# check if resource_metric is empty
if 'resource_metrics' not in capabilities \
or not bool(capabilities.get('resource_metrics')):
raise exception.EmptyResourceMetrics()

task = dict()
task.update(storage_id=storage_id)
task.update(args=capabilities.get('resource_metrics'))
task.update(interval=CONF.telemetry.performance_collection_interval)
task.update(method=constants.TelemetryCollection.PERFORMANCE_TASK_METHOD)
db.task_create(context=context, values=task)
10 changes: 10 additions & 0 deletions delfin/leader_election/distributor/task_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ def __call__(self):
else:
LOG.debug("Periodic job distribution completed.")

def distribute_new_job(self, task_id):
executor = CONF.host
try:
db.task_update(self.ctx, task_id, {'executor': executor})
LOG.info('Distribute a new job, id: %s' % task_id)
self.task_rpcapi.assign_job(self.ctx, task_id, executor)
except Exception as e:
LOG.error('Failed to distribute the new job, reason: %s',
six.text_type(e))

@classmethod
def job_interval(cls):
return TelemetryCollection.PERIODIC_JOB_INTERVAL
32 changes: 32 additions & 0 deletions delfin/task_manager/job_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log

from delfin import manager
from delfin.leader_election.distributor import task_distributor

LOG = log.getLogger(__name__)


class JobGenerator(manager.Manager):
"""Generate job to job distributor"""

RPC_API_VERSION = '1.0'

def __init__(self, service_name=None, *args, **kwargs):
super(JobGenerator, self).__init__(*args, **kwargs)

def add_new_job(self, context, task_id):
distributor = task_distributor.TaskDistributor(context)
distributor.distribute_new_job(task_id)
8 changes: 5 additions & 3 deletions delfin/task_manager/metrics_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
periodical task manager for metric collection tasks**
"""
from oslo_log import log

from delfin import manager
from delfin.leader_election.distributor import task_distributor
from delfin.task_manager.scheduler import schedule_manager
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\
import JobHandler
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler\
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \
import FailedJobHandler
from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \
import JobHandler
from delfin.task_manager.tasks import telemetry

LOG = log.getLogger(__name__)
Expand Down
69 changes: 69 additions & 0 deletions delfin/task_manager/perf_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Client side of the metrics task manager RPC API.
"""

import oslo_messaging as messaging
from oslo_config import cfg
from oslo_log import log

from delfin import rpc
from delfin import db
from delfin import exception
from delfin.common import constants

LOG = log.getLogger(__name__)
CONF = cfg.CONF


class PerfJobManager(object):
"""Client side of the metrics task rpc API.
API version history:
1.0 - Initial version.
"""

RPC_API_VERSION = '1.0'

def __init__(self):
self.target = messaging.Target(topic='JobGenerator',
version=self.RPC_API_VERSION)
self.client = rpc.get_client(self.target,
version_cap=self.RPC_API_VERSION)

def create_perf_job(self, context, storage_id, capabilities):
# Add it to db
# Check resource_metric attribute availability and
# check if resource_metric is empty
if 'resource_metrics' not in capabilities \
or not bool(capabilities.get('resource_metrics')):
raise exception.EmptyResourceMetrics()

task = dict()
task.update(storage_id=storage_id)
task.update(args=capabilities.get('resource_metrics'))
task.update(interval=CONF.telemetry.performance_collection_interval)
task.update(method=
constants.TelemetryCollection.PERFORMANCE_TASK_METHOD)
db.task_create(context=context, values=task)
# Add it to RabbitMQ
filters = {'storage_id': storage_id}
task_id = db.task_get_all(context, filters=filters)[0].get('id')
call_context = self.client.prepare(version='1.0')
return call_context.cast(context,
'add_new_job',
task_id=task_id)
12 changes: 12 additions & 0 deletions delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@

import six
from apscheduler.schedulers.background import BackgroundScheduler
from oslo_config import cfg
from oslo_log import log
from oslo_utils import importutils
from oslo_utils import uuidutils

from delfin import context
from delfin import service
from delfin import utils
from delfin.leader_election.distributor.failed_task_distributor\
import FailedTaskDistributor
from delfin.leader_election.distributor.task_distributor \
import TaskDistributor

LOG = log.getLogger(__name__)
CONF = cfg.CONF

SCHEDULER_BOOT_JOBS = [
TaskDistributor.__module__ + '.' + TaskDistributor.__name__,
Expand Down Expand Up @@ -76,6 +79,15 @@ def schedule_boot_jobs(self):
LOG.error("Failed to initialize periodic tasks, reason: %s.",
six.text_type(e))
raise e
job_generator = service. \
TaskService.create(binary='delfin-task',
topic='JobGenerator',
manager='delfin.'
'task_manager.'
'job_generator.'
'JobGenerator',
coordination=True)
service.serve(job_generator)

def stop(self):
"""Cleanup periodic jobs"""
Expand Down

0 comments on commit 8c34e9b

Please sign in to comment.