Skip to content

Commit

Permalink
Merge pull request #10898 from yongyiduan/issue_10718
Browse files Browse the repository at this point in the history
feat:流水线并发运行时,支持限制并发个数和排队 #10718
  • Loading branch information
bkci-bot authored Oct 15, 2024
2 parents c770eba + 682061a commit a1ad335
Show file tree
Hide file tree
Showing 24 changed files with 186 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.tencent.devops.common.pipeline.pojo.setting.PipelineSetting
import com.tencent.devops.common.pipeline.pojo.setting.Subscription
import com.tencent.devops.common.pipeline.pojo.transfer.IfType
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_CONCURRENCY_GROUP_DEFAULT
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX
import com.tencent.devops.process.yaml.pojo.YamlVersion
import com.tencent.devops.process.yaml.transfer.VariableDefault.nullIfDefault
import com.tencent.devops.process.yaml.transfer.aspect.PipelineTransferAspectWrapper
Expand Down Expand Up @@ -96,6 +97,7 @@ class ModelTransfer @Autowired constructor(
waitQueueTimeMinute = yaml.concurrency?.queueTimeoutMinutes
?: VariableDefault.DEFAULT_WAIT_QUEUE_TIME_MINUTE,
maxQueueSize = yaml.concurrency?.queueLength ?: VariableDefault.DEFAULT_PIPELINE_SETTING_MAX_QUEUE_SIZE,
maxConRunningQueueSize = yaml.concurrency?.maxParallel ?: PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX,
labels = yaml2Labels(yamlInput),
pipelineAsCodeSettings = yamlInput.asCodeSettings,
successSubscriptionList = yamlNotice2Setting(
Expand Down Expand Up @@ -313,7 +315,18 @@ class ModelTransfer @Autowired constructor(
queueLength = setting.maxQueueSize
.nullIfDefault(VariableDefault.DEFAULT_PIPELINE_SETTING_MAX_QUEUE_SIZE),
queueTimeoutMinutes = setting.waitQueueTimeMinute
.nullIfDefault(VariableDefault.DEFAULT_WAIT_QUEUE_TIME_MINUTE)
.nullIfDefault(VariableDefault.DEFAULT_WAIT_QUEUE_TIME_MINUTE),
maxParallel = null
)
}
if (setting.runLockType == PipelineRunLockType.MULTIPLE) {
return Concurrency(
group = null,
cancelInProgress = null,
queueLength = null,
queueTimeoutMinutes = setting.waitQueueTimeMinute
.nullIfDefault(VariableDefault.DEFAULT_WAIT_QUEUE_TIME_MINUTE),
maxParallel = setting.maxConRunningQueueSize.nullIfDefault(PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX)
)
}
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@ data class Concurrency(
@JsonProperty("queue-length")
val queueLength: Int?,
@JsonProperty("queue-timeout-minutes")
val queueTimeoutMinutes: Int?
val queueTimeoutMinutes: Int?,
@JsonProperty("max-parallel")
val maxParallel: Int?
)
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,9 @@
"additionalProperties" : false,
"required" : [ "jobs" ],
"properties" : {
"enable" : {
"type" : "boolean"
},
"name" : {
"type" : "string"
},
Expand Down Expand Up @@ -1875,6 +1878,9 @@
"required" : [ "steps" ],
"additionalProperties" : false,
"properties" : {
"enable" : {
"type" : "boolean"
},
"name" : {
"type" : "string"
},
Expand Down Expand Up @@ -2110,6 +2116,11 @@
"type" : "integer",
"minimum" : 1,
"maximum" : 1440
},
"max-parallel" : {
"type" : "integer",
"minimum" : 1,
"maximum" : 200
}
}
},
Expand Down Expand Up @@ -2318,6 +2329,9 @@
"required" : [ "steps" ],
"additionalProperties" : false,
"properties" : {
"enable" : {
"type" : "boolean"
},
"name" : {
"type" : "string"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
package com.tencent.devops.common.pipeline.pojo.setting

import com.tencent.devops.common.api.pojo.PipelineAsCodeSettings
import com.tencent.devops.common.api.util.DateTimeUtil
import com.tencent.devops.common.pipeline.utils.PIPELINE_RES_NUM_MIN
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_CONCURRENCY_GROUP_DEFAULT
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_MAX_QUEUE_SIZE_DEFAULT
import com.tencent.devops.common.pipeline.utils.PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT
import com.tencent.devops.common.web.annotation.BkField
Expand Down Expand Up @@ -84,10 +82,10 @@ data class PipelineSetting(
var concurrencyGroup: String? = PIPELINE_SETTING_CONCURRENCY_GROUP_DEFAULT,
@get:Schema(title = "并发时,是否相同group取消正在执行的流水线", required = false)
var concurrencyCancelInProgress: Boolean = false,
@get:Schema(title = "并发构建数量限制", required = false)
var maxConRunningQueueSize: Int? = null, // MULTIPLE类型时,并发构建数量限制

// 平台系统控制相关配置 —— 不作为生成版本的配置
@get:Schema(title = "并发构建数量限制", required = false)
var maxConRunningQueueSize: Int? = PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX, // MULTIPLE类型时,并发构建数量限制
@get:Schema(title = "保存流水线编排的最大个数", required = false)
val maxPipelineResNum: Int = PIPELINE_RES_NUM_MIN, // 保存流水线编排的最大个数
@get:Schema(title = "重试时清理引擎变量表", required = false)
Expand All @@ -112,9 +110,7 @@ data class PipelineSetting(
version = 1,
desc = pipelineName,
maxPipelineResNum = maxPipelineResNum ?: PIPELINE_RES_NUM_MIN,
waitQueueTimeMinute = DateTimeUtil.minuteToSecond(
PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT
),
waitQueueTimeMinute = PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT,
maxQueueSize = PIPELINE_SETTING_MAX_QUEUE_SIZE_DEFAULT,
runLockType = PipelineRunLockType.MULTIPLE,
successSubscription = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const val PIPELINE_SETTING_MAX_CON_QUEUE_SIZE_MAX = 200
/**
* 流水线设置-最大排队时间-默认值 单位:分钟
*/
const val PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT = 1
const val PIPELINE_SETTING_WAIT_QUEUE_TIME_MINUTE_DEFAULT = 10

/**
* 流水线设置-CONCURRENCY GROUP 并发组-默认值
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.tencent.devops.common.es.ESClient
import com.tencent.devops.common.es.client.LogClient
import com.tencent.devops.common.redis.RedisLock
import com.tencent.devops.common.redis.RedisOperation
import com.tencent.devops.common.service.utils.RetryUtils
import com.tencent.devops.openapi.es.ESIndexUtils
import com.tencent.devops.openapi.es.ESMessage
import com.tencent.devops.openapi.es.IESService
Expand Down Expand Up @@ -122,27 +123,39 @@ class ESServiceImpl constructor(
while (true) {
val message = queue.take() ?: continue
buf.add(message)
if (buf.size == BULK_BUFFER_SIZE) {
val currentEpoch = System.currentTimeMillis()
try {
prepareIndex()
if (doAddMultiLines(buf) == 0) {
throw ExecuteException(
"None of lines is inserted successfully to ES "
)
} else {
buf.clear()
if (buf.size >= BULK_BUFFER_SIZE) {
RetryUtils.execute(action = object : RetryUtils.Action<Unit> {
override fun execute() {
doAdd()
}
} finally {
val elapse = System.currentTimeMillis() - currentEpoch
// #4265 当日志消息处理时间过长时打印消息内容
if (elapse >= INDEX_STORAGE_WARN_MILLIS) logger.warn(
" addBatchLogEvent spent too much time($elapse)"
)
}

override fun fail(e: Throwable) {
logger.error("add to es failed", e)
}
}, retryTime = 6, retryPeriodMills = 10000)
}
}
}

private fun doAdd() {
val currentEpoch = System.currentTimeMillis()
try {
prepareIndex()
if (doAddMultiLines(buf) == 0) {
throw ExecuteException(
"None of lines is inserted successfully to ES "
)
} else {
buf.clear()
}
} finally {
val elapse = System.currentTimeMillis() - currentEpoch
// #4265 当日志消息处理时间过长时打印消息内容
if (elapse >= INDEX_STORAGE_WARN_MILLIS) logger.warn(
" addBatchLogEvent spent too much time($elapse)"
)
}
}
}

private fun doAddMultiLines(logMessages: List<ESMessage>): Int {
Expand Down Expand Up @@ -269,7 +282,7 @@ class ESServiceImpl constructor(
return try {
logger.info(
"[${createClient.clusterName}][$index]|createIndex|: shards[${createClient.shards}]" +
" replicas[${createClient.replicas}] shardsPerNode[${createClient.shardsPerNode}]"
" replicas[${createClient.replicas}] shardsPerNode[${createClient.shardsPerNode}]"
)
val request = CreateIndexRequest(index)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ data class PipelineSettingVersion(
@get:Schema(title = "并发时,设定的group", required = false)
var concurrencyGroup: String?,
@get:Schema(title = "并发时,是否相同group取消正在执行的流水线", required = false)
var concurrencyCancelInProgress: Boolean?
var concurrencyCancelInProgress: Boolean?,
@get:Schema(title = "并发构建数量限制", required = false)
var maxConRunningQueueSize: Int? = null // MULTIPLE类型时,并发构建数量限制
) {
companion object {

Expand All @@ -90,7 +92,8 @@ data class PipelineSettingVersion(
maxQueueSize = setting.maxQueueSize,
buildNumRule = setting.buildNumRule,
concurrencyCancelInProgress = setting.concurrencyCancelInProgress,
concurrencyGroup = setting.concurrencyGroup
concurrencyGroup = setting.concurrencyGroup,
maxConRunningQueueSize = setting.maxConRunningQueueSize
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,11 @@ class PipelineSettingDao {
.set(SUCCESS_SUBSCRIPTION, JsonUtil.toJson(successSubscriptionList, false))
.set(FAILURE_SUBSCRIPTION, JsonUtil.toJson(failSubscriptionList, false))
.set(VERSION, setting.version)
.set(MAX_CON_RUNNING_QUEUE_SIZE, setting.maxConRunningQueueSize)
// pipelineAsCodeSettings 默认传空不更新
setting.pipelineAsCodeSettings?.let { self ->
insert.set(PIPELINE_AS_CODE_SETTINGS, JsonUtil.toJson(self, false))
}
// maxConRunningQueueSize 默认传空不更新
if (setting.maxConRunningQueueSize != null) {
insert.set(MAX_CON_RUNNING_QUEUE_SIZE, setting.maxConRunningQueueSize)
}
return insert.execute()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class PipelineSettingVersionDao {
SUCCESS_SUBSCRIPTION,
FAILURE_SUBSCRIPTION,
PIPELINE_AS_CODE_SETTINGS,
VERSION
VERSION,
MAX_CON_RUNNING_QUEUE_SIZE
).values(
id,
setting.projectId,
Expand All @@ -98,7 +99,8 @@ class PipelineSettingVersionDao {
setting.pipelineAsCodeSettings?.let { self ->
JsonUtil.toJson(self, false)
},
version
version,
setting.maxConRunningQueueSize ?: -1
).onDuplicateKeyUpdate()
.set(NAME, setting.pipelineName)
.set(DESC, setting.desc)
Expand All @@ -111,6 +113,7 @@ class PipelineSettingVersionDao {
.set(CONCURRENCY_CANCEL_IN_PROGRESS, setting.concurrencyCancelInProgress)
.set(SUCCESS_SUBSCRIPTION, JsonUtil.toJson(successSubscriptionList, false))
.set(FAILURE_SUBSCRIPTION, JsonUtil.toJson(failSubscriptionList, false))
.set(MAX_CON_RUNNING_QUEUE_SIZE, setting.maxConRunningQueueSize ?: -1)
.execute()
}
}
Expand Down Expand Up @@ -219,7 +222,8 @@ class PipelineSettingVersionDao {
maxQueueSize = t.maxQueueSize,
buildNumRule = t.buildNumRule,
concurrencyCancelInProgress = t.concurrencyCancelInProgress,
concurrencyGroup = t.concurrencyGroup
concurrencyGroup = t.concurrencyGroup,
maxConRunningQueueSize = t.maxConRunningQueueSize
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ import com.tencent.devops.process.pojo.BuildStageStatus
import com.tencent.devops.process.pojo.PipelineBuildMaterial
import com.tencent.devops.process.pojo.app.StartBuildContext
import com.tencent.devops.process.pojo.code.WebhookInfo
import java.sql.Timestamp
import java.time.LocalDateTime
import javax.ws.rs.core.Response
import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.DatePart
Expand All @@ -62,9 +65,6 @@ import org.jooq.RecordMapper
import org.jooq.SelectConditionStep
import org.jooq.impl.DSL
import org.springframework.stereotype.Repository
import java.sql.Timestamp
import java.time.LocalDateTime
import javax.ws.rs.core.Response

@Suppress("ALL")
@Repository
Expand Down Expand Up @@ -268,6 +268,29 @@ class PipelineBuildDao {
} else normal
}

fun countAllBuildWithStatus(
dslContext: DSLContext,
projectId: String,
pipelineId: String,
status: Set<BuildStatus>
): Int {
val normal = with(T_PIPELINE_BUILD_HISTORY) {
val where = dslContext.selectCount().from(this)
.where(PROJECT_ID.eq(projectId))
.and(PIPELINE_ID.eq(pipelineId))
.and(STATUS.`in`(status.map { it.ordinal }))
where.fetchOne(0, Int::class.java)!!
}
val debug = with(T_PIPELINE_BUILD_HISTORY_DEBUG) {
val where = dslContext.selectCount().from(this)
.where(PROJECT_ID.eq(projectId))
.and(PIPELINE_ID.eq(pipelineId))
.and(STATUS.`in`(status.map { it.ordinal }))
where.fetchOne(0, Int::class.java)!!
}
return normal + debug
}

fun getBuildTasksByConcurrencyGroup(
dslContext: DSLContext,
projectId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ data class InterceptData(
@get:Schema(title = "并发时,是否相同group取消正在执行的流水线", required = false)
val concurrencyCancelInProgress: Boolean = false,
@get:Schema(title = "并发构建数量限制", required = false)
val maxConRunningQueueSize: Int?, // MULTIPLE类型时,并发构建数量限制
val maxConRunningQueueSize: Int, // MULTIPLE类型时,并发构建数量限制
@get:Schema(title = "是否为重试操作", required = false)
val retry: Boolean? = false
)
Loading

0 comments on commit a1ad335

Please sign in to comment.