Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:流水线/Job并发和排队数据落地 #10997 #11134

Merged
merged 9 commits into from
Nov 1, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.tencent.devops.common.event.enums

enum class PipelineBuildStatusBroadCastEventType {
BUILD_QUEUE, /*构建排队,包含并发超限时排队、并发组排队。*/
BUILD_START, /*构建开始,不包含并发超限时排队、并发组排队。*/
BUILD_END, /*构建结束*/
BUILD_STAGE_START, /*stage开始*/
BUILD_STAGE_END, /*stage结束*/
BUILD_JOB_QUEUE, /*job排队,包含互斥组排队、构建机复用互斥排队、最大job并发排队。*/
BUILD_JOB_START, /*job开始,不包含BUILD_JOB_QUEUE。如果job SKIP或没有可执行的插件,就不会有该事件。*/
BUILD_JOB_END, /*job结束,job SKIP或没有可执行的插件时会有该事件。*/
BUILD_AGENT_START, /*构建机启动,现在仅包含第三方构建机*/
BUILD_TASK_START, /*插件开始*/
BUILD_TASK_END, /*插件结束*/
BUILD_TASK_PAUSE; /*插件前置暂停*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package com.tencent.devops.common.event.pojo.pipeline

import com.tencent.devops.common.event.annotation.Event
import com.tencent.devops.common.event.enums.ActionType
import com.tencent.devops.common.event.enums.PipelineBuildStatusBroadCastEventType
import com.tencent.devops.common.stream.constants.StreamBinding
import java.time.LocalDateTime

Expand All @@ -51,6 +52,8 @@ data class PipelineBuildStatusBroadCastEvent(
val buildStatus: String?,
val atomCode: String? = null,
val eventTime: LocalDateTime? = LocalDateTime.now(),
val type: PipelineBuildStatusBroadCastEventType? = null,
val labels: Map<String, String>? = null,
override var actionType: ActionType,
override var delayMills: Int = 0
) : IPipelineEvent(actionType, source, projectId, pipelineId, userId, delayMills)
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,34 @@ class CallBackData<out T>(
val data: T
)

/**
*
* @see com.tencent.devops.common.event.enums.PipelineBuildStatusBroadCastEventType
*/
enum class CallBackEvent {
DELETE_PIPELINE,
CREATE_PIPELINE,
UPDATE_PIPELINE,
STREAM_ENABLED,
RESTORE_PIPELINE,
BUILD_START,
BUILD_END,
BUILD_TASK_START,
BUILD_TASK_END,
BUILD_STAGE_START,
BUILD_STAGE_END,
BUILD_JOB_START,
BUILD_JOB_END,
BUILD_TASK_PAUSE,
PROJECT_CREATE,
PROJECT_UPDATE,
PROJECT_ENABLE,
PROJECT_DISABLE
DELETE_PIPELINE, /*流水线删除*/
CREATE_PIPELINE, /*流水线创建*/
UPDATE_PIPELINE, /*流水线更新,包括model和setting。*/
STREAM_ENABLED, /*stream ci 开启/关闭*/
RESTORE_PIPELINE, /*流水线恢复*/

BUILD_QUEUE, /*构建排队,包含并发超限时排队、并发组排队。*/
BUILD_START, /*构建开始,不包含并发超限时排队、并发组排队。*/
BUILD_END, /*构建结束*/
BUILD_STAGE_START, /*stage开始*/
BUILD_STAGE_END, /*stage结束*/
BUILD_JOB_QUEUE, /*job排队,包含互斥组排队、构建机复用互斥排队、最大job并发排队。*/
BUILD_JOB_START, /*job开始,不包含BUILD_JOB_QUEUE。如果job SKIP或没有可执行的插件,就不会有该事件。*/
BUILD_JOB_END, /*job结束,job SKIP或没有可执行的插件时会有该事件。*/
BUILD_AGENT_START, /*构建机启动,现在仅包含第三方构建机*/
BUILD_TASK_START, /*插件开始*/
BUILD_TASK_END, /*插件结束*/
BUILD_TASK_PAUSE, /*插件前置暂停*/

PROJECT_CREATE, /*项目创建*/
PROJECT_UPDATE, /*项目更新*/
PROJECT_ENABLE, /*项目启用*/
PROJECT_DISABLE /*项目禁用*/
}

data class PipelineEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import com.tencent.devops.common.pipeline.event.CallBackEvent

@Suppress("ComplexMethod")
object EventUtils {
private val callBackEventMap = CallBackEvent.values().associateBy { it.name }
fun PipelineBuildStatusBroadCastEvent.toEventType(): CallBackEvent? {
if (type != null && callBackEventMap[type!!.name] != null) {
return callBackEventMap[type!!.name]
}
if (!taskId.isNullOrBlank()) {
if (actionType == ActionType.START) {
return CallBackEvent.BUILD_TASK_START
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ import com.tencent.devops.common.api.util.timestampmilli
import com.tencent.devops.common.auth.api.AuthResourceType
import com.tencent.devops.common.client.Client
import com.tencent.devops.common.client.ClientTokenService
import com.tencent.devops.common.event.dispatcher.SampleEventDispatcher
import com.tencent.devops.common.event.enums.ActionType
import com.tencent.devops.common.event.enums.PipelineBuildStatusBroadCastEventType
import com.tencent.devops.common.event.pojo.pipeline.PipelineBuildStatusBroadCastEvent
import com.tencent.devops.common.notify.enums.NotifyType
import com.tencent.devops.common.pipeline.enums.BuildStatus
import com.tencent.devops.common.pipeline.type.agent.ThirdPartyAgentDockerInfoDispatch
import com.tencent.devops.common.redis.RedisOperation
import com.tencent.devops.common.service.utils.HomeHostUtil
Expand All @@ -69,12 +74,6 @@ import com.tencent.devops.model.dispatch.tables.records.TDispatchThirdpartyAgent
import com.tencent.devops.notify.api.service.ServiceNotifyMessageTemplateResource
import com.tencent.devops.notify.pojo.SendNotifyMessageTemplateRequest
import com.tencent.devops.process.api.service.ServiceBuildResource
import org.jooq.DSLContext
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.dao.DeadlockLoserDataAccessException
import org.springframework.stereotype.Service
import java.time.LocalDateTime
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletableFuture
Expand All @@ -83,6 +82,12 @@ import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import javax.ws.rs.NotFoundException
import org.jooq.DSLContext
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.dao.DeadlockLoserDataAccessException
import org.springframework.stereotype.Service

@Service
@Suppress("ALL")
Expand All @@ -94,7 +99,8 @@ class ThirdPartyAgentService @Autowired constructor(
private val thirdPartyAgentBuildDao: ThirdPartyAgentBuildDao,
private val thirdPartyAgentDockerService: ThirdPartyAgentDockerService,
private val tokenService: ClientTokenService,
private val commonUtil: TPACommonUtil
private val commonUtil: TPACommonUtil,
private val pipelineEventDispatcher: SampleEventDispatcher
) {
@Value("\${thirdagent.workerErrorTemplate:#{null}}")
val workerErrorRtxTemplate: String? = null
Expand Down Expand Up @@ -205,7 +211,7 @@ class ThirdPartyAgentService @Autowired constructor(
if (agentResult.data!!.secretKey != secretKey) {
logger.warn(
"The secretKey($secretKey) is not match the expect one(${agentResult.data!!.secretKey} " +
"of project($projectId) and agent($agentId)"
"of project($projectId) and agent($agentId)"
)
throw NotFoundException("Fail to get the agent")
}
Expand Down Expand Up @@ -238,9 +244,27 @@ class ThirdPartyAgentService @Autowired constructor(
} catch (e: RemoteServiceException) {
logger.warn(
"notify agent task[$build.projectId|${build.buildId}|${build.vmSeqId}|$agentId]" +
" claim failed, cause: ${e.message} agent project($projectId)"
" claim failed, cause: ${e.message} agent project($projectId)"
)
}
pipelineEventDispatcher.dispatch(
// 第三方构建机启动
PipelineBuildStatusBroadCastEvent(
source = "third-party-agent-start-$agentId", projectId = build.projectId,
pipelineId = build.pipelineId, userId = "",
buildId = build.buildId, taskId = null, actionType = ActionType.START,
containerHashId = build.containerHashId, jobId = build.jobId, stageId = null,
stepId = null, atomCode = null, executeCount = build.executeCount,
buildStatus = BuildStatus.RUNNING.name,
type = PipelineBuildStatusBroadCastEventType.BUILD_AGENT_START,
labels = mapOf(
"agentId" to build.agentId,
"envHashId" to (build.envId?.let { HashUtil.encodeLongId(it) } ?: ""),
"nodeHashId" to (build.nodeId?.let { HashUtil.encodeLongId(it) } ?: ""),
"agentIp" to build.agentIp
)
)
)

// 第三方构建机docker启动获取镜像凭据
val dockerInfo = if (build.dockerInfo == null) {
Expand All @@ -256,9 +280,9 @@ class ThirdPartyAgentService @Autowired constructor(
// 只有凭据ID的参与计算
if (dockerInfo != null) {
if ((
dockerInfo.credential?.user.isNullOrBlank() &&
dockerInfo.credential?.password.isNullOrBlank()
) &&
dockerInfo.credential?.user.isNullOrBlank() &&
dockerInfo.credential?.password.isNullOrBlank()
) &&
!(dockerInfo.credential?.credentialId.isNullOrBlank())
) {
val (userName, password) = try {
Expand Down Expand Up @@ -470,7 +494,7 @@ class ThirdPartyAgentService @Autowired constructor(
private fun finishBuild(record: TDispatchThirdpartyAgentBuildRecord, success: Boolean) {
logger.info(
"Finish the third party agent(${record.agentId}) build(${record.buildId}) " +
"of seq(${record.vmSeqId}) and status(${record.status})"
"of seq(${record.vmSeqId}) and status(${record.status})"
)
val agentResult = client.get(ServiceThirdPartyAgentResource::class)
.getAgentByIdGlobal(record.projectId, record.agentId)
Expand Down Expand Up @@ -514,9 +538,9 @@ class ThirdPartyAgentService @Autowired constructor(
// 有些并发情况可能会导致在finish时AgentBuild状态没有被置为Done在这里改一下
val buildRecord = thirdPartyAgentBuildDao.get(dslContext, buildInfo.buildId, buildInfo.vmSeqId)
if (buildRecord != null && (
buildRecord.status != PipelineTaskStatus.DONE.status ||
buildRecord.status != PipelineTaskStatus.FAILURE.status
)
buildRecord.status != PipelineTaskStatus.DONE.status ||
buildRecord.status != PipelineTaskStatus.FAILURE.status
)
) {
thirdPartyAgentBuildDao.updateStatus(
dslContext = dslContext,
Expand Down Expand Up @@ -572,9 +596,9 @@ class ThirdPartyAgentService @Autowired constructor(
}
// 构建需要使用构建的项目id跳转,防止是共享agent,agent链接使用上报的项目Id即可
val buildUrl = "${HomeHostUtil.innerServerHost()}/console/pipeline/${buildRecord.projectId}/" +
"${buildRecord.pipelineId}/detail/${buildRecord.buildId}/executeDetail"
"${buildRecord.pipelineId}/detail/${buildRecord.buildId}/executeDetail"
val agentUrl = "${HomeHostUtil.innerServerHost()}/console/environment/$projectId/" +
"nodeDetail/${agentResult.data!!.nodeId}"
"nodeDetail/${agentResult.data!!.nodeId}"
client.get(ServiceNotifyMessageTemplateResource::class).sendNotifyMessageByTemplate(
SendNotifyMessageTemplateRequest(
templateCode = workerErrorRtxTemplate!!,
Expand Down Expand Up @@ -819,7 +843,7 @@ class ThirdPartyAgentService @Autowired constructor(
"oldIp" to agent.ip,
"newIp" to newIp,
"url" to "${HomeHostUtil.innerServerHost()}/console/environment/$projectId/" +
"nodeDetail/$nodeHashId"
"nodeDetail/$nodeHashId"
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ data class MetricsUserPO(
val status: String,
val atomCode: String?,
val eventType: CallBackEvent,
var endTime: LocalDateTime?
var endTime: LocalDateTime?,
val labels: String?
) {
constructor(event: PipelineBuildStatusBroadCastEvent) : this(
startTime = event.eventTime ?: LocalDateTime.now(),
Expand All @@ -56,15 +57,16 @@ data class MetricsUserPO(
status = checkNotNull(event.buildStatus),
atomCode = event.atomCode,
eventType = checkNotNull(event.toEventType()),
endTime = null
endTime = null,
labels = event.labels?.entries?.joinToString(separator = ";") { "${it.key}=${it.value}" }
)

companion object {
const val DELIMITER = ","
fun load(str: String?): MetricsUserPO? {
if (str.isNullOrBlank()) return null
val list = str.split(DELIMITER)
if (list.size != 10) return null
if (list.size < 10) return null
return MetricsUserPO(
LocalDateTime.ofInstant(Instant.ofEpochSecond(list[0].toLong()), ZoneOffset.ofHours(8)),
list[1],
Expand All @@ -77,7 +79,8 @@ data class MetricsUserPO(
CallBackEvent.valueOf(list[8]),
list[9].ifEmpty { null }?.let {
LocalDateTime.ofInstant(Instant.ofEpochSecond(it.toLong()), ZoneOffset.ofHours(8))
}
},
list.getOrNull(10)?.ifEmpty { null }
)
}
}
Expand All @@ -92,6 +95,7 @@ data class MetricsUserPO(
status + DELIMITER +
(atomCode ?: "") + DELIMITER +
eventType.name + DELIMITER +
(endTime?.toInstant(ZoneOffset.ofHours(8))?.epochSecond?.toString() ?: "")
(endTime?.toInstant(ZoneOffset.ofHours(8))?.epochSecond?.toString() ?: "") + DELIMITER +
(labels ?: "")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ import org.springframework.context.annotation.Configuration
class MetricsUserConfig {

companion object {
const val gaugeBuildQueueKey = "pipeline_queue_time_seconds"
const val gaugeBuildKey = "pipeline_running_time_seconds"
const val gaugeBuildStatusKey = "pipeline_status_info"
const val gaugeBuildJobQueueKey = "pipeline_job_queue_time_seconds"
const val gaugeBuildJobKey = "pipeline_job_running_time_seconds"
const val gaugeBuildAgentKey = "pipeline_agent_running_time_seconds"
const val gaugeBuildStepKey = "pipeline_step_running_time_seconds"
const val gaugeBuildStepStatusKey = "pipeline_step_status_info"
}
Expand Down Expand Up @@ -85,9 +88,12 @@ class MetricsUserConfig {
fun scrape(): String {
return meterRegistry.scrape(
TextFormat.CONTENT_TYPE_004, setOf(
gaugeBuildQueueKey,
gaugeBuildKey,
gaugeBuildStatusKey,
gaugeBuildJobQueueKey,
gaugeBuildJobKey,
gaugeBuildAgentKey,
gaugeBuildStepKey,
gaugeBuildStepStatusKey
)
Expand Down
Loading
Loading