diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java index c63e6c57da..cb90845304 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java @@ -26,15 +26,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.tencent.bk.job.common.WatchableThreadPoolExecutor; +import com.tencent.bk.job.execute.util.ContextExecutorService; import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @@ -42,9 +43,9 @@ public class ExecutorConfiguration { @Bean("logExportExecutor") - public ThreadPoolExecutor logExportExecutor(MeterRegistry meterRegistry) { + public ExecutorService logExportExecutor(MeterRegistry meterRegistry) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("log-export-thread-%d").build(); - return new WatchableThreadPoolExecutor( + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "logExportExecutor", 10, @@ -53,12 +54,12 @@ public ThreadPoolExecutor logExportExecutor(MeterRegistry meterRegistry) { TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory - ); + )); } @Bean("getHostsByTopoExecutor") - public ThreadPoolExecutor getHostsByTopoExecutor(MeterRegistry meterRegistry) { - return new WatchableThreadPoolExecutor( + public ExecutorService getHostsByTopoExecutor(MeterRegistry meterRegistry) { + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "getHostsByTopoExecutor", 50, @@ -66,12 +67,12 @@ public ThreadPoolExecutor getHostsByTopoExecutor(MeterRegistry meterRegistry) { 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>() - ); + )); } @Bean("getHostTopoPathExecutor") - public ThreadPoolExecutor getHostTopoPathExecutor(MeterRegistry meterRegistry) { - return new WatchableThreadPoolExecutor( + public ExecutorService getHostTopoPathExecutor(MeterRegistry meterRegistry) { + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "getHostTopoPathExecutor", 5, @@ -87,14 +88,14 @@ public ThreadPoolExecutor getHostTopoPathExecutor(MeterRegistry meterRegistry) { Thread.currentThread().getName()); r.run(); } - ); + )); } @Bean("localFileDownloadExecutor") - public ThreadPoolExecutor localFileDownloadExecutor(LocalFileConfigForExecute localFileConfigForExecute, - MeterRegistry meterRegistry) { + public ExecutorService localFileDownloadExecutor(LocalFileConfigForExecute localFileConfigForExecute, + MeterRegistry meterRegistry) { int concurrency = localFileConfigForExecute.getDownloadConcurrency(); - return new WatchableThreadPoolExecutor( + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "localFileDownloadExecutor", concurrency, @@ -110,12 +111,12 @@ public ThreadPoolExecutor localFileDownloadExecutor(LocalFileConfigForExecute lo Thread.currentThread().getName()); r.run(); } - ); + )); } @Bean("localFileWatchExecutor") - public ThreadPoolExecutor localFileWatchExecutor(MeterRegistry meterRegistry) { - return new WatchableThreadPoolExecutor( + public ExecutorService localFileWatchExecutor(MeterRegistry meterRegistry) { + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "localFileWatchExecutor", 0, @@ -131,12 +132,12 @@ public ThreadPoolExecutor localFileWatchExecutor(MeterRegistry meterRegistry) { Thread.currentThread().getName()); r.run(); } - ); + )); } @Bean("shutdownExecutor") - public ThreadPoolExecutor shutdownExecutor(MeterRegistry meterRegistry) { - return new WatchableThreadPoolExecutor( + public ExecutorService shutdownExecutor(MeterRegistry meterRegistry) { + return ContextExecutorService.wrap(new WatchableThreadPoolExecutor( meterRegistry, "shutdownExecutor", 10, @@ -144,6 +145,6 @@ public ThreadPoolExecutor shutdownExecutor(MeterRegistry meterRegistry) { 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>() - ); + )); } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java index f6e72b29a8..e2679ffdf1 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/dao/impl/TaskInstanceIdDynamicCondition.java @@ -56,7 +56,8 @@ public static Condition build(Long taskInstanceId, log.info("TaskInstanceIdDynamicCondition : EmptyJobExecuteContext!"); // JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition // (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id) - toggleEvaluateContext = ToggleEvaluateContext.EMPTY; +// toggleEvaluateContext = ToggleEvaluateContext.EMPTY; + throw new IllegalStateException("EmptyJobExecuteContext"); } else { ResourceScope resourceScope = jobExecuteContext.getResourceScope(); if (resourceScope != null) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java index e23ac02d97..74064cc652 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @@ -52,8 +53,8 @@ public class ArtifactoryLocalFilePrepareTask implements JobTaskContext { private final String artifactoryRepo; private final String jobStorageRootPath; private final List> futureList = new ArrayList<>(); - private final ThreadPoolExecutor localFileDownloadExecutor; - private final ThreadPoolExecutor localFileWatchExecutor; + private final ExecutorService localFileDownloadExecutor; + private final ExecutorService localFileWatchExecutor; public static Future localFileWatchFuture = null; public ArtifactoryLocalFilePrepareTask( @@ -65,8 +66,8 @@ public ArtifactoryLocalFilePrepareTask( String artifactoryProject, String artifactoryRepo, String jobStorageRootPath, - ThreadPoolExecutor localFileDownloadExecutor, - ThreadPoolExecutor localFileWatchExecutor + ExecutorService localFileDownloadExecutor, + ExecutorService localFileWatchExecutor ) { this.stepInstance = stepInstance; this.isForRetry = isForRetry; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java index c738e3c17b..261be91f7e 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @Slf4j @@ -60,8 +61,8 @@ public class LocalFilePrepareService { private final StepInstanceService stepInstanceService; private final ArtifactoryClient artifactoryClient; private final Map taskMap = new ConcurrentHashMap<>(); - private final ThreadPoolExecutor localFileDownloadExecutor; - private final ThreadPoolExecutor localFileWatchExecutor; + private final ExecutorService localFileDownloadExecutor; + private final ExecutorService localFileWatchExecutor; @Autowired public LocalFilePrepareService(FileDistributeConfig fileDistributeConfig, @@ -70,8 +71,8 @@ public LocalFilePrepareService(FileDistributeConfig fileDistributeConfig, AgentService agentService, StepInstanceService stepInstanceService, @Qualifier("jobArtifactoryClient") ArtifactoryClient artifactoryClient, - @Qualifier("localFileDownloadExecutor") ThreadPoolExecutor localFileDownloadExecutor, - @Qualifier("localFileWatchExecutor") ThreadPoolExecutor localFileWatchExecutor) { + @Qualifier("localFileDownloadExecutor") ExecutorService localFileDownloadExecutor, + @Qualifier("localFileWatchExecutor") ExecutorService localFileWatchExecutor) { this.fileDistributeConfig = fileDistributeConfig; this.artifactoryConfig = artifactoryConfig; this.localFileConfigForExecute = localFileConfigForExecute; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/util/ContextExecutorService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/util/ContextExecutorService.java new file mode 100644 index 0000000000..87fffd0d22 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/util/ContextExecutorService.java @@ -0,0 +1,150 @@ +package com.tencent.bk.job.execute.util; + +import com.tencent.bk.job.execute.common.context.JobExecuteContext; +import com.tencent.bk.job.execute.common.context.JobExecuteContextThreadLocalRepo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * 支持上下文传播的 ExecutorService + */ +public class ContextExecutorService implements ExecutorService { + + private final ExecutorService delegate; + + private ContextExecutorService(ExecutorService delegate) { + this.delegate = delegate; + } + + public static ContextExecutorService wrap(ExecutorService delegate) { + return new ContextExecutorService(delegate); + } + + + public Future submit(Callable task) { + return this.delegate.submit(ContextCallable.wrap(task)); + } + + public Future submit(Runnable task, T result) { + return this.delegate.submit(ContextRunnable.wrap(task), result); + } + + public Future submit(Runnable task) { + return this.delegate.submit(ContextRunnable.wrap(task)); + } + + public List> invokeAll(Collection> tasks) throws InterruptedException { + return this.delegate.invokeAll(ContextCallable.wrap(tasks)); + } + + public List> invokeAll(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + return this.delegate.invokeAll(ContextCallable.wrap(tasks), timeout, unit); + } + + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return this.delegate.invokeAny(ContextCallable.wrap(tasks)); + } + + public T invokeAny(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.delegate.invokeAny(ContextCallable.wrap(tasks), timeout, unit); + } + + public void execute(Runnable command) { + this.delegate.execute(ContextRunnable.wrap(command)); + } + + public final void shutdown() { + this.delegate.shutdown(); + } + + public final List shutdownNow() { + return this.delegate.shutdownNow(); + } + + public final boolean isShutdown() { + return this.delegate.isShutdown(); + } + + public final boolean isTerminated() { + return this.delegate.isTerminated(); + } + + public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return this.delegate.awaitTermination(timeout, unit); + } + + private static class ContextCallable implements Callable { + + private final JobExecuteContext context; + private final Callable delegate; + + + private ContextCallable(Callable delegate) { + this.context = JobExecuteContextThreadLocalRepo.get(); + this.delegate = delegate; + } + + public static ContextCallable wrap(Callable delegate) { + return new ContextCallable<>(delegate); + } + + public static List> wrap(Collection> delegates) { + List> contextCallables = new ArrayList<>(delegates.size()); + delegates.forEach(delegate -> contextCallables.add(new ContextCallable<>(delegate))); + return contextCallables; + } + + @Override + public T call() throws Exception { + try { + JobExecuteContextThreadLocalRepo.set(context); + return delegate.call(); + } finally { + JobExecuteContextThreadLocalRepo.unset(); + } + } + } + + private static class ContextRunnable implements Runnable { + + private final JobExecuteContext context; + private final Runnable delegate; + + + private ContextRunnable(Runnable delegate) { + this.context = JobExecuteContextThreadLocalRepo.get(); + this.delegate = delegate; + } + + public static ContextRunnable wrap(Runnable delegate) { + return new ContextRunnable(delegate); + } + + public static List wrap(Collection delegates) { + List contextRunnableList = new ArrayList<>(delegates.size()); + delegates.forEach(delegate -> contextRunnableList.add(new ContextRunnable(delegate))); + return contextRunnableList; + } + + @Override + public void run() { + try { + JobExecuteContextThreadLocalRepo.set(context); + delegate.run(); + } finally { + JobExecuteContextThreadLocalRepo.unset(); + } + } + } +} +