Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:流水线并发运行时,支持限制并发个数和排队 #10718 #10898

Merged
merged 16 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
cb96736
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 2, 2024
56e3e9b
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 2, 2024
eaf02eb
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 2, 2024
9a3354d
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 12, 2024
de0f0be
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 13, 2024
b1653ca
Merge remote-tracking branch 'github/master' into issue_10718
yongyiduan Sep 13, 2024
a930ccc
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 13, 2024
3e20617
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 19, 2024
9dfa97d
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 19, 2024
05af55f
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 19, 2024
f3a214d
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 24, 2024
16dcb96
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Sep 24, 2024
f0db948
feat:流水线并发运行时,支持限制并发个数和排队 #10718 修改个openapi的优化
yongyiduan Oct 10, 2024
25e52a7
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Oct 10, 2024
b247dab
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Oct 10, 2024
682061a
feat:流水线并发运行时,支持限制并发个数和排队 #10718
yongyiduan Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.tencent.devops.common.pipeline.pojo.setting.PipelineSetting
import com.tencent.devops.common.pipeline.pojo.setting.Subscription
import com.tencent.devops.common.pipeline.pojo.transfer.IfType
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_CONCURRENCY_GROUP_DEFAULT
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX
import com.tencent.devops.process.yaml.pojo.YamlVersion
import com.tencent.devops.process.yaml.transfer.VariableDefault.nullIfDefault
import com.tencent.devops.process.yaml.transfer.aspect.PipelineTransferAspectWrapper
Expand Down Expand Up @@ -96,6 +97,7 @@ class ModelTransfer @Autowired constructor(
waitQueueTimeMinute = yaml.concurrency?.queueTimeoutMinutes
?: VariableDefault.DEFAULT_WAIT_QUEUE_TIME_MINUTE,
maxQueueSize = yaml.concurrency?.queueLength ?: VariableDefault.DEFAULT_PIPELINE_SETTING_MAX_QUEUE_SIZE,
maxConRunningQueueSize = yaml.concurrency?.maxParallel ?: PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX,
labels = yaml2Labels(yamlInput),
pipelineAsCodeSettings = yamlInput.asCodeSettings,
successSubscriptionList = yamlNotice2Setting(
Expand Down Expand Up @@ -313,7 +315,18 @@ class ModelTransfer @Autowired constructor(
queueLength = setting.maxQueueSize
.nullIfDefault(VariableDefault.DEFAULT_PIPELINE_SETTING_MAX_QUEUE_SIZE),
queueTimeoutMinutes = setting.waitQueueTimeMinute
.nullIfDefault(VariableDefault.DEFAULT_WAIT_QUEUE_TIME_MINUTE)
.nullIfDefault(VariableDefault.DEFAULT_WAIT_QUEUE_TIME_MINUTE),
maxParallel = null
)
}
if (setting.runLockType == PipelineRunLockType.MULTIPLE) {
return Concurrency(
group = null,
cancelInProgress = null,
queueLength = null,
queueTimeoutMinutes = setting.waitQueueTimeMinute
.nullIfDefault(VariableDefault.DEFAULT_WAIT_QUEUE_TIME_MINUTE),
maxParallel = setting.maxConRunningQueueSize.nullIfDefault(PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX)
)
}
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@ data class Concurrency(
@JsonProperty("queue-length")
val queueLength: Int?,
@JsonProperty("queue-timeout-minutes")
val queueTimeoutMinutes: Int?
val queueTimeoutMinutes: Int?,
@JsonProperty("max-parallel")
val maxParallel: Int?
)
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,9 @@
"additionalProperties" : false,
"required" : [ "jobs" ],
"properties" : {
"enable" : {
"type" : "boolean"
},
"name" : {
"type" : "string"
},
Expand Down Expand Up @@ -1875,6 +1878,9 @@
"required" : [ "steps" ],
"additionalProperties" : false,
"properties" : {
"enable" : {
"type" : "boolean"
},
"name" : {
"type" : "string"
},
Expand Down Expand Up @@ -2110,6 +2116,11 @@
"type" : "integer",
"minimum" : 1,
"maximum" : 1440
},
"max-parallel" : {
"type" : "integer",
"minimum" : 1,
"maximum" : 200
}
}
},
Expand Down Expand Up @@ -2318,6 +2329,9 @@
"required" : [ "steps" ],
"additionalProperties" : false,
"properties" : {
"enable" : {
"type" : "boolean"
},
"name" : {
"type" : "string"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
package com.tencent.devops.common.pipeline.pojo.setting

import com.tencent.devops.common.api.pojo.PipelineAsCodeSettings
import com.tencent.devops.common.api.util.DateTimeUtil
import com.tencent.devops.common.pipeline.utils.PIPELINE_RES_NUM_MIN
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_CONCURRENCY_GROUP_DEFAULT
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_MAX_QUEUE_SIZE_DEFAULT
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT
import com.tencent.devops.common.web.annotation.BkField
Expand Down Expand Up @@ -87,7 +85,7 @@ data class PipelineSetting(

// 平台系统控制相关配置 —— 不作为生成版本的配置
@get:Schema(title = "并发构建数量限制", required = false)
var maxConRunningQueueSize: Int? = PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX, // MULTIPLE类型时,并发构建数量限制
var maxConRunningQueueSize: Int? = null, // MULTIPLE类型时,并发构建数量限制
@get:Schema(title = "保存流水线编排的最大个数", required = false)
val maxPipelineResNum: Int = PIPELINE_RES_NUM_MIN, // 保存流水线编排的最大个数
@get:Schema(title = "重试时清理引擎变量表", required = false)
Expand All @@ -112,9 +110,7 @@ data class PipelineSetting(
version = 1,
desc = pipelineName,
maxPipelineResNum = maxPipelineResNum ?: PIPELINE_RES_NUM_MIN,
waitQueueTimeMinute = DateTimeUtil.minuteToSecond(
PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT
),
waitQueueTimeMinute = PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT,
maxQueueSize = PIPELINE_SETTING_MAX_QUEUE_SIZE_DEFAULT,
runLockType = PipelineRunLockType.MULTIPLE,
successSubscription = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const val PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX = 200
/**
* 流水线设置-最大排队时间-默认值 单位:分钟
*/
const val PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT = 1
const val PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT = 10

/**
* 流水线设置-CONCURRENCY GROUP 并发组-默认值
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ import com.tencent.devops.process.pojo.BuildStageStatus
import com.tencent.devops.process.pojo.PipelineBuildMaterial
import com.tencent.devops.process.pojo.app.StartBuildContext
import com.tencent.devops.process.pojo.code.WebhookInfo
import java.sql.Timestamp
import java.time.LocalDateTime
import javax.ws.rs.core.Response
import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.DatePart
Expand All @@ -62,9 +65,6 @@ import org.jooq.RecordMapper
import org.jooq.SelectConditionStep
import org.jooq.impl.DSL
import org.springframework.stereotype.Repository
import java.sql.Timestamp
import java.time.LocalDateTime
import javax.ws.rs.core.Response

@Suppress("ALL")
@Repository
Expand Down Expand Up @@ -268,6 +268,29 @@ class PipelineBuildDao {
} else normal
}

fun countAllBuildWithStatus(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
status: Set<BuildStatus>
): Int {
val normal = with(T_PIPELINE_BUILD_HISTORY) {
val where = dslContext.selectCount().from(this)
.where(PROJECT_ID.eq(projectId))
.and(PIPELINE_ID.eq(pipelineId))
.and(STATUS.`in`(status.map { it.ordinal }))
where.fetchOne(0, Int::class.java)!!
}
val debug = with(T_PIPELINE_BUILD_HISTORY_DEBUG) {
val where = dslContext.selectCount().from(this)
.where(PROJECT_ID.eq(projectId))
.and(PIPELINE_ID.eq(pipelineId))
.and(STATUS.`in`(status.map { it.ordinal }))
where.fetchOne(0, Int::class.java)!!
}
return normal + debug
}

fun getBuildTasksByConcurrencyGroup(
dslContext: DSLContext,
projectId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ data class InterceptData(
@get:Schema(title = "并发时,是否相同group取消正在执行的流水线", required = false)
val concurrencyCancelInProgress: Boolean = false,
@get:Schema(title = "并发构建数量限制", required = false)
val maxConRunningQueueSize: Int?, // MULTIPLE类型时,并发构建数量限制
val maxConRunningQueueSize: Int, // MULTIPLE类型时,并发构建数量限制
@get:Schema(title = "是否为重试操作", required = false)
val retry: Boolean? = false
)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.tencent.devops.common.event.dispatcher.pipeline.PipelineEventDispatch
import com.tencent.devops.common.log.utils.BuildLogPrinter
import com.tencent.devops.common.pipeline.enums.BuildStatus
import com.tencent.devops.common.pipeline.pojo.setting.PipelineRunLockType
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX
import com.tencent.devops.common.redis.RedisOperation
import com.tencent.devops.common.web.utils.I18nUtil
import com.tencent.devops.process.bean.PipelineUrlBean
Expand All @@ -45,7 +46,7 @@ import com.tencent.devops.process.engine.pojo.event.PipelineBuildCancelEvent
import com.tencent.devops.process.engine.service.PipelineRedisService
import com.tencent.devops.process.engine.service.PipelineRuntimeExtService
import com.tencent.devops.process.engine.service.PipelineRuntimeService
import com.tencent.devops.process.engine.service.PipelineTaskService
import kotlin.math.max
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
Expand All @@ -61,7 +62,6 @@ class QueueInterceptor @Autowired constructor(
private val pipelineRuntimeExtService: PipelineRuntimeExtService,
private val pipelineEventDispatcher: PipelineEventDispatcher,
private val buildLogPrinter: BuildLogPrinter,
private val pipelineTaskService: PipelineTaskService,
private val redisOperation: RedisOperation,
private val pipelineRedisService: PipelineRedisService,
private val pipelineUrlBean: PipelineUrlBean
Expand All @@ -86,6 +86,7 @@ class QueueInterceptor @Autowired constructor(
language = I18nUtil.getDefaultLocaleLanguage()
)
)

runLockType == PipelineRunLockType.SINGLE || runLockType == PipelineRunLockType.SINGLE_LOCK ->
checkRunLockWithSingleType(
task = task,
Expand All @@ -94,21 +95,27 @@ class QueueInterceptor @Autowired constructor(
runningCount = buildSummaryRecord.runningCount,
queueCount = buildSummaryRecord.queueCount
)

runLockType == PipelineRunLockType.GROUP_LOCK ->
checkRunLockWithGroupType(
task = task,
latestBuildId = buildSummaryRecord.latestBuildId,
latestStartUser = buildSummaryRecord.latestStartUser,
runningCount = buildSummaryRecord.runningCount
)
task.maxConRunningQueueSize!! <= (buildSummaryRecord.queueCount + buildSummaryRecord.runningCount) ->

(buildSummaryRecord.queueCount + buildSummaryRecord.runningCount) >= max(
PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX,
task.maxConRunningQueueSize
) ->
Response(
status = ERROR_PIPELINE_QUEUE_FULL.toInt(),
message = MessageUtil.getMessageByLocale(
messageCode = BK_MAX_PARALLEL,
language = I18nUtil.getDefaultLocaleLanguage()
) + " ${task.maxConRunningQueueSize}"
) + " ${max(PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX, task.maxConRunningQueueSize)}"
)

else -> Response(data = BuildStatus.RUNNING)
}
}
Expand All @@ -130,6 +137,7 @@ class QueueInterceptor @Autowired constructor(
// 设置了最大排队数量限制为0,但此时没有构建正在执行
task.maxQueueSize == 0 && runningCount == 0 && queueCount == 0 ->
Response(data = BuildStatus.RUNNING)

task.maxQueueSize == 0 && (runningCount > 0 || queueCount > 0) ->
Response(
status = ERROR_PIPELINE_QUEUE_FULL.toInt(),
Expand All @@ -138,6 +146,7 @@ class QueueInterceptor @Autowired constructor(
language = I18nUtil.getDefaultLocaleLanguage()
)
)

queueCount >= task.maxQueueSize -> {
if (groupName == null) {
outQueueCancelBySingle(
Expand Down Expand Up @@ -228,7 +237,8 @@ class QueueInterceptor @Autowired constructor(
buildId = buildInfo.buildId,
message = I18nUtil.getCodeLanMessage(
messageCode = ProcessMessageCode.BK_BUILD_QUEUE_WAIT_FOR_CONCURRENCY,
params = arrayOf(groupName,
params = arrayOf(
groupName,
"<a target='_blank' href='$detailUrl'>${task.buildId}</a>"
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ class PipelineRepositoryService constructor(
)
}
if (runLockType == PipelineRunLockType.SINGLE ||
runLockType == PipelineRunLockType.SINGLE_LOCK || runLockType == PipelineRunLockType.GROUP_LOCK
runLockType == PipelineRunLockType.SINGLE_LOCK ||
runLockType == PipelineRunLockType.GROUP_LOCK ||
runLockType == PipelineRunLockType.MULTIPLE
) {
if (waitQueueTimeMinute < PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_MIN ||
waitQueueTimeMinute > PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_MAX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,12 @@ class PipelineRuntimeExtService @Autowired constructor(
}

fun existQueue(projectId: String, pipelineId: String, buildId: String, buildStatus: BuildStatus): Boolean {
val redisLock = PipelineNextQueueLock(redisOperation, pipelineId, buildId)
try {
redisLock.lock()
return pipelineBuildDao.updateStatus(
dslContext = dslContext,
projectId = projectId,
buildId = buildId,
oldBuildStatus = buildStatus,
newBuildStatus = BuildStatus.UNEXEC
)
} finally {
redisLock.unlock()
}
return pipelineBuildDao.updateStatus(
dslContext = dslContext,
projectId = projectId,
buildId = buildId,
oldBuildStatus = buildStatus,
newBuildStatus = BuildStatus.UNEXEC
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ import com.tencent.devops.process.utils.PIPELINE_NAME
import com.tencent.devops.process.utils.PIPELINE_RETRY_COUNT
import com.tencent.devops.process.utils.PIPELINE_START_TASK_ID
import com.tencent.devops.process.utils.PipelineVarUtil
import java.time.LocalDateTime
import java.util.Date
import java.util.concurrent.TimeUnit
import org.jooq.DSLContext
import org.jooq.Result
import org.jooq.impl.DSL
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.time.LocalDateTime
import java.util.Date
import java.util.concurrent.TimeUnit

/**
* 流水线运行时相关的服务
Expand Down Expand Up @@ -227,6 +227,13 @@ class PipelineRuntimeService @Autowired constructor(
return pipelineBuildDao.getBuildInfo(dslContext, projectId, pipelineId, buildId)
}

fun getRunningBuildCount(
projectId: String,
pipelineId: String
): Int {
return pipelineBuildDao.countAllBuildWithStatus(dslContext, projectId, pipelineId, setOf(BuildStatus.RUNNING))
}

/** 根据状态信息获取并发组构建列表
* @return Pair( PIPELINE_ID , BUILD_ID )
*/
Expand Down Expand Up @@ -535,6 +542,7 @@ class PipelineRuntimeService @Autowired constructor(
}
result.distinct()
}

else -> emptyList()
}
return if (search.isNullOrBlank()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class PipelineSettingService @Autowired constructor(
TimeUnit.HOURS.toMillis(1)
}
setting.runLockType == PipelineRunLockType.SINGLE ||
setting.runLockType == PipelineRunLockType.GROUP_LOCK -> {
setting.runLockType == PipelineRunLockType.GROUP_LOCK ||
setting.runLockType == PipelineRunLockType.MULTIPLE -> {
TimeUnit.MINUTES.toMillis(setting.waitQueueTimeMinute.toLong())
}
else -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import com.tencent.devops.common.pipeline.enums.BuildFormPropertyType
import com.tencent.devops.common.pipeline.enums.ChannelCode
import com.tencent.devops.common.pipeline.enums.StartType
import com.tencent.devops.common.pipeline.pojo.BuildParameters
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX
import com.tencent.devops.common.redis.concurrent.SimpleRateLimiter
import com.tencent.devops.common.service.trace.TraceTag
import com.tencent.devops.process.bean.PipelineUrlBean
Expand Down Expand Up @@ -244,7 +245,7 @@ class PipelineBuildService(
maxQueueSize = setting.maxQueueSize,
concurrencyGroup = context.concurrencyGroup,
concurrencyCancelInProgress = setting.concurrencyCancelInProgress,
maxConRunningQueueSize = setting.maxConRunningQueueSize,
maxConRunningQueueSize = setting.maxConRunningQueueSize ?: PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX,
retry = pipelineParamMap[PIPELINE_RETRY_COUNT] != null
)
)
Expand Down
Loading
Loading