Skip to content

Commit

Permalink
feature: 周期删除订阅数据周期任务性能优化 (closed #1961)
Browse files Browse the repository at this point in the history
  • Loading branch information
CohleRustW authored and ZhuoZhuoCrayon committed Dec 21, 2023
1 parent a284d6c commit 35d1ad1
Showing 1 changed file with 46 additions and 31 deletions.
77 changes: 46 additions & 31 deletions apps/backend/periodic_tasks/clean_subscription_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,67 +58,82 @@ def clean_subscription_data():
)

with connection.cursor() as cursor:

sub_detail_delete_sql: str = build_delete_query_sql(
sub_detail_delete_sqls: List[str] = build_delete_query_sqls(
table_name=SUBSCRIPTION_INSTANCE_DETAIL_TABLE,
days=alive_days,
limit=limit,
log_save_levels=sub_ins_detail_save_log_status,
)
logger.info(
f"periodic_task -> clean_subscription_data, time -> {strftime_local(timezone.now())}"
f"start to execute sql -> [{sub_detail_delete_sql}] "
)
cursor.execute(sub_detail_delete_sql)
logger.info(
f"periodic_task -> clean_subscription_data, time -> {strftime_local(timezone.now())}, "
f"deleted subscription instance status detail records -> [{cursor.rowcount}] "
)

job_instance_map_delete_sql: str = build_delete_query_sql(
table_name=JOB_SUB_INSTANCE_MAP_TABLE, appoint_clean_statuses=job_map_clean_status, limit=limit
)
if job_instance_map_delete_sql:
for sub_detail_delete_sql in sub_detail_delete_sqls:
logger.info(
f"periodic_task -> clean_subscription_data, time -> {strftime_local(timezone.now())}, "
f"start to execute sql -> [{job_instance_map_delete_sql}]"
f"periodic_task -> clean_subscription_data, time -> {strftime_local(timezone.now())}"
f"start to execute sql -> [{sub_detail_delete_sql}] "
)
cursor.execute(job_instance_map_delete_sql)
cursor.execute(sub_detail_delete_sql)
logger.info(
f"periodic_task -> clean_subscription_data, time -> {strftime_local(timezone.now())}, "
f"deleted job instance map records -> [{cursor.rowcount}] "
f"deleted subscription instance status detail records -> [{cursor.rowcount}] "
)

job_instance_map_delete_sqls: List[str] = build_delete_query_sqls(
table_name=JOB_SUB_INSTANCE_MAP_TABLE,
appoint_clean_statuses=job_map_clean_status,
limit=limit,
)
if job_instance_map_delete_sqls:
for job_instance_map_delete_sql in job_instance_map_delete_sqls:
logger.info(
f"periodic_task -> clean_subscription_data, time -> {strftime_local(timezone.now())}, "
f"start to execute sql -> [{job_instance_map_delete_sql}]"
)
cursor.execute(job_instance_map_delete_sql)
logger.info(
f"periodic_task -> clean_subscription_data, time -> {strftime_local(timezone.now())}, "
f"deleted job instance map records -> [{cursor.rowcount}] "
)
else:
logger.info(
f"periodic_task -> clean_subscription_data, time -> {strftime_local(timezone.now())}, "
f"skip delete node_man_jobsubscriptioninstancemap records because appoint_clean_status is empty"
)


def build_delete_query_sql(
def build_delete_query_sqls(
table_name: str,
limit: int,
days: Optional[int] = None,
appoint_clean_statuses: Optional[List[int]] = None,
log_save_levels: Optional[List[str]] = None,
) -> str:
) -> List[str]:
head_sql: str = f"DELETE FROM {table_name} "

# 判断是否需要增加时间条件, JOB_SUB_INSTANCE_MAP_TABLE 没有时间条件 并且 limit 条件不根据 create_time 排序
where_conditions: List[str] = []
if table_name == JOB_SUB_INSTANCE_MAP_TABLE:
# appoint_clean_status 为空时,不需要进行后续清理动作
if not appoint_clean_statuses:
return
limit_condition: str = f"limit {limit}"
where_condition: str = f"WHERE status IN ({', '.join([f'{status}' for status in appoint_clean_statuses])})"
where_conditions: str = [f"WHERE status IN ({', '.join([f'{status}' for status in appoint_clean_statuses])})"]
else:
time_condition: str = f"create_time < DATE_SUB(NOW(), INTERVAL {days} DAY)"
limit_condition: str = f"ORDER BY create_time DESC limit {limit}"
where_condition: str = (
f"WHERE {time_condition}"
if not log_save_levels
else "WHERE status NOT IN ({}) AND {}".format(
", ".join([f"'{level}'" for level in log_save_levels]), time_condition
)
)
return f"{head_sql} {where_condition} {limit_condition}"
if not log_save_levels:
where_conditions: List[str] = [f"WHERE {time_condition}"]
else:
# 日志表来说,只有 PENDING RUNNING SUCCESS FAILED 几种状态
delete_log_level: List[str] = [
job_type
for job_type in [
constants.JobStatusType.PENDING,
constants.JobStatusType.RUNNING,
constants.JobStatusType.SUCCESS,
constants.JobStatusType.FAILED,
]
if job_type not in log_save_levels
]
where_conditions: List[str] = [
f"WHERE status = '{level}' AND {time_condition}" for level in delete_log_level
]

return [f"{head_sql} {where_condition} {limit_condition}" for where_condition in where_conditions]

0 comments on commit 35d1ad1

Please sign in to comment.