diff --git a/package.sh b/package.sh index 879ae47406..8c83efda31 100644 --- a/package.sh +++ b/package.sh @@ -152,7 +152,6 @@ fi cp -r support-files/bk-cmdb/ release/job/support-files/ cp -r support-files/bkiam/ release/job/support-files/ cp -r support-files/dependJarInfo/ release/job/support-files/ -cp support-files/javaagent/* release/job/backend/ # Package dependJarLists if [[ -d "support-files/dependJarLists/" ]]; then cp -r support-files/dependJarLists/ release/job/support-files/ diff --git a/src/backend/commons/cmdb-sdk-ext/src/main/java/com/tencent/bk/job/common/cc/config/CMDBFlowControllerConfig.java b/src/backend/commons/cmdb-sdk-ext/src/main/java/com/tencent/bk/job/common/cc/config/CMDBFlowControllerConfig.java index 2e7569210a..ecfd5d5604 100644 --- a/src/backend/commons/cmdb-sdk-ext/src/main/java/com/tencent/bk/job/common/cc/config/CMDBFlowControllerConfig.java +++ b/src/backend/commons/cmdb-sdk-ext/src/main/java/com/tencent/bk/job/common/cc/config/CMDBFlowControllerConfig.java @@ -24,7 +24,6 @@ package com.tencent.bk.job.common.cc.config; -import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; import com.tencent.bk.job.common.redis.util.RedisSlideWindowFlowController; import com.tencent.bk.job.common.util.StringUtil; import lombok.extern.slf4j.Slf4j; @@ -85,7 +84,6 @@ public void initCMDBGlobalFlowController() { map, cmdbConfig.getFlowControlDefaultLimit(), cmdbConfig.getFlowControlPrecision()); cmdbGlobalFlowController.init(redisTemplate, map, cmdbConfig.getFlowControlDefaultLimit(), cmdbConfig.getFlowControlPrecision()); - BizCmdbClient.setGlobalFlowController(cmdbGlobalFlowController); } catch (Exception e) { log.error("Fail to init globalFlowController", e); } diff --git a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/config/BizCmdbClientAutoConfig.java b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/config/BizCmdbClientAutoConfig.java new file mode 100644 index 0000000000..33d2de8d98 --- /dev/null +++ b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/config/BizCmdbClientAutoConfig.java @@ -0,0 +1,119 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.common.cc.config; + +import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; +import com.tencent.bk.job.common.esb.config.EsbConfig; +import com.tencent.bk.job.common.esb.constants.EsbLang; +import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient; +import com.tencent.bk.job.common.util.FlowController; +import io.micrometer.core.instrument.MeterRegistry; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Configuration +public class BizCmdbClientAutoConfig { + + @Bean("cmdbThreadPoolExecutor") + public ThreadPoolExecutor cmdbThreadPoolExecutor(CmdbConfig cmdbConfig) { + int cmdbQueryThreadsNum = cmdbConfig.getCmdbQueryThreadsNum(); + return new ThreadPoolExecutor( + cmdbQueryThreadsNum, + cmdbQueryThreadsNum, + 180L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(cmdbQueryThreadsNum * 4), (r, executor) -> { + //使用请求的线程直接拉取数据 + log.error("cmdb request runnable rejected, use current thread({}), plz add more threads", + Thread.currentThread().getName()); + r.run(); + }); + } + + @Bean("cmdbLongTermThreadPoolExecutor") + public ThreadPoolExecutor cmdbLongTermThreadPoolExecutor(CmdbConfig cmdbConfig) { + int longTermCmdbQueryThreadsNum = cmdbConfig.getFindHostRelationLongTermConcurrency(); + return new ThreadPoolExecutor( + longTermCmdbQueryThreadsNum, + longTermCmdbQueryThreadsNum, + 180L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(longTermCmdbQueryThreadsNum * 4), (r, executor) -> { + //使用请求的线程直接拉取数据 + log.warn("cmdb long term request runnable rejected, use current thread({}), plz add more threads", + Thread.currentThread().getName()); + r.run(); + }); + } + + @Bean + @Primary + public BizCmdbClient bizCmdbClient(EsbConfig esbConfig, + CmdbConfig cmdbConfig, + ThreadPoolExecutor cmdbThreadPoolExecutor, + ThreadPoolExecutor cmdbLongTermThreadPoolExecutor, + @Autowired(required = false) QueryAgentStatusClient queryAgentStatusClient, + MeterRegistry meterRegistry, + @Autowired(required = false) FlowController flowController) { + return new BizCmdbClient( + esbConfig, + cmdbConfig, + EsbLang.EN, + cmdbThreadPoolExecutor, + cmdbLongTermThreadPoolExecutor, + queryAgentStatusClient, + flowController, + meterRegistry + ); + } + + @Bean("cnBizCmdbClient") + public BizCmdbClient cnBizCmdbClient(EsbConfig esbConfig, + CmdbConfig cmdbConfig, + ThreadPoolExecutor cmdbThreadPoolExecutor, + ThreadPoolExecutor cmdbLongTermThreadPoolExecutor, + @Autowired(required = false) QueryAgentStatusClient queryAgentStatusClient, + MeterRegistry meterRegistry, + @Autowired(required = false) FlowController flowController) { + return new BizCmdbClient( + esbConfig, + cmdbConfig, + EsbLang.CN, + cmdbThreadPoolExecutor, + cmdbLongTermThreadPoolExecutor, + queryAgentStatusClient, + flowController, + meterRegistry + ); + } +} diff --git a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java index a4a5d24bbb..3afc2bc12d 100644 --- a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java +++ b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java @@ -159,13 +159,13 @@ public class BizCmdbClient extends AbstractEsbSdkClient implements IBizCmdbClien private static final ConcurrentHashMap> bizInternalTopoMap = new ConcurrentHashMap<>(); private static final ConcurrentHashMap bizInternalTopoLockMap = new ConcurrentHashMap<>(); - public static ThreadPoolExecutor threadPoolExecutor = null; - public static ThreadPoolExecutor longTermThreadPoolExecutor = null; - public static CmdbConfig cmdbConfig = null; + private final ThreadPoolExecutor threadPoolExecutor; + private final ThreadPoolExecutor longTermThreadPoolExecutor; + private final CmdbConfig cmdbConfig; /** * 对整个应用中所有的CMDB调用进行限流 */ - private static FlowController globalFlowController = null; + private final FlowController globalFlowController; static { interfaceNameMap.put(SEARCH_BIZ_INST_TOPO, "search_biz_inst_topo"); @@ -187,66 +187,37 @@ public class BizCmdbClient extends AbstractEsbSdkClient implements IBizCmdbClien protected String defaultSupplierAccount; protected String defaultUin = "admin"; - private QueryAgentStatusClient queryAgentStatusClient; + private final QueryAgentStatusClient queryAgentStatusClient; private final MeterRegistry meterRegistry; private final LoadingCache bizInstCompleteTopologyCache = CacheBuilder.newBuilder() .maximumSize(1000).expireAfterWrite(30, TimeUnit.SECONDS). build(new CacheLoader() { @Override - public InstanceTopologyDTO load(Long bizId) { + public InstanceTopologyDTO load(@SuppressWarnings("NullableProblems") Long bizId) { return getBizInstCompleteTopology(bizId); } } ); - public BizCmdbClient(EsbConfig esbConfig, CmdbConfig cmdbConfig, QueryAgentStatusClient queryAgentStatusClient, + public BizCmdbClient(EsbConfig esbConfig, + CmdbConfig cmdbConfig, + String lang, + ThreadPoolExecutor threadPoolExecutor, + ThreadPoolExecutor longTermThreadPoolExecutor, + QueryAgentStatusClient queryAgentStatusClient, + FlowController flowController, MeterRegistry meterRegistry) { - this(esbConfig, cmdbConfig, null, queryAgentStatusClient, meterRegistry); - } - - public BizCmdbClient(EsbConfig esbConfig, CmdbConfig cmdbConfig, String lang, - QueryAgentStatusClient queryAgentStatusClient, MeterRegistry meterRegistry) { super(esbConfig.getEsbUrl(), esbConfig.getAppCode(), esbConfig.getAppSecret(), lang, esbConfig.isUseEsbTestEnv()); + this.cmdbConfig = cmdbConfig; this.defaultSupplierAccount = cmdbConfig.getDefaultSupplierAccount(); + this.threadPoolExecutor = threadPoolExecutor; + this.longTermThreadPoolExecutor = longTermThreadPoolExecutor; this.queryAgentStatusClient = queryAgentStatusClient; + this.globalFlowController = flowController; this.meterRegistry = meterRegistry; } - public static void setGlobalFlowController(FlowController flowController) { - globalFlowController = flowController; - } - - private static void initThreadPoolExecutor(int cmdbQueryThreadsNum, int longTermCmdbQueryThreadsNum) { - threadPoolExecutor = new ThreadPoolExecutor(cmdbQueryThreadsNum, cmdbQueryThreadsNum, 180L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(cmdbQueryThreadsNum * 4), (r, executor) -> { - //使用请求的线程直接拉取数据 - log.error("cmdb request runnable rejected, use current thread({}), plz add more threads", - Thread.currentThread().getName()); - r.run(); - }); - longTermThreadPoolExecutor = new ThreadPoolExecutor(longTermCmdbQueryThreadsNum, longTermCmdbQueryThreadsNum, - 180L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(longTermCmdbQueryThreadsNum * 4), (r, executor) -> { - //使用请求的线程直接拉取数据 - log.warn("cmdb long term request runnable rejected, use current thread({}), plz add more threads", - Thread.currentThread().getName()); - r.run(); - }); - } - - public static void init() { - initThreadPoolExecutor(cmdbConfig.getCmdbQueryThreadsNum(), - cmdbConfig.getFindHostRelationLongTermConcurrency()); - } - - public static void setCcConfig(CmdbConfig cmdbConfig) { - BizCmdbClient.cmdbConfig = cmdbConfig; - } - - public void setQueryAgentStatusClient(QueryAgentStatusClient queryAgentStatusClient) { - this.queryAgentStatusClient = queryAgentStatusClient; - } - @Override public InstanceTopologyDTO getBizInstCompleteTopology(long bizId) { InstanceTopologyDTO completeTopologyDTO; @@ -555,15 +526,15 @@ private List findModuleHostRelationConcurrently(long bizId, FindModuleHostRelationTask task = new FindModuleHostRelationTask(resultQueue, genFindModuleHostRelationReq(bizId, moduleIdList, start, limit), JobContextUtil.getRequestId()); + Future future; if (totalCount > 10000) { //主机数太多,防止将CMDB拉挂了 - Future future = longTermThreadPoolExecutor.submit(task); - futures.add(future); + future = longTermThreadPoolExecutor.submit(task); } else { // 默认采用多个并发线程拉取 - Future future = threadPoolExecutor.submit(task); - futures.add(future); + future = threadPoolExecutor.submit(task); } + futures.add(future); totalCount -= limit; } futures.forEach(it -> { diff --git a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/CmdbClientFactory.java b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/CmdbClientFactory.java index df99d2007e..f81e802147 100644 --- a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/CmdbClientFactory.java +++ b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/CmdbClientFactory.java @@ -24,53 +24,13 @@ package com.tencent.bk.job.common.cc.sdk; -import com.google.common.collect.Maps; -import com.tencent.bk.job.common.cc.config.CmdbConfig; -import com.tencent.bk.job.common.esb.config.EsbConfig; -import com.tencent.bk.job.common.esb.constants.EsbLang; -import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient; import com.tencent.bk.job.common.i18n.locale.LocaleUtils; import com.tencent.bk.job.common.util.ApplicationContextRegister; -import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.DependsOn; - -import java.util.Map; @Slf4j -@DependsOn({"applicationContextRegister", "cmdbConfigSetter"}) public class CmdbClientFactory { - private static final Map CMDB_CLIENT_MAPS = Maps.newHashMap(); - - static { - EsbConfig esbConfig = null; - CmdbConfig cmdbConfig = null; - QueryAgentStatusClient queryAgentStatusClient = null; - MeterRegistry meterRegistry = null; - try { - esbConfig = ApplicationContextRegister.getBean(EsbConfig.class); - cmdbConfig = ApplicationContextRegister.getBean(CmdbConfig.class); - queryAgentStatusClient = ApplicationContextRegister.getBean(QueryAgentStatusClient.class); - meterRegistry = ApplicationContextRegister.getBean(MeterRegistry.class); - } catch (Throwable e) { - log.error("Error while initialize bk config!", e); - throw e; - } - CMDB_CLIENT_MAPS.put(LocaleUtils.LANG_ZH_CN, - new BizCmdbClient(esbConfig, cmdbConfig, EsbLang.CN, queryAgentStatusClient, meterRegistry) - ); - CMDB_CLIENT_MAPS.put(LocaleUtils.LANG_EN, - new BizCmdbClient(esbConfig, cmdbConfig, EsbLang.EN, queryAgentStatusClient, meterRegistry) - ); - CMDB_CLIENT_MAPS.put(LocaleUtils.LANG_ZH, - new BizCmdbClient(esbConfig, cmdbConfig, EsbLang.CN, queryAgentStatusClient, meterRegistry) - ); - CMDB_CLIENT_MAPS.put(LocaleUtils.LANG_EN_US, - new BizCmdbClient(esbConfig, cmdbConfig, EsbLang.EN, queryAgentStatusClient, meterRegistry) - ); - } - public static IBizCmdbClient getCmdbClient() { return getCmdbClient(LocaleUtils.LANG_EN_US); } @@ -79,10 +39,12 @@ public static IBizCmdbClient getCmdbClient(String language) { if (language == null) { language = LocaleUtils.LANG_EN_US; } - IBizCmdbClient icmdbClient = CMDB_CLIENT_MAPS.get(language); - if (icmdbClient == null) { - icmdbClient = CMDB_CLIENT_MAPS.get(LocaleUtils.LANG_EN_US); + switch (language) { + case LocaleUtils.LANG_ZH: + case LocaleUtils.LANG_ZH_CN: + return ApplicationContextRegister.getBean("cnBizCmdbClient", IBizCmdbClient.class); + default: + return ApplicationContextRegister.getBean(IBizCmdbClient.class); } - return icmdbClient; } } diff --git a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/service/CloudAreaService.java b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/service/CloudAreaService.java index b7109ac766..c7ad08afde 100644 --- a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/service/CloudAreaService.java +++ b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/service/CloudAreaService.java @@ -24,19 +24,13 @@ package com.tencent.bk.job.common.cc.service; -import com.tencent.bk.job.common.cc.config.CmdbConfig; import com.tencent.bk.job.common.cc.model.CcCloudAreaInfoDTO; -import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; import com.tencent.bk.job.common.cc.sdk.IBizCmdbClient; -import com.tencent.bk.job.common.esb.config.EsbConfig; -import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient; -import com.tencent.bk.job.common.i18n.locale.LocaleUtils; -import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; -import org.springframework.context.annotation.DependsOn; +import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Iterator; @@ -53,18 +47,16 @@ * @since 23/12/2019 22:48 */ -@DependsOn({"cmdbConfigSetter"}) @Slf4j +@Service public class CloudAreaService { private static final Map CLOUD_AREA_NAME_MAP = new ConcurrentHashMap<>(); - private static IBizCmdbClient esbBizCmdbClient; + private static IBizCmdbClient bizCmdbClient; private static List fullCloudAreaInfoList; - public CloudAreaService(EsbConfig esbConfig, CmdbConfig cmdbConfig, QueryAgentStatusClient queryAgentStatusClient, - MeterRegistry meterRegistry) { + public CloudAreaService(IBizCmdbClient bizCmdbClient) { + CloudAreaService.bizCmdbClient = bizCmdbClient; CloudAreaNameCacheThread cloudAreaNameCacheThread = new CloudAreaNameCacheThread(); - esbBizCmdbClient = new BizCmdbClient(esbConfig, cmdbConfig, LocaleUtils.LANG_EN_US, queryAgentStatusClient, - meterRegistry); cloudAreaNameCacheThread.start(); } @@ -78,7 +70,7 @@ public static String getCloudAreaNameFromCache(Long cloudAreaId) { } private static List getCloudAreaListFromCc() { - List cloudAreaInfoList = esbBizCmdbClient.getCloudAreaList(); + List cloudAreaInfoList = bizCmdbClient.getCloudAreaList(); if (cloudAreaInfoList == null) { return new ArrayList<>(); } diff --git a/src/backend/commons/common-otel/build.gradle b/src/backend/commons/common-otel/build.gradle new file mode 100644 index 0000000000..08589eb630 --- /dev/null +++ b/src/backend/commons/common-otel/build.gradle @@ -0,0 +1,32 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software', to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +dependencies { + api "org.springframework.cloud:spring-cloud-sleuth-api" + api "org.springframework.cloud:spring-cloud-sleuth-otel" + api "org.springframework.cloud:spring-cloud-sleuth-otel-autoconfigure" + api "io.opentelemetry:opentelemetry-exporter-otlp-trace" + // https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-sdk-extension-resources + api 'io.opentelemetry:opentelemetry-sdk-extension-resources' +} diff --git a/src/backend/commons/common-otel/src/main/java/com/tencent/bk/job/common/tracing/OtelConfiguration.java b/src/backend/commons/common-otel/src/main/java/com/tencent/bk/job/common/tracing/OtelConfiguration.java new file mode 100644 index 0000000000..481b05efa1 --- /dev/null +++ b/src/backend/commons/common-otel/src/main/java/com/tencent/bk/job/common/tracing/OtelConfiguration.java @@ -0,0 +1,91 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.common.tracing; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder; +import io.opentelemetry.sdk.trace.SpanLimits; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.sleuth.autoconfig.otel.SpanProcessorProvider; +import org.springframework.cloud.sleuth.otel.bridge.SpanExporterCustomizer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +@Slf4j +@Configuration(proxyBeanMethods = false) +class OtelConfiguration { + + @Value("${spring.sleuth.otel.exporter.enabled:false}") + private boolean exporterEnabled; + + @Value("${spring.sleuth.otel.resource.bkDataToken:}") + private String bkDataToken; + + @Bean + Supplier otelBkDataTokenResourceProvider() { + return this::buildResource; + } + + Resource buildResource() { + AttributesBuilder attributes = Attributes.builder(); + attributes.put("bk.data.token", bkDataToken); + return Resource.create(attributes.build(), ResourceAttributes.SCHEMA_URL); + } + + @Bean + SdkTracerProvider otelTracerProvider(SpanLimits spanLimits, + ObjectProvider> spanProcessors, + SpanExporterCustomizer spanExporterCustomizer, + ObjectProvider> spanExporters, + Sampler sampler, + Resource resource, + SpanProcessorProvider spanProcessorProvider) { + log.debug("exporterEnabled={},bkDataToken={}", exporterEnabled, bkDataToken); + SdkTracerProviderBuilder sdkTracerProviderBuilder = SdkTracerProvider.builder().setResource(resource) + .setSampler(sampler).setSpanLimits(spanLimits); + List processors = spanProcessors.getIfAvailable(ArrayList::new); + if (exporterEnabled) { + processors.addAll(spanExporters.getIfAvailable(ArrayList::new).stream() + .map(e -> spanProcessorProvider.toSpanProcessor(spanExporterCustomizer.customize(e))) + .collect(Collectors.toList())); + } + processors.forEach(sdkTracerProviderBuilder::addSpanProcessor); + return sdkTracerProviderBuilder.build(); + } +} diff --git a/src/backend/commons/common-otel/src/main/resources/META-INF/spring.factories b/src/backend/commons/common-otel/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/backend/commons/common-service/build.gradle b/src/backend/commons/common-service/build.gradle index 67ca78de5f..2b8c10d50b 100644 --- a/src/backend/commons/common-service/build.gradle +++ b/src/backend/commons/common-service/build.gradle @@ -28,6 +28,7 @@ println("assembly mode: $assemblyMode") dependencies { api project(':commons:common') api project(':commons:common-security') + api project(':commons:common-otel') api 'org.hibernate.validator:hibernate-validator' api 'jakarta.validation:jakarta.validation-api' api 'org.springframework.boot:spring-boot-starter-actuator' @@ -54,9 +55,5 @@ dependencies { api 'io.jsonwebtoken:jjwt' api 'io.prometheus:simpleclient_pushgateway' api 'io.micrometer:micrometer-registry-prometheus' - api "org.springframework.cloud:spring-cloud-sleuth-api" - api "org.springframework.cloud:spring-cloud-sleuth-otel" - api "org.springframework.cloud:spring-cloud-sleuth-otel-autoconfigure" - api "io.opentelemetry:opentelemetry-exporter-otlp-trace" testImplementation 'org.springframework.boot:spring-boot-starter-test' } diff --git a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceableExecutorService.java b/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceableExecutorService.java deleted file mode 100644 index 487c95a810..0000000000 --- a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceableExecutorService.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. - * - * License for BK-JOB蓝鲸智云作业平台: - * -------------------------------------------------------------------- - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and - * to permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of - * the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO - * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -package com.tencent.bk.job.common.trace.executors; - -import org.springframework.cloud.sleuth.Tracer; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public class TraceableExecutorService implements ExecutorService { - - protected ExecutorService delegateExecutorService; - protected Tracer tracer; - - public TraceableExecutorService(ExecutorService executorService, Tracer tracer) { - this.delegateExecutorService = executorService; - this.tracer = tracer; - } - - public ExecutorService getDelegateExecutorService() { - return delegateExecutorService; - } - - @Override - public void shutdown() { - delegateExecutorService.shutdown(); - } - - @Override - public List shutdownNow() { - return delegateExecutorService.shutdownNow(); - } - - @Override - public boolean isShutdown() { - return delegateExecutorService.isShutdown(); - } - - @Override - public boolean isTerminated() { - return delegateExecutorService.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return delegateExecutorService.awaitTermination(timeout, unit); - } - - @Override - public Future submit(Callable task) { - return delegateExecutorService.submit(new TraceCallable(task, tracer)); - } - - @Override - public Future submit(Runnable task, T result) { - return delegateExecutorService.submit(new TraceRunnable(task, tracer), result); - } - - @Override - public Future submit(Runnable task) { - return delegateExecutorService.submit(new TraceRunnable(task, tracer)); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return delegateExecutorService.invokeAll(tasks); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, - TimeUnit unit) throws InterruptedException { - return delegateExecutorService.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return delegateExecutorService.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, - TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return delegateExecutorService.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - delegateExecutorService.execute(new TraceRunnable(command, tracer)); - } -} diff --git a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceableScheduledExecutorService.java b/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceableScheduledExecutorService.java deleted file mode 100644 index 03c487646f..0000000000 --- a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceableScheduledExecutorService.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. - * - * License for BK-JOB蓝鲸智云作业平台: - * -------------------------------------------------------------------- - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and - * to permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of - * the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO - * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -package com.tencent.bk.job.common.trace.executors; - - -import org.springframework.cloud.sleuth.Tracer; - -import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public class TraceableScheduledExecutorService extends TraceableExecutorService implements ScheduledExecutorService { - public TraceableScheduledExecutorService(final ScheduledExecutorService delegate, final Tracer tracer) { - super(delegate, tracer); - } - - private ScheduledExecutorService getScheduledExecutorService() { - return (ScheduledExecutorService) this.delegateExecutorService; - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - Runnable r = new TraceRunnable(command, tracer); - return getScheduledExecutorService().schedule(r, delay, unit); - } - - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - Callable c = new TraceCallable<>(callable, tracer); - return getScheduledExecutorService().schedule(c, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - Runnable r = new TraceRunnable(command, tracer); - return getScheduledExecutorService().scheduleAtFixedRate(r, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - Runnable r = new TraceRunnable(command, tracer); - return getScheduledExecutorService().scheduleWithFixedDelay(r, initialDelay, delay, unit); - } -} diff --git a/src/backend/commons/common-service/src/main/resources/logback-spring.xml b/src/backend/commons/common-service/src/main/resources/logback-spring.xml index b283f4080a..d7b1c4fe6d 100644 --- a/src/backend/commons/common-service/src/main/resources/logback-spring.xml +++ b/src/backend/commons/common-service/src/main/resources/logback-spring.xml @@ -5,8 +5,8 @@ - - + + diff --git a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/JobCommonInterceptor.java b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/JobCommonInterceptor.java index d0f23cca3c..0ce8cbbf38 100644 --- a/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/JobCommonInterceptor.java +++ b/src/backend/commons/common-web/src/main/java/com/tencent/bk/job/common/web/interceptor/JobCommonInterceptor.java @@ -122,7 +122,6 @@ private void initSpanAndAddRequestId() { currentSpan = tracer.nextSpan().start(); } spanInScope = tracer.withSpan(currentSpan); - log.debug("currentSpan={}", currentSpan); String traceId = currentSpan.context().traceId(); JobContextUtil.setRequestId(traceId); } diff --git a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/config/ExecutorConfiguration.java b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/config/ExecutorConfiguration.java new file mode 100644 index 0000000000..1b71f1959e --- /dev/null +++ b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/config/ExecutorConfiguration.java @@ -0,0 +1,72 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.analysis.config; + +import com.tencent.bk.job.analysis.task.statistics.StatisticsTaskScheduler; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Configuration +public class ExecutorConfiguration { + + @Bean("analysisScheduleExecutor") + public ThreadPoolExecutor analysisScheduleExecutor() { + return new ThreadPoolExecutor( + 5, + 10, + 60L, + TimeUnit.SECONDS, new LinkedBlockingQueue(2), (r, executor) -> + log.warn("analysisTask runnable rejected!") + ); + } + + @Bean("currentStatisticsTaskExecutor") + public ThreadPoolExecutor currentStatisticsTaskExecutor() { + return new ThreadPoolExecutor( + StatisticsTaskScheduler.defaultCorePoolSize, + StatisticsTaskScheduler.defaultMaximumPoolSize, + StatisticsTaskScheduler.defaultKeepAliveTime, + TimeUnit.SECONDS, new LinkedBlockingQueue<>(StatisticsTaskScheduler.currentStatisticsTaskQueueSize), + (r, executor) -> log.error("statisticsTask runnable rejected! num:{}", + StatisticsTaskScheduler.rejectedStatisticsTaskNum.incrementAndGet())); + } + + @Bean("pastStatisticsTaskExecutor") + public ThreadPoolExecutor pastStatisticsTaskExecutor() { + return new ThreadPoolExecutor( + StatisticsTaskScheduler.defaultCorePoolSize, + StatisticsTaskScheduler.defaultMaximumPoolSize, + StatisticsTaskScheduler.defaultKeepAliveTime, + TimeUnit.SECONDS, new LinkedBlockingQueue<>(StatisticsTaskScheduler.pastStatisticsTaskQueueSize), + (r, executor) -> log.error("pastStatisticsTaskExecutor runnable rejected! num:{}", + StatisticsTaskScheduler.rejectedStatisticsTaskNum.incrementAndGet())); + } +} diff --git a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/analysis/AnalysisTaskScheduler.java b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/analysis/AnalysisTaskScheduler.java index cefd86329d..450acd827e 100644 --- a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/analysis/AnalysisTaskScheduler.java +++ b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/analysis/AnalysisTaskScheduler.java @@ -32,14 +32,17 @@ import io.micrometer.core.instrument.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; /** * @Description @@ -54,28 +57,25 @@ public class AnalysisTaskScheduler { public static final String NAME_ANALYSIS_TASK_SCHEDULE_POOL_SIZE = "analysisTask.schedule.pool.size"; public static final String NAME_ANALYSIS_TASK_SCHEDULE_QUEUE_SIZE = "analysisTask.schedule.queue.size"; - private static final ThreadPoolExecutor scheduleThreadPoolExecutor = new ThreadPoolExecutor(5, 10, 60L, - TimeUnit.SECONDS, new LinkedBlockingQueue(2), new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - log.warn("analysisTask runnable rejected!"); - } - }); + private final ThreadPoolExecutor analysisScheduleExecutor; public static List analysisTaskList = new ArrayList<>(); public static Map analysisTaskMap = new ConcurrentHashMap<>(); private static long count = -1L; + @Autowired - public AnalysisTaskScheduler(MeterRegistry meterRegistry) { + public AnalysisTaskScheduler(MeterRegistry meterRegistry, + @Qualifier("analysisScheduleExecutor") ThreadPoolExecutor analysisScheduleExecutor) { + this.analysisScheduleExecutor = analysisScheduleExecutor; meterRegistry.gauge( NAME_ANALYSIS_TASK_SCHEDULE_POOL_SIZE, - Arrays.asList(Tag.of(StatisticsConstants.TAG_KEY_MODULE, VALUE_MODULE_ANALYSIS_TASK)), - scheduleThreadPoolExecutor, + Collections.singletonList(Tag.of(StatisticsConstants.TAG_KEY_MODULE, VALUE_MODULE_ANALYSIS_TASK)), + this.analysisScheduleExecutor, ThreadPoolExecutor::getPoolSize ); meterRegistry.gauge( NAME_ANALYSIS_TASK_SCHEDULE_QUEUE_SIZE, - Arrays.asList(Tag.of(StatisticsConstants.TAG_KEY_MODULE, VALUE_MODULE_ANALYSIS_TASK)), - scheduleThreadPoolExecutor, + Collections.singletonList(Tag.of(StatisticsConstants.TAG_KEY_MODULE, VALUE_MODULE_ANALYSIS_TASK)), + this.analysisScheduleExecutor, threadPoolExecutor -> threadPoolExecutor.getQueue().size() ); } @@ -116,7 +116,7 @@ public void schedule() { } else { if (count % analysisTask.getPeriodSeconds() == 0) { //开一个新线程跑分析任务 - scheduleThreadPoolExecutor.submit(analysisTask); + analysisScheduleExecutor.submit(analysisTask); } } }); diff --git a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/statistics/StatisticsTaskScheduler.java b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/statistics/StatisticsTaskScheduler.java index 2804e1f0bf..734eb1a955 100644 --- a/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/statistics/StatisticsTaskScheduler.java +++ b/src/backend/job-analysis/service-job-analysis/src/main/java/com/tencent/bk/job/analysis/task/statistics/StatisticsTaskScheduler.java @@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @@ -75,29 +76,23 @@ public class StatisticsTaskScheduler { private static final String machineIp = IpUtils.getFirstMachineIP(); private static final String REDIS_KEY_STATISTIC_JOB_LOCK = "statistic-job-lock"; private static final String REDIS_KEY_STATISTIC_JOB_RUNNING_MACHINE = "statistic-job-running-machine"; - private static final AtomicLong rejectedStatisticsTaskNum = new AtomicLong(0); + public static final AtomicLong rejectedStatisticsTaskNum = new AtomicLong(0); private static final String REDIS_KEY_STATISTIC_JOB_INIT_MACHINE = "statistic-job-init-machine"; + + // 线程池默认配置参数 + public static final long defaultKeepAliveTime = 60L; + public static final int defaultCorePoolSize = 10; + public static final int defaultMaximumPoolSize = 10; + public static final int currentStatisticsTaskQueueSize = 2000; + public static final int pastStatisticsTaskQueueSize = 200000; + public static List statisticsTaskList = new ArrayList<>(); public static Map statisticsTaskMap = new ConcurrentHashMap<>(); public static Map currentStatisticTaskFutureMap = new HashMap<>(); - // 线程池默认配置参数 - private static final long defaultKeepAliveTime = 60L; - private static final int defaultCorePoolSize = 10; - private static final int defaultMaximumPoolSize = 10; - private static final int currentStatisticsTaskQueueSize = 2000; - private static final int pastStatisticsTaskQueueSize = 200000; - private static ThreadPoolExecutor currentStatisticsTaskExecutor = new ThreadPoolExecutor( - defaultCorePoolSize, defaultMaximumPoolSize, defaultKeepAliveTime, - TimeUnit.SECONDS, new LinkedBlockingQueue<>(currentStatisticsTaskQueueSize), - (r, executor) -> log.error("statisticsTask runnable rejected! num:{}", - rejectedStatisticsTaskNum.incrementAndGet())); - private static ThreadPoolExecutor pastStatisticsTaskExecutor = new ThreadPoolExecutor( - defaultCorePoolSize, defaultMaximumPoolSize, defaultKeepAliveTime, - TimeUnit.SECONDS, new LinkedBlockingQueue<>(pastStatisticsTaskQueueSize), - (r, executor) -> log.error("pastStatisticsTaskExecutor runnable rejected! num:{}", - rejectedStatisticsTaskNum.incrementAndGet())); + private ThreadPoolExecutor currentStatisticsTaskExecutor; + private ThreadPoolExecutor pastStatisticsTaskExecutor; static { List keyList = Collections.singletonList(REDIS_KEY_STATISTIC_JOB_LOCK); @@ -118,9 +113,17 @@ public class StatisticsTaskScheduler { private int currentCycleHours = 0; @Autowired - public StatisticsTaskScheduler(MeterRegistry meterRegistry, StatisticConfig statisticConfig, RedisTemplate redisTemplate, PastStatisticsMakeupTask pastStatisticsMakeupTask, + public StatisticsTaskScheduler(MeterRegistry meterRegistry, + @Qualifier("currentStatisticsTaskExecutor") + ThreadPoolExecutor currentStatisticsTaskExecutor, + @Qualifier("pastStatisticsTaskExecutor") + ThreadPoolExecutor pastStatisticsTaskExecutor, + StatisticConfig statisticConfig, + RedisTemplate redisTemplate, + PastStatisticsMakeupTask pastStatisticsMakeupTask, ClearExpiredStatisticsTask clearExpiredStatisticsTask) { + this.currentStatisticsTaskExecutor = currentStatisticsTaskExecutor; + this.pastStatisticsTaskExecutor = pastStatisticsTaskExecutor; this.statisticConfig = statisticConfig; this.redisTemplate = redisTemplate; this.pastStatisticsMakeupTask = pastStatisticsMakeupTask; @@ -129,14 +132,14 @@ public StatisticsTaskScheduler(MeterRegistry meterRegistry, StatisticConfig stat StatisticsConstants.NAME_STATISTICS_TASK_SCHEDULE_POOL_SIZE, Collections.singletonList(Tag.of(StatisticsConstants.TAG_KEY_MODULE, StatisticsConstants.TAG_VALUE_MODULE_STATISTICS_TASK)), - currentStatisticsTaskExecutor, + this.currentStatisticsTaskExecutor, ThreadPoolExecutor::getPoolSize ); meterRegistry.gauge( StatisticsConstants.NAME_STATISTICS_TASK_SCHEDULE_QUEUE_SIZE, Collections.singletonList(Tag.of(StatisticsConstants.TAG_KEY_MODULE, StatisticsConstants.TAG_VALUE_MODULE_STATISTICS_TASK)), - currentStatisticsTaskExecutor, + this.currentStatisticsTaskExecutor, threadPoolExecutor -> threadPoolExecutor.getQueue().size() ); } diff --git a/src/backend/job-backup/boot-job-backup/src/main/java/com/tencent/bk/job/backup/config/ArchivistAutoConfig.java b/src/backend/job-backup/boot-job-backup/src/main/java/com/tencent/bk/job/backup/config/ArchivistAutoConfig.java index d3657bbf80..c76adfb93f 100644 --- a/src/backend/job-backup/boot-job-backup/src/main/java/com/tencent/bk/job/backup/config/ArchivistAutoConfig.java +++ b/src/backend/job-backup/boot-job-backup/src/main/java/com/tencent/bk/job/backup/config/ArchivistAutoConfig.java @@ -53,6 +53,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; +import java.util.concurrent.ExecutorService; + @Configuration @EnableScheduling @Slf4j @@ -217,6 +219,7 @@ public JobExecuteArchiveManage jobExecuteArchiveManage( @Autowired(required = false) RollingConfigRecordDAO rollingConfigRecordDAO, @Autowired(required = false) ExecuteArchiveDAO executeArchiveDAO, ArchiveProgressService archiveProgressService, + @Qualifier("archiveExecutor") ExecutorService archiveExecutor, ArchiveConfig archiveConfig) { log.info("Init JobExecuteArchiveManage"); @@ -239,7 +242,7 @@ public JobExecuteArchiveManage jobExecuteArchiveManage( rollingConfigRecordDAO, executeArchiveDAO, archiveProgressService, - archiveConfig - ); + archiveConfig, + archiveExecutor); } } diff --git a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/JobExecuteArchiveManage.java b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/JobExecuteArchiveManage.java index 2738ace866..260b7eec85 100644 --- a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/JobExecuteArchiveManage.java +++ b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/JobExecuteArchiveManage.java @@ -24,7 +24,6 @@ package com.tencent.bk.job.backup.archive; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.tencent.bk.job.backup.archive.impl.FileSourceTaskLogArchivist; import com.tencent.bk.job.backup.archive.impl.GseFileAgentTaskArchivist; import com.tencent.bk.job.backup.archive.impl.GseScriptAgentTaskArchivist; @@ -68,19 +67,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; @Slf4j public class JobExecuteArchiveManage implements SmartLifecycle { private final ArchiveConfig archiveConfig; - private static final ExecutorService ARCHIVE_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(20, 20, - 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(20), - new ThreadFactoryBuilder().setNameFormat("archive-thread-pool-%d").build(), - new ThreadPoolExecutor.AbortPolicy()); + private final ExecutorService archiveExecutor; private final FileSourceTaskLogArchivist fileSourceTaskLogArchivist; private final StepInstanceArchivist stepInstanceArchivist; @@ -124,7 +117,8 @@ public JobExecuteArchiveManage(TaskInstanceRecordDAO taskInstanceRecordDAO, RollingConfigRecordDAO rollingConfigRecordDAO, ExecuteArchiveDAO executeArchiveDAO, ArchiveProgressService archiveProgressService, - ArchiveConfig archiveConfig) { + ArchiveConfig archiveConfig, + ExecutorService archiveExecutor) { log.info("Init JobExecuteArchiveManage! archiveConfig: {}", archiveConfig); this.archiveConfig = archiveConfig; this.archiveProgressService = archiveProgressService; @@ -159,6 +153,7 @@ public JobExecuteArchiveManage(TaskInstanceRecordDAO taskInstanceRecordDAO, executeArchiveDAO, archiveProgressService); this.rollingConfigArchivist = new RollingConfigArchivist(rollingConfigRecordDAO, executeArchiveDAO, archiveProgressService); + this.archiveExecutor = archiveExecutor; } @Scheduled(cron = "${job.execute.archive.cron:0 0 4 * * *}") @@ -273,52 +268,52 @@ private void archive(long maxNeedArchiveTaskInstanceId, long maxNeedArchiveStepI log.info("Submitting archive task..."); // task_instance - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> taskInstanceArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> taskInstanceArchivist.archive(archiveConfig, maxNeedArchiveTaskInstanceId, countDownLatch)); // step_instance - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> stepInstanceArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> stepInstanceArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // step_instance_confirm - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> stepInstanceConfirmArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> stepInstanceConfirmArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // step_instance_file - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> stepInstanceFileArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> stepInstanceFileArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // step_instance_script - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> stepInstanceScriptArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> stepInstanceScriptArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // gse_task_log - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> gseTaskLogArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> gseTaskLogArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // file_source_task 非正式发布功能,暂时不开启 // ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> fileSourceTaskLogArchivist.archive(archiveConfig, // maxNeedArchiveStepInstanceId, countDownLatch)); // gse_task_ip_log - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> gseTaskIpLogArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> gseTaskIpLogArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // task_instance_variable - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> taskInstanceVariableArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> taskInstanceVariableArchivist.archive(archiveConfig, maxNeedArchiveTaskInstanceId, countDownLatch)); // step_instance_variable - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> stepInstanceVariableArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> stepInstanceVariableArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // operation_log - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> operationLogArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> operationLogArchivist.archive(archiveConfig, maxNeedArchiveTaskInstanceId, countDownLatch)); // gse_task - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> gseTaskArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> gseTaskArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // gse_script_agent_task - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> gseScriptAgentTaskArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> gseScriptAgentTaskArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // gse_file_agent_task - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> gseFileAgentTaskArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> gseFileAgentTaskArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // step_instance_rolling_task - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> stepInstanceRollingTaskArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> stepInstanceRollingTaskArchivist.archive(archiveConfig, maxNeedArchiveStepInstanceId, countDownLatch)); // rolling_config - ARCHIVE_THREAD_POOL_EXECUTOR.execute(() -> rollingConfigArchivist.archive(archiveConfig, + archiveExecutor.execute(() -> rollingConfigArchivist.archive(archiveConfig, maxNeedArchiveTaskInstanceId, countDownLatch)); log.info("Archive task submitted. Waiting for complete..."); diff --git a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceRunnable.java b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/config/ExecutorConfiguration.java similarity index 60% rename from src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceRunnable.java rename to src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/config/ExecutorConfiguration.java index f0a87977e0..79565daf87 100644 --- a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceRunnable.java +++ b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/config/ExecutorConfiguration.java @@ -22,44 +22,31 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.common.trace.executors; +package com.tencent.bk.job.backup.config; -import org.springframework.cloud.sleuth.Span; -import org.springframework.cloud.sleuth.Tracer; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -public class TraceRunnable implements Runnable { - /** - * 日志调用链tracer - */ - private final Tracer tracer; - /** - * 调用链父上下文 - */ - private final Span parent; - private Runnable delegate; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; - public TraceRunnable(Runnable runnable, Tracer tracer) { - this.delegate = runnable; - this.tracer = tracer; - this.parent = tracer.currentSpan(); - } +@Slf4j +@Configuration +public class ExecutorConfiguration { - @Override - public void run() { - Span span = null; - try { - span = tracer.nextSpan(parent).name("async"); - delegate.run(); - } catch (Throwable e) { - if (span != null) { - span.error(e); - } - throw e; - } finally { - if (span != null) { - span.end(); - } - } + @Bean("archiveExecutor") + public ThreadPoolExecutor archiveExecutor() { + return new ThreadPoolExecutor( + 20, + 20, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(20), + new ThreadFactoryBuilder().setNameFormat("archive-thread-pool-%d").build(), + new ThreadPoolExecutor.AbortPolicy() + ); } - } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java new file mode 100644 index 0000000000..eeae58ee69 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/ExecutorConfiguration.java @@ -0,0 +1,95 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.config; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Configuration +public class ExecutorConfiguration { + + @Bean("logExportExecutor") + public ThreadPoolExecutor logExportExecutor() { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("log-export-thread-%d").build(); + return new ThreadPoolExecutor( + 10, + 100, + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + threadFactory + ); + } + + @Bean("getHostsByTopoExecutor") + public ThreadPoolExecutor getHostsByTopoExecutor() { + return new ThreadPoolExecutor( + 50, + 100, + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>() + ); + } + + @Bean("localFilePrepareExecutor") + public ThreadPoolExecutor localFilePrepareExecutor(LocalFileConfigForExecute localFileConfigForExecute) { + int concurrency = localFileConfigForExecute.getDownloadConcurrency(); + return new ThreadPoolExecutor( + concurrency, + concurrency, + 180L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(100), + (r, executor) -> { + //使用请求的线程直接拉取数据 + log.error( + "download localupload file from artifactory runnable rejected," + + " use current thread({}), plz add more threads", + Thread.currentThread().getName()); + r.run(); + } + ); + } + + @Bean("shutdownExecutor") + public ThreadPoolExecutor shutdownExecutor() { + return new ThreadPoolExecutor( + 10, + 20, + 120, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>() + ); + } +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/JobExecuteAutoConfiguration.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/JobExecuteAutoConfiguration.java index 69ffb2eb9d..f1be73e260 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/JobExecuteAutoConfiguration.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/config/JobExecuteAutoConfiguration.java @@ -25,8 +25,6 @@ package com.tencent.bk.job.execute.config; import com.tencent.bk.job.common.artifactory.sdk.ArtifactoryClient; -import com.tencent.bk.job.common.cc.config.CmdbConfig; -import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; import com.tencent.bk.job.common.esb.metrics.EsbApiTimedAspect; import com.tencent.bk.job.common.service.AppScopeMappingService; import com.tencent.bk.job.execute.client.ApplicationResourceClient; @@ -43,13 +41,6 @@ public EsbApiTimedAspect esbApiTimedAspect(@Autowired MeterRegistry meterRegistr return new EsbApiTimedAspect(meterRegistry); } - @Bean - public CmdbConfigSetter cmdbConfigSetter(@Autowired CmdbConfig cmdbConfig) { - BizCmdbClient.setCcConfig(cmdbConfig); - BizCmdbClient.init(); - return new CmdbConfigSetter(); - } - @Bean public ArtifactoryClient artifactoryClient(@Autowired ArtifactoryConfig artifactoryConfig, @Autowired MeterRegistry meterRegistry) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java index ce7ccc7d57..f9b8d1b900 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/ArtifactoryLocalFilePrepareTask.java @@ -46,7 +46,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -67,28 +66,9 @@ public class ArtifactoryLocalFilePrepareTask implements JobTaskContext { private final String artifactoryRepo; private final String jobStorageRootPath; private final List> futureList = new ArrayList<>(); - public static ThreadPoolExecutor threadPoolExecutor = null; + private final ThreadPoolExecutor threadPoolExecutor; public static FinalResultHandler finalResultHandler = null; - public static void init(int concurrency) { - if (threadPoolExecutor == null) { - threadPoolExecutor = new ThreadPoolExecutor( - concurrency, - concurrency, - 180L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(100), - (r, executor) -> { - //使用请求的线程直接拉取数据 - log.error( - "download localupload file from artifactory runnable rejected," + - " use current thread({}), plz add more threads", - Thread.currentThread().getName()); - r.run(); - }); - } - } - public ArtifactoryLocalFilePrepareTask( Long stepInstanceId, boolean isForRetry, @@ -97,7 +77,8 @@ public ArtifactoryLocalFilePrepareTask( ArtifactoryClient artifactoryClient, String artifactoryProject, String artifactoryRepo, - String jobStorageRootPath + String jobStorageRootPath, + ThreadPoolExecutor threadPoolExecutor ) { this.stepInstanceId = stepInstanceId; this.isForRetry = isForRetry; @@ -107,6 +88,7 @@ public ArtifactoryLocalFilePrepareTask( this.artifactoryProject = artifactoryProject; this.artifactoryRepo = artifactoryRepo; this.jobStorageRootPath = jobStorageRootPath; + this.threadPoolExecutor = threadPoolExecutor; } @Override diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java index a60057e54a..a73da4c8bb 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/local/LocalFilePrepareService.java @@ -36,12 +36,13 @@ import com.tencent.bk.job.manage.common.consts.task.TaskFileTypeEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; @Slf4j @Service @@ -54,6 +55,7 @@ public class LocalFilePrepareService { private final TaskInstanceService taskInstanceService; private final ArtifactoryClient artifactoryClient; private final Map taskMap = new ConcurrentHashMap<>(); + private final ThreadPoolExecutor localFilePrepareExecutor; @Autowired public LocalFilePrepareService(StorageSystemConfig storageSystemConfig, @@ -61,18 +63,15 @@ public LocalFilePrepareService(StorageSystemConfig storageSystemConfig, LocalFileConfigForExecute localFileConfigForExecute, AgentService agentService, TaskInstanceService taskInstanceService, - ArtifactoryClient artifactoryClient) { + ArtifactoryClient artifactoryClient, + @Qualifier("localFilePrepareExecutor") ThreadPoolExecutor localFilePrepareExecutor) { this.storageSystemConfig = storageSystemConfig; this.artifactoryConfig = artifactoryConfig; this.localFileConfigForExecute = localFileConfigForExecute; this.agentService = agentService; this.taskInstanceService = taskInstanceService; this.artifactoryClient = artifactoryClient; - } - - @PostConstruct - private void init() { - ArtifactoryLocalFilePrepareTask.init(localFileConfigForExecute.getDownloadConcurrency()); + this.localFilePrepareExecutor = localFilePrepareExecutor; } public void stopPrepareLocalFilesAsync( @@ -111,7 +110,8 @@ public void prepareLocalFilesAsync( artifactoryClient, artifactoryConfig.getArtifactoryJobProject(), localFileConfigForExecute.getLocalUploadRepo(), - storageSystemConfig.getJobStorageRootPath() + storageSystemConfig.getJobStorageRootPath(), + localFilePrepareExecutor ); taskMap.put(stepInstanceId, task); task.execute(); diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java index 7f1d188fcf..109b27af2a 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java @@ -32,6 +32,7 @@ import com.tencent.bk.job.execute.monitor.metrics.ExecuteMonitor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.sleuth.Span; import org.springframework.cloud.sleuth.Tracer; import org.springframework.context.SmartLifecycle; @@ -45,8 +46,6 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -132,19 +131,20 @@ public class ResultHandleManager implements SmartLifecycle { * whether this component is currently running(Spring Lifecycle isRunning method) */ private volatile boolean running = false; - private final ExecutorService shutdownExecutorService = new ThreadPoolExecutor( - 10, 20, 120, TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); + private final ExecutorService shutdownExecutor; @Autowired public ResultHandleManager(Tracer tracer, ExecuteMonitor counters, ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager, - ResultHandleTaskSampler resultHandleTaskSampler, JobExecuteConfig jobExecuteConfig) { + ResultHandleTaskSampler resultHandleTaskSampler, + JobExecuteConfig jobExecuteConfig, + @Qualifier("shutdownExecutor") ExecutorService shutdownExecutor) { this.tracer = tracer; this.counters = counters; this.resultHandleTaskKeepaliveManager = resultHandleTaskKeepaliveManager; this.resultHandleTaskSampler = resultHandleTaskSampler; this.resultHandleLimiter = new ResultHandleLimiter(jobExecuteConfig.getResultHandleTasksLimit()); + this.shutdownExecutor = shutdownExecutor; } /** @@ -267,7 +267,7 @@ private void stopTasksGraceful() { stopTaskCounter.initCounter(scheduledTasks.keySet()); } for (ScheduledContinuousResultHandleTask task : scheduledTasks.values()) { - shutdownExecutorService.execute(new StopTask(task, tracer)); + shutdownExecutor.execute(new StopTask(task, tracer)); } } try { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ApplicationServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ApplicationServiceImpl.java index 0c5a4c01c4..101800e34e 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ApplicationServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ApplicationServiceImpl.java @@ -30,14 +30,12 @@ import com.tencent.bk.job.manage.model.inner.resp.ServiceApplicationDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Service; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -@DependsOn({"cmdbConfigSetter"}) @Service @Slf4j public class ApplicationServiceImpl implements ApplicationService { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/HostServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/HostServiceImpl.java index ce1c6fd5ca..e80b8f2aaa 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/HostServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/HostServiceImpl.java @@ -50,7 +50,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.DependsOn; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -64,7 +63,6 @@ import java.util.concurrent.TimeUnit; @Service -@DependsOn({"cmdbConfigSetter"}) @Slf4j public class HostServiceImpl implements HostService { private final WhiteIpResourceClient whiteIpResourceClient; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/LogExportServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/LogExportServiceImpl.java index b13fc61fb2..a4daf71ba1 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/LogExportServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/LogExportServiceImpl.java @@ -24,12 +24,10 @@ package com.tencent.bk.job.execute.service.impl; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.tencent.bk.job.common.artifactory.sdk.ArtifactoryClient; import com.tencent.bk.job.common.constant.JobConstants; import com.tencent.bk.job.common.model.dto.HostDTO; import com.tencent.bk.job.common.redis.util.LockUtils; -import com.tencent.bk.job.common.trace.executors.TraceableExecutorService; import com.tencent.bk.job.common.util.BatchUtil; import com.tencent.bk.job.common.util.date.DateUtils; import com.tencent.bk.job.common.util.file.ZipUtil; @@ -51,7 +49,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.helpers.MessageFormatter; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.sleuth.Tracer; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.StopWatch; @@ -66,10 +64,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; /** * @since 19/1/2021 12:01 @@ -79,7 +74,7 @@ public class LogExportServiceImpl implements LogExportService { private static final String EXPORT_KEY_PREFIX = "execute:log:export:"; private final LogService logService; - private final TraceableExecutorService logExportExecutor; + private final ExecutorService logExportExecutor; private final StringRedisTemplate redisTemplate; private final TaskInstanceService taskInstanceService; private final ArtifactoryClient artifactoryClient; @@ -89,13 +84,13 @@ public class LogExportServiceImpl implements LogExportService { @Autowired public LogExportServiceImpl(LogService logService, - Tracer tracer, StringRedisTemplate redisTemplate, TaskInstanceService taskInstanceService, ArtifactoryClient artifactoryClient, ArtifactoryConfig artifactoryConfig, LogExportConfig logExportConfig, - ScriptAgentTaskService scriptAgentTaskService) { + ScriptAgentTaskService scriptAgentTaskService, + @Qualifier("logExportExecutor") ExecutorService logExportExecutor) { this.logService = logService; this.redisTemplate = redisTemplate; this.taskInstanceService = taskInstanceService; @@ -103,9 +98,7 @@ public LogExportServiceImpl(LogService logService, this.artifactoryConfig = artifactoryConfig; this.logExportConfig = logExportConfig; this.scriptAgentTaskService = scriptAgentTaskService; - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("log-export-thread-%d").build(); - this.logExportExecutor = new TraceableExecutorService(new ThreadPoolExecutor(10, - 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory), tracer); + this.logExportExecutor = logExportExecutor; } @Override diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java index 497dd0568d..3daf722697 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java @@ -40,7 +40,6 @@ import com.tencent.bk.job.common.model.InternalResponse; import com.tencent.bk.job.common.model.dto.AppResourceScope; import com.tencent.bk.job.common.model.dto.HostDTO; -import com.tencent.bk.job.common.trace.executors.TraceableExecutorService; import com.tencent.bk.job.common.util.ArrayUtil; import com.tencent.bk.job.common.util.date.DateUtils; import com.tencent.bk.job.common.util.json.JsonUtils; @@ -118,7 +117,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.sleuth.Tracer; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.util.StopWatch; @@ -136,9 +135,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum.EXECUTE_SCRIPT; @@ -161,7 +157,7 @@ public class TaskExecuteServiceImpl implements TaskExecuteService { private final HostService hostService; private final ServiceUserResourceClient userResource; private final ExecuteAuthService executeAuthService; - private final ExecutorService GET_HOSTS_BY_TOPO_EXECUTOR; + private final ExecutorService getHostsByTopoExecutor; private final DangerousScriptCheckService dangerousScriptCheckService; private final RollingConfigService rollingConfigService; private final JobExecuteConfig jobExecuteConfig; @@ -182,7 +178,7 @@ public TaskExecuteServiceImpl(AccountService accountService, HostService hostService, ServiceUserResourceClient userResource, ExecuteAuthService executeAuthService, - Tracer tracer, + @Qualifier("getHostsByTopoExecutor") ExecutorService getHostsByTopoExecutor, DangerousScriptCheckService dangerousScriptCheckService, JobExecuteConfig jobExecuteConfig, TaskEvictPolicyExecutor taskEvictPolicyExecutor, @@ -199,8 +195,7 @@ public TaskExecuteServiceImpl(AccountService accountService, this.hostService = hostService; this.userResource = userResource; this.executeAuthService = executeAuthService; - this.GET_HOSTS_BY_TOPO_EXECUTOR = new TraceableExecutorService(new ThreadPoolExecutor(50, - 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()), tracer); + this.getHostsByTopoExecutor = getHostsByTopoExecutor; this.dangerousScriptCheckService = dangerousScriptCheckService; this.rollingConfigService = rollingConfigService; this.jobExecuteConfig = jobExecuteConfig; @@ -1869,7 +1864,7 @@ private void getTopoHostsConcurrent(long appId, List t CountDownLatch latch = new CountDownLatch(topoNodes.size()); List>>> futures = new ArrayList<>(topoNodes.size()); for (DynamicServerTopoNodeDTO topoNode : topoNodes) { - futures.add(GET_HOSTS_BY_TOPO_EXECUTOR.submit(new GetTopoHostTask(appId, topoNode, latch))); + futures.add(getHostsByTopoExecutor.submit(new GetTopoHostTask(appId, topoNode, latch))); } try { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TopoServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TopoServiceImpl.java index 6235552adc..7b0cf06f8a 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TopoServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TopoServiceImpl.java @@ -24,38 +24,28 @@ package com.tencent.bk.job.execute.service.impl; -import com.tencent.bk.job.common.cc.config.CmdbConfig; import com.tencent.bk.job.common.cc.model.InstanceTopologyDTO; import com.tencent.bk.job.common.cc.model.req.GetTopoNodePathReq; import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; -import com.tencent.bk.job.common.esb.config.EsbConfig; -import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient; import com.tencent.bk.job.common.util.CustomCollectionUtils; import com.tencent.bk.job.execute.model.DynamicServerTopoNodeDTO; import com.tencent.bk.job.execute.service.TopoService; -import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Service; import java.util.Collections; import java.util.List; -@DependsOn({"cmdbConfigSetter"}) @Service @Slf4j public class TopoServiceImpl implements TopoService { - private final BizCmdbClient ccClient; + + private final BizCmdbClient bizCmdbClient; @Autowired - public TopoServiceImpl( - EsbConfig esbConfig, - CmdbConfig cmdbConfig, - QueryAgentStatusClient queryAgentStatusClient, - MeterRegistry meterRegistry - ) { - ccClient = new BizCmdbClient(esbConfig, cmdbConfig, queryAgentStatusClient, meterRegistry); + public TopoServiceImpl(BizCmdbClient bizCmdbClient) { + this.bizCmdbClient = bizCmdbClient; } @Override @@ -63,7 +53,7 @@ public List batchGetTopoNodeHierarchy(long appId, List req.add(topoNode.getNodeType(), topoNode.getTopoNodeId())); - List hierarchyNodes = ccClient.getTopoInstancePath(req); + List hierarchyNodes = bizCmdbClient.getTopoInstancePath(req); log.debug("Get topo node hierarchy, req:{}, result:{}", req, hierarchyNodes); if (CustomCollectionUtils.isEmptyCollection(hierarchyNodes)) { return Collections.emptyList(); diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/build.gradle b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/build.gradle index c192ee89be..2200207fae 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/build.gradle +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/build.gradle @@ -24,6 +24,7 @@ dependencies { api project(":commons:common-i18n") + api project(":commons:common-otel") api project(":job-file-worker-sdk:api-job-file-worker-sdk") api "org.springframework.boot:spring-boot-starter-web" api "ch.qos.logback:logback-core" @@ -37,10 +38,6 @@ dependencies { api 'io.springfox:springfox-swagger2' api 'io.springfox:springfox-swagger-ui' api 'net.coobird:thumbnailator:0.4.14' - api "org.springframework.cloud:spring-cloud-sleuth-api" - api "org.springframework.cloud:spring-cloud-sleuth-otel" - api "org.springframework.cloud:spring-cloud-sleuth-otel-autoconfigure" - api "io.opentelemetry:opentelemetry-exporter-otlp-trace" testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.apache.commons:commons-lang3' } diff --git a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceCallable.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/ExecutorConfiguration.java similarity index 55% rename from src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceCallable.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/ExecutorConfiguration.java index 65ac6e9b17..0d763d5b42 100644 --- a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/trace/executors/TraceCallable.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/ExecutorConfiguration.java @@ -22,45 +22,42 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.common.trace.executors; +package com.tencent.bk.job.file.worker.config; -import org.springframework.cloud.sleuth.Span; -import org.springframework.cloud.sleuth.Tracer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; -public class TraceCallable implements Callable { - /** - * 日志调用链tracer - */ - private final Tracer tracer; - /** - * 调用链父上下文 - */ - private final Span parent; - private Callable delegate; +@Slf4j +@Configuration +public class ExecutorConfiguration { - public TraceCallable(Callable callable, Tracer tracer) { - this.delegate = callable; - this.tracer = tracer; - this.parent = tracer.currentSpan(); + @Bean("fileTaskExecutor") + public ThreadPoolExecutor fileTaskExecutor() { + return new ThreadPoolExecutor( + 20, + 50, + 1, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>(1000), (r, executor) -> { + log.error("fileTaskWorkerPool rejected a task, use current thread now, plz add more threads"); + r.run(); + }); } - @Override - public V call() throws Exception { - Span span = null; - try { - span = tracer.nextSpan(parent).name("async"); - return delegate.call(); - } catch (Throwable e) { - if (span != null) { - span.error(e); - } - throw e; - } finally { - if (span != null) { - span.end(); - } - } + @Bean("watchingTaskExecutor") + public ThreadPoolExecutor watchingTaskExecutor() { + return new ThreadPoolExecutor( + 20, + 50, + 1, + TimeUnit.MINUTES + , new LinkedBlockingQueue<>(1000), (r, executor) -> + log.error("watchingTaskExecutor rejected a task, ignore, plz add more threads") + ); } } diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileTaskService.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileTaskService.java index 00b5104636..91a5bd7031 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileTaskService.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileTaskService.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.tomcat.util.http.fileupload.FileUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.io.File; @@ -36,7 +37,9 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -44,27 +47,20 @@ @Service public class FileTaskService { - private static final ThreadPoolExecutor fileTaskExecutor = new ThreadPoolExecutor(20, 50, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(1000), new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - log.error("fileTaskWorkerPool rejected a task, use current thread now, plz add more threads"); - r.run(); - } - }); - private static final ThreadPoolExecutor watchingTaskExecutor = new ThreadPoolExecutor(20, 50, 1, TimeUnit.MINUTES - , new LinkedBlockingQueue<>(1000), new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - log.error("watchingTaskExecutor rejected a task, ignore, plz add more threads"); - } - }); + private final ThreadPoolExecutor fileTaskExecutor; + private final ThreadPoolExecutor watchingTaskExecutor; private static final ConcurrentHashMap> fileTaskMap = new ConcurrentHashMap<>(); private static final ConcurrentHashMap> watchingTaskMap = new ConcurrentHashMap<>(); private final WorkerConfig workerConfig; private final TaskReporter taskReporter; + @Autowired - public FileTaskService(WorkerConfig workerConfig, TaskReporter taskReporter) { + public FileTaskService(@Qualifier("fileTaskExecutor") ThreadPoolExecutor fileTaskExecutor, + @Qualifier("watchingTaskExecutor") ThreadPoolExecutor watchingTaskExecutor, + WorkerConfig workerConfig, + TaskReporter taskReporter) { + this.fileTaskExecutor = fileTaskExecutor; + this.watchingTaskExecutor = watchingTaskExecutor; this.workerConfig = workerConfig; this.taskReporter = taskReporter; } diff --git a/src/backend/job-file-worker/boot-job-file-worker/src/main/resources/bootstrap.yml b/src/backend/job-file-worker/boot-job-file-worker/src/main/resources/bootstrap.yml index cb75cb699c..67fbda41df 100644 --- a/src/backend/job-file-worker/boot-job-file-worker/src/main/resources/bootstrap.yml +++ b/src/backend/job-file-worker/boot-job-file-worker/src/main/resources/bootstrap.yml @@ -13,4 +13,4 @@ spring: name: ${JOB_COMMON_CONFIGMAP_NAME} logging: pattern: - level: "%5p [${spring.application.name:},%X{trace_id:-},%X{span_id:-}]" + level: "%5p [${spring.application.name:},%X{traceId:-},%X{spanId:-}]" diff --git a/src/backend/job-manage/boot-job-manage/src/main/java/com/tencent/bk/job/manage/JobManageBootApplication.java b/src/backend/job-manage/boot-job-manage/src/main/java/com/tencent/bk/job/manage/JobManageBootApplication.java index 60784b714c..08f4a2e0ce 100644 --- a/src/backend/job-manage/boot-job-manage/src/main/java/com/tencent/bk/job/manage/JobManageBootApplication.java +++ b/src/backend/job-manage/boot-job-manage/src/main/java/com/tencent/bk/job/manage/JobManageBootApplication.java @@ -24,22 +24,13 @@ package com.tencent.bk.job.manage; -import com.tencent.bk.job.common.cc.config.CmdbConfig; -import com.tencent.bk.job.common.cc.service.CloudAreaService; import com.tencent.bk.job.common.config.FeatureToggleConfig; -import com.tencent.bk.job.common.esb.config.EsbConfig; -import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient; -import com.tencent.bk.job.common.util.ApplicationContextRegister; -import io.micrometer.core.instrument.MeterRegistry; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cache.annotation.EnableCaching; import org.springframework.cloud.openfeign.EnableFeignClients; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.DependsOn; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication(scanBasePackages = "com.tencent.bk.job", exclude = {RedisAutoConfiguration.class}) @@ -47,18 +38,10 @@ @EnableCaching @EnableFeignClients @EnableScheduling -@DependsOn({"applicationContextRegister", "cmdbConfigSetter"}) public class JobManageBootApplication { public static void main(String[] args) { SpringApplication.run(JobManageBootApplication.class, args); } - @Bean - CloudAreaService buildCloudAreaService(@Autowired EsbConfig esbConfig, @Autowired CmdbConfig cmdbConfig) { - QueryAgentStatusClient queryAgentStatusClient = - ApplicationContextRegister.getBean(QueryAgentStatusClient.class); - MeterRegistry meterRegistry = ApplicationContextRegister.getBean(MeterRegistry.class); - return new CloudAreaService(esbConfig, cmdbConfig, queryAgentStatusClient, meterRegistry); - } } diff --git a/src/backend/job-manage/service-job-manage/build.gradle b/src/backend/job-manage/service-job-manage/build.gradle index 7a285b0511..6b7e21f646 100644 --- a/src/backend/job-manage/service-job-manage/build.gradle +++ b/src/backend/job-manage/service-job-manage/build.gradle @@ -52,9 +52,6 @@ dependencies { implementation "commons-io:commons-io" implementation "commons-codec:commons-codec" implementation 'joda-time:joda-time' - implementation "org.springframework.cloud:spring-cloud-sleuth-api" - implementation "org.springframework.cloud:spring-cloud-sleuth-otel-autoconfigure" - implementation "io.opentelemetry:opentelemetry-exporter-otlp-trace" testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.apache.commons:commons-lang3' } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/api/web/impl/WebGlobalSettingsQueryResourceImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/api/web/impl/WebGlobalSettingsQueryResourceImpl.java index 4c31f8fd99..7d6d6c4cb1 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/api/web/impl/WebGlobalSettingsQueryResourceImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/api/web/impl/WebGlobalSettingsQueryResourceImpl.java @@ -45,12 +45,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -65,10 +65,7 @@ public class WebGlobalSettingsQueryResourceImpl implements WebGlobalSettingsQuer private final NoResourceScopeAuthService noResourceScopeAuthService; private final AppAuthService appAuthService; private final ScriptService scriptService; - - private final ThreadPoolExecutor executor = new ThreadPoolExecutor( - 5, 5, 30, TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); + private final ThreadPoolExecutor adminAuthExecutor; @Autowired public WebGlobalSettingsQueryResourceImpl(GlobalSettingsService globalSettingsService, @@ -76,13 +73,15 @@ public WebGlobalSettingsQueryResourceImpl(GlobalSettingsService globalSettingsSe JobManageConfig jobManageConfig, NoResourceScopeAuthService noResourceScopeAuthService, AppAuthService appAuthService, - ScriptService scriptService) { + ScriptService scriptService, + @Qualifier("adminAuthExecutor") ThreadPoolExecutor adminAuthExecutor) { this.globalSettingsService = globalSettingsService; this.applicationService = applicationService; this.jobManageConfig = jobManageConfig; this.noResourceScopeAuthService = noResourceScopeAuthService; this.appAuthService = appAuthService; this.scriptService = scriptService; + this.adminAuthExecutor = adminAuthExecutor; } @Override @@ -99,7 +98,7 @@ public Response getAccountNameRules(String userna public Response isAdmin(String username) { AtomicBoolean flag = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(9); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { AuthResult createWhiteListAuthResultVO = noResourceScopeAuthService.authCreateWhiteList(username); flag.set(flag.get() || createWhiteListAuthResultVO.isPass()); @@ -109,7 +108,7 @@ public Response isAdmin(String username) { latch.countDown(); } }); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { AuthResult manageWhiteListAuthResultVO = noResourceScopeAuthService.authManageWhiteList(username); flag.set(flag.get() || manageWhiteListAuthResultVO.isPass()); @@ -119,7 +118,7 @@ public Response isAdmin(String username) { latch.countDown(); } }); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { AuthResult createPublicScriptAuthResultVO = noResourceScopeAuthService.authCreatePublicScript(username); flag.set(flag.get() || createPublicScriptAuthResultVO.isPass()); @@ -129,7 +128,7 @@ public Response isAdmin(String username) { latch.countDown(); } }); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { // 是否能管理某些公共脚本 List canManagePublicScriptIds = @@ -144,7 +143,7 @@ public Response isAdmin(String username) { latch.countDown(); } }); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { AuthResult globalSettingsAuthResultVO = noResourceScopeAuthService.authGlobalSetting(username); flag.set(flag.get() || globalSettingsAuthResultVO.isPass()); @@ -154,7 +153,7 @@ public Response isAdmin(String username) { latch.countDown(); } }); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { AuthResult authResult = noResourceScopeAuthService.authViewDashBoard(username, AnalysisConsts.GLOBAL_DASHBOARD_VIEW_ID); @@ -165,7 +164,7 @@ public Response isAdmin(String username) { latch.countDown(); } }); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { AuthResult serviceInfoAuthResultVO = noResourceScopeAuthService.authViewServiceState(username); flag.set(flag.get() || serviceInfoAuthResultVO.isPass()); @@ -175,7 +174,7 @@ public Response isAdmin(String username) { latch.countDown(); } }); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { AuthResult highRiskRuleAuthResultVO = noResourceScopeAuthService.authHighRiskDetectRule(username); flag.set(flag.get() || highRiskRuleAuthResultVO.isPass()); @@ -185,7 +184,7 @@ public Response isAdmin(String username) { latch.countDown(); } }); - executor.submit(() -> { + adminAuthExecutor.submit(() -> { try { AuthResult highRiskRecordAuthResultVO = noResourceScopeAuthService.authHighRiskDetectRecord(username); flag.set(flag.get() || highRiskRecordAuthResultVO.isPass()); @@ -252,6 +251,6 @@ public Response> getJobConfig(String username) { @Override public void destroy() { - executor.shutdown(); + adminAuthExecutor.shutdown(); } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/config/ExecutorConfiguration.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/config/ExecutorConfiguration.java new file mode 100644 index 0000000000..b341bb74e0 --- /dev/null +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/config/ExecutorConfiguration.java @@ -0,0 +1,146 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.manage.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@Configuration +public class ExecutorConfiguration { + + @Bean("syncAgentStatusExecutor") + public ThreadPoolExecutor syncAgentStatusExecutor() { + ThreadPoolExecutor syncAgentStatusExecutor = new ThreadPoolExecutor( + 5, + 5, + 1L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(5000), (r, executor) -> + log.error( + "syncAgentStatusExecutor Runnable rejected! executor.poolSize={}, executor.queueSize={}", + executor.getPoolSize(), + executor.getQueue().size() + ) + ); + syncAgentStatusExecutor.setThreadFactory(getThreadFactoryByNameAndSeq("syncAgentStatusExecutor-", + new AtomicInteger(1))); + return syncAgentStatusExecutor; + } + + @Bean("syncHostExecutor") + public ThreadPoolExecutor syncHostExecutor() { + ThreadPoolExecutor syncHostExecutor = new ThreadPoolExecutor( + 5, + 5, + 1L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(5000), (r, executor) -> + log.error( + "syncHostExecutor Runnable rejected! executor.poolSize={}, executor.queueSize={}", + executor.getPoolSize(), + executor.getQueue().size() + ) + ); + syncHostExecutor.setThreadFactory(getThreadFactoryByNameAndSeq("syncHostExecutor-", + new AtomicInteger(1))); + return syncHostExecutor; + } + + @Bean("syncAppExecutor") + public ThreadPoolExecutor syncAppExecutor() { + ThreadPoolExecutor syncAppExecutor = new ThreadPoolExecutor( + 5, + 5, + 1L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(20), (r, executor) -> + log.error( + "syncAppExecutor Runnable rejected! executor.poolSize={}, executor.queueSize={}", + executor.getPoolSize(), + executor.getQueue().size() + ) + ); + syncAppExecutor.setThreadFactory( + getThreadFactoryByNameAndSeq("syncAppExecutor-", new AtomicInteger(1)) + ); + return syncAppExecutor; + } + + @Bean("notifySendExecutor") + public ThreadPoolExecutor notifySendExecutor() { + return new ThreadPoolExecutor( + 5, + 30, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(10) + ); + } + + @Bean("dangerousRuleCheckExecutor") + public ExecutorService dangerousRuleCheckExecutor() { + return new ThreadPoolExecutor( + 10, + 50, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>() + ); + } + + @Bean("adminAuthExecutor") + public ThreadPoolExecutor adminAuthExecutor() { + return new ThreadPoolExecutor( + 5, + 5, + 30, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>() + ); + } + + private ThreadFactory getThreadFactoryByNameAndSeq(String namePrefix, AtomicInteger seq) { + return r -> { + Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, + namePrefix + seq.getAndIncrement(), + 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + }; + } +} diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/config/JobManageAutoConfiguration.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/config/JobManageAutoConfiguration.java index 3ed44bdd44..790ab63dd7 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/config/JobManageAutoConfiguration.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/config/JobManageAutoConfiguration.java @@ -25,8 +25,6 @@ package com.tencent.bk.job.manage.config; import com.tencent.bk.job.common.artifactory.sdk.ArtifactoryClient; -import com.tencent.bk.job.common.cc.config.CmdbConfig; -import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; import com.tencent.bk.job.common.encrypt.Encryptor; import com.tencent.bk.job.common.encrypt.RSAEncryptor; import com.tencent.bk.job.common.esb.metrics.EsbApiTimedAspect; @@ -50,13 +48,6 @@ public ArtifactoryClient artifactoryClient(@Autowired ArtifactoryConfig artifact meterRegistry); } - @Bean - public CmdbConfigSetter cmdbConfigSetter(@Autowired CmdbConfig cmdbConfig) { - BizCmdbClient.setCcConfig(cmdbConfig); - BizCmdbClient.init(); - return new CmdbConfigSetter(); - } - static class CmdbConfigSetter { CmdbConfigSetter() { } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/metrics/StaticMetricsConfig.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/metrics/StaticMetricsConfig.java index a72a27c13f..b2cae84c34 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/metrics/StaticMetricsConfig.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/metrics/StaticMetricsConfig.java @@ -24,7 +24,6 @@ package com.tencent.bk.job.manage.metrics; -import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; import com.tencent.bk.job.common.redis.util.RedisSlideWindowFlowController; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; @@ -44,22 +43,22 @@ public class StaticMetricsConfig { @Autowired public StaticMetricsConfig( + ThreadPoolExecutor cmdbThreadPoolExecutor, MeterRegistry meterRegistry, RedisSlideWindowFlowController cmdbGlobalFlowController ) { // CMDB请求线程池大小 - ThreadPoolExecutor cmdbQueryThreadPool = BizCmdbClient.threadPoolExecutor; meterRegistry.gauge( MetricsConstants.NAME_CMDB_QUERY_POOL_SIZE, Collections.singletonList(Tag.of(MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_CMDB)), - cmdbQueryThreadPool, + cmdbThreadPoolExecutor, ThreadPoolExecutor::getPoolSize ); // CMDB请求线程池队列大小 meterRegistry.gauge( MetricsConstants.NAME_CMDB_QUERY_QUEUE_SIZE, Collections.singletonList(Tag.of(MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_CMDB)), - cmdbQueryThreadPool, + cmdbThreadPoolExecutor, threadPoolExecutor -> threadPoolExecutor.getQueue().size() ); try { diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/SyncService.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/SyncService.java index 656fbbacae..3411af98cf 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/SyncService.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/SyncService.java @@ -47,8 +47,6 @@ public interface SyncService { Long getLastFinishTimeSyncAgentStatus(); - ThreadPoolExecutor getSyncAppExecutor(); - ThreadPoolExecutor getSyncHostExecutor(); ThreadPoolExecutor getSyncAgentStatusExecutor(); diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/MeasureServiceImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/MeasureServiceImpl.java index d64442ee5b..44220d2599 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/MeasureServiceImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/MeasureServiceImpl.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.ThreadPoolExecutor; @Slf4j @Service @@ -56,13 +57,14 @@ public class MeasureServiceImpl implements MeasureService { private final TaskTemplateService taskTemplateService; private final TaskPlanService taskPlanService; private final SyncService syncService; + private final ThreadPoolExecutor syncAppExecutor; @Autowired public MeasureServiceImpl(MeterRegistry meterRegistry, AccountDAO accountDAO, ApplicationDAO applicationDAO, ApplicationHostDAO applicationHostDAO, ScriptDAO scriptDAO, WhiteIPRecordDAO whiteIPRecordDAO, TaskTemplateService taskTemplateService, TaskPlanService taskPlanService, - SyncService syncService) { + SyncService syncService, ThreadPoolExecutor syncAppExecutor) { this.meterRegistry = meterRegistry; this.accountDAO = accountDAO; this.applicationDAO = applicationDAO; @@ -72,6 +74,7 @@ public MeasureServiceImpl(MeterRegistry meterRegistry, AccountDAO accountDAO, this.taskTemplateService = taskTemplateService; this.taskPlanService = taskPlanService; this.syncService = syncService; + this.syncAppExecutor = syncAppExecutor; } @Override @@ -230,14 +233,14 @@ private void measureSyncAppExecutor() { meterRegistry.gauge( MetricsConstants.NAME_SYNC_APP_EXECUTOR_POOL_SIZE, Collections.singletonList(Tag.of(MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_SYNC)), - this.syncService, - syncService1 -> syncService1.getSyncAppExecutor().getPoolSize() + this.syncAppExecutor, + ThreadPoolExecutor::getPoolSize ); meterRegistry.gauge( MetricsConstants.NAME_SYNC_APP_EXECUTOR_QUEUE_SIZE, Collections.singletonList(Tag.of(MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_SYNC)), - this.syncService, - syncService1 -> syncService1.getSyncAppExecutor().getQueue().size() + this.syncAppExecutor, + syncAppExecutor -> syncAppExecutor.getQueue().size() ); } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/ScriptCheckServiceImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/ScriptCheckServiceImpl.java index 3ca7ad9fa4..914b71b02f 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/ScriptCheckServiceImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/ScriptCheckServiceImpl.java @@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.helpers.MessageFormatter; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -51,8 +52,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @Slf4j @@ -60,13 +59,13 @@ public class ScriptCheckServiceImpl implements ScriptCheckService { private final DangerousRuleCache dangerousRuleCache; - private final ExecutorService executor = new ThreadPoolExecutor( - 10, 50, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>()); + private final ExecutorService dangerousRuleCheckExecutor; @Autowired - public ScriptCheckServiceImpl(DangerousRuleCache dangerousRuleCache) { + public ScriptCheckServiceImpl(DangerousRuleCache dangerousRuleCache, + @Qualifier("dangerousRuleCheckExecutor") ExecutorService dangerousRuleCheckExecutor) { this.dangerousRuleCache = dangerousRuleCache; + this.dangerousRuleCheckExecutor = dangerousRuleCheckExecutor; } @Override @@ -82,21 +81,24 @@ public List check(ScriptTypeEnum scriptType, String co int timeout = 5; ScriptCheckParam scriptCheckParam = new ScriptCheckParam(scriptType, content); Future> dangerousRuleCheckResultItems = - executor.submit(new DangerousRuleScriptChecker(scriptCheckParam, dangerousRules)); + dangerousRuleCheckExecutor.submit(new DangerousRuleScriptChecker(scriptCheckParam, dangerousRules)); if (ScriptTypeEnum.SHELL.equals(scriptType)) { - Future> grammar = executor.submit(new ScriptGrammarChecker(scriptType, - content)); + Future> grammar = dangerousRuleCheckExecutor.submit( + new ScriptGrammarChecker(scriptType, content) + ); Future> danger = - executor.submit(new BuildInDangerousScriptChecker(scriptCheckParam)); + dangerousRuleCheckExecutor.submit(new BuildInDangerousScriptChecker(scriptCheckParam)); Future> logic = - executor.submit(new ScriptLogicChecker(scriptCheckParam)); + dangerousRuleCheckExecutor.submit(new ScriptLogicChecker(scriptCheckParam)); - Future> io = executor.submit(new IOScriptChecker(scriptCheckParam)); + Future> io = dangerousRuleCheckExecutor.submit( + new IOScriptChecker(scriptCheckParam) + ); Future> device = - executor.submit(new DeviceCrashScriptChecker(scriptCheckParam)); + dangerousRuleCheckExecutor.submit(new DeviceCrashScriptChecker(scriptCheckParam)); checkResultList.addAll(grammar.get(timeout, TimeUnit.SECONDS)); checkResultList.addAll(logic.get(timeout, TimeUnit.SECONDS)); checkResultList.addAll(danger.get(timeout, TimeUnit.SECONDS)); @@ -133,7 +135,7 @@ public List checkScriptWithDangerousRule(ScriptTypeEnu if (CollectionUtils.isEmpty(dangerousRules)) { return Collections.emptyList(); } - Future> dangerousRuleCheckResultItems = executor.submit( + Future> dangerousRuleCheckResultItems = dangerousRuleCheckExecutor.submit( new DangerousRuleScriptChecker(scriptCheckParam, dangerousRules)); checkResultList.addAll(dangerousRuleCheckResultItems.get(timeout, TimeUnit.SECONDS)); checkResultList.sort(Comparator.comparingInt(ScriptCheckResultItemDTO::getLine)); diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/notify/NotifySendService.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/notify/NotifySendService.java index 93819840db..ad9b086574 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/notify/NotifySendService.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/notify/NotifySendService.java @@ -24,7 +24,6 @@ package com.tencent.bk.job.manage.service.impl.notify; -import com.tencent.bk.job.common.trace.executors.TraceableExecutorService; import com.tencent.bk.job.manage.dao.notify.EsbUserInfoDAO; import com.tencent.bk.job.manage.metrics.MetricsConstants; import com.tencent.bk.job.manage.service.impl.WatchableSendMsgService; @@ -33,66 +32,49 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.sleuth.Tracer; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; @Slf4j @Service public class NotifySendService { //发通知专用线程池 - private final TraceableExecutorService notifySendExecutor; - + private final ThreadPoolExecutor notifySendExecutor; private final WatchableSendMsgService watchableSendMsgService; private final EsbUserInfoDAO esbUserInfoDAO; @Autowired public NotifySendService(WatchableSendMsgService watchableSendMsgService, EsbUserInfoDAO esbUserInfoDAO, - Tracer tracer, + @Qualifier("notifySendExecutor") ThreadPoolExecutor notifySendExecutor, MeterRegistry meterRegistry) { this.watchableSendMsgService = watchableSendMsgService; this.esbUserInfoDAO = esbUserInfoDAO; - notifySendExecutor = new TraceableExecutorService( - new ThreadPoolExecutor( - 5, - 30, - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(10) - ), - tracer - ); + this.notifySendExecutor = notifySendExecutor; measureNotifySendExecutor(meterRegistry); } private void measureNotifySendExecutor(MeterRegistry meterRegistry) { - ExecutorService executorService = notifySendExecutor.getDelegateExecutorService(); - if (executorService instanceof ThreadPoolExecutor) { - ThreadPoolExecutor notifySendExecutor = (ThreadPoolExecutor) executorService; - meterRegistry.gauge( - MetricsConstants.NAME_NOTIFY_POOL_SIZE, - Collections.singletonList(Tag.of(MetricsConstants.TAG_KEY_MODULE, - MetricsConstants.TAG_VALUE_MODULE_NOTIFY)), - notifySendExecutor, - ThreadPoolExecutor::getPoolSize - ); - meterRegistry.gauge( - MetricsConstants.NAME_NOTIFY_QUEUE_SIZE, - Collections.singletonList(Tag.of(MetricsConstants.TAG_KEY_MODULE, - MetricsConstants.TAG_VALUE_MODULE_NOTIFY)), - notifySendExecutor, - threadPoolExecutor -> threadPoolExecutor.getQueue().size() - ); - } + meterRegistry.gauge( + MetricsConstants.NAME_NOTIFY_POOL_SIZE, + Collections.singletonList(Tag.of(MetricsConstants.TAG_KEY_MODULE, + MetricsConstants.TAG_VALUE_MODULE_NOTIFY)), + notifySendExecutor, + ThreadPoolExecutor::getPoolSize + ); + meterRegistry.gauge( + MetricsConstants.NAME_NOTIFY_QUEUE_SIZE, + Collections.singletonList(Tag.of(MetricsConstants.TAG_KEY_MODULE, + MetricsConstants.TAG_VALUE_MODULE_NOTIFY)), + notifySendExecutor, + threadPoolExecutor -> threadPoolExecutor.getQueue().size() + ); } private SendNotifyTask buildSendTask(Long appId, diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BasicAppSyncService.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BasicAppSyncService.java index fa022c27dc..3f03d1048f 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BasicAppSyncService.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BasicAppSyncService.java @@ -24,7 +24,6 @@ package com.tencent.bk.job.manage.service.impl.sync; -import com.tencent.bk.job.common.cc.sdk.CmdbClientFactory; import com.tencent.bk.job.common.cc.sdk.IBizCmdbClient; import com.tencent.bk.job.common.constant.ResourceScopeTypeEnum; import com.tencent.bk.job.common.model.dto.ApplicationDTO; @@ -51,17 +50,19 @@ public class BasicAppSyncService { private final ApplicationDAO applicationDAO; private final ApplicationHostDAO applicationHostDAO; private final ApplicationService applicationService; - protected final IBizCmdbClient bizCmdbClient = CmdbClientFactory.getCmdbClient(); + protected final IBizCmdbClient bizCmdbClient; @Autowired public BasicAppSyncService(DSLContext dslContext, ApplicationDAO applicationDAO, ApplicationHostDAO applicationHostDAO, - ApplicationService applicationService) { + ApplicationService applicationService, + IBizCmdbClient bizCmdbClient) { this.dslContext = dslContext; this.applicationDAO = applicationDAO; this.applicationHostDAO = applicationHostDAO; this.applicationService = applicationService; + this.bizCmdbClient = bizCmdbClient; } protected Map genScopeAppIdMap(List appList) { diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BizSetSyncService.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BizSetSyncService.java index 9017f5591f..7c38b35208 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BizSetSyncService.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BizSetSyncService.java @@ -27,6 +27,7 @@ import com.tencent.bk.job.common.cc.model.bizset.BizInfo; import com.tencent.bk.job.common.cc.model.bizset.BizSetInfo; import com.tencent.bk.job.common.cc.model.bizset.BizSetScope; +import com.tencent.bk.job.common.cc.sdk.IBizCmdbClient; import com.tencent.bk.job.common.cc.sdk.IBizSetCmdbClient; import com.tencent.bk.job.common.constant.ResourceScopeTypeEnum; import com.tencent.bk.job.common.model.dto.ApplicationAttrsDO; @@ -63,9 +64,10 @@ public BizSetSyncService(DSLContext dslContext, ApplicationDAO applicationDAO, ApplicationHostDAO applicationHostDAO, ApplicationService applicationService, + IBizCmdbClient bizCmdbClient, IBizSetCmdbClient bizSetCmdbClient, BizSetService bizSetService) { - super(dslContext, applicationDAO, applicationHostDAO, applicationService); + super(dslContext, applicationDAO, applicationHostDAO, applicationService, bizCmdbClient); this.applicationDAO = applicationDAO; this.bizSetCmdbClient = bizSetCmdbClient; this.bizSetService = bizSetService; diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BizSyncService.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BizSyncService.java index de2adb6f35..e253569eb9 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BizSyncService.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/BizSyncService.java @@ -24,6 +24,7 @@ package com.tencent.bk.job.manage.service.impl.sync; +import com.tencent.bk.job.common.cc.sdk.IBizCmdbClient; import com.tencent.bk.job.common.model.dto.ApplicationDTO; import com.tencent.bk.job.manage.dao.ApplicationDAO; import com.tencent.bk.job.manage.dao.ApplicationHostDAO; @@ -50,8 +51,9 @@ public class BizSyncService extends BasicAppSyncService { public BizSyncService(DSLContext dslContext, ApplicationDAO applicationDAO, ApplicationHostDAO applicationHostDAO, - ApplicationService applicationService) { - super(dslContext, applicationDAO, applicationHostDAO, applicationService); + ApplicationService applicationService, + IBizCmdbClient bizCmdbClient) { + super(dslContext, applicationDAO, applicationHostDAO, applicationService, bizCmdbClient); this.applicationDAO = applicationDAO; } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/SyncServiceImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/SyncServiceImpl.java index 4305b4fa73..e73488a3cd 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/SyncServiceImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/SyncServiceImpl.java @@ -55,15 +55,12 @@ import java.util.Arrays; import java.util.List; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @Slf4j @@ -140,7 +137,10 @@ public SyncServiceImpl(@Qualifier("job-manage-dsl-context") DSLContext dslContex ApplicationCache applicationCache, HostCache hostCache, BizSetEventWatcher bizSetEventWatcher, - BizSetRelationEventWatcher bizSetRelationEventWatcher) { + BizSetRelationEventWatcher bizSetRelationEventWatcher, + @Qualifier("syncAppExecutor") ThreadPoolExecutor syncAppExecutor, + @Qualifier("syncHostExecutor") ThreadPoolExecutor syncHostExecutor, + @Qualifier("syncAgentStatusExecutor") ThreadPoolExecutor syncAgentStatusExecutor) { this.dslContext = dslContext; this.applicationDAO = applicationDAO; this.applicationHostDAO = applicationHostDAO; @@ -161,27 +161,11 @@ public SyncServiceImpl(@Qualifier("job-manage-dsl-context") DSLContext dslContex this.bizSetEventWatcher = bizSetEventWatcher; this.bizSetRelationEventWatcher = bizSetRelationEventWatcher; // 同步业务的线程池配置 - syncAppExecutor = new ThreadPoolExecutor(5, 5, 1L, - TimeUnit.SECONDS, new ArrayBlockingQueue<>(20), (r, executor) -> - log.error( - "syncAppExecutor Runnable rejected! executor.poolSize={}, executor.queueSize={}", - executor.getPoolSize(), executor.getQueue().size())); - syncAppExecutor.setThreadFactory(getThreadFactoryByNameAndSeq("syncAppExecutor-", - new AtomicInteger(1))); + this.syncAppExecutor = syncAppExecutor; // 同步主机的线程池配置 - syncHostExecutor = new ThreadPoolExecutor(5, 5, 1L, - TimeUnit.SECONDS, new ArrayBlockingQueue<>(5000), (r, executor) -> - log.error("syncHostExecutor Runnable rejected! executor.poolSize={}, executor.queueSize={}", - executor.getPoolSize(), executor.getQueue().size())); - syncHostExecutor.setThreadFactory(getThreadFactoryByNameAndSeq("syncHostExecutor-", - new AtomicInteger(1))); + this.syncHostExecutor = syncHostExecutor; // 同步主机Agent状态的线程池配置 - syncAgentStatusExecutor = new ThreadPoolExecutor(5, 5, 1L, - TimeUnit.SECONDS, new ArrayBlockingQueue<>(5000), - (r, executor) -> log.error("syncAgentStatusExecutor Runnable rejected! executor.poolSize={}, executor" - + ".queueSize={}", executor.getPoolSize(), executor.getQueue().size())); - syncAgentStatusExecutor.setThreadFactory(getThreadFactoryByNameAndSeq("syncAgentStatusExecutor-", - new AtomicInteger(1))); + this.syncAgentStatusExecutor = syncAgentStatusExecutor; } @Override @@ -263,24 +247,6 @@ public boolean addExtraSyncBizHostsTask(Long bizId) { return false; } - private ThreadFactory getThreadFactoryByNameAndSeq(String namePrefix, AtomicInteger seq) { - return r -> { - Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, - namePrefix + seq.getAndIncrement(), - 0); - if (t.isDaemon()) - t.setDaemon(false); - if (t.getPriority() != Thread.NORM_PRIORITY) - t.setPriority(Thread.NORM_PRIORITY); - return t; - }; - } - - @Override - public ThreadPoolExecutor getSyncAppExecutor() { - return syncAppExecutor; - } - @Override public ThreadPoolExecutor getSyncHostExecutor() { return syncHostExecutor; diff --git a/src/backend/settings.gradle b/src/backend/settings.gradle index b2a0365bab..b220a66ccf 100644 --- a/src/backend/settings.gradle +++ b/src/backend/settings.gradle @@ -34,6 +34,7 @@ include 'commons:paas-sdk' include 'commons:gse-sdk' include 'commons:common-iam' include 'commons:common-utils' +include 'commons:common-otel' include 'commons:common-redis' include 'commons:common-security' include 'commons:common-discovery' diff --git a/support-files/javaagent/opentelemetry-javaagent.jar b/support-files/javaagent/opentelemetry-javaagent.jar deleted file mode 100644 index 94cdbb60a3..0000000000 Binary files a/support-files/javaagent/opentelemetry-javaagent.jar and /dev/null differ diff --git a/support-files/kubernetes/charts/bk-job/VALUES_LOG.md b/support-files/kubernetes/charts/bk-job/VALUES_LOG.md index 971f8c3159..592ecfa8bb 100644 --- a/support-files/kubernetes/charts/bk-job/VALUES_LOG.md +++ b/support-files/kubernetes/charts/bk-job/VALUES_LOG.md @@ -1,13 +1,11 @@ # chart values 更新日志 -## 0.3.0-rc.24 +## 0.3.0-rc.31 1.增加Trace及数据上报至APM相关配置 ```shell script ## Trace配置 job: trace: - # 是否开启Trace,默认开启,设置为false则jar启动参数中不含-javaagent:opentelemetry-javaagent.jar,且日志中无traceId - enabled: true report: # 是否上报Trace数据至监控平台APM应用,默认不上报 enabled: false @@ -15,6 +13,8 @@ job: pushUrl: "" # 监控平台中目标APM应用的SecureKey secureKey: "" + # Trace数据上报比率,取值范围为0~1,根据作业平台与监控平台负载适当调节该比率 + ratio: 0.1 ``` 2.fileWorker对应的Service端口默认值设置为与pod端口一致,避免混淆 ```shell script diff --git a/support-files/kubernetes/charts/bk-job/templates/_helpers.tpl b/support-files/kubernetes/charts/bk-job/templates/_helpers.tpl index 83d6953c73..b45d622405 100644 --- a/support-files/kubernetes/charts/bk-job/templates/_helpers.tpl +++ b/support-files/kubernetes/charts/bk-job/templates/_helpers.tpl @@ -512,26 +512,11 @@ Return the Job Storage Env Content value: {{ .Values.persistence.localStorage.path }}/local {{- end -}} -{{/* -Return the Job Trace Env Content -*/}} -{{- define "job.trace.env" -}} -- name: OTEL_TRACE_ENABLED - value: {{ .Values.job.trace.enabled | quote }} -- name: OTEL_TRACE_REPORT_ENABLED - value: {{ .Values.job.trace.report.enabled | quote }} -- name: OTEL_TRACE_REPORT_ENDPOINT_URL - value: {{ .Values.job.trace.report.pushUrl | quote }} -- name: OTEL_TRACE_REPORT_BK_DATA_TOKEN - value: {{ .Values.job.trace.report.secureKey | quote }} -{{- end -}} - {{/* Return the Job Common Env Content */}} {{- define "job.common.env" -}} {{ include "job.storage.env" . }} -{{ include "job.trace.env" . }} {{- end -}} {{/* diff --git a/support-files/kubernetes/charts/bk-job/templates/configmap-common.yaml b/support-files/kubernetes/charts/bk-job/templates/configmap-common.yaml index b790a2b902..ba82b3adce 100644 --- a/support-files/kubernetes/charts/bk-job/templates/configmap-common.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/configmap-common.yaml @@ -17,6 +17,16 @@ data: name: {{ .Values.job.security.actuator.user.name }} password: {{ .Values.job.security.actuator.user.password }} roles: ENDPOINT_ADMIN + sleuth: + otel: + exporter: + enabled: {{ .Values.job.trace.report.enabled }} + otlp: + endpoint: {{ .Values.job.trace.report.pushUrl }} + resource: + bkDataToken: {{ .Values.job.trace.report.secureKey }} + config: + traceIdRatioBased: {{ .Values.job.trace.report.ratio }} app: code: {{ .Values.appCode }} secret: {{ .Values.appSecret }} @@ -88,9 +98,6 @@ data: enabled: {{ .Values.job.feature.toggle.esbApiParamBkBizId.enabled }} storage: root-path: {{ .Values.persistence.localStorage.path }}/local - trace: - report: - enabled: {{ .Values.job.trace.report.enabled }} cmdb: default: supplier: diff --git a/support-files/kubernetes/charts/bk-job/values.yaml b/support-files/kubernetes/charts/bk-job/values.yaml index 2d1538f11e..fd34084826 100644 --- a/support-files/kubernetes/charts/bk-job/values.yaml +++ b/support-files/kubernetes/charts/bk-job/values.yaml @@ -571,8 +571,6 @@ job: # 是否兼容bk_biz_id参数 enabled: true trace: - # 是否开启Trace,默认开启,设置为false则jar启动参数中不含-javaagent:opentelemetry-javaagent.jar,且日志中无traceId - enabled: true report: # 是否上报Trace数据至监控平台APM应用,默认不上报 enabled: false @@ -580,6 +578,8 @@ job: pushUrl: "" # 监控平台中目标APM应用的SecureKey secureKey: "" + # Trace数据上报比率,取值范围为0~1,根据作业平台与监控平台负载适当调节该比率 + ratio: 0.1 migration: iamModel: diff --git a/support-files/kubernetes/images/backend/startup.sh b/support-files/kubernetes/images/backend/startup.sh index 4e905f4708..2c4a309fda 100644 --- a/support-files/kubernetes/images/backend/startup.sh +++ b/support-files/kubernetes/images/backend/startup.sh @@ -40,21 +40,7 @@ if [[ "$BK_JOB_FILE_WORKER_WORKSPACE_DIR" != "" ]];then chmod 666 "$BK_JOB_FILE_WORKER_WORKSPACE_DIR" fi -# OpenTelemetry相关参数 -OTEL_OPTS="" -if [[ "$OTEL_TRACE_ENABLED" == "true" ]];then - OTEL_OPTS="-javaagent:${BK_JOB_HOME}/opentelemetry-javaagent.jar" - OTEL_OPTS="$OTEL_OPTS -Dotel.metrics.exporter=none" - if [[ "$OTEL_TRACE_REPORT_ENABLED" == "true" ]];then - OTEL_OPTS="$OTEL_OPTS -Dotel.exporter.otlp.endpoint=$OTEL_TRACE_REPORT_ENDPOINT_URL" - OTEL_OPTS="$OTEL_OPTS -Dotel.resource.attributes=service.name=$BK_JOB_APP_NAME,bk.data.token=$OTEL_TRACE_REPORT_BK_DATA_TOKEN" - else - OTEL_OPTS="$OTEL_OPTS -Dotel.traces.exporter=none" - fi -fi - exec java -server \ - $OTEL_OPTS \ -Dfile.encoding=UTF-8 \ -Djob.log.dir=$BK_JOB_LOG_BASE_DIR \ -Xloggc:$BK_JOB_LOG_DIR/gc.log \ diff --git a/support-files/kubernetes/images/build.sh b/support-files/kubernetes/images/build.sh index b7662bb08c..71a258e803 100755 --- a/support-files/kubernetes/images/build.sh +++ b/support-files/kubernetes/images/build.sh @@ -197,7 +197,6 @@ build_backend_module () { fi rm -rf tmp/* cp $BACKEND_DIR/release/$SERVICE-$VERSION.jar tmp/$SERVICE.jar - cp $SUPPORT_FILES_DIR/javaagent/opentelemetry-javaagent.jar tmp/ cp backend/startup.sh backend/tini tmp/ docker build -f backend/backend.Dockerfile -t $REGISTRY/$SERVICE:$VERSION tmp --network=host if [[ $PUSH -eq 1 ]] ; then diff --git a/support-files/templates/#etc#job#job-common#application.yml b/support-files/templates/#etc#job#job-common#application.yml index 22c1f5a7a7..569ee1d801 100644 --- a/support-files/templates/#etc#job#job-common#application.yml +++ b/support-files/templates/#etc#job#job-common#application.yml @@ -8,6 +8,16 @@ spring: name: __BK_JOB_SECURITY_USERNAME__ password: __BK_JOB_SECURITY_PASSWORD__ roles: ADMIN + sleuth: + otel: + exporter: + enabled: __BK_JOB_TRACE_REPORT_ENABLED__ + otlp: + endpoint: __BK_JOB_TRACE_REPORT_PUSH_URL__ + resource: + bkDataToken: __BK_JOB_TRACE_REPORT_SECURE_KEY__ + config: + traceIdRatioBased: __BK_JOB_TRACE_REPORT_RATIO__ app: code: __BK_JOB_APP_CODE__ secret: __BK_JOB_APP_SECRET__ diff --git a/support-files/templates/job.env b/support-files/templates/job.env index 14e8a19700..6e0d88a381 100644 --- a/support-files/templates/job.env +++ b/support-files/templates/job.env @@ -25,6 +25,14 @@ BK_JOB_SECURITY_USERNAME=job BK_JOB_SECURITY_PASSWORD= BK_JOB_ACTUATOR_USERNAME=job BK_JOB_ACTUATOR_PASSWORD= +# 是否上报trace数据至蓝鲸监控APM +BK_JOB_TRACE_REPORT_ENABLED=false +# trace数据上报地址:监控平台中目标APM应用的PUSH URL +BK_JOB_TRACE_REPORT_PUSH_URL= +# trace数据上报密钥:监控平台中目标APM应用的SecureKey +BK_JOB_TRACE_REPORT_SECURE_KEY= +# trace数据上报比率,取值范围为0~1,根据作业平台与监控平台负载适当调节该比率 +BK_JOB_TRACE_REPORT_RATIO=0.1 # 3.3.3新增 Job Encryption and decryption key,一旦确认不能修改 BK_JOB_ENCRYPT_PASSWORD=