Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 数据同步增加默认重试次数,当所有重试都失败时处理异常 #197 #202

Merged
merged 5 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/api/bkuser_core/categories/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ class SyncTaskStatus(AutoLowerEnum):
SUCCESSFUL = auto()
FAILED = auto()
RUNNING = auto()
RETRYING = auto()

_choices_labels = ((SUCCESSFUL, _("成功")), (FAILED, _("失败")), (RUNNING, _("同步中")))
_choices_labels = ((SUCCESSFUL, _("成功")), (FAILED, _("失败")), (RUNNING, _("同步中")), (RETRYING, _("失败重试中")))
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 3.2.5 on 2021-12-09 08:04

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("categories", "0011_synctask"),
]

operations = [
migrations.AddField(
model_name="synctask",
name="retry_count",
field=models.IntegerField(default=0, verbose_name="重试次数"),
),
migrations.AlterField(
model_name="syncprogress",
name="status",
field=models.CharField(
choices=[("successful", "成功"), ("failed", "失败"), ("running", "同步中"), ("retrying", "失败重试中")],
default="running",
max_length=16,
verbose_name="状态",
),
),
migrations.AlterField(
model_name="synctask",
name="status",
field=models.CharField(
choices=[("successful", "成功"), ("failed", "失败"), ("running", "同步中"), ("retrying", "失败重试中")],
default="running",
max_length=16,
verbose_name="状态",
),
),
]
9 changes: 1 addition & 8 deletions src/api/bkuser_core/categories/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,14 @@ class SyncTask(TimestampedModel):
verbose_name="触发类型", max_length=16, choices=SyncTaskType.get_choices(), default=SyncTaskType.MANUAL.value
)
operator = models.CharField(max_length=255, verbose_name="操作人", default="nobody")
retry_count = models.IntegerField(verbose_name="重试次数", default=0)
IMBlues marked this conversation as resolved.
Show resolved Hide resolved

objects = SyncTaskManager()

@property
def required_time(self) -> datetime.timedelta:
return self.update_time - self.create_time

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
self.status = SyncTaskStatus.FAILED.value
self.save(update_fields=["status", "update_time"])

@property
def progresses(self):
# 由于建表顺序的原因, SyncProgress 的 task_id 未设置成外键....
Expand Down
8 changes: 8 additions & 0 deletions src/api/bkuser_core/categories/plugins/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from enum import auto

from bkuser_core.categories.constants import SyncStep
from bkuser_core.common.enum import AutoNameEnum
from django.utils.translation import gettext_lazy as _

PLUGIN_NAME_SETTING_KEY = "plugin_name"
Expand All @@ -22,3 +25,8 @@
(SyncStep.USERS_RELATIONSHIP, True): _("同步用户【{username}】上级成功"),
(SyncStep.USERS_RELATIONSHIP, False): _("同步用户【{username}】上级失败, 失败原因: {error}"),
}


class HookType(AutoNameEnum):
POST_SYNC = auto()
PRE_SYNC = auto()
4 changes: 3 additions & 1 deletion src/api/bkuser_core/categories/plugins/custom/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from bkuser_core.categories.plugins.plugin import DataSourcePlugin
from bkuser_core.categories.plugins.plugin import DataSourcePlugin, HookType

from .hooks import AlertIfFailedHook
from .login import LoginHandler
from .sycner import CustomSyncer

Expand All @@ -19,4 +20,5 @@
login_handler_cls=LoginHandler,
allow_client_write=True,
category_type="custom",
hooks={HookType.POST_SYNC: AlertIfFailedHook},
).register()
24 changes: 24 additions & 0 deletions src/api/bkuser_core/categories/plugins/custom/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging

from celery.states import FAILURE

logger = logging.getLogger(__name__)


class AlertIfFailedHook:
IMBlues marked this conversation as resolved.
Show resolved Hide resolved
def trigger(self, status: str, params: dict):
if status == FAILURE:
logger.error(
"failed to sync data for category<%s> after %s times tries", params["category"], params["retries"]
IMBlues marked this conversation as resolved.
Show resolved Hide resolved
)
# you can send some alerts here
17 changes: 16 additions & 1 deletion src/api/bkuser_core/categories/plugins/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
specific language governing permissions and limitations under the License.
"""
from dataclasses import dataclass, field
from typing import Optional, Type
from typing import Dict, Optional, Type
from uuid import UUID

from bkuser_core.categories.constants import SyncTaskStatus
from bkuser_core.categories.loader import register_plugin
from bkuser_core.categories.models import SyncProgress, SyncTask
from bkuser_core.categories.plugins.base import LoginHandler, Syncer
from bkuser_core.categories.plugins.constants import HookType
from rest_framework import serializers
from typing_extensions import Protocol


class SyncRecordSLZ(serializers.Serializer):
Expand All @@ -25,6 +27,13 @@ class SyncRecordSLZ(serializers.Serializer):
dt = serializers.DateTimeField()


class PluginHook(Protocol):
"""插件钩子,用于各种事件后的回调"""

def trigger(self, status: str, params: dict):
raise NotImplementedError


@dataclass
class DataSourcePlugin:
"""数据源插件,定义不同的数据源"""
Expand All @@ -44,10 +53,16 @@ class DataSourcePlugin:
# 其他额外配置
extra_config: dict = field(default_factory=dict)

hooks: Dict[HookType, Type[PluginHook]] = field(default_factory=dict)

def register(self):
"""注册插件"""
register_plugin(self)

def get_hook(self, type_: HookType) -> Optional[PluginHook]:
hook_cls = self.hooks.get(type_)
return hook_cls() if hook_cls else None

def sync(self, instance_id: int, task_id: UUID, *args, **kwargs):
"""同步数据"""
syncer = self.syncer_cls(category_id=instance_id)
Expand Down
1 change: 1 addition & 0 deletions src/api/bkuser_core/categories/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class SyncTaskSerializer(Serializer):
operator = CharField(help_text="操作人")
create_time = DateTimeField(help_text="开始时间")
required_time = DurationTotalSecondField(help_text="耗时")
retry_count = IntegerField(help_text="重试次数")


class SyncTaskProcessSerializer(Serializer):
Expand Down
92 changes: 67 additions & 25 deletions src/api/bkuser_core/categories/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,95 @@
"""
import logging
import uuid
from typing import Optional
from contextlib import contextmanager
from typing import Any, Optional, Union

from bkuser_core.categories.constants import SyncTaskType
from bkuser_core.categories.constants import SyncTaskStatus, SyncTaskType
from bkuser_core.categories.exceptions import ExistsSyncingTaskError
from bkuser_core.categories.loader import get_plugin_by_category
from bkuser_core.categories.models import ProfileCategory, SyncTask
from bkuser_core.categories.plugins.constants import HookType
from bkuser_core.categories.utils import catch_time
from bkuser_core.celery import app
from bkuser_core.common.cache import clear_cache
from bkuser_core.common.error_codes import error_codes
from celery import Task
from django.conf import settings

logger = logging.getLogger(__name__)


@app.task
class RetryWithHookTask(Task):
"""A task will retry automatically, with plugin hook executing"""

autoretry_for = (Exception,)
retry_kwargs = {"max_retries": settings.TASK_MAX_RETRIES}
retry_backoff = settings.RETRY_BACKOFF
retry_jitter = True

def after_return(self, status, retval, task_id, args, kwargs, einfo):
category = ProfileCategory.objects.get(pk=kwargs["instance_id"])
logger.info("Sync data task<%s> of category<%s> got result: %s", task_id, category, status)

plugin = get_plugin_by_category(category)
post_sync_hook = plugin.get_hook(HookType.POST_SYNC)
if post_sync_hook:
kwargs.update({"retries": self.request.retries, "category": category})
post_sync_hook.trigger(status, kwargs)


@contextmanager
def sync_data_task(category: ProfileCategory, task_id: Union[uuid.UUID, Any], still_retrying: bool):
IMBlues marked this conversation as resolved.
Show resolved Hide resolved
sync_task = SyncTask.objects.get(id=task_id)
try:
yield
except Exception:
if still_retrying:
IMBlues marked this conversation as resolved.
Show resolved Hide resolved
status = SyncTaskStatus.RETRYING.value
sync_task.retry_count += 1
else:
status = SyncTaskStatus.FAILED.value

sync_task.status = status
sync_task.save(update_fields=["retry_count", "status", "update_time"])
raise
else:
# 标记同步
category.mark_synced()
sync_task.status = SyncTaskStatus.SUCCESSFUL.value
sync_task.save(update_fields=["status", "update_time"])

# 同步成功后,清理当前的缓存
clear_cache()


@app.task(base=RetryWithHookTask)
def adapter_sync(instance_id: int, operator: str, task_id: Optional[uuid.UUID] = None, *args, **kwargs):
logger.info("going to sync Category<%s>", instance_id)
instance = ProfileCategory.objects.get(pk=instance_id)
category = ProfileCategory.objects.get(pk=instance_id)

if task_id is None:
# 只有定时任务未传递 task_id
try:
task_id = SyncTask.objects.register_task(category=instance, operator=operator, type_=SyncTaskType.AUTO).id
task_id = SyncTask.objects.register_task(category=category, operator=operator, type_=SyncTaskType.AUTO).id
except ExistsSyncingTaskError as e:
raise error_codes.LOAD_DATA_FAILED.f(str(e))

with SyncTask.objects.get(id=task_id):
try:
plugin = get_plugin_by_category(instance)
except ValueError:
logger.exception("category type<%s> is not support", instance.type)
raise error_codes.CATEGORY_TYPE_NOT_SUPPORTED
except Exception:
logger.exception(
"load adapter<%s-%s-%s> failed",
instance.type,
instance.display_name,
instance.id,
)
raise error_codes.LOAD_DATA_ADAPTER_FAILED
try:
plugin = get_plugin_by_category(category)
except ValueError:
logger.exception("category type<%s> is not support", category.type)
raise error_codes.CATEGORY_TYPE_NOT_SUPPORTED
except Exception:
logger.exception(
"load adapter<%s-%s-%s> failed",
category.type,
category.display_name,
category.id,
)
raise error_codes.LOAD_DATA_ADAPTER_FAILED

with sync_data_task(category, task_id, adapter_sync.request.retries < adapter_sync.max_retries):
with catch_time() as context:
plugin.sync(instance_id=instance_id, task_id=task_id, *args, **kwargs)
logger.info(f"同步总耗时: {context.time_delta}s, 消耗总CPU时间: {context.clock_delta}s.")

# 标记同步
instance.mark_synced()

# 同步成功后,清理当前的缓存
clear_cache()
6 changes: 6 additions & 0 deletions src/api/bkuser_core/config/common/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,9 @@

# 是否使用进度条(本地开发方便)
USE_PROGRESS_BAR = False

# ==============================================================================
# 数据同步
# ==============================================================================
TASK_MAX_RETRIES = env.int("TASK_MAX_RETRIES", default=3)
RETRY_BACKOFF = env.int("RETRY_BACKOFF", default=30)
47 changes: 47 additions & 0 deletions src/api/bkuser_core/tests/categories/plugins/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import pytest
from bkuser_core.categories.constants import SyncTaskStatus, SyncTaskType
from bkuser_core.categories.models import SyncTask
from bkuser_core.categories.tasks import sync_data_task

pytestmark = pytest.mark.django_db


class TestSyncDataTask:
@pytest.fixture
def sync_task(self, test_ldap_category):
return SyncTask.objects.register_task(category=test_ldap_category, operator="admin", type_=SyncTaskType.AUTO)

@pytest.mark.parametrize(
"retrying_status",
[
[True],
[False],
[True, False],
[True, True, False],
[True, True, True],
],
)
def test_sync_data_task(self, test_ldap_category, sync_task, retrying_status):
"""测试同步数据任务"""

for t in retrying_status:
with pytest.raises(ValueError):
with sync_data_task(test_ldap_category, sync_task.id, t):
raise ValueError("Anything wrong")

sync_task = SyncTask.objects.get(pk=sync_task.id)
assert (
sync_task.status == SyncTaskStatus.RETRYING.value if retrying_status[-1] else SyncTaskStatus.FAILED.value
)

assert sync_task.retry_count == len([x for x in retrying_status if x])
1 change: 1 addition & 0 deletions src/pages/src/language/lang/en.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ export default {
数据更新记录: 'Data update record',
日记详细: 'Log in detail',
同步中: 'In the synchronous',
失败重试中: 'Retrying after failed',
用户数据更新: 'User Data Update',
组织数据更新: 'Organizing data updates',
用户间关系数据更新: 'Update relationship data between users',
Expand Down
1 change: 1 addition & 0 deletions src/pages/src/language/lang/zh.js
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ export default {
数据更新记录: '数据更新记录',
日志详细: '日志详细',
同步中: '同步中',
失败重试中: '失败重试中',
用户数据更新: '用户数据更新',
组织数据更新: '组织数据更新',
用户间关系数据更新: '用户间关系数据更新',
Expand Down
Loading