Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: only send post_sync_data_source signal when user & dept synced #1909

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/bk-user/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ i18n-mo: ## 将 django.po 文件编译成 django.mo 文件
i18n-all: i18n-po i18n-mo ## 执行 i18n-po & i18n-mo

package-plugin: ## 打包自定义数据源插件
kubectl create configmap bk-user-plugin-${name} --from-file=bkuser/plugins/${name} --dry-run=client -o yaml > bk-user-plugin-${name}.yaml
@configmap_name=bk-user-plugin-$(shell echo ${name} | sed 's/_/-/g'); \
kubectl create configmap $${configmap_name} --from-file=bkuser/plugins/${name} --dry-run=client -o yaml > $${configmap_name}.yaml

package-idp-plugin: ## 打包自定义 IDP 插件
kubectl create configmap bk-user-idp-plugin-${name} --from-file=bkuser/idp_plugins/${name} --dry-run=client -o yaml > bk-user-idp-plugin-${name}.yaml
@configmap_name=bk-user-idp-plugin-$(shell echo ${name} | sed 's/_/-/g'); \
kubectl create configmap $${configmap_name} --from-file=bkuser/idp_plugins/${name} --dry-run=client -o yaml > $${configmap_name}.yaml
3 changes: 2 additions & 1 deletion src/bk-user/bkuser/apps/sync/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class DataSourceSyncObjectType(str, StructuredEnum):

USER = EnumField("user", label=_("用户"))
DEPARTMENT = EnumField("department", label=_("部门"))
USER_RELATION = EnumField("user_relation", label=_("用户关系"))
DEPARTMENT_RELATION = EnumField("department_relation", label=_("部门关系"))
USER_LEADER_RELATION = EnumField("user_leader_relation", label=_("用户 Leader 关系"))
USER_DEPARTMENT_RELATION = EnumField("user_department_relation", label=_("用户部门关系"))


class TenantSyncObjectType(str, StructuredEnum):
Expand Down
1 change: 1 addition & 0 deletions src/bk-user/bkuser/apps/sync/contexts/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, task: DataSourceSyncTask):
self.task = task
self.logger = TaskLogger()
self.recorder = ChangeLogRecorder()
self.synced_obj_types: set[DataSourceSyncObjectType] = set()

timeout = task.extras.get("timeout", settings.DATA_SOURCE_SYNC_DEFAULT_TIMEOUT)
self.lock = DataSourceSyncTaskLock(task.data_source_id, timeout=timeout)
Expand Down
52 changes: 48 additions & 4 deletions src/bk-user/bkuser/apps/sync/runners/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Any, Dict

from bkuser.apps.data_source.models import DataSource, DataSourceUser
from bkuser.apps.sync.constants import DataSourceSyncObjectType
from bkuser.apps.sync.contexts import DataSourceSyncTaskContext
from bkuser.apps.sync.models import DataSourceSyncTask
from bkuser.apps.sync.signals import post_sync_data_source
Expand Down Expand Up @@ -50,8 +51,7 @@ def run(self):
self._sync_departments(ctx)
self._sync_users(ctx)
self._validate_unique_fields(ctx)

self._send_signal()
self._send_signal(ctx)

def _need_skip_sync(self) -> bool:
"""租户不是启用状态,需要跳过同步"""
Expand Down Expand Up @@ -84,8 +84,12 @@ def _sync_departments(self, ctx: DataSourceSyncTaskContext):
"overwrite": bool(self.task.extras.get("overwrite", False)),
"incremental": bool(self.task.extras.get("incremental", False)),
}
# 部门主体
DataSourceDepartmentSyncer(**kwargs).sync() # type: ignore
ctx.synced_obj_types.add(DataSourceSyncObjectType.DEPARTMENT)
# 部门间关系
DataSourceDepartmentRelationSyncer(**kwargs).sync() # type: ignore
ctx.synced_obj_types.add(DataSourceSyncObjectType.DEPARTMENT_RELATION)

ctx.logger.info("succeed to sync departments and their relations from data source plugin")

Expand All @@ -110,16 +114,56 @@ def _sync_users(self, ctx: DataSourceSyncTaskContext):
#
# ref: https://github.com/TencentBlueKing/bk-user/pull/1904/files
exists_user_ids = set(DataSourceUser.objects.filter(data_source=self.data_source).values_list("id", flat=True))
# 用户主体
DataSourceUserSyncer(**kwargs).sync() # type: ignore
ctx.synced_obj_types.add(DataSourceSyncObjectType.USER)
# 用户 Leader 关系
DataSourceUserLeaderRelationSyncer(exists_user_ids_before_sync=exists_user_ids, **kwargs).sync() # type: ignore
ctx.synced_obj_types.add(DataSourceSyncObjectType.USER_LEADER_RELATION)
# 用户部门关系
DataSourceUserDeptRelationSyncer(exists_user_ids_before_sync=exists_user_ids, **kwargs).sync() # type: ignore
ctx.synced_obj_types.add(DataSourceSyncObjectType.USER_DEPARTMENT_RELATION)

ctx.logger.info("succeed to sync users and their leader & dept relations from data source plugin")

def _validate_unique_fields(self, ctx: DataSourceSyncTaskContext):
"""对有唯一性要求的自定义字段的校验"""
DataSourceUserExtrasUniqueValidator(self.data_source, ctx.logger).validate()

def _send_signal(self):
"""发送数据源同步完成信号,触发后续流程"""
def _send_signal(self, ctx: DataSourceSyncTaskContext):
"""若符合准出条件,则发送数据源同步完成信号,触发后续流程

资源同步顺序:部门 -> 部门间关系 -> 用户 -> 用户 Leader 关系 -> 用户部门关系

同步失败可能有下面几种场景:

1:部门同步失败,被回滚 -> 数据都不变,不需要同步到租户

2:部门同步成功,但是部门关系同步失败
-> 部门数据变,用户数据不变,此时不会同步到租户
具体影响:部分用户无法获取部门信息(部门被删除,导致有边无节点)

3:部门 & 部门关系同步成功,用户同步失败,用户数据被回滚 -> 效果同场景 2

4:部门 & 部门关系 & 用户同步成功,用户 Leader 关系同步失败
-> 会同步到租户,但用户 Leader,部门关联边是老数据
具体影响:部分用户无法获取 Leader / 部门信息(Leader / 部门被删除,导致有边无节点)

5:部门 & 部门关系 & 用户 & 用户 Leader 关系同步成功,用户部门关系同步失败
-> 会同步到租户,但用户部门关联边是老数据
具体影响:部分用户无法获取部门信息(部门被删除,导致有边无节点)

注意:其中场景 2 出现概率极低(原因是 mptt 树是直接重建的,除非 tree_id 分配到 int 上限导致失败,需运维介入)
narasux marked this conversation as resolved.
Show resolved Hide resolved
"""
ctx.logger.info(f"current synced object types is {[t.value for t in ctx.synced_obj_types]}")

if (
DataSourceSyncObjectType.DEPARTMENT not in ctx.synced_obj_types
or DataSourceSyncObjectType.USER not in ctx.synced_obj_types
):
ctx.logger.error("departments or users haven't been synced, skip sync tenant...")
return

# 若用户 & 部门主体完成同步,即可触发租户同步流程
post_sync_data_source.send(sender=self.__class__, data_source=self.data_source)
ctx.logger.info("signal post_sync_data_source sent...")