Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 本地目录导入excel支持选择更新 #728 #813

Merged
merged 10 commits into from
Dec 6, 2022
Merged
4 changes: 4 additions & 0 deletions src/api/bkuser_core/api/web/category/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class CategoryFileImportInputSLZ(serializers.Serializer):
file = serializers.FileField(required=False)


class CategoryFileImportQuerySLZ(serializers.Serializer):
is_overwrite = serializers.BooleanField(required=False, default=False)
Canway-shiisa marked this conversation as resolved.
Show resolved Hide resolved


class CategorySyncResponseOutputSLZ(serializers.Serializer):
task_id = serializers.CharField(help_text="task_id for the sync job.")

Expand Down
9 changes: 8 additions & 1 deletion src/api/bkuser_core/api/web/category/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
CategoryExportInputSLZ,
CategoryExportProfileOutputSLZ,
CategoryFileImportInputSLZ,
CategoryFileImportQuerySLZ,
CategoryMetaOutputSLZ,
CategoryNamespaceSettingUpdateInputSLZ,
CategoryProfileListInputSLZ,
Expand Down Expand Up @@ -454,6 +455,9 @@ def post(self, request, *args, **kwargs):

def _local_category_do_import(self, request, instance):
"""向本地目录导入数据文件"""
query_slz = CategoryFileImportQuerySLZ(data=request.query_params)
query_slz.is_valid(raise_exception=True)

slz = CategoryFileImportInputSLZ(data=request.data)
slz.is_valid(raise_exception=True)

Expand All @@ -465,7 +469,10 @@ def _local_category_do_import(self, request, instance):
raise error_codes.CREATE_SYNC_TASK_FAILED.f(str(e))

instance_id = instance.id
params = {"raw_data_file": slz.validated_data["file"]}
params = {
"raw_data_file": slz.validated_data["file"],
"is_overwrite": query_slz.validated_data["is_overwrite"],
}
try:
# TODO: FileField 可能不能反序列化, 所以可能不能传到 celery 执行
adapter_sync(instance_id, operator=request.operator, task_id=task_id, **params)
Expand Down
51 changes: 39 additions & 12 deletions src/api/bkuser_core/categories/plugins/local/syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ def __post_init__(self):
self._default_password_valid_days = int(ConfigProvider(self.category_id).get("password_valid_days"))
self.fetcher: ExcelFetcher = self.get_fetcher()

def sync(self, raw_data_file):
def sync(self, raw_data_file, is_overwrite):
user_rows, departments = self.fetcher.fetch(raw_data_file)
with transaction.atomic():
self._sync_departments(departments)

with transaction.atomic():
self._sync_users(self.fetcher.parser_set, user_rows)
self._sync_users(self.fetcher.parser_set, user_rows, is_overwrite)
self._sync_leaders(self.fetcher.parser_set, user_rows)

self._notify_init_passwords()
Expand Down Expand Up @@ -175,15 +175,18 @@ def _judge_data_all_none(raw_data: list) -> bool:
"""某些状况下会读取 Excel 整个空行"""
return all(x is None for x in raw_data)

def _sync_users(self, parser_set: "ParserSet", users: list):
def _sync_users(self, parser_set: "ParserSet", users: list, is_overwrite: bool = False):
Canway-shiisa marked this conversation as resolved.
Show resolved Hide resolved
"""在内存中操作&判断数据,bulk 插入"""
# pylint: disable=W,R,C
# TODO:复杂度异常处理
logger.info("=========== trying to load profiles into memory ===========")
Canway-shiisa marked this conversation as resolved.
Show resolved Hide resolved

# to record failed records
failed_records = []
success_count = 0

total = len(users)
should_deleted_department_profile_relation_ids = []
for index, user_raw_info in enumerate(users):
if self._judge_data_all_none(user_raw_info):
logger.debug("empty line, skipping")
Expand Down Expand Up @@ -235,8 +238,13 @@ def _sync_users(self, parser_set: "ParserSet", users: list):
progress(index, total, f"loading {username}")
try:
updating_profile = Profile.objects.get(username=username, category_id=self.category_id)

# 如果已经存在,则更新该 profile
# 已存在的用户:如果未勾选 <进行覆盖更新>(即is_overwrite为false)=》则忽略,反之则更新该 profile
if not is_overwrite:
wklken marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(
"username %s exist, and is_overwrite is false, so will not do update for this user, skip",
username,
)
continue
for name, value in profile_params.items():
if name == "extras":
extras = updating_profile.extras or {}
Expand All @@ -250,6 +258,7 @@ def _sync_users(self, parser_set: "ParserSet", users: list):

setattr(updating_profile, name, value)
profile_id = updating_profile.id

self.db_sync_manager.magic_add(updating_profile, SyncOperation.UPDATE.value)
logger.debug("(%s/%s) username<%s> already exist, trying to update it", username, index + 1, total)

Expand Down Expand Up @@ -278,15 +287,33 @@ def _sync_users(self, parser_set: "ParserSet", users: list):
# 2 获取关联的部门DB实例,创建关联对象
progress(index, total, "adding profile & department relation")
department_groups = parser_set.get_cell_data("department_name", user_raw_info)

cell_parser = DepartmentCellParser(self.category_id)
# 已存在的用户-部门关系
old_department_profile_relations = DepartmentThroughModel.objects.filter(profile_id=profile_id)
# Note: 有新关系可能存在重复数据,所以这里使用不变的old_department_set用于后续判断是否存在的依据,
# 而不使用后面会变更的old_department_relations数据
old_department_set = set([r.department_id for r in old_department_profile_relations])
old_department_relations = {r.department_id: r.id for r in old_department_profile_relations}

for department in cell_parser.parse_to_db_obj(department_groups):
relation_params = {"department_id": department.pk, "profile_id": profile_id}
try:
DepartmentThroughModel.objects.get(**relation_params)
except DepartmentThroughModel.DoesNotExist:
department_attachment = DepartmentThroughModel(**relation_params)
self.db_sync_manager.magic_add(department_attachment)
# 用户-部门关系已存在
if department.pk in old_department_set:
# Note: 可能本次更新里存在重复数据,dict无法重复移除
if department.pk in old_department_relations:
del old_department_relations[department.pk]
continue

# 不存在则添加
department_attachment = DepartmentThroughModel(department_id=department.pk, profile_id=profile_id)
self.db_sync_manager.magic_add(department_attachment)

# 已存在的数据从old_department_relations移除后,最后剩下的数据,表示多余的,即本次更新里不存在的用户部门关系
# 如果是覆盖,则记录需要删除多余数据
if is_overwrite and len(old_department_relations) > 0:
should_deleted_department_profile_relation_ids.extend(old_department_relations.values())

if len(should_deleted_department_profile_relation_ids) > 0:
DepartmentThroughModel.objects.filter(id__in=should_deleted_department_profile_relation_ids).delete()

# 需要在处理 leader 之前全部插入 DB
self.db_sync_manager[Profile].sync_to_db()
Expand Down