From b395479d52af24ed7521226e22da6b5dae63963f Mon Sep 17 00:00:00 2001 From: sawyersong2 Date: Thu, 14 Oct 2021 15:11:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B5=81=E6=B0=B4=E7=BA=BFJob=E9=85=8D?= =?UTF-8?q?=E9=A2=9D=E7=AE=A1=E7=90=86=20#5154?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatch.sdk/config/MQConfiguration.kt | 51 ++++++ .../dispatch.sdk/listener/BuildListener.kt | 166 ++++++++++-------- .../dispatch.sdk/service/JobQuotaService.kt | 32 ++-- .../pipeline/type/DispatchRouteKeySuffix.kt | 1 + .../docker/listener/DockerVMListener.kt | 13 +- 5 files changed, 179 insertions(+), 84 deletions(-) diff --git a/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/config/MQConfiguration.kt b/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/config/MQConfiguration.kt index b10effb42eb..52f35cf02d8 100644 --- a/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/config/MQConfiguration.kt +++ b/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/config/MQConfiguration.kt @@ -158,6 +158,9 @@ class MQConfiguration @Autowired constructor() { return directExchange } + /** + * 启动构建队列 + */ @Bean fun buildAgentStartQueue(@Autowired buildListener: BuildListener): Queue { return Queue(ROUTE_AGENT_STARTUP + getStartQueue(buildListener)) @@ -197,6 +200,45 @@ class MQConfiguration @Autowired constructor() { ) } + /** + * 啓動构建降级队列 + */ + @Bean + fun buildAgentStartDemoteQueue(@Autowired buildListener: BuildListener): Queue { + return Queue(ROUTE_AGENT_STARTUP + getStartDemoteQueue(buildListener)) + } + + @Bean + fun buildAgentStartDemoteQueueBind( + @Autowired buildAgentStartDemoteQueue: Queue, + @Autowired exchange: DirectExchange + ): Binding { + return BindingBuilder.bind(buildAgentStartDemoteQueue).to(exchange).with(buildAgentStartDemoteQueue.name) + } + + @Bean + fun startDemoteListener( + @Autowired connectionFactory: ConnectionFactory, + @Autowired buildAgentStartDemoteQueue: Queue, + @Autowired rabbitAdmin: RabbitAdmin, + @Autowired buildListener: BuildListener, + @Autowired messageConverter: Jackson2JsonMessageConverter + ): SimpleMessageListenerContainer { + val adapter = MessageListenerAdapter(buildListener, buildListener::handleStartDemoteMessage.name) + adapter.setMessageConverter(messageConverter) + return Tools.createSimpleMessageListenerContainerByAdapter( + connectionFactory = connectionFactory, + queue = buildAgentStartDemoteQueue, + rabbitAdmin = rabbitAdmin, + startConsumerMinInterval = 10000, + consecutiveActiveTrigger = 5, + concurrency = 10, + maxConcurrency = 20, + adapter = adapter, + prefetchCount = 1 + ) + } + @Bean fun buildAgentShutdownQueue(@Autowired buildListener: BuildListener): Queue { return Queue(ROUTE_AGENT_SHUTDOWN + getShutdownQueue(buildListener)) @@ -246,6 +288,15 @@ class MQConfiguration @Autowired constructor() { return startupQueue } + private fun getStartDemoteQueue(buildListener: BuildListener): String { + val startupDemoteQueue = buildListener.getStartupDemoteQueue() + logger.info("Get the startupDemoteQueue ($startupDemoteQueue)") + if (startupDemoteQueue.isBlank()) { + throw RuntimeException("The startupDemoteQueue is blank") + } + return startupDemoteQueue + } + private fun getShutdownQueue(buildListener: BuildListener): String { val shutdownQueue = buildListener.getShutdownQueue() logger.info("Get the shutdown queue ($shutdownQueue)") diff --git a/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/listener/BuildListener.kt b/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/listener/BuildListener.kt index 2ec195a0f6d..d4cde8a4f89 100644 --- a/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/listener/BuildListener.kt +++ b/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/listener/BuildListener.kt @@ -54,6 +54,8 @@ interface BuildListener { fun getStartupQueue(): String + fun getStartupDemoteQueue(): String + fun getShutdownQueue(): String fun onPipelineStartup(event: PipelineBuildStartBroadCastEvent) { @@ -66,81 +68,16 @@ interface BuildListener { fun onStartup(dispatchMessage: DispatchMessage) + fun onStartupDemote(dispatchMessage: DispatchMessage) + fun onShutdown(event: PipelineAgentShutdownEvent) fun handleStartMessage(event: PipelineAgentStartupEvent) { - DispatcherContext.setEvent(event) - val dispatchService = getDispatchService() - val jobQuotaService = getJobQuotaService() - - var startTime = 0L - var errorCode = 0 - var errorMessage = "" - var errorType: ErrorType? = null - - try { - logger.info("Start to handle the startup message -(${DispatcherContext.getEvent()})") - - startTime = System.currentTimeMillis() - - // 校验流水线是否还在运行中 - dispatchService.checkRunning(event) - // 校验构建资源配额是否超限,配额超限后会放进延迟队列 - if (!jobQuotaService.checkAndAddRunningJob(event, getVmType())) { - return - } - - val dispatchMessage = dispatchService.buildDispatchMessage(event) - onStartup(dispatchMessage) - } catch (e: BuildFailureException) { - dispatchService.logRed(buildId = event.buildId, - containerHashId = event.containerHashId, - vmSeqId = event.vmSeqId, - message = "启动构建机失败 - ${e.message}", - executeCount = event.executeCount) - - errorCode = e.errorCode - errorMessage = e.formatErrorMessage - errorType = e.errorType - - onFailure(dispatchService, event, e) - } catch (t: Throwable) { - logger.warn("Fail to handle the start up message - DispatchService($event)", t) - dispatchService.logRed(buildId = event.buildId, - containerHashId = event.containerHashId, - vmSeqId = event.vmSeqId, - message = "启动构建机失败 - ${t.message}", - executeCount = event.executeCount) - - errorCode = DispatchSdkErrorCode.SDK_SYSTEM_ERROR - errorMessage = "Fail to handle the start up message" - errorType = ErrorType.SYSTEM - - onFailure(dispatchService = dispatchService, - event = event, - e = BuildFailureException(errorType = ErrorType.SYSTEM, - errorCode = DispatchSdkErrorCode.SDK_SYSTEM_ERROR, - formatErrorMessage = "Fail to handle the start up message", - errorMessage = "Fail to handle the start up message")) - } finally { - DispatcherContext.removeEvent() + handleStartCommon(event) { doStartHandler() } + } - // 上报monitoring,做SLA统计 - dispatchService.sendDispatchMonitoring( - projectId = event.projectId, - pipelineId = event.pipelineId, - buildId = event.buildId, - vmSeqId = event.vmSeqId, - actionType = "start", - retryTime = event.retryTime, - routeKeySuffix = event.routeKeySuffix!!, - startTime = startTime, - stopTime = 0L, - errorCode = errorCode, - errorMessage = errorMessage, - errorType = errorType - ) - } + fun handleStartDemoteMessage(event: PipelineAgentStartupEvent) { + handleStartCommon(event) { doStartDemoteHandler() } } fun handleShutdownMessage(event: PipelineAgentShutdownEvent) { @@ -280,6 +217,93 @@ interface BuildListener { return newValue.toString() } + private fun handleStartCommon(event: PipelineAgentStartupEvent, startup: () -> Unit) { + DispatcherContext.setEvent(event) + val dispatchService = getDispatchService() + + var startTime = 0L + var errorCode = 0 + var errorMessage = "" + var errorType: ErrorType? = null + + try { + logger.info("Start to handle the startup message -(${DispatcherContext.getEvent()})") + + startTime = System.currentTimeMillis() + + startup() + } catch (e: BuildFailureException) { + dispatchService.logRed(buildId = event.buildId, + containerHashId = event.containerHashId, + vmSeqId = event.vmSeqId, + message = "启动构建机失败 - ${e.message}", + executeCount = event.executeCount) + + errorCode = e.errorCode + errorMessage = e.formatErrorMessage + errorType = e.errorType + + onFailure(dispatchService, event, e) + } catch (t: Throwable) { + logger.warn("Fail to handle the start up message - DispatchService($event)", t) + dispatchService.logRed(buildId = event.buildId, + containerHashId = event.containerHashId, + vmSeqId = event.vmSeqId, + message = "启动构建机失败 - ${t.message}", + executeCount = event.executeCount) + + errorCode = DispatchSdkErrorCode.SDK_SYSTEM_ERROR + errorMessage = "Fail to handle the start up message" + errorType = ErrorType.SYSTEM + + onFailure(dispatchService = dispatchService, + event = event, + e = BuildFailureException(errorType = ErrorType.SYSTEM, + errorCode = DispatchSdkErrorCode.SDK_SYSTEM_ERROR, + formatErrorMessage = "Fail to handle the start up message", + errorMessage = "Fail to handle the start up message")) + } finally { + DispatcherContext.removeEvent() + + // 上报monitoring,做SLA统计 + dispatchService.sendDispatchMonitoring( + projectId = event.projectId, + pipelineId = event.pipelineId, + buildId = event.buildId, + vmSeqId = event.vmSeqId, + actionType = "start", + retryTime = event.retryTime, + routeKeySuffix = event.routeKeySuffix!!, + startTime = startTime, + stopTime = 0L, + errorCode = errorCode, + errorMessage = errorMessage, + errorType = errorType + ) + } + } + + private fun doStartHandler() { + val dispatchService = getDispatchService() + val jobQuotaService = getJobQuotaService() + val event = DispatcherContext.getEvent()!! + + // 校验流水线是否还在运行中 + dispatchService.checkRunning(event) + // 校验构建资源配额是否超限,配额超限后会放进延迟队列 + if (!jobQuotaService.checkAndAddRunningJob(event, getVmType(), getStartupDemoteQueue())) { + return + } + + val dispatchMessage = dispatchService.buildDispatchMessage(event) + onStartup(dispatchMessage) + } + + private fun doStartDemoteHandler() { + val dispatchMessage = getDispatchService().buildDispatchMessage(DispatcherContext.getEvent()!!) + onStartupDemote(dispatchMessage) + } + private fun getDispatchService(): DispatchService { return SpringContextUtil.getBean(DispatchService::class.java) } diff --git a/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/service/JobQuotaService.kt b/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/service/JobQuotaService.kt index 90ffad425b8..b1f670fecaf 100644 --- a/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/service/JobQuotaService.kt +++ b/src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/service/JobQuotaService.kt @@ -27,10 +27,7 @@ package com.tencent.devops.common.dispatch.sdk.service -import com.tencent.devops.common.api.pojo.ErrorType import com.tencent.devops.common.client.Client -import com.tencent.devops.common.dispatch.sdk.BuildFailureException -import com.tencent.devops.common.dispatch.sdk.DispatchSdkErrorCode import com.tencent.devops.common.log.utils.BuildLogPrinter import com.tencent.devops.common.service.utils.SpringContextUtil import com.tencent.devops.dispatch.api.ServiceJobQuotaBusinessResource @@ -54,12 +51,18 @@ class JobQuotaService constructor( @Value("\${dispatch.jobQuota.enable:false}") private val jobQuotaEnable: Boolean = false - fun checkAndAddRunningJob(startupEvent: PipelineAgentStartupEvent, vmType: JobQuotaVmType?): Boolean { + fun checkAndAddRunningJob( + startupEvent: PipelineAgentStartupEvent, + vmType: JobQuotaVmType?, + demoteQueueRouteKeySuffix: String + ): Boolean { if (null == vmType || !jobQuotaEnable) { logger.info("JobQuota not enabled or VmType is null, job quota check will be skipped.") return true } + val dispatchService = SpringContextUtil.getBean(DispatchService::class.java) + with(startupEvent) { val checkResult = checkAndAddRunningJob( projectId = projectId, @@ -77,13 +80,20 @@ class JobQuotaService constructor( } if (!checkResult && startupEvent.retryTime > RETRY_TIME) { - logger.error("$projectId|$vmType|$buildId|$vmSeqId|$executeCount Job quota excess.") - throw BuildFailureException( - errorType = ErrorType.USER, - errorCode = DispatchSdkErrorCode.JOB_QUOTA_EXCESS, - formatErrorMessage = "JOB配额超限", - errorMessage = "JOB配额超限" + logger.error("$projectId|$vmType|$buildId|$vmSeqId|$executeCount Job quota excess. Send event to demoteQueue.") + + buildLogPrinter.addYellowLine( + buildId = buildId, + message = "当前项目下正在执行的【${vmType.displayName}】JOB数量已经达到配额最大值并已延迟等待${retryTime}次," + + "将放入降级队列执行.", + tag = VMUtils.genStartVMTaskId(containerId), + jobId = containerHashId, + executeCount = executeCount ?: 1 ) + + dispatchService.redispatch(startupEvent.copy(routeKeySuffix = demoteQueueRouteKeySuffix)) + + return false } else { logger.info("$projectId|$vmType|$buildId|$vmSeqId|$executeCount Job quota excess. delay: $RETRY_DELTA " + "and retry. retryTime: ${startupEvent.retryTime}") @@ -99,7 +109,7 @@ class JobQuotaService constructor( startupEvent.retryTime += 1 startupEvent.delayMills = RETRY_DELTA - SpringContextUtil.getBean(DispatchService::class.java).redispatch(startupEvent) + dispatchService.redispatch(startupEvent) return false } diff --git a/src/backend/ci/core/common/common-pipeline/src/main/kotlin/com/tencent/devops/common/pipeline/type/DispatchRouteKeySuffix.kt b/src/backend/ci/core/common/common-pipeline/src/main/kotlin/com/tencent/devops/common/pipeline/type/DispatchRouteKeySuffix.kt index ebc77719d2e..28d9479099d 100644 --- a/src/backend/ci/core/common/common-pipeline/src/main/kotlin/com/tencent/devops/common/pipeline/type/DispatchRouteKeySuffix.kt +++ b/src/backend/ci/core/common/common-pipeline/src/main/kotlin/com/tencent/devops/common/pipeline/type/DispatchRouteKeySuffix.kt @@ -29,6 +29,7 @@ package com.tencent.devops.common.pipeline.type enum class DispatchRouteKeySuffix(val routeKeySuffix: String) { DOCKER_VM(".docker.vm"), + DOCKER_VM_DEMOTE(".docker.vm.demote"), PCG(".pcg.sumeru"), DEVCLOUD(".devcloud.public"), IDC(".idc.public"), diff --git a/src/backend/ci/core/dispatch-docker/biz-dispatch-docker/src/main/kotlin/com/tencent/devops/dispatch/docker/listener/DockerVMListener.kt b/src/backend/ci/core/dispatch-docker/biz-dispatch-docker/src/main/kotlin/com/tencent/devops/dispatch/docker/listener/DockerVMListener.kt index e0e3954944f..db6caef15a8 100644 --- a/src/backend/ci/core/dispatch-docker/biz-dispatch-docker/src/main/kotlin/com/tencent/devops/dispatch/docker/listener/DockerVMListener.kt +++ b/src/backend/ci/core/dispatch-docker/biz-dispatch-docker/src/main/kotlin/com/tencent/devops/dispatch/docker/listener/DockerVMListener.kt @@ -32,6 +32,7 @@ import com.tencent.devops.common.api.util.JsonUtil import com.tencent.devops.common.dispatch.sdk.listener.BuildListener import com.tencent.devops.common.dispatch.sdk.pojo.DispatchMessage import com.tencent.devops.common.log.utils.BuildLogPrinter +import com.tencent.devops.common.pipeline.type.DispatchRouteKeySuffix import com.tencent.devops.common.pipeline.type.docker.DockerDispatchType import com.tencent.devops.dispatch.docker.client.DockerHostClient import com.tencent.devops.dispatch.docker.common.ErrorCodeEnum @@ -69,11 +70,15 @@ class DockerVMListener @Autowired constructor( } override fun getShutdownQueue(): String { - return ".docker.vm" + return DispatchRouteKeySuffix.DOCKER_VM.routeKeySuffix } override fun getStartupQueue(): String { - return ".docker.vm" + return DispatchRouteKeySuffix.DOCKER_VM.routeKeySuffix + } + + override fun getStartupDemoteQueue(): String { + return DispatchRouteKeySuffix.DOCKER_VM_DEMOTE.routeKeySuffix } override fun getVmType(): JobQuotaVmType? { @@ -209,6 +214,10 @@ class DockerVMListener @Autowired constructor( } } + override fun onStartupDemote(dispatchMessage: DispatchMessage) { + TODO("Not yet implemented") + } + override fun onShutdown(event: PipelineAgentShutdownEvent) { logger.info("On shutdown - ($event)")