Skip to content

Commit

Permalink
feat: 优化分库分表迁移过程中,task_instance_id 动态查询条件构造逻辑 TencentBlueKing#3324
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Dec 11, 2024
1 parent 736e959 commit 0f9c5f7
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,26 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.tencent.bk.job.common.WatchableThreadPoolExecutor;
import com.tencent.bk.job.execute.util.ContextExecutorService;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Configuration(value = "jobExecuteExecutorConfig")
public class ExecutorConfiguration {

@Bean("logExportExecutor")
public ThreadPoolExecutor logExportExecutor(MeterRegistry meterRegistry) {
public ExecutorService logExportExecutor(MeterRegistry meterRegistry) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("log-export-thread-%d").build();
return new WatchableThreadPoolExecutor(
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"logExportExecutor",
10,
Expand All @@ -53,25 +54,25 @@ public ThreadPoolExecutor logExportExecutor(MeterRegistry meterRegistry) {
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactory
);
));
}

@Bean("getHostsByTopoExecutor")
public ThreadPoolExecutor getHostsByTopoExecutor(MeterRegistry meterRegistry) {
return new WatchableThreadPoolExecutor(
public ExecutorService getHostsByTopoExecutor(MeterRegistry meterRegistry) {
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"getHostsByTopoExecutor",
50,
100,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
));
}

@Bean("getHostTopoPathExecutor")
public ThreadPoolExecutor getHostTopoPathExecutor(MeterRegistry meterRegistry) {
return new WatchableThreadPoolExecutor(
public ExecutorService getHostTopoPathExecutor(MeterRegistry meterRegistry) {
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"getHostTopoPathExecutor",
5,
Expand All @@ -87,14 +88,14 @@ public ThreadPoolExecutor getHostTopoPathExecutor(MeterRegistry meterRegistry) {
Thread.currentThread().getName());
r.run();
}
);
));
}

@Bean("localFileDownloadExecutor")
public ThreadPoolExecutor localFileDownloadExecutor(LocalFileConfigForExecute localFileConfigForExecute,
MeterRegistry meterRegistry) {
public ExecutorService localFileDownloadExecutor(LocalFileConfigForExecute localFileConfigForExecute,
MeterRegistry meterRegistry) {
int concurrency = localFileConfigForExecute.getDownloadConcurrency();
return new WatchableThreadPoolExecutor(
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"localFileDownloadExecutor",
concurrency,
Expand All @@ -110,12 +111,12 @@ public ThreadPoolExecutor localFileDownloadExecutor(LocalFileConfigForExecute lo
Thread.currentThread().getName());
r.run();
}
);
));
}

@Bean("localFileWatchExecutor")
public ThreadPoolExecutor localFileWatchExecutor(MeterRegistry meterRegistry) {
return new WatchableThreadPoolExecutor(
public ExecutorService localFileWatchExecutor(MeterRegistry meterRegistry) {
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"localFileWatchExecutor",
0,
Expand All @@ -131,19 +132,19 @@ public ThreadPoolExecutor localFileWatchExecutor(MeterRegistry meterRegistry) {
Thread.currentThread().getName());
r.run();
}
);
));
}

@Bean("shutdownExecutor")
public ThreadPoolExecutor shutdownExecutor(MeterRegistry meterRegistry) {
return new WatchableThreadPoolExecutor(
public ExecutorService shutdownExecutor(MeterRegistry meterRegistry) {
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"shutdownExecutor",
10,
20,
120,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public static Condition build(Long taskInstanceId,
log.info("TaskInstanceIdDynamicCondition : EmptyJobExecuteContext!");
// JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition
// (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id)
toggleEvaluateContext = ToggleEvaluateContext.EMPTY;
// toggleEvaluateContext = ToggleEvaluateContext.EMPTY;
throw new IllegalStateException("EmptyJobExecuteContext");
} else {
ResourceScope resourceScope = jobExecuteContext.getResourceScope();
if (resourceScope != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

Expand All @@ -52,8 +53,8 @@ public class ArtifactoryLocalFilePrepareTask implements JobTaskContext {
private final String artifactoryRepo;
private final String jobStorageRootPath;
private final List<Future<Boolean>> futureList = new ArrayList<>();
private final ThreadPoolExecutor localFileDownloadExecutor;
private final ThreadPoolExecutor localFileWatchExecutor;
private final ExecutorService localFileDownloadExecutor;
private final ExecutorService localFileWatchExecutor;
public static Future<?> localFileWatchFuture = null;

public ArtifactoryLocalFilePrepareTask(
Expand All @@ -65,8 +66,8 @@ public ArtifactoryLocalFilePrepareTask(
String artifactoryProject,
String artifactoryRepo,
String jobStorageRootPath,
ThreadPoolExecutor localFileDownloadExecutor,
ThreadPoolExecutor localFileWatchExecutor
ExecutorService localFileDownloadExecutor,
ExecutorService localFileWatchExecutor
) {
this.stepInstance = stepInstance;
this.isForRetry = isForRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
Expand All @@ -60,8 +61,8 @@ public class LocalFilePrepareService {
private final StepInstanceService stepInstanceService;
private final ArtifactoryClient artifactoryClient;
private final Map<String, ArtifactoryLocalFilePrepareTask> taskMap = new ConcurrentHashMap<>();
private final ThreadPoolExecutor localFileDownloadExecutor;
private final ThreadPoolExecutor localFileWatchExecutor;
private final ExecutorService localFileDownloadExecutor;
private final ExecutorService localFileWatchExecutor;

@Autowired
public LocalFilePrepareService(FileDistributeConfig fileDistributeConfig,
Expand All @@ -70,8 +71,8 @@ public LocalFilePrepareService(FileDistributeConfig fileDistributeConfig,
AgentService agentService,
StepInstanceService stepInstanceService,
@Qualifier("jobArtifactoryClient") ArtifactoryClient artifactoryClient,
@Qualifier("localFileDownloadExecutor") ThreadPoolExecutor localFileDownloadExecutor,
@Qualifier("localFileWatchExecutor") ThreadPoolExecutor localFileWatchExecutor) {
@Qualifier("localFileDownloadExecutor") ExecutorService localFileDownloadExecutor,
@Qualifier("localFileWatchExecutor") ExecutorService localFileWatchExecutor) {
this.fileDistributeConfig = fileDistributeConfig;
this.artifactoryConfig = artifactoryConfig;
this.localFileConfigForExecute = localFileConfigForExecute;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.tencent.bk.job.execute.util;

import com.tencent.bk.job.execute.common.context.JobExecuteContext;
import com.tencent.bk.job.execute.common.context.JobExecuteContextThreadLocalRepo;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* 支持上下文传播的 ExecutorService
*/
public class ContextExecutorService implements ExecutorService {

private final ExecutorService delegate;

private ContextExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}

public static ContextExecutorService wrap(ExecutorService delegate) {
return new ContextExecutorService(delegate);
}


public <T> Future<T> submit(Callable<T> task) {
return this.delegate.submit(ContextCallable.wrap(task));
}

public <T> Future<T> submit(Runnable task, T result) {
return this.delegate.submit(ContextRunnable.wrap(task), result);
}

public Future<?> submit(Runnable task) {
return this.delegate.submit(ContextRunnable.wrap(task));
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return this.delegate.invokeAll(ContextCallable.wrap(tasks));
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException {
return this.delegate.invokeAll(ContextCallable.wrap(tasks), timeout, unit);
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return this.delegate.invokeAny(ContextCallable.wrap(tasks));
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.delegate.invokeAny(ContextCallable.wrap(tasks), timeout, unit);
}

public void execute(Runnable command) {
this.delegate.execute(ContextRunnable.wrap(command));
}

public final void shutdown() {
this.delegate.shutdown();
}

public final List<Runnable> shutdownNow() {
return this.delegate.shutdownNow();
}

public final boolean isShutdown() {
return this.delegate.isShutdown();
}

public final boolean isTerminated() {
return this.delegate.isTerminated();
}

public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return this.delegate.awaitTermination(timeout, unit);
}

private static class ContextCallable<T> implements Callable<T> {

private final JobExecuteContext context;
private final Callable<T> delegate;


private ContextCallable(Callable<T> delegate) {
this.context = JobExecuteContextThreadLocalRepo.get();
this.delegate = delegate;
}

public static <T> ContextCallable<T> wrap(Callable<T> delegate) {
return new ContextCallable<>(delegate);
}

public static <T> List<ContextCallable<T>> wrap(Collection<? extends Callable<T>> delegates) {
List<ContextCallable<T>> contextCallables = new ArrayList<>(delegates.size());
delegates.forEach(delegate -> contextCallables.add(new ContextCallable<>(delegate)));
return contextCallables;
}

@Override
public T call() throws Exception {
try {
JobExecuteContextThreadLocalRepo.set(context);
return delegate.call();
} finally {
JobExecuteContextThreadLocalRepo.unset();
}
}
}

private static class ContextRunnable implements Runnable {

private final JobExecuteContext context;
private final Runnable delegate;


private ContextRunnable(Runnable delegate) {
this.context = JobExecuteContextThreadLocalRepo.get();
this.delegate = delegate;
}

public static ContextRunnable wrap(Runnable delegate) {
return new ContextRunnable(delegate);
}

public static List<ContextRunnable> wrap(Collection<Runnable> delegates) {
List<ContextRunnable> contextRunnableList = new ArrayList<>(delegates.size());
delegates.forEach(delegate -> contextRunnableList.add(new ContextRunnable(delegate)));
return contextRunnableList;
}

@Override
public void run() {
try {
JobExecuteContextThreadLocalRepo.set(context);
delegate.run();
} finally {
JobExecuteContextThreadLocalRepo.unset();
}
}
}
}

0 comments on commit 0f9c5f7

Please sign in to comment.