Skip to content

Commit

Permalink
feature: 订阅更新接口支持 steps 更新 (closed #1002)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Aug 22, 2022
1 parent e9e4da9 commit 79e7dce
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 13 deletions.
8 changes: 8 additions & 0 deletions apps/backend/subscription/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,11 @@ class NoRunningInstanceRecordError(AppBaseException):
ERROR_CODE = 16
MESSAGE = _("不存在运行中的任务,请确认")
MESSAGE_TPL = _("订阅[id:{subscription_id}]不存在运行中的任务,请确认")


class SubscriptionUpdateError(AppBaseException):
"""订阅任务不存在"""

ERROR_CODE = 17
MESSAGE = _("订阅更新错误")
MESSAGE_TPL = _("订阅[id:{subscription_id}]更新错误: {msg}")
1 change: 1 addition & 0 deletions apps/backend/subscription/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class ScopeSerializer(serializers.Serializer):

class StepSerializer(serializers.Serializer):
id = serializers.CharField()
type = serializers.CharField(required=False)
params = serializers.DictField()
config = serializers.DictField(required=False)

Expand Down
66 changes: 55 additions & 11 deletions apps/backend/subscription/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from collections import defaultdict
from copy import deepcopy
from functools import cmp_to_key, reduce
from typing import Dict
from typing import Dict, List, Set

import ujson as json
from django.core.cache import caches
from django.db import transaction
from django.db.models import Q
from django.utils.translation import get_language
from django.utils.translation import gettext_lazy as _
from rest_framework.decorators import action
from rest_framework.exceptions import ParseError
from rest_framework.response import Response
Expand Down Expand Up @@ -349,17 +350,60 @@ def update_subscription(self, request):
subscription.bk_biz_scope = params.get("bk_biz_scope")
subscription.save()

steps_group_by_id = {step["id"]: step for step in params["steps"]}

for step in subscription.steps:
step.params = steps_group_by_id[step.step_id]["params"]
if "config" in steps_group_by_id[step.step_id]:
step.config = steps_group_by_id[step.step_id]["config"]
step.save()

result = {
"subscription_id": subscription.id,
step_ids: Set[str] = set()
step_id__obj_map: Dict[str, models.SubscriptionStep] = {
step_obj.step_id: step_obj for step_obj in subscription.steps
}
step_objs_to_be_created: List[models.SubscriptionStep] = []
step_objs_to_be_updated: List[models.SubscriptionStep] = []

for index, step_info in enumerate(params["steps"]):
if step_info["id"] in step_id__obj_map:
# 存在则更新
step_obj: models.SubscriptionStep = step_id__obj_map[step_info["id"]]
step_obj.params = step_info["params"]
if "config" in step_info:
step_obj.config = step_info["config"]
step_obj.index = index
step_objs_to_be_updated.append(step_obj)
else:
# 新增场景
try:
step_obj_to_be_created: models.SubscriptionStep = models.SubscriptionStep(
subscription_id=subscription.id,
index=index,
step_id=step_info["id"],
type=step_info["type"],
config=step_info["config"],
params=step_info["params"],
)
except KeyError as e:
logger.warning(
f"update subscription[{subscription.id}] to add step[{step_info['id']}] error: "
f"err_msg -> {e}"
)
raise errors.SubscriptionUpdateError(
{
"subscription_id": subscription.id,
"msg": _("新增订阅步骤[{step_id}] 需要提供 type & config,错误信息 -> {err_msg}").format(
step_id=step_info["id"], err_msg=e
),
}
)
step_objs_to_be_created.append(step_obj_to_be_created)
step_ids.add(step_info["id"])

# 删除更新后不存在的 step
models.SubscriptionStep.objects.filter(
subscription_id=subscription.id, step_id__in=set(step_id__obj_map.keys()) - step_ids
).delete()
models.SubscriptionStep.objects.bulk_update(step_objs_to_be_updated, fields=["config", "params", "index"])
models.SubscriptionStep.objects.bulk_create(step_objs_to_be_created)
# 更新 steps 需要移除缓存
if hasattr(subscription, "_steps"):
delattr(subscription, "_steps")

result = {"subscription_id": subscription.id}

if run_immediately:
if subscription.is_running():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 3.2.4 on 2022-08-19 08:52

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("node_man", "0064_auto_20220721_1557"),
]

operations = [
migrations.AlterUniqueTogether(
name="subscriptionstep",
unique_together={("subscription_id", "step_id")},
),
]
5 changes: 3 additions & 2 deletions apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,7 @@ class Meta:
verbose_name = _("订阅步骤")
verbose_name_plural = _("订阅步骤")
ordering = ["index"]
unique_together = (("subscription_id", "index"), ("subscription_id", "step_id"))
unique_together = (("subscription_id", "step_id"),)

def __str__(self):
return (
Expand Down Expand Up @@ -1848,10 +1848,11 @@ class CategoryType(object):
@property
def steps(self):
if not getattr(self, "_steps", None):
self._steps = SubscriptionStep.objects.filter(subscription_id=self.id)
self._steps = list(SubscriptionStep.objects.filter(subscription_id=self.id))
for step in self._steps:
# 设置 subscription 属性,减少查询次数
step.subscription = self
self._steps = sorted(self._steps, key=lambda x: x.index)
return self._steps

@steps.setter
Expand Down
3 changes: 3 additions & 0 deletions dev_log/2.2.22/crayon_202208191706.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
feature:
- "订阅更新接口支持 steps 更新 (closed #1002)"

0 comments on commit 79e7dce

Please sign in to comment.