Skip to content

Commit

Permalink
feat:流水线/Job并发和排队数据落地 TencentBlueKing#10997
Browse files Browse the repository at this point in the history
  • Loading branch information
yongyiduan committed Oct 24, 2024
1 parent 2106854 commit b7d3dfd
Show file tree
Hide file tree
Showing 20 changed files with 457 additions and 90 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.tencent.devops.common.event.enums

enum class PipelineBuildStatusBroadCastEventType {
BUILD_QUEUE, /*构建排队,包含并发超限时排队、并发组排队。*/
BUILD_START, /*构建开始,不包含并发超限时排队、并发组排队。*/
BUILD_END, /*构建结束*/
BUILD_STAGE_START, /*stage开始*/
BUILD_STAGE_END, /*stage结束*/
BUILD_JOB_QUEUE, /*job排队,包含互斥组排队、构建机复用互斥排队、最大job并发排队。*/
BUILD_JOB_START, /*job开始,不包含BUILD_JOB_QUEUE。如果job SKIP或没有可执行的插件,就不会有该事件。*/
BUILD_JOB_END, /*job结束,job SKIP或没有可执行的插件时会有该事件。*/
BUILD_AGENT_START, /*构建机启动,现在仅包含第三方构建机*/
BUILD_TASK_START, /*插件开始*/
BUILD_TASK_END, /*插件结束*/
BUILD_TASK_PAUSE; /*插件前置暂停*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.devops.common.event.pojo.pipeline

import com.tencent.devops.common.event.annotation.Event
import com.tencent.devops.common.event.enums.ActionType
import com.tencent.devops.common.event.enums.PipelineBuildStatusBroadCastEventType
import com.tencent.devops.common.stream.constants.StreamBinding
import java.time.LocalDateTime

Expand All @@ -51,6 +52,8 @@ data class PipelineBuildStatusBroadCastEvent(
val buildStatus: String?,
val atomCode: String? = null,
val eventTime: LocalDateTime? = LocalDateTime.now(),
val type: PipelineBuildStatusBroadCastEventType? = null,
val labels: Map<String, String>? = null,
override var actionType: ActionType,
override var delayMills: Int = 0
) : IPipelineEvent(actionType, source, projectId, pipelineId, userId, delayMills)
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,34 @@ class CallBackData<out T>(
val data: T
)

/**
*
* @see com.tencent.devops.common.event.enums.PipelineBuildStatusBroadCastEventType
*/
enum class CallBackEvent {
DELETE_PIPELINE,
CREATE_PIPELINE,
UPDATE_PIPELINE,
STREAM_ENABLED,
RESTORE_PIPELINE,
BUILD_START,
BUILD_END,
BUILD_TASK_START,
BUILD_TASK_END,
BUILD_STAGE_START,
BUILD_STAGE_END,
BUILD_JOB_START,
BUILD_JOB_END,
BUILD_TASK_PAUSE,
PROJECT_CREATE,
PROJECT_UPDATE,
PROJECT_ENABLE,
PROJECT_DISABLE
DELETE_PIPELINE, /*流水线删除*/
CREATE_PIPELINE, /*流水线创建*/
UPDATE_PIPELINE, /*流水线更新,包括model和setting。*/
STREAM_ENABLED, /*stream ci 开启/关闭*/
RESTORE_PIPELINE, /*流水线恢复*/

BUILD_QUEUE, /*构建排队,包含并发超限时排队、并发组排队。*/
BUILD_START, /*构建开始,不包含并发超限时排队、并发组排队。*/
BUILD_END, /*构建结束*/
BUILD_STAGE_START, /*stage开始*/
BUILD_STAGE_END, /*stage结束*/
BUILD_JOB_QUEUE, /*job排队,包含互斥组排队、构建机复用互斥排队、最大job并发排队。*/
BUILD_JOB_START, /*job开始,不包含BUILD_JOB_QUEUE。如果job SKIP或没有可执行的插件,就不会有该事件。*/
BUILD_JOB_END, /*job结束,job SKIP或没有可执行的插件时会有该事件。*/
BUILD_AGENT_START, /*构建机启动,现在仅包含第三方构建机*/
BUILD_TASK_START, /*插件开始*/
BUILD_TASK_END, /*插件结束*/
BUILD_TASK_PAUSE, /*插件前置暂停*/

PROJECT_CREATE, /*项目创建*/
PROJECT_UPDATE, /*项目更新*/
PROJECT_ENABLE, /*项目启用*/
PROJECT_DISABLE /*项目禁用*/
}

data class PipelineEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import com.tencent.devops.common.pipeline.event.CallBackEvent

@Suppress("ComplexMethod")
object EventUtils {
private val callBackEventMap = CallBackEvent.values().associateBy { it.name }
fun PipelineBuildStatusBroadCastEvent.toEventType(): CallBackEvent? {
if (type != null && callBackEventMap[type!!.name] != null) {
return callBackEventMap[type!!.name]
}
if (!taskId.isNullOrBlank()) {
if (actionType == ActionType.START) {
return CallBackEvent.BUILD_TASK_START
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ import com.tencent.devops.common.api.util.timestampmilli
import com.tencent.devops.common.auth.api.AuthResourceType
import com.tencent.devops.common.client.Client
import com.tencent.devops.common.client.ClientTokenService
import com.tencent.devops.common.event.dispatcher.SampleEventDispatcher
import com.tencent.devops.common.event.enums.ActionType
import com.tencent.devops.common.event.enums.PipelineBuildStatusBroadCastEventType
import com.tencent.devops.common.event.pojo.pipeline.PipelineBuildStatusBroadCastEvent
import com.tencent.devops.common.notify.enums.NotifyType
import com.tencent.devops.common.pipeline.type.agent.ThirdPartyAgentDockerInfoDispatch
import com.tencent.devops.common.redis.RedisOperation
Expand All @@ -69,12 +73,6 @@ import com.tencent.devops.model.dispatch.tables.records.TDispatchThirdpartyAgent
import com.tencent.devops.notify.api.service.ServiceNotifyMessageTemplateResource
import com.tencent.devops.notify.pojo.SendNotifyMessageTemplateRequest
import com.tencent.devops.process.api.service.ServiceBuildResource
import org.jooq.DSLContext
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.dao.DeadlockLoserDataAccessException
import org.springframework.stereotype.Service
import java.time.LocalDateTime
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletableFuture
Expand All @@ -83,6 +81,12 @@ import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import javax.ws.rs.NotFoundException
import org.jooq.DSLContext
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.dao.DeadlockLoserDataAccessException
import org.springframework.stereotype.Service

@Service
@Suppress("ALL")
Expand All @@ -94,7 +98,8 @@ class ThirdPartyAgentService @Autowired constructor(
private val thirdPartyAgentBuildDao: ThirdPartyAgentBuildDao,
private val thirdPartyAgentDockerService: ThirdPartyAgentDockerService,
private val tokenService: ClientTokenService,
private val commonUtil: TPACommonUtil
private val commonUtil: TPACommonUtil,
private val pipelineEventDispatcher: SampleEventDispatcher
) {
@Value("\${thirdagent.workerErrorTemplate:#{null}}")
val workerErrorRtxTemplate: String? = null
Expand Down Expand Up @@ -205,7 +210,7 @@ class ThirdPartyAgentService @Autowired constructor(
if (agentResult.data!!.secretKey != secretKey) {
logger.warn(
"The secretKey($secretKey) is not match the expect one(${agentResult.data!!.secretKey} " +
"of project($projectId) and agent($agentId)"
"of project($projectId) and agent($agentId)"
)
throw NotFoundException("Fail to get the agent")
}
Expand Down Expand Up @@ -238,9 +243,26 @@ class ThirdPartyAgentService @Autowired constructor(
} catch (e: RemoteServiceException) {
logger.warn(
"notify agent task[$build.projectId|${build.buildId}|${build.vmSeqId}|$agentId]" +
" claim failed, cause: ${e.message} agent project($projectId)"
" claim failed, cause: ${e.message} agent project($projectId)"
)
}
pipelineEventDispatcher.dispatch(
// 第三方构建机启动
PipelineBuildStatusBroadCastEvent(
source = "third-party-agent-start-$agentId", projectId = build.projectId,
pipelineId = build.pipelineId, userId = build.startUser,
buildId = build.buildId, taskId = null, actionType = ActionType.START,
containerHashId = build.containerHashId, jobId = build.jobId, stageId = null,
stepId = null, atomCode = null, executeCount = build.executeCount,
buildStatus = null, type = PipelineBuildStatusBroadCastEventType.BUILD_AGENT_START,
labels = mapOf(
"agentId" to build.agentId,
"envHashId" to (build.envId?.let { HashUtil.encodeLongId(it) } ?: ""),
"nodeHashId" to (build.nodeId?.let { HashUtil.encodeLongId(it) } ?: ""),
"agentIp" to build.agentIp
)
)
)

// 第三方构建机docker启动获取镜像凭据
val dockerInfo = if (build.dockerInfo == null) {
Expand All @@ -256,9 +278,9 @@ class ThirdPartyAgentService @Autowired constructor(
// 只有凭据ID的参与计算
if (dockerInfo != null) {
if ((
dockerInfo.credential?.user.isNullOrBlank() &&
dockerInfo.credential?.password.isNullOrBlank()
) &&
dockerInfo.credential?.user.isNullOrBlank() &&
dockerInfo.credential?.password.isNullOrBlank()
) &&
!(dockerInfo.credential?.credentialId.isNullOrBlank())
) {
val (userName, password) = try {
Expand Down Expand Up @@ -470,7 +492,7 @@ class ThirdPartyAgentService @Autowired constructor(
private fun finishBuild(record: TDispatchThirdpartyAgentBuildRecord, success: Boolean) {
logger.info(
"Finish the third party agent(${record.agentId}) build(${record.buildId}) " +
"of seq(${record.vmSeqId}) and status(${record.status})"
"of seq(${record.vmSeqId}) and status(${record.status})"
)
val agentResult = client.get(ServiceThirdPartyAgentResource::class)
.getAgentByIdGlobal(record.projectId, record.agentId)
Expand Down Expand Up @@ -514,9 +536,9 @@ class ThirdPartyAgentService @Autowired constructor(
// 有些并发情况可能会导致在finish时AgentBuild状态没有被置为Done在这里改一下
val buildRecord = thirdPartyAgentBuildDao.get(dslContext, buildInfo.buildId, buildInfo.vmSeqId)
if (buildRecord != null && (
buildRecord.status != PipelineTaskStatus.DONE.status ||
buildRecord.status != PipelineTaskStatus.FAILURE.status
)
buildRecord.status != PipelineTaskStatus.DONE.status ||
buildRecord.status != PipelineTaskStatus.FAILURE.status
)
) {
thirdPartyAgentBuildDao.updateStatus(
dslContext = dslContext,
Expand Down Expand Up @@ -572,9 +594,9 @@ class ThirdPartyAgentService @Autowired constructor(
}
// 构建需要使用构建的项目id跳转,防止是共享agent,agent链接使用上报的项目Id即可
val buildUrl = "${HomeHostUtil.innerServerHost()}/console/pipeline/${buildRecord.projectId}/" +
"${buildRecord.pipelineId}/detail/${buildRecord.buildId}/executeDetail"
"${buildRecord.pipelineId}/detail/${buildRecord.buildId}/executeDetail"
val agentUrl = "${HomeHostUtil.innerServerHost()}/console/environment/$projectId/" +
"nodeDetail/${agentResult.data!!.nodeId}"
"nodeDetail/${agentResult.data!!.nodeId}"
client.get(ServiceNotifyMessageTemplateResource::class).sendNotifyMessageByTemplate(
SendNotifyMessageTemplateRequest(
templateCode = workerErrorRtxTemplate!!,
Expand Down Expand Up @@ -819,7 +841,7 @@ class ThirdPartyAgentService @Autowired constructor(
"oldIp" to agent.ip,
"newIp" to newIp,
"url" to "${HomeHostUtil.innerServerHost()}/console/environment/$projectId/" +
"nodeDetail/$nodeHashId"
"nodeDetail/$nodeHashId"
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ data class MetricsUserPO(
val status: String,
val atomCode: String?,
val eventType: CallBackEvent,
var endTime: LocalDateTime?
var endTime: LocalDateTime?,
val labels: String?
) {
constructor(event: PipelineBuildStatusBroadCastEvent) : this(
startTime = event.eventTime ?: LocalDateTime.now(),
Expand All @@ -56,15 +57,16 @@ data class MetricsUserPO(
status = checkNotNull(event.buildStatus),
atomCode = event.atomCode,
eventType = checkNotNull(event.toEventType()),
endTime = null
endTime = null,
labels = event.labels?.entries?.joinToString(separator = ";") { "${it.key}=${it.value}" },
)

companion object {
const val DELIMITER = ","
fun load(str: String?): MetricsUserPO? {
if (str.isNullOrBlank()) return null
val list = str.split(DELIMITER)
if (list.size != 10) return null
if (list.size < 10) return null
return MetricsUserPO(
LocalDateTime.ofInstant(Instant.ofEpochSecond(list[0].toLong()), ZoneOffset.ofHours(8)),
list[1],
Expand All @@ -77,7 +79,8 @@ data class MetricsUserPO(
CallBackEvent.valueOf(list[8]),
list[9].ifEmpty { null }?.let {
LocalDateTime.ofInstant(Instant.ofEpochSecond(it.toLong()), ZoneOffset.ofHours(8))
}
},
list.getOrNull(10)?.ifEmpty { null },
)
}
}
Expand All @@ -92,6 +95,7 @@ data class MetricsUserPO(
status + DELIMITER +
(atomCode ?: "") + DELIMITER +
eventType.name + DELIMITER +
(endTime?.toInstant(ZoneOffset.ofHours(8))?.epochSecond?.toString() ?: "")
(endTime?.toInstant(ZoneOffset.ofHours(8))?.epochSecond?.toString() ?: "") + DELIMITER +
(labels ?: "")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ import org.springframework.context.annotation.Configuration
class MetricsUserConfig {

companion object {
const val gaugeBuildQueueKey = "pipeline_queue_time_seconds"
const val gaugeBuildKey = "pipeline_running_time_seconds"
const val gaugeBuildStatusKey = "pipeline_status_info"
const val gaugeBuildJobQueueKey = "pipeline_job_queue_time_seconds"
const val gaugeBuildJobKey = "pipeline_job_running_time_seconds"
const val gaugeBuildAgentKey = "pipeline_agent_running_time_seconds"
const val gaugeBuildStepKey = "pipeline_step_running_time_seconds"
const val gaugeBuildStepStatusKey = "pipeline_step_status_info"
}
Expand Down
Loading

0 comments on commit b7d3dfd

Please sign in to comment.