Skip to content

Commit

Permalink
Merge pull request TencentBlueKing#10848 from tangruotian/issue-10740
Browse files Browse the repository at this point in the history
feat:同一流水线多次构建时资源调度优先级优化 TencentBlueKing#9897
  • Loading branch information
bkci-bot authored Sep 11, 2024
2 parents 369a660 + 41083b4 commit 9a2e179
Show file tree
Hide file tree
Showing 39 changed files with 2,504 additions and 464 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,4 @@ import com.tencent.devops.common.api.util.UUIDUtil
open class UniqueIdException(
val msg: String?,
val uniqueId: String? = UUIDUtil.generate()
) :
RuntimeException("[uniqueId=$uniqueId]$msg")
) : RuntimeException("[uniqueId=$uniqueId]$msg")
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ interface BuildListener {
buildId = event.buildId,
containerHashId = event.containerHashId,
vmSeqId = event.vmSeqId,
message = "${I18nUtil.getCodeLanMessage("$BK_FAILED_START_BUILD_MACHINE")}- ${e.message}",
message = "${I18nUtil.getCodeLanMessage(BK_FAILED_START_BUILD_MACHINE)}- ${e.message}",
executeCount = event.executeCount,
jobId = event.jobId
)
Expand All @@ -136,14 +136,14 @@ interface BuildListener {
errorMessage = e.formatErrorMessage
errorType = e.errorType

onFailure(dispatchService, event, e)
dispatchService.onFailure(event, e)
} catch (t: Throwable) {
logger.warn("Fail to handle the start up message - DispatchService($event)", t)
dispatchService.logRed(
buildId = event.buildId,
containerHashId = event.containerHashId,
vmSeqId = event.vmSeqId,
message = "${I18nUtil.getCodeLanMessage("$BK_FAILED_START_BUILD_MACHINE")} - ${t.message}",
message = "${I18nUtil.getCodeLanMessage(BK_FAILED_START_BUILD_MACHINE)} - ${t.message}",
executeCount = event.executeCount,
jobId = event.jobId
)
Expand All @@ -152,8 +152,7 @@ interface BuildListener {
errorMessage = "Fail to handle the start up message"
errorType = ErrorType.SYSTEM

onFailure(
dispatchService = dispatchService,
dispatchService.onFailure(
event = event,
e = BuildFailureException(
errorType = ErrorType.SYSTEM,
Expand Down Expand Up @@ -361,15 +360,6 @@ interface BuildListener {

private fun getClient() = SpringContextUtil.getBean(Client::class.java)

private fun onFailure(
dispatchService: DispatchService,
event: PipelineAgentStartupEvent,
e: BuildFailureException
) {
dispatchService.onContainerFailure(event, e)
DispatchLogRedisUtils.removeRedisExecuteCount(event.buildId)
}

companion object {
private val logger = LoggerFactory.getLogger(BuildListener::class.java)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import com.tencent.devops.common.dispatch.sdk.pojo.docker.DockerConstants.ENV_KE
import com.tencent.devops.common.dispatch.sdk.pojo.docker.DockerConstants.ENV_KEY_BUILD_ID
import com.tencent.devops.common.dispatch.sdk.pojo.docker.DockerConstants.ENV_KEY_PROJECT_ID
import com.tencent.devops.common.dispatch.sdk.utils.ChannelUtils
import com.tencent.devops.common.dispatch.sdk.utils.DispatchLogRedisUtils
import com.tencent.devops.common.event.dispatcher.pipeline.PipelineEventDispatcher
import com.tencent.devops.common.event.pojo.pipeline.IPipelineEvent
import com.tencent.devops.common.log.utils.BuildLogPrinter
Expand Down Expand Up @@ -153,10 +154,33 @@ class DispatchService constructor(
}

fun checkRunning(event: PipelineAgentStartupEvent): Boolean {
val (startBuildTask, buildContainer) = getContainerStartupInfo(event)
return checkRunning(
projectId = event.projectId,
buildId = event.buildId,
containerId = event.containerId,
retryTime = event.retryTime,
executeCount = event.executeCount,
logTag = "$event"
)
}

fun checkRunning(
projectId: String,
buildId: String,
containerId: String,
retryTime: Int,
executeCount: Int?,
logTag: String?
): Boolean {
val (startBuildTask, buildContainer) = getContainerStartupInfo(
projectId = projectId,
buildId = buildId,
containerId = containerId,
logTag = logTag
)

var needStart = true
if (event.executeCount != startBuildTask.executeCount) {
if (executeCount != startBuildTask.executeCount) {
// 如果已经重试过或执行次数不匹配则直接丢弃
needStart = false
} else if (startBuildTask.status.isFinish() && buildContainer.status.isRunning()) {
Expand All @@ -167,9 +191,9 @@ class DispatchService constructor(
}

if (!needStart) {
logger.warn("The build event($event) is not running")
logger.warn("The build event($logTag) is not running")
// dispatch主动发起的重试或者用户已取消的流水线忽略异常报错
if (event.retryTime > 1 || buildContainer.status.isCancel()) {
if (retryTime > 1 || buildContainer.status.isCancel()) {
return false
}

Expand All @@ -184,26 +208,71 @@ class DispatchService constructor(
return true
}

fun onContainerFailure(event: PipelineAgentStartupEvent, e: BuildFailureException) {
logger.warn("[${event.buildId}|${event.vmSeqId}] Container startup failure")
fun onFailure(
event: PipelineAgentStartupEvent,
e: BuildFailureException
) {
onFailure(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = event.buildId,
vmSeqId = event.vmSeqId,
e = e,
logTag = "$event"
)
}

fun onFailure(
projectId: String,
pipelineId: String,
buildId: String,
vmSeqId: String,
e: BuildFailureException,
logTag: String?
) {
onContainerFailure(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
vmSeqId = vmSeqId,
e = e,
logTag
)
DispatchLogRedisUtils.removeRedisExecuteCount(buildId)
}

private fun onContainerFailure(
projectId: String,
pipelineId: String,
buildId: String,
vmSeqId: String,
e: BuildFailureException,
logTag: String?
) {
logger.warn("[$buildId|$vmSeqId] Container startup failure")
try {
val (startBuildTask, buildContainer) = getContainerStartupInfo(event)
val (startBuildTask, buildContainer) = getContainerStartupInfo(
projectId = projectId,
buildId = buildId,
containerId = vmSeqId,
logTag = logTag
)
if (buildContainer.status.isCancel() || startBuildTask.status.isCancel()) {
return
}

client.get(ServiceBuildResource::class).setVMStatus(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = event.buildId,
vmSeqId = event.vmSeqId,
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
vmSeqId = vmSeqId,
status = BuildStatus.FAILED,
errorType = e.errorType,
errorCode = e.errorCode,
errorMsg = e.formatErrorMessage
)
} catch (ignore: ClientException) {
logger.error("SystemErrorLogMonitor|onContainerFailure|${event.buildId}|error=${e.message},${e.errorCode}")
logger.error("SystemErrorLogMonitor|onContainerFailure|$buildId|error=${e.message},${e.errorCode}")
}
}

Expand Down Expand Up @@ -250,20 +319,23 @@ class DispatchService constructor(
}

private fun getContainerStartupInfo(
event: PipelineAgentStartupEvent
projectId: String,
buildId: String,
containerId: String,
logTag: String?
): Pair<PipelineBuildTask, PipelineBuildContainer> {
// 判断流水线当前container是否在运行中
val statusResult = client.get(ServicePipelineTaskResource::class).getContainerStartupInfo(
projectId = event.projectId,
buildId = event.buildId,
containerId = event.containerId,
taskId = VMUtils.genStartVMTaskId(event.containerId)
projectId = projectId,
buildId = buildId,
containerId = containerId,
taskId = VMUtils.genStartVMTaskId(containerId)
)
val startBuildTask = statusResult.data?.startBuildTask
val buildContainer = statusResult.data?.buildContainer
if (statusResult.isNotOk() || startBuildTask == null || buildContainer == null) {
logger.warn(
"The build event($event) fail to check if pipeline task is running " +
"The build event($logTag) fail to check if pipeline task is running " +
"because of statusResult(${statusResult.message})"
)
val errorMessage = I18nUtil.getCodeLanMessage(UNABLE_GET_PIPELINE_JOB_STATUS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ object MQ {
const val ROUTE_AGENT_SHUTDOWN = "r.engine.pipeline.agent.shutdown"
const val QUEUE_AGENT_SHUTDOWN = "q.engine.pipeline.agent.shutdown"

// 第三方 AGENT 排队消息队列 ====================================
const val EXCHANGE_THIRD_PARTY_AGENT_QUEUE = "e.dispatch.tp.agent.queue"
const val ROUTE_THIRD_PARTY_AGENT_QUEUE = "r.dispatch.tp.agent.queue"
const val QUEUE_THIRD_PARTY_AGENT_QUEUE = "q.dispatch.tp.agent.queue"

// 无构建环境的Docker构建机启停消息队列 ====================================
const val EXCHANGE_BUILD_LESS_AGENT_LISTENER_DIRECT = "e.engine.pipeline.bl.agent"
const val ROUTE_BUILD_LESS_AGENT_STARTUP_DISPATCH = "r.engine.pipeline.bl.agent.dispatch.startup"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import com.tencent.devops.common.pipeline.type.DispatchType
abstract class ThirdPartyAgentDispatch(
override var value: String,
open val agentType: AgentType,
open var workspace: String?,
// 第三方构建机用docker作为构建机
open val dockerInfo: ThirdPartyAgentDockerInfo?,
// 类型为REUSE_JOB时,被复用的job的value,防止同一个stage并发下拿不到agent,启动时填充
open var reusedInfo: ReusedInfo?
) : DispatchType(value) {
Expand All @@ -19,6 +22,9 @@ abstract class ThirdPartyAgentDispatch(

// 是否在复用锁定链上
fun hasReuseMutex(): Boolean = this.agentType.isReuse() || this.reusedInfo != null

fun isEnv() = this is ThirdPartyAgentEnvDispatchType
fun isSingle() = this is ThirdPartyAgentIDDispatchType
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,21 @@ package com.tencent.devops.common.pipeline.type.agent
import com.fasterxml.jackson.annotation.JsonProperty
import com.tencent.devops.common.api.util.EnvUtils
import com.tencent.devops.common.pipeline.type.BuildType
import io.swagger.v3.oas.annotations.media.Schema

data class ThirdPartyAgentEnvDispatchType(
@JsonProperty("value")
var envName: String,
@get:Schema(title = "共享环境时必填,值为提供共享环境的项目id")
override var workspace: String?,
// 共享环境时必填,值为提供共享环境的项目id
var envProjectId: String?,
@get:Schema(title = "工作空间")
var workspace: String?,
@get:Schema(title = "agent类型,默认NAME")
override val agentType: AgentType = AgentType.NAME,
// 第三方构建机用docker作为构建机
val dockerInfo: ThirdPartyAgentDockerInfo?,
override val dockerInfo: ThirdPartyAgentDockerInfo?,
override var reusedInfo: ReusedInfo?
) : ThirdPartyAgentDispatch(
value = envName,
workspace = workspace,
agentType = agentType,
dockerInfo = dockerInfo,
reusedInfo = reusedInfo
) {
override fun cleanDataBeforeSave() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import com.tencent.devops.common.api.util.EnvUtils
import com.tencent.devops.common.pipeline.type.BuildType

data class ThirdPartyAgentIDDispatchType(
@JsonProperty("value") var displayName: String,
var workspace: String?,
@JsonProperty("value")
var displayName: String,
override var workspace: String?,
override val agentType: AgentType = AgentType.NAME,
// 第三方构建机用docker作为构建机
val dockerInfo: ThirdPartyAgentDockerInfo?,
override val dockerInfo: ThirdPartyAgentDockerInfo?,
override var reusedInfo: ReusedInfo?
) : ThirdPartyAgentDispatch(
value = displayName,
agentType = agentType,
workspace = workspace,
dockerInfo = dockerInfo,
reusedInfo = reusedInfo
) {
override fun cleanDataBeforeSave() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ open class RedisLock(
private val redisOperation: RedisOperation,
private val lockKey: String,
private val expiredTimeInSeconds: Long,
private val sleepTime: Long = 100L
private val sleepTime: Long = 100L,
private var lockValue: String = UUID.randomUUID().toString()
) : AutoCloseable {
private val lockValue = UUID.randomUUID().toString()

/**
* 锁是否已经被占用
Expand Down Expand Up @@ -127,6 +127,12 @@ open class RedisLock(

private fun getLocalLock(): Any = localLock.get(lockKey)!!

fun getLockValue() = lockValue

fun setLockValue(lockValue: String) {
this.lockValue = lockValue
}

override fun close() {
unlock()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.tencent.devops.dispatch.api

import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.annotations.tags.Tag
import javax.ws.rs.Consumes
import javax.ws.rs.POST
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.QueryParam
import javax.ws.rs.core.MediaType

@Tag(name = "OP_AGENT", description = "agent相关")
@Path("/op/agent")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
interface OpAgentResource {

@Operation(summary = "修改灰度排队功能的项目或者流水线")
@POST
@Path("/update_gray_queue")
fun updateGrayQueue(
@QueryParam("projectId")
projectId: String,
@QueryParam("operate")
operate: String,
pipelineIds: Set<String>?
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,9 @@ const val AGENT_REUSE_MUTEX_WAIT_REUSED_ENV = "agentReuseMuteXWaitReusedEnv"
const val BK_ENV_NODE_DISABLE = "bkEnvNodeDisable"
const val BK_THIRD_JOB_ENV_CURR = "bkThirdJobEnvCurr" // 当前环境下所有构建机并发{0}已经超过配置的{1},排队{2}分钟
const val BK_THIRD_JOB_NODE_CURR = "bkThirdJobNodeCurr" // 当前环境下所有节点运行任务都超过了配置的{0},排队{1}分钟
// 构建机复用互斥,节点 {0} 已被 {1} 构建使用,剩余可调度空间不足,重新调度
const val AGENT_REUSE_MUTEX_RESERVE_REDISPATCH = "agentReuseMutexReserveRedispatch"
// 构建环境调度结束,已选取节点 {0}
const val BK_ENV_DISPATCH_AGENT = "bkEnvDispatchAgent"
// 尝试下发任务至节点 {0}
const val TRY_AGENT_DISPATCH = "tryAgentDispatch"
Loading

0 comments on commit 9a2e179

Please sign in to comment.