Skip to content

Commit

Permalink
Telemetry scheduler failed task handling (sodafoundation#10)
Browse files Browse the repository at this point in the history
* telemetry scheduler failed task changes
  • Loading branch information
Amit Roushan authored Mar 21, 2021
1 parent 88f0818 commit 5f16a6a
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 27 deletions.
18 changes: 17 additions & 1 deletion delfin/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,20 @@ class TelemetryCollection(object):
PERFORMANCE_TASK_METHOD = "delfin.task_manager.scheduler.schedulers." \
"telemetry.performance_collection_handler." \
"PerformanceCollectionHandler"
PERIODIC_JOB_INTERVAL = 300
"""Performance monitoring job interval"""
PERIODIC_JOB_INTERVAL = 180
"""Failed Performance monitoring job interval"""
FAILED_JOB_SCHEDULE_INTERVAL = 240


class TelemetryTaskStatus(object):
"""Telemetry task enum"""
TASK_EXEC_STATUS_SUCCESS = True
TASK_EXEC_STATUS_FAILURE = False


class TelemetryJobStatus(object):
"""Telemetry jobs enum"""
FAILED_JOB_STATUS_SUCCESS = "Success"
FAILED_JOB_STATUS_STARTED = "Started"
FAILED_JOB_STATUS_INIT = "Initialized"
3 changes: 2 additions & 1 deletion delfin/db/sqlalchemy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,5 +278,6 @@ class FailedTask(BASE, DelfinBase):
start_time = Column(Integer)
end_time = Column(Integer)
retry_count = Column(Integer)
result = Column(String(255))
method = Column(String(255))
job_id = Column(String(36))
result = Column(String(255))
4 changes: 4 additions & 0 deletions delfin/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,7 @@ class StorageCapabilityNotSupported(Invalid):
class EmptyResourceMetrics(DelfinException):
msg_fmt = _("Empty resource metric in capabilities")
code = 501


class TelemetryTaskExecError(DelfinException):
msg_fmt = _("Failure in telemetry task execution")
3 changes: 2 additions & 1 deletion delfin/task_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def collect_telemetry(self, context, storage_id, telemetry_task,
" id:{1}".format(args, storage_id))
cls = importutils.import_class(telemetry_task)
device_obj = cls()
device_obj.collect(context, storage_id, args, start_time, end_time)
return device_obj.collect(context, storage_id, args, start_time,
end_time)

def remove_storage_resource(self, context, storage_id, resource_task):
cls = importutils.import_class(resource_task)
Expand Down
2 changes: 1 addition & 1 deletion delfin/task_manager/rpcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def sync_storage_resource(self, context, storage_id, resource_task):
def collect_telemetry(self, context, storage_id, telemetry_task, args,
start_time, end_time):
call_context = self.client.prepare(version='1.0')
return call_context.cast(context,
return call_context.call(context,
'collect_telemetry',
storage_id=storage_id,
telemetry_task=telemetry_task,
Expand Down
6 changes: 2 additions & 4 deletions delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
from datetime import datetime

import six
from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils

from delfin import context
from delfin.common import constants
from delfin.common.constants import TelemetryCollection
from delfin.task_manager.scheduler import scheduler
from delfin.task_manager.scheduler.schedulers.telemetry import telemetry_job

LOG = log.getLogger(__name__)
CONF = cfg.CONF


class SchedulerManager(object):
Expand All @@ -42,7 +40,7 @@ def start(self):
periodic_scheduler_job_id = uuidutils.generate_uuid()
self.schedule_instance.add_job(
telemetry_job.TelemetryJob(ctxt), 'interval', args=[ctxt],
seconds=constants.TelemetryCollection.PERIODIC_JOB_INTERVAL,
seconds=TelemetryCollection.PERIODIC_JOB_INTERVAL,
next_run_time=datetime.now(),
id=periodic_scheduler_job_id)
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import six
from oslo_config import cfg
from oslo_log import log

from delfin import db
from delfin import exception
from delfin.common.constants import TelemetryJobStatus
from delfin.db.sqlalchemy.models import FailedTask
from delfin.db.sqlalchemy.models import Task
from delfin.i18n import _
from delfin.task_manager import rpcapi as task_rpcapi
from delfin.task_manager.scheduler import scheduler
from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask

LOG = log.getLogger(__name__)
CONF = cfg.CONF


class FailedPerformanceCollectionHandler(object):
def __init__(self, ctx, filed_task_id, storage_id, args, job_id,
start_time, end_time):
self.ctx = ctx
self.filed_task_id = filed_task_id
self.retry_count = 0
self.storage_id = storage_id
self.job_id = job_id
self.args = args
self.start_time = start_time
self.end_time = end_time
self.task_rpcapi = task_rpcapi.TaskAPI()
self.scheduler_instance = scheduler.Scheduler.get_instance()
self.result = TelemetryJobStatus.FAILED_JOB_STATUS_INIT

@classmethod
def get_instance(cls, ctx, failed_task_id):
# fetch failed task info
failed_task = db.failed_task_get(ctx, failed_task_id)
# fetched task info
task = db.task_get(ctx, failed_task[FailedTask.task_id.name])
return FailedPerformanceCollectionHandler(
ctx,
failed_task[FailedTask.id.name],
task[Task.storage_id.name],
task[Task.args.name],
failed_task[FailedTask.job_id.name],
failed_task[FailedTask.start_time.name],
failed_task[FailedTask.end_time.name],
)

def __call__(self):
# pull performance collection info
self.retry_count = self.retry_count + 1
try:
status = self.task_rpcapi.collect_telemetry(
self.ctx, self.storage_id,
PerformanceCollectionTask.__module__ + '.' +
PerformanceCollectionTask.__name__,
self.args, self.start_time, self.end_time)

if not status:
raise exception.TelemetryTaskExecError()
except Exception as e:
LOG.error(e)
msg = _("Failed to collect performance metrics for storage "
"id:{0}, reason:{1}".format(self.storage_id,
six.text_type(e)))
LOG.error(msg)
else:
LOG.info("Successfully completed Performance metrics collection "
"for storage id :{0} ".format(self.storage_id))
self.result = TelemetryJobStatus.FAILED_JOB_STATUS_SUCCESS
self._teardown_task()
return

if self.retry_count >= CONF.TELEMETRY.max_failed_task_retry_count:
msg = _(
"Failed to collect performance metrics of task instance "
"id:{0} for start time:{1} and end time:{2} with "
"maximum retry. Giving up on "
"retry".format(self.filed_task_id, self.start_time,
self.end_time))
LOG.error(msg)
self._teardown_task()
return

self.result = TelemetryJobStatus.FAILED_JOB_STATUS_STARTED
db.failed_task_update(self.ctx, self.filed_task_id,
{FailedTask.retry_count.name: self.retry_count,
FailedTask.result.name: self.result})

def _teardown_task(self):
db.failed_task_update(self.ctx, self.filed_task_id,
{FailedTask.retry_count.name: self.retry_count,
FailedTask.result.name: self.result})
self.scheduler_instance.pause_job(self.job_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright 2021 The SODA Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime

import six
from oslo_config import cfg
from oslo_log import log
from oslo_utils import importutils
from oslo_utils import uuidutils

from delfin import db
from delfin import utils
from delfin.common.constants import TelemetryJobStatus
from delfin.db.sqlalchemy.models import FailedTask
from delfin.task_manager.scheduler import scheduler

LOG = log.getLogger(__name__)
CONF = cfg.CONF

telemetry_opts = [
cfg.IntOpt('max_failed_task_retry_count', default=5,
help='default value (in integer) for maximum number of retries '
'for failed task execution'),
]

CONF.register_opts(telemetry_opts, "TELEMETRY")
telemetry = CONF.TELEMETRY


@six.add_metaclass(utils.Singleton)
class FailedTelemetryJob(object):
def __init__(self, ctx):
# create the object of periodic scheduler
self.schedule = scheduler.Scheduler.get_instance()
self.ctx = ctx

def __call__(self):
"""
:return:
"""
try:
# create the object of periodic scheduler
failed_tasks = db.failed_task_get_all(self.ctx)

if not len(failed_tasks):
LOG.info("No failed task found for performance collection")
return

LOG.info("Schedule performance collection triggered: total "
"failed tasks:%s" % len(failed_tasks))

for failed_task in failed_tasks:
# Get current time in epoch format
failed_task_id = failed_task[FailedTask.id.name]
LOG.info("Processing failed task : %s" % failed_task_id)

retry_count = failed_task[FailedTask.retry_count.name]
result = failed_task[FailedTask.result.name]
job_id = failed_task[FailedTask.job_id.name]
if retry_count >= telemetry.max_failed_task_retry_count \
or result == \
TelemetryJobStatus.FAILED_JOB_STATUS_SUCCESS:
LOG.info("Exiting Failure task processing for task [%d] "
"with result [%s] and retry count [%d] "
% (failed_task_id, result, retry_count))
# task ID is same as job id
self._teardown_task(self.ctx, failed_task_id, job_id)
continue

# check if job already scheduled
if job_id and self.schedule.get_job(job_id):
# skip if job already exist
continue

# fetch storage_id and args from task table
telemetry_task = db.task_get(
self.ctx, failed_task[FailedTask.task_id.name])
if not telemetry_task:
# failed task if original task is not available
db.failed_task_delete(self.ctx, failed_task_id)

if not job_id:
job_id = uuidutils.generate_uuid()
db.failed_task_update(self.ctx, failed_task_id,
{FailedTask.job_id.name: job_id})

# method indicates the specific collection task to be
# triggered
collection_class = importutils.import_class(
failed_task[FailedTask.method.name])
instance = \
collection_class.get_instance(self.ctx, failed_task_id)
# Create failed task collection
self.schedule.add_job(
instance, 'interval',
seconds=failed_task[FailedTask.interval.name],
next_run_time=datetime.now(), id=job_id)

except Exception as e:
LOG.error("Failed to schedule retry tasks for performance "
"collection, reason: %s", six.text_type(e))
else:
# start the scheduler
LOG.info("Schedule collection completed")

def _teardown_task(self, ctx, failed_task_id, job_id):
db.failed_task_delete(ctx, failed_task_id)
self.schedule.remove_job(job_id)
Loading

0 comments on commit 5f16a6a

Please sign in to comment.