Skip to content

Commit

Permalink
feat: 优化分库分表迁移过程中,task_instance_id 动态查询条件构造逻辑 TencentBlueKing#3324
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Dec 11, 2024
1 parent 6ca9690 commit 7e0bb79
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ public void assertRequiredInitParam(String paramName) {
public boolean checkRequiredContextParam(ToggleEvaluateContext context, String paramName) {
boolean checkResult = true;
if (context.getParam(paramName) == null) {
String msg = MessageFormatter.format(
"Context param {} is required for evaluate", paramName).getMessage();
log.warn(msg);
log.info("Context param {} is required for evaluate", paramName);
checkResult = false;
}
return checkResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,15 @@ private Long parseAppIdFromPath(String requestURI) {
Long appId = null;
Matcher appIdMatcher = APP_PATTERN.matcher(requestURI);
if (appIdMatcher.find()) {
appId = Long.parseLong(appIdMatcher.group());
appId = Long.parseLong(appIdMatcher.group(1));
}
return appId;
}

private Long parseAppIdFromQueryParam(HttpServletRequest request) {
String value = request.getParameter("appId");
log.debug("Parsed from GET: {}={}", "appId", value);
String queryParam = "appId";
String value = request.getParameter(queryParam);
log.debug("Parsed from GET: {}={}", queryParam, value);
return value != null ? Long.parseLong(value) : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.tencent.bk.job.execute.dao.impl;

import com.tencent.bk.job.common.model.dto.ResourceScope;
import com.tencent.bk.job.common.util.JobContextUtil;
import com.tencent.bk.job.common.util.toggle.ToggleEvaluateContext;
import com.tencent.bk.job.common.util.toggle.ToggleStrategyContextParams;
import com.tencent.bk.job.common.util.toggle.feature.FeatureIdConstants;
Expand All @@ -51,33 +50,35 @@ public class TaskInstanceIdDynamicCondition {

public static Condition build(Long taskInstanceId,
Function<Long, Condition> taskInstanceIdConditionBuilder) {
ToggleEvaluateContext toggleEvaluateContext;
JobExecuteContext jobExecuteContext = JobExecuteContextThreadLocalRepo.get();
if (jobExecuteContext == null) {
log.info("TaskInstanceIdDynamicCondition : Empty JobExecuteContext!");
// JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition
// (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id)
return DSL.trueCondition();
}
ResourceScope resourceScope = jobExecuteContext.getResourceScope();
if (resourceScope == null) {
log.info("TaskInstanceIdDynamicCondition : Empty resource scope!");
// 无法根据业务决定是否使用 task_instance_id 作为查询条件。为了不影响请求正常处理,直接返回 TRUE Condition
// (不会影响 DAO 查询,task_instance_id 仅作为分片功能,实际业务数据关系并不强依赖 task_instance_id)
return DSL.trueCondition();
toggleEvaluateContext = ToggleEvaluateContext.EMPTY;
} else {
ResourceScope resourceScope = jobExecuteContext.getResourceScope();
if (resourceScope != null) {
toggleEvaluateContext = ToggleEvaluateContext.builder()
.addContextParam(ToggleStrategyContextParams.CTX_PARAM_RESOURCE_SCOPE, resourceScope);
} else {
log.info("TaskInstanceIdDynamicCondition : Empty resource scope!");
toggleEvaluateContext = ToggleEvaluateContext.EMPTY;
}
}
if (FeatureToggle.checkFeature(
FeatureIdConstants.DAO_ADD_TASK_INSTANCE_ID,
ToggleEvaluateContext.builder()
.addContextParam(ToggleStrategyContextParams.CTX_PARAM_RESOURCE_SCOPE,
JobContextUtil.getAppResourceScope()))) {

if (FeatureToggle.checkFeature(FeatureIdConstants.DAO_ADD_TASK_INSTANCE_ID, toggleEvaluateContext)) {
if (taskInstanceId == null || taskInstanceId <= 0L) {
log.info("TaskInstanceIdDynamicCondition : Invalid taskInstanceId {}", taskInstanceId);
// 为了不影响兼容性,忽略错误
return DSL.trueCondition();
} else {
log.debug("TaskInstanceIdDynamicCondition: Use task_instance_id condition");
return taskInstanceIdConditionBuilder.apply(taskInstanceId);
}
} else {
log.debug("TaskInstanceIdDynamicCondition: Ignore task_instance_id condition");
return DSL.trueCondition();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,15 @@ public final void onEvent(Message<? extends JobMessage> message) {
}

private void beforeHandleMessage(Message<? extends JobMessage> message) {
log.info("beforeHandleMessage");
MessageHeaders headers = message.getHeaders();
String jobExecuteContextJson = (String) headers.get(JobExecuteContext.KEY);
if (StringUtils.isNotEmpty(jobExecuteContextJson)) {
log.info("setJobExecuteContextThreadLocalRepo");
JobExecuteContextThreadLocalRepo.set(JsonUtils.fromJson(jobExecuteContextJson,
JobExecuteContext.class));
}
}

private void afterHandle(Message<? extends JobMessage> message) {
log.info("afterHandleMessage");
JobExecuteContextThreadLocalRepo.unset();
}

Expand Down

0 comments on commit 7e0bb79

Please sign in to comment.