Skip to content

Commit

Permalink
feat: 流水线Job配额管理 TencentBlueKing#5154
Browse files Browse the repository at this point in the history
  • Loading branch information
sawyersong2 committed Oct 14, 2021
1 parent 4ba5e2f commit b395479
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ class MQConfiguration @Autowired constructor() {
return directExchange
}

/**
* 启动构建队列
*/
@Bean
fun buildAgentStartQueue(@Autowired buildListener: BuildListener): Queue {
return Queue(ROUTE_AGENT_STARTUP + getStartQueue(buildListener))
Expand Down Expand Up @@ -197,6 +200,45 @@ class MQConfiguration @Autowired constructor() {
)
}

/**
* 啓動构建降级队列
*/
@Bean
fun buildAgentStartDemoteQueue(@Autowired buildListener: BuildListener): Queue {
return Queue(ROUTE_AGENT_STARTUP + getStartDemoteQueue(buildListener))
}

@Bean
fun buildAgentStartDemoteQueueBind(
@Autowired buildAgentStartDemoteQueue: Queue,
@Autowired exchange: DirectExchange
): Binding {
return BindingBuilder.bind(buildAgentStartDemoteQueue).to(exchange).with(buildAgentStartDemoteQueue.name)
}

@Bean
fun startDemoteListener(
@Autowired connectionFactory: ConnectionFactory,
@Autowired buildAgentStartDemoteQueue: Queue,
@Autowired rabbitAdmin: RabbitAdmin,
@Autowired buildListener: BuildListener,
@Autowired messageConverter: Jackson2JsonMessageConverter
): SimpleMessageListenerContainer {
val adapter = MessageListenerAdapter(buildListener, buildListener::handleStartDemoteMessage.name)
adapter.setMessageConverter(messageConverter)
return Tools.createSimpleMessageListenerContainerByAdapter(
connectionFactory = connectionFactory,
queue = buildAgentStartDemoteQueue,
rabbitAdmin = rabbitAdmin,
startConsumerMinInterval = 10000,
consecutiveActiveTrigger = 5,
concurrency = 10,
maxConcurrency = 20,
adapter = adapter,
prefetchCount = 1
)
}

@Bean
fun buildAgentShutdownQueue(@Autowired buildListener: BuildListener): Queue {
return Queue(ROUTE_AGENT_SHUTDOWN + getShutdownQueue(buildListener))
Expand Down Expand Up @@ -246,6 +288,15 @@ class MQConfiguration @Autowired constructor() {
return startupQueue
}

private fun getStartDemoteQueue(buildListener: BuildListener): String {
val startupDemoteQueue = buildListener.getStartupDemoteQueue()
logger.info("Get the startupDemoteQueue ($startupDemoteQueue)")
if (startupDemoteQueue.isBlank()) {
throw RuntimeException("The startupDemoteQueue is blank")
}
return startupDemoteQueue
}

private fun getShutdownQueue(buildListener: BuildListener): String {
val shutdownQueue = buildListener.getShutdownQueue()
logger.info("Get the shutdown queue ($shutdownQueue)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ interface BuildListener {

fun getStartupQueue(): String

fun getStartupDemoteQueue(): String

fun getShutdownQueue(): String

fun onPipelineStartup(event: PipelineBuildStartBroadCastEvent) {
Expand All @@ -66,81 +68,16 @@ interface BuildListener {

fun onStartup(dispatchMessage: DispatchMessage)

fun onStartupDemote(dispatchMessage: DispatchMessage)

fun onShutdown(event: PipelineAgentShutdownEvent)

fun handleStartMessage(event: PipelineAgentStartupEvent) {
DispatcherContext.setEvent(event)
val dispatchService = getDispatchService()
val jobQuotaService = getJobQuotaService()

var startTime = 0L
var errorCode = 0
var errorMessage = ""
var errorType: ErrorType? = null

try {
logger.info("Start to handle the startup message -(${DispatcherContext.getEvent()})")

startTime = System.currentTimeMillis()

// 校验流水线是否还在运行中
dispatchService.checkRunning(event)
// 校验构建资源配额是否超限,配额超限后会放进延迟队列
if (!jobQuotaService.checkAndAddRunningJob(event, getVmType())) {
return
}

val dispatchMessage = dispatchService.buildDispatchMessage(event)
onStartup(dispatchMessage)
} catch (e: BuildFailureException) {
dispatchService.logRed(buildId = event.buildId,
containerHashId = event.containerHashId,
vmSeqId = event.vmSeqId,
message = "启动构建机失败 - ${e.message}",
executeCount = event.executeCount)

errorCode = e.errorCode
errorMessage = e.formatErrorMessage
errorType = e.errorType

onFailure(dispatchService, event, e)
} catch (t: Throwable) {
logger.warn("Fail to handle the start up message - DispatchService($event)", t)
dispatchService.logRed(buildId = event.buildId,
containerHashId = event.containerHashId,
vmSeqId = event.vmSeqId,
message = "启动构建机失败 - ${t.message}",
executeCount = event.executeCount)

errorCode = DispatchSdkErrorCode.SDK_SYSTEM_ERROR
errorMessage = "Fail to handle the start up message"
errorType = ErrorType.SYSTEM

onFailure(dispatchService = dispatchService,
event = event,
e = BuildFailureException(errorType = ErrorType.SYSTEM,
errorCode = DispatchSdkErrorCode.SDK_SYSTEM_ERROR,
formatErrorMessage = "Fail to handle the start up message",
errorMessage = "Fail to handle the start up message"))
} finally {
DispatcherContext.removeEvent()
handleStartCommon(event) { doStartHandler() }
}

// 上报monitoring,做SLA统计
dispatchService.sendDispatchMonitoring(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = event.buildId,
vmSeqId = event.vmSeqId,
actionType = "start",
retryTime = event.retryTime,
routeKeySuffix = event.routeKeySuffix!!,
startTime = startTime,
stopTime = 0L,
errorCode = errorCode,
errorMessage = errorMessage,
errorType = errorType
)
}
fun handleStartDemoteMessage(event: PipelineAgentStartupEvent) {
handleStartCommon(event) { doStartDemoteHandler() }
}

fun handleShutdownMessage(event: PipelineAgentShutdownEvent) {
Expand Down Expand Up @@ -280,6 +217,93 @@ interface BuildListener {
return newValue.toString()
}

private fun handleStartCommon(event: PipelineAgentStartupEvent, startup: () -> Unit) {
DispatcherContext.setEvent(event)
val dispatchService = getDispatchService()

var startTime = 0L
var errorCode = 0
var errorMessage = ""
var errorType: ErrorType? = null

try {
logger.info("Start to handle the startup message -(${DispatcherContext.getEvent()})")

startTime = System.currentTimeMillis()

startup()
} catch (e: BuildFailureException) {
dispatchService.logRed(buildId = event.buildId,
containerHashId = event.containerHashId,
vmSeqId = event.vmSeqId,
message = "启动构建机失败 - ${e.message}",
executeCount = event.executeCount)

errorCode = e.errorCode
errorMessage = e.formatErrorMessage
errorType = e.errorType

onFailure(dispatchService, event, e)
} catch (t: Throwable) {
logger.warn("Fail to handle the start up message - DispatchService($event)", t)
dispatchService.logRed(buildId = event.buildId,
containerHashId = event.containerHashId,
vmSeqId = event.vmSeqId,
message = "启动构建机失败 - ${t.message}",
executeCount = event.executeCount)

errorCode = DispatchSdkErrorCode.SDK_SYSTEM_ERROR
errorMessage = "Fail to handle the start up message"
errorType = ErrorType.SYSTEM

onFailure(dispatchService = dispatchService,
event = event,
e = BuildFailureException(errorType = ErrorType.SYSTEM,
errorCode = DispatchSdkErrorCode.SDK_SYSTEM_ERROR,
formatErrorMessage = "Fail to handle the start up message",
errorMessage = "Fail to handle the start up message"))
} finally {
DispatcherContext.removeEvent()

// 上报monitoring,做SLA统计
dispatchService.sendDispatchMonitoring(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = event.buildId,
vmSeqId = event.vmSeqId,
actionType = "start",
retryTime = event.retryTime,
routeKeySuffix = event.routeKeySuffix!!,
startTime = startTime,
stopTime = 0L,
errorCode = errorCode,
errorMessage = errorMessage,
errorType = errorType
)
}
}

private fun doStartHandler() {
val dispatchService = getDispatchService()
val jobQuotaService = getJobQuotaService()
val event = DispatcherContext.getEvent()!!

// 校验流水线是否还在运行中
dispatchService.checkRunning(event)
// 校验构建资源配额是否超限,配额超限后会放进延迟队列
if (!jobQuotaService.checkAndAddRunningJob(event, getVmType(), getStartupDemoteQueue())) {
return
}

val dispatchMessage = dispatchService.buildDispatchMessage(event)
onStartup(dispatchMessage)
}

private fun doStartDemoteHandler() {
val dispatchMessage = getDispatchService().buildDispatchMessage(DispatcherContext.getEvent()!!)
onStartupDemote(dispatchMessage)
}

private fun getDispatchService(): DispatchService {
return SpringContextUtil.getBean(DispatchService::class.java)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@

package com.tencent.devops.common.dispatch.sdk.service

import com.tencent.devops.common.api.pojo.ErrorType
import com.tencent.devops.common.client.Client
import com.tencent.devops.common.dispatch.sdk.BuildFailureException
import com.tencent.devops.common.dispatch.sdk.DispatchSdkErrorCode
import com.tencent.devops.common.log.utils.BuildLogPrinter
import com.tencent.devops.common.service.utils.SpringContextUtil
import com.tencent.devops.dispatch.api.ServiceJobQuotaBusinessResource
Expand All @@ -54,12 +51,18 @@ class JobQuotaService constructor(
@Value("\${dispatch.jobQuota.enable:false}")
private val jobQuotaEnable: Boolean = false

fun checkAndAddRunningJob(startupEvent: PipelineAgentStartupEvent, vmType: JobQuotaVmType?): Boolean {
fun checkAndAddRunningJob(
startupEvent: PipelineAgentStartupEvent,
vmType: JobQuotaVmType?,
demoteQueueRouteKeySuffix: String
): Boolean {
if (null == vmType || !jobQuotaEnable) {
logger.info("JobQuota not enabled or VmType is null, job quota check will be skipped.")
return true
}

val dispatchService = SpringContextUtil.getBean(DispatchService::class.java)

with(startupEvent) {
val checkResult = checkAndAddRunningJob(
projectId = projectId,
Expand All @@ -77,13 +80,20 @@ class JobQuotaService constructor(
}

if (!checkResult && startupEvent.retryTime > RETRY_TIME) {
logger.error("$projectId|$vmType|$buildId|$vmSeqId|$executeCount Job quota excess.")
throw BuildFailureException(
errorType = ErrorType.USER,
errorCode = DispatchSdkErrorCode.JOB_QUOTA_EXCESS,
formatErrorMessage = "JOB配额超限",
errorMessage = "JOB配额超限"
logger.error("$projectId|$vmType|$buildId|$vmSeqId|$executeCount Job quota excess. Send event to demoteQueue.")

buildLogPrinter.addYellowLine(
buildId = buildId,
message = "当前项目下正在执行的【${vmType.displayName}】JOB数量已经达到配额最大值并已延迟等待${retryTime}次," +
"将放入降级队列执行.",
tag = VMUtils.genStartVMTaskId(containerId),
jobId = containerHashId,
executeCount = executeCount ?: 1
)

dispatchService.redispatch(startupEvent.copy(routeKeySuffix = demoteQueueRouteKeySuffix))

return false
} else {
logger.info("$projectId|$vmType|$buildId|$vmSeqId|$executeCount Job quota excess. delay: $RETRY_DELTA " +
"and retry. retryTime: ${startupEvent.retryTime}")
Expand All @@ -99,7 +109,7 @@ class JobQuotaService constructor(

startupEvent.retryTime += 1
startupEvent.delayMills = RETRY_DELTA
SpringContextUtil.getBean(DispatchService::class.java).redispatch(startupEvent)
dispatchService.redispatch(startupEvent)

return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.devops.common.pipeline.type

enum class DispatchRouteKeySuffix(val routeKeySuffix: String) {
DOCKER_VM(".docker.vm"),
DOCKER_VM_DEMOTE(".docker.vm.demote"),
PCG(".pcg.sumeru"),
DEVCLOUD(".devcloud.public"),
IDC(".idc.public"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.tencent.devops.common.api.util.JsonUtil
import com.tencent.devops.common.dispatch.sdk.listener.BuildListener
import com.tencent.devops.common.dispatch.sdk.pojo.DispatchMessage
import com.tencent.devops.common.log.utils.BuildLogPrinter
import com.tencent.devops.common.pipeline.type.DispatchRouteKeySuffix
import com.tencent.devops.common.pipeline.type.docker.DockerDispatchType
import com.tencent.devops.dispatch.docker.client.DockerHostClient
import com.tencent.devops.dispatch.docker.common.ErrorCodeEnum
Expand Down Expand Up @@ -69,11 +70,15 @@ class DockerVMListener @Autowired constructor(
}

override fun getShutdownQueue(): String {
return ".docker.vm"
return DispatchRouteKeySuffix.DOCKER_VM.routeKeySuffix
}

override fun getStartupQueue(): String {
return ".docker.vm"
return DispatchRouteKeySuffix.DOCKER_VM.routeKeySuffix
}

override fun getStartupDemoteQueue(): String {
return DispatchRouteKeySuffix.DOCKER_VM_DEMOTE.routeKeySuffix
}

override fun getVmType(): JobQuotaVmType? {
Expand Down Expand Up @@ -209,6 +214,10 @@ class DockerVMListener @Autowired constructor(
}
}

override fun onStartupDemote(dispatchMessage: DispatchMessage) {
TODO("Not yet implemented")
}

override fun onShutdown(event: PipelineAgentShutdownEvent) {
logger.info("On shutdown - ($event)")

Expand Down

0 comments on commit b395479

Please sign in to comment.