Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: celery版本升级到5.1.2 --story=121220239 #4410

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions bkmonitor/alarm_backends/service/access/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from celery.task import task
from celery import shared_task

from alarm_backends.core.cache import key
from alarm_backends.core.lock.service_lock import service_lock
Expand All @@ -20,7 +20,7 @@
from core.prometheus import metrics


@task(ignore_result=True, queue="celery_service")
@shared_task(ignore_result=True, queue="celery_service")
def run_access_data(strategy_group_key, interval=60):
with service_lock(key.SERVICE_LOCK_ACCESS, strategy_group_key=strategy_group_key):
task_tb = TokenBucket(strategy_group_key, interval)
Expand All @@ -35,20 +35,20 @@ def run_access_data(strategy_group_key, interval=60):
task_tb.release(max([int(processor.pull_duration), 1]))


@task(queue="celery_service_batch", ignore_result=True)
@shared_task(queue="celery_service_batch", ignore_result=True)
def run_access_batch_data(strategy_group_key: str, sub_task_id: str):
processor = AccessBatchDataProcess(strategy_group_key=strategy_group_key, sub_task_id=sub_task_id)
return processor.process()


@task(ignore_result=True, queue="celery_service")
@shared_task(ignore_result=True, queue="celery_service")
def run_access_event(access_type):
access_type_cls = ACCESS_TYPE_TO_CLASS.get(access_type)
access_type_cls().process()
metrics.report_all()


@task(ignore_result=True, queue="celery_service_access_event")
@shared_task(ignore_result=True, queue="celery_service_access_event")
def run_access_event_handler(data_id):
"""
事件处理器
Expand Down
6 changes: 3 additions & 3 deletions bkmonitor/alarm_backends/service/alert/builder/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
import time
from typing import List

from celery.task import task
from celery import shared_task
from kafka.consumer.fetcher import ConsumerRecord

from alarm_backends.core.alert import Event
from alarm_backends.service.alert.builder.processor import AlertBuilder
from core.prometheus import metrics


@task(ignore_result=True, queue="celery_alert_builder")
@shared_task(ignore_result=True, queue="celery_alert_builder")
def run_alert_builder(topic_data_id, bootstrap_server, events: List[ConsumerRecord]):
builder = AlertBuilder()
exc = None
Expand Down Expand Up @@ -52,7 +52,7 @@ def run_alert_builder(topic_data_id, bootstrap_server, events: List[ConsumerReco
metrics.report_all()


@task(ignore_result=True, queue="celery_alert_builder")
@shared_task(ignore_result=True, queue="celery_alert_builder")
def dedupe_events_to_alerts(events: List[Event]):
builder = AlertBuilder()
try:
Expand Down
4 changes: 2 additions & 2 deletions bkmonitor/alarm_backends/service/alert/manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import time
from typing import Dict, List

from celery.task import task
from celery import shared_task
from elasticsearch.helpers import BulkIndexError
from elasticsearch_dsl import Q

Expand Down Expand Up @@ -190,7 +190,7 @@ def send_check_task(alerts: List[Dict], run_immediately=True):
)


@task(ignore_result=True, queue="celery_alert_manager")
@shared_task(ignore_result=True, queue="celery_alert_manager")
def handle_alerts(alert_keys: List[AlertKey]):
"""
处理告警(异步任务)
Expand Down
4 changes: 2 additions & 2 deletions bkmonitor/alarm_backends/service/composite/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import logging

from celery.task import task
from celery import shared_task

from alarm_backends.core.alert import Alert
from alarm_backends.core.alert.alert import AlertKey
Expand All @@ -12,7 +12,7 @@
logger = logging.getLogger("composite")


@task(ignore_result=True, queue="celery_composite")
@shared_task(ignore_result=True, queue="celery_composite")
def check_action_and_composite(
alert_key: AlertKey, alert_status: str, composite_strategy_ids: list = None, retry_times: int = 0
):
Expand Down
4 changes: 2 additions & 2 deletions bkmonitor/alarm_backends/service/converge/shield/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import logging

from celery.task import task
from celery import shared_task

from alarm_backends.core.cluster import get_cluster_bk_biz_ids
from alarm_backends.service.converge.shield.shield_obj import AlertShieldObj
Expand Down Expand Up @@ -44,7 +44,7 @@ def check_and_send_shield_notice():


# sharded task
@task(ignore_result=True, queue="celery_cron")
@shared_task(ignore_result=True, queue="celery_cron")
def do_check_and_send_shield_notice(ids):
shield_configs = list(
Shield.objects.filter(
Expand Down
4 changes: 2 additions & 2 deletions bkmonitor/alarm_backends/service/converge/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import logging
import time

from celery.task import task
from celery import shared_task
from django.db import OperationalError

from alarm_backends.constants import CONST_HALF_MINUTE
Expand All @@ -21,7 +21,7 @@
logger = logging.getLogger("fta_action.converge")


@task(ignore_result=True, queue="celery_converge")
@shared_task(ignore_result=True, queue="celery_converge")
def run_converge(converge_config, instance_id, instance_type, converge_context=None, alerts=None, retry_times=0):
"""
执行收敛动作
Expand Down
6 changes: 3 additions & 3 deletions bkmonitor/alarm_backends/service/detect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import logging

from celery.task import task
from celery import shared_task

from alarm_backends.core.cache import key
from alarm_backends.service.detect.process import DetectProcess
Expand All @@ -21,7 +21,7 @@
logger = logging.getLogger("detect")


@task(ignore_result=True, queue="celery_service")
@shared_task(ignore_result=True, queue="celery_service")
def run_detect(strategy_id):
client = key.DATA_SIGNAL_KEY.client
data_signal_key = key.DATA_SIGNAL_KEY.get_key()
Expand All @@ -48,6 +48,6 @@ def run_detect(strategy_id):
metrics.report_all()


@task(ingnore_result=True, queue="celery_service_aiops")
@shared_task(ingnore_result=True, queue="celery_service_aiops")
def run_detect_with_sdk(strategy_id):
return run_detect(strategy_id)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections import defaultdict
from datetime import datetime, timedelta, timezone

from celery.task import task
from celery import shared_task
from django.db.models import Q
from django.utils.translation import gettext as _

Expand Down Expand Up @@ -47,7 +47,7 @@
logger = logging.getLogger("fta_action.run")


@task(ignore_result=True, queue="celery_running_action")
@shared_task(ignore_result=True, queue="celery_running_action")
def run_action(action_type, action_info):
"""
自愈动作的执行入口函数
Expand Down Expand Up @@ -166,7 +166,7 @@ def run_action(action_type, action_info):
metrics.report_all()


@task(ignore_result=True, queue="celery_webhook_action")
@shared_task(ignore_result=True, queue="celery_webhook_action")
def run_webhook_action(action_type, action_info):
"""
支持webhook回调和队列回调的任务
Expand All @@ -186,7 +186,7 @@ def sync_action_instances():
sync_action_instances_every_10_secs.apply_async(countdown=interval * 10, expires=120)


@task(ignore_result=True, queue="celery_action_cron")
@shared_task(ignore_result=True, queue="celery_action_cron")
def sync_action_instances_every_10_secs(last_sync_time=None):
"""
每隔十秒同步任务
Expand Down Expand Up @@ -238,7 +238,7 @@ def sync_action_instances_every_10_secs(last_sync_time=None):
return


@task(ignore_result=True, queue="celery_action_cron")
@shared_task(ignore_result=True, queue="celery_action_cron")
def sync_actions_sharding_task(action_ids):
"""
分片任务同步信息,避免一次任务量太大
Expand Down Expand Up @@ -373,7 +373,7 @@ def check_timeout_actions():
return


@task(ignore_result=True, queue="celery_action_cron")
@shared_task(ignore_result=True, queue="celery_action_cron")
def execute_demo_actions():
"""
从DB获取调试任务推送到执行任务队列
Expand Down Expand Up @@ -460,7 +460,7 @@ def generate_duty_plan_task():
return managers


@task(ignore_result=True, queue="celery_action_cron")
@shared_task(ignore_result=True, queue="celery_action_cron")
def manage_group_duty_snap(duty_manager: GroupDutyRuleManager):
"""
单个任务组的排班计划管理
Expand All @@ -472,7 +472,7 @@ def manage_group_duty_snap(duty_manager: GroupDutyRuleManager):
logger.info("finished to manage group(%s)'s duty plan", duty_manager.user_group.id)


@task(ignore_result=True, queue="celery_action_cron")
@shared_task(ignore_result=True, queue="celery_action_cron")
def manage_group_duty_notice(duty_manager: GroupDutyRuleManager):
"""
单个任务组的排班计划管理
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from typing import List

from celery.task import task
from celery import shared_task
from django.conf import settings
from django.utils.translation import gettext as _
from elasticsearch import ConflictError
Expand Down Expand Up @@ -46,7 +46,7 @@
logger = logging.getLogger("fta_action.run")


@task(ignore_result=True, queue="celery_action")
@shared_task(ignore_result=True, queue="celery_action")
def create_actions(
strategy_id,
signal,
Expand Down Expand Up @@ -114,7 +114,7 @@ def create_actions(
return actions


@task(ignore_result=True, queue="celery_interval_action")
@shared_task(ignore_result=True, queue="celery_interval_action")
def create_interval_actions(
strategy_id,
signal,
Expand Down Expand Up @@ -172,7 +172,7 @@ def check_create_poll_action():
check_create_poll_action_10_secs.apply_async(countdown=interval, expires=120)


@task(ignore_result=True, queue="celery_action_cron")
@shared_task(ignore_result=True, queue="celery_action_cron")
def check_create_poll_action_10_secs():
"""
每10s进行一次数据查询
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
from collections import defaultdict

from celery.task import task
from celery import shared_task
from django.conf import settings
from django.db.models import Q
from django.utils.functional import cached_property
Expand All @@ -24,7 +24,7 @@
logger = logging.getLogger("fta_action.run")


@task(ignore_result=True, queue="celery_action")
@shared_task(ignore_result=True, queue="celery_action")
def run_noise_reduce_task(processor):
processor.process()

Expand Down
4 changes: 2 additions & 2 deletions bkmonitor/alarm_backends/service/nodata/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import logging
import time

from celery.task import task
from celery import shared_task

from alarm_backends.service.nodata.processor import CheckProcessor
from core.errors.alarm_backends import LockError
Expand All @@ -22,7 +22,7 @@
logger = logging.getLogger("nodata")


@task(ignore_result=True, queue="celery_service")
@shared_task(ignore_result=True, queue="celery_service")
def no_data_check(strategy_id, now_timestamp):
"""
:summary: 检测当前策略是否需要无数据告警
Expand Down
6 changes: 3 additions & 3 deletions bkmonitor/alarm_backends/service/preparation/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import logging
import time

from celery.task import task
from celery import shared_task

from alarm_backends.core.cache import key
from alarm_backends.core.cache.strategy import StrategyCacheManager
Expand All @@ -26,7 +26,7 @@
logger = logging.getLogger("preparation")


@task(ignore_result=True, queue="celery_cron")
@shared_task(ignore_result=True, queue="celery_cron")
def refresh_aiops_sdk_depend_data(strategy_id, update_time: int = None):
"""刷新用于AIOPS SDK异常检测的历史依赖数据.

Expand All @@ -41,7 +41,7 @@ def refresh_aiops_sdk_depend_data(strategy_id, update_time: int = None):
logger.exception("Process strategy({strategy_id}) exception, " "{msg}".format(strategy_id=strategy_id, msg=e))


@task(ignore_result=True, queue="celery_cron")
@shared_task(ignore_result=True, queue="celery_cron")
def maintain_all_aiops_sdk_depend_data():
"""通过轮训的方式管理AIOPS SDK的历史依赖.

Expand Down
35 changes: 34 additions & 1 deletion bkmonitor/alarm_backends/service/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import six.moves.urllib.parse
import six.moves.urllib.request
from billiard import cpu_count
from celery import Celery
from celery import Celery, Task
from celery.schedules import maybe_schedule
from celery.signals import beat_init, setup_logging
from django.conf import settings
from django.db import close_old_connections
Expand Down Expand Up @@ -258,3 +259,35 @@ def config_loggers(*args, **kwags):
@beat_init.connect
def clean_db_connections(sender, **kwargs):
close_old_connections()


class PeriodicTask(Task):
"""A task that adds itself to the :setting:`beat_schedule` setting. 兼容celery5"""

abstract = True
ignore_result = True
relative = False
options = None
compat = True

def __init__(self):
if not hasattr(self, 'run_every'):
raise NotImplementedError('Periodic tasks must have a run_every attribute')
self.run_every = maybe_schedule(self.run_every, self.relative)
super(PeriodicTask, self).__init__()

@classmethod
def on_bound(cls, _app):
_app.conf.beat_schedule[cls.name] = {
'task': cls.name,
'schedule': cls.run_every,
'args': (),
'kwargs': {},
'options': cls.options or {},
'relative': cls.relative,
}


def periodic_task(*args, **options):
"""Deprecated decorator, please use :setting:`beat_schedule`."""
return app.task(**dict({"base": PeriodicTask}, **options))
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
import logging

from celery.schedules import crontab
from celery.task import periodic_task

from alarm_backends.core.api_cache.library import API_CRONTAB
from alarm_backends.core.cluster import get_cluster
from alarm_backends.service.scheduler.app import periodic_task
from alarm_backends.service.scheduler.tasks.cron import task_duration

logger = logging.getLogger("cron")
Expand Down
Loading