Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…o issue-10740

# Conflicts:
#	src/backend/ci/core/common/common-dispatch-sdk/src/main/kotlin/com/tencent/devops/common/dispatch.sdk/service/DispatchService.kt
  • Loading branch information
tangruotian committed Aug 23, 2024
2 parents 9c6f19d + 3ca3af4 commit 273aebb
Show file tree
Hide file tree
Showing 48 changed files with 781 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import com.tencent.devops.common.auth.api.pojo.ProjectConditionDTO
import com.tencent.devops.common.client.Client
import com.tencent.devops.common.redis.RedisOperation
import com.tencent.devops.common.service.trace.TraceTag
import com.tencent.devops.model.auth.tables.records.TAuthResourceGroupApplyRecord
import com.tencent.devops.project.api.service.ServiceProjectResource
import org.jooq.DSLContext
import org.jooq.impl.DSL
Expand Down Expand Up @@ -175,6 +176,9 @@ class RbacPermissionResourceGroupSyncService @Autowired constructor(
val limit = 100
var offset = 0
val startEpoch = System.currentTimeMillis()
val finalRecordIdsOfTimeOut = mutableListOf<Long>()
val finalRecordsOfPending = mutableListOf<TAuthResourceGroupApplyRecord>()
val finalRecordsOfSuccess = mutableListOf<TAuthResourceGroupApplyRecord>()
do {
logger.info("sync members of apply | start")
val records = authResourceGroupApplyDao.list(
Expand All @@ -197,35 +201,38 @@ class RbacPermissionResourceGroupSyncService @Autowired constructor(
false
}
}
if (recordIdsOfTimeOut.isNotEmpty()) {
authResourceGroupApplyDao.batchUpdate(
dslContext = dslContext,
ids = recordIdsOfTimeOut,
applyToGroupStatus = ApplyToGroupStatus.TIME_OUT
)
}
if (recordsOfPending.isNotEmpty()) {
authResourceGroupApplyDao.batchUpdate(
dslContext = dslContext,
ids = recordsOfPending.map { it.id },
applyToGroupStatus = ApplyToGroupStatus.PENDING
)
}
if (recordsOfSuccess.isNotEmpty()) {
recordsOfSuccess.forEach {
syncIamGroupMember(
projectCode = it.projectCode,
iamGroupId = it.iamGroupId
)
}
authResourceGroupApplyDao.batchUpdate(
dslContext = dslContext,
ids = recordsOfSuccess.map { it.id },
applyToGroupStatus = ApplyToGroupStatus.SUCCEED
)
}
finalRecordIdsOfTimeOut.addAll(recordIdsOfTimeOut)
finalRecordsOfPending.addAll(recordsOfPending)
finalRecordsOfSuccess.addAll(recordsOfSuccess)
offset += limit
} while (records.size == limit)
if (finalRecordIdsOfTimeOut.isNotEmpty()) {
authResourceGroupApplyDao.batchUpdate(
dslContext = dslContext,
ids = finalRecordIdsOfTimeOut,
applyToGroupStatus = ApplyToGroupStatus.TIME_OUT
)
}
if (finalRecordsOfPending.isNotEmpty()) {
authResourceGroupApplyDao.batchUpdate(
dslContext = dslContext,
ids = finalRecordsOfPending.map { it.id },
applyToGroupStatus = ApplyToGroupStatus.PENDING
)
}
if (finalRecordsOfSuccess.isNotEmpty()) {
finalRecordsOfSuccess.forEach {
syncIamGroupMember(
projectCode = it.projectCode,
iamGroupId = it.iamGroupId
)
}
authResourceGroupApplyDao.batchUpdate(
dslContext = dslContext,
ids = finalRecordsOfSuccess.map { it.id },
applyToGroupStatus = ApplyToGroupStatus.SUCCEED
)
}
logger.info("It take(${System.currentTimeMillis() - startEpoch})ms to sync members of apply")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ import com.tencent.devops.monitoring.pojo.DispatchStatus
import com.tencent.devops.process.api.service.ServiceBuildResource
import com.tencent.devops.process.api.service.ServicePipelineTaskResource
import com.tencent.devops.process.engine.common.VMUtils
import com.tencent.devops.process.engine.pojo.PipelineBuildContainer
import com.tencent.devops.process.engine.pojo.PipelineBuildTask
import com.tencent.devops.process.pojo.mq.PipelineAgentShutdownEvent
import com.tencent.devops.process.pojo.mq.PipelineAgentStartupEvent
import java.util.Date
Expand Down Expand Up @@ -170,28 +172,12 @@ class DispatchService constructor(
executeCount: Int?,
logTag: String?
): Boolean {
// 判断流水线当前container是否在运行中
val statusResult = client.get(ServicePipelineTaskResource::class).getContainerStartupInfo(
val (startBuildTask, buildContainer) = getContainerStartupInfo(
projectId = projectId,
buildId = buildId,
containerId = containerId,
taskId = VMUtils.genStartVMTaskId(containerId)
logTag = logTag
)
val startBuildTask = statusResult.data?.startBuildTask
val buildContainer = statusResult.data?.buildContainer
if (statusResult.isNotOk() || startBuildTask == null || buildContainer == null) {
logger.warn(
"The build event($logTag) fail to check if pipeline task is running " +
"because of statusResult(${statusResult.message})"
)
val errorMessage = I18nUtil.getCodeLanMessage(UNABLE_GET_PIPELINE_JOB_STATUS)
throw BuildFailureException(
errorType = ErrorType.SYSTEM,
errorCode = UNABLE_GET_PIPELINE_JOB_STATUS.toInt(),
formatErrorMessage = errorMessage,
errorMessage = errorMessage
)
}

var needStart = true
if (executeCount != startBuildTask.executeCount) {
Expand Down Expand Up @@ -231,7 +217,8 @@ class DispatchService constructor(
pipelineId = event.pipelineId,
buildId = event.buildId,
vmSeqId = event.vmSeqId,
e = e
e = e,
logTag = "$event"
)
}

Expand All @@ -240,14 +227,16 @@ class DispatchService constructor(
pipelineId: String,
buildId: String,
vmSeqId: String,
e: BuildFailureException
e: BuildFailureException,
logTag: String?
) {
onContainerFailure(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
vmSeqId = vmSeqId,
e = e
e = e,
logTag
)
DispatchLogRedisUtils.removeRedisExecuteCount(buildId)
}
Expand All @@ -257,10 +246,21 @@ class DispatchService constructor(
pipelineId: String,
buildId: String,
vmSeqId: String,
e: BuildFailureException
e: BuildFailureException,
logTag: String?
) {
logger.warn("[$buildId|$vmSeqId] Container startup failure")
try {
val (startBuildTask, buildContainer) = getContainerStartupInfo(
projectId = projectId,
buildId = buildId,
containerId = vmSeqId,
logTag = logTag
)
if (buildContainer.status.isCancel() || startBuildTask.status.isCancel()) {
return
}

client.get(ServiceBuildResource::class).setVMStatus(
projectId = projectId,
pipelineId = pipelineId,
Expand Down Expand Up @@ -318,6 +318,38 @@ class DispatchService constructor(
}
}

private fun getContainerStartupInfo(
projectId: String,
buildId: String,
containerId: String,
logTag: String?
): Pair<PipelineBuildTask, PipelineBuildContainer> {
// 判断流水线当前container是否在运行中
val statusResult = client.get(ServicePipelineTaskResource::class).getContainerStartupInfo(
projectId = projectId,
buildId = buildId,
containerId = containerId,
taskId = VMUtils.genStartVMTaskId(containerId)
)
val startBuildTask = statusResult.data?.startBuildTask
val buildContainer = statusResult.data?.buildContainer
if (statusResult.isNotOk() || startBuildTask == null || buildContainer == null) {
logger.warn(
"The build event($logTag) fail to check if pipeline task is running " +
"because of statusResult(${statusResult.message})"
)
val errorMessage = I18nUtil.getCodeLanMessage(UNABLE_GET_PIPELINE_JOB_STATUS)
throw BuildFailureException(
errorType = ErrorType.SYSTEM,
errorCode = UNABLE_GET_PIPELINE_JOB_STATUS.toInt(),
formatErrorMessage = errorMessage,
errorMessage = errorMessage
)
}

return Pair(startBuildTask, buildContainer)
}

private fun finishBuild(vmSeqId: String, buildId: String, executeCount: Int) {
val result = redisOperation.hget(secretInfoRedisKey(buildId), secretInfoRedisMapKey(vmSeqId, executeCount))
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.tencent.devops.common.api.pojo.Result
import com.tencent.devops.common.pipeline.pojo.JobHeartbeatRequest
import com.tencent.devops.common.web.annotation.BkField
import com.tencent.devops.engine.api.pojo.HeartBeatInfo
import com.tencent.devops.process.pojo.BuildJobResult
import com.tencent.devops.process.pojo.BuildTask
import com.tencent.devops.process.pojo.BuildTaskResult
import com.tencent.devops.process.pojo.BuildVariables
Expand Down Expand Up @@ -132,7 +133,9 @@ interface BuildJobResource {
vmSeqId: String,
@Parameter(description = "构建机名称", required = true)
@HeaderParam(AUTH_HEADER_DEVOPS_VM_NAME)
vmName: String
vmName: String,
@Parameter(description = "执行结果", required = false)
result: BuildJobResult? = null
): Result<Boolean>

@Operation(summary = "Job超时触发")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,10 @@ object ProcessMessageCode {
const val ERROR_NO_PERMISSION_PLUGIN_IN_TEMPLATE = "2101176" // 模版下存在无权限的插件
const val PIPELINE_ORCHESTRATIONS_NUMBER_ILLEGAL = "2101177" // 流水线编排数量非法
const val MAXIMUM_NUMBER_CONCURRENCY_ILLEGAL = "2101178" // 最大并发数量非法
const val PIPELINE_BUILD_HAS_ENDED_CANNOT_BE_CANCELED = "2101179" // 流水线: 流水线构建已结束,不能取消
const val GET_PIPELINE_ATOM_INFO_NO_PERMISSION = "2101180" // 无权访问插件{0}的流水线信息,请联系组件管理员
const val GROUP_IS_EXIST = "2101181" // 分组({0})已存在/group ({0}) is already exist
const val GROUP_LABEL_IS_EXIST = "2101182" // 分组标签({0})已存在/group label ({0}) is already exist
const val GET_PIPELINE_ATOM_INFO_NO_PERMISSION = "2101179" // 无权访问插件{0}的流水线信息,请联系组件管理员
const val GROUP_IS_EXIST = "2101180" // 分组({0})已存在/group ({0}) is already exist
const val GROUP_LABEL_IS_EXIST = "2101181" // 分组标签({0})已存在/group label ({0}) is already exist
const val PIPELINE_BUILD_HAS_ENDED_CANNOT_BE_OPERATE = "2101182" // 流水线: 流水线构建已结束,不能操作
const val ERROR_NO_PERMISSION_OPERATION_TEMPLATE = "2101183" // 用户没有操作模板的权限

const val ERROR_NO_PIPELINE_VERSION_EXISTS_BY_ID = "2101184" // 流水线版本[{0}]不存在
Expand Down Expand Up @@ -334,6 +334,11 @@ object ProcessMessageCode {
const val ERROR_TASK_NOT_ALLOWED_TO_BE_SKIPPED = "2101221" // task不允许被跳过
const val ERROR_INCORRECT_NOTIFICATION_TYPE = "2101230" // 通知类型配置不正确,请检查
const val ERROR_INCORRECT_NOTIFICATION_MESSAGE_CONTENT = "2101231" // 通知内容为空,请检查
const val ERROR_AGENT_REUSE_MUTEX_JOB_NULL = "2101232" // {0}使用流水线构建机复用互斥组需要声明具体的JobId,不能为空
// 流水线构建机复用互斥组节点 {0} 复用的 {1} 不存在,或非第三方构建机节点
const val ERROR_AGENT_REUSE_MUTEX_DEP_NULL_NODE = "2101233"
// 在 {0} 下,构建机复用互斥组节点 {1} 与被复用的 {2} 节点调度类型不同,AgentId和AgentEnv不能互相复用
const val ERROR_AGENT_REUSE_MUTEX_DEP_ERROR = "2101234"
const val ERROR_YAML_PUSH_CREATE_BRANCH = "2101235" // 创建分支失败: {0}
const val ERROR_YAML_PUSH_CREATE_BRANCH_NO_PERMISSION = "2101236" // 用户{0}没有代码库{1}的创建分支权限
const val ERROR_YAML_PUSH_CREATE_FILE = "2101237" // 创建文件失败: {0}
Expand All @@ -343,26 +348,19 @@ object ProcessMessageCode {
const val ERROR_GIT_PROJECT_NOT_FOUND_OR_NOT_PERMISSION = "2101241" // 工蜂仓库({0})不存在或没有权限访问
const val ERROR_TGIT_SERVER_EXCEPTION = "2101242" // 工蜂服务异常

const val ERROR_AGENT_REUSE_MUTEX_JOB_NULL = "2101232" // {0}使用流水线构建机复用互斥组需要声明具体的JobId,不能为空
// 流水线构建机复用互斥组节点 {0} 复用的 {1} 不存在,或非第三方构建机节点
const val ERROR_AGENT_REUSE_MUTEX_DEP_NULL_NODE = "2101233"
// 在 {0} 下,构建机复用互斥组节点 {1} 与被复用的 {2} 节点调度类型不同,AgentId和AgentEnv不能互相复用
const val ERROR_AGENT_REUSE_MUTEX_DEP_ERROR = "2101234"

const val ERROR_TIMER_TRIGGER_SVN_BRANCH_NOT_EMPTY = "2101243" // 定时触发SVN分支不能为空
const val ERROR_PIPELINE_ELEMENT_CHECK_FAILED = "2101244" // 流水线有效性校验失败
const val ERROR_TIMER_TRIGGER_REPO_NOT_FOUND = "2101245" // 定时触发代码库不存在
const val ERROR_TIMER_TRIGGER_NEED_ENABLE_PAC = "2101246" // 定时触发需要流水线开启PAC
const val ERROR_PIPELINE_TIMER_BRANCH_IS_EMPTY = "2101247" // 流水线定时触发分支为空
const val ERROR_PIPELINE_TIMER_BRANCH_NO_CHANGE = "2101248" // 定时触发分支{0}代码没有变更
const val ERROR_PIPELINE_TIMER_BRANCH_NOT_FOUND = "2101249" // 定时触发分支{0}不存在
const val ERROR_PIPELINE_TIMER_BRANCH_UNKNOWN = "2101252" // 定时触发分支{0}未知错误

const val ERROR_PIPELINE_JOB_ID_FORMAT = "2101250" // 流水线Job:{0}的jobId为空或长度超过{1}位
const val ERROR_PIPELINE_JOB_CONTROL_NODECURR = "2101251" // 流水线Job:{0}的单节点或总结点并发配置需要为小于1000的正整数

const val ERROR_PIPELINE_TIMER_BRANCH_UNKNOWN = "2101252" // 定时触发分支{0}未知错误
const val ERROR_PIPELINE_CONDITION_EXPRESSION_TOO_LONG = "2101253" // 自定义条件表达式{0}的长度超过{1}位
const val ERROR_PIPELINE_BUILD_START_PARAM_NO_EMPTY = "2101254" // 构建启动参数如果必填,不能为空
const val ERROR_REPEATEDLY_START_VM = "2101255" // 重复启动构建机,当前构建机的状态为:{0}

const val BK_SUCCESSFULLY_DISTRIBUTED = "bkSuccessfullyDistributed" // 跨项目构件分发成功,共分发了{0}个文件
const val BK_SUCCESSFULLY_FAILED = "bkSuccessfullyFailed" // 跨项目构件分发失败,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.devops.process.pojo

import io.swagger.v3.oas.annotations.media.Schema

@Schema(title = "流水线模型-job执行结果")
data class BuildJobResult(
@get:Schema(title = "错误原因", required = false)
val message: String? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ class BuildRecordContainerDao {
}
}

fun flushEndTimeWhenRetry(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
buildId: String,
containerId: String,
executeCount: Int
) {
with(TPipelineBuildRecordContainer.T_PIPELINE_BUILD_RECORD_CONTAINER) {
dslContext.update(this)
.setNull(END_TIME)
.where(
BUILD_ID.eq(buildId)
.and(PROJECT_ID.eq(projectId))
.and(PIPELINE_ID.eq(pipelineId))
.and(EXECUTE_COUNT.eq(executeCount))
.and(CONTAINER_ID.eq(containerId))
).execute()
}
}

fun getRecord(
dslContext: DSLContext,
projectId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,27 @@ class BuildRecordTaskDao {
}
}

fun flushEndTimeWhenRetry(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
buildId: String,
taskId: String,
executeCount: Int
) {
with(TPipelineBuildRecordTask.T_PIPELINE_BUILD_RECORD_TASK) {
dslContext.update(this)
.setNull(END_TIME)
.where(
BUILD_ID.eq(buildId)
.and(PROJECT_ID.eq(projectId))
.and(PIPELINE_ID.eq(pipelineId))
.and(EXECUTE_COUNT.eq(executeCount))
.and(TASK_ID.eq(taskId))
).execute()
}
}

class BuildRecordTaskJooqMapper : RecordMapper<TPipelineBuildRecordTaskRecord, BuildRecordTask> {
override fun map(record: TPipelineBuildRecordTaskRecord?): BuildRecordTask? {
return record?.run {
Expand Down
Loading

0 comments on commit 273aebb

Please sign in to comment.