Skip to content

Commit

Permalink
Merge pull request #4646 from carlyin0801/issue_4645_misc_mutl_thread…
Browse files Browse the repository at this point in the history
…_clear_fix

perf:misc多线程清理构建数据优化 #4645
  • Loading branch information
irwinsun authored Jul 12, 2021
2 parents 3159152 + fffa126 commit e052a9f
Showing 1 changed file with 68 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ import java.time.LocalDateTime
import java.util.concurrent.Callable
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

@Component
@Suppress("ALL")
Expand All @@ -73,22 +76,38 @@ class PipelineBuildHistoryDataClearJob @Autowired constructor(
"pipeline:build:history:data:clear:project:id"
private const val PIPELINE_BUILD_HISTORY_DATA_CLEAR_PROJECT_LIST_KEY =
"pipeline:build:history:data:clear:project:list"
private const val PIPELINE_BUILD_HISTORY_DATA_CLEAR_THREAD_SET_KEY =
"pipeline:build:history:data:clear:thread:set"
private var executor: ThreadPoolExecutor? = null
}

@Value("\${process.deletedPipelineStoreDays:30}")
private val deletedPipelineStoreDays: Long = 30 // 回收站已删除流水线保存天数

private val executor = Executors.newFixedThreadPool(miscBuildDataClearConfig.maxThreadHandleProjectNum)

@Scheduled(initialDelay = 10000, fixedDelay = 12000)
fun pipelineBuildHistoryDataClear() {
if (!miscBuildDataClearConfig.switch.toBoolean()) {
// 如果清理构建历史数据开关关闭,则不清理
return
}
logger.info("pipelineBuildHistoryDataClear start")
val lock = RedisLock(redisOperation,
LOCK_KEY, 3000)
if (executor == null) {
// 创建带有边界队列的线程池,防止内存爆掉
logger.info("pipelineBuildHistoryDataClear create executor")
executor = ThreadPoolExecutor(
miscBuildDataClearConfig.maxThreadHandleProjectNum,
miscBuildDataClearConfig.maxThreadHandleProjectNum,
0L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(10),
Executors.defaultThreadFactory(),
ThreadPoolExecutor.DiscardPolicy()
)
}
val lock = RedisLock(
redisOperation,
LOCK_KEY, 3000
)
try {
if (!lock.tryLock()) {
logger.info("get lock failed, skip")
Expand All @@ -108,7 +127,6 @@ class PipelineBuildHistoryDataClearJob @Autowired constructor(
}
// 获取清理项目构建数据的线程数量
val maxThreadHandleProjectNum = miscBuildDataClearConfig.maxThreadHandleProjectNum
val futureList = mutableListOf<Future<Boolean>>()
val avgProjectNum = maxProjectNum / maxThreadHandleProjectNum
for (index in 1..maxThreadHandleProjectNum) {
// 计算线程能处理的最大项目主键ID
Expand All @@ -117,17 +135,15 @@ class PipelineBuildHistoryDataClearJob @Autowired constructor(
} else {
index * avgProjectNum + maxProjectNum % maxThreadHandleProjectNum
}
futureList.add(
// 判断线程是否正在处理任务,如正在处理则不分配新任务(定时任务12秒执行一次,线程启动到往set集合设置编号耗费时间很短,故不加锁)
if (!redisOperation.isMember(PIPELINE_BUILD_HISTORY_DATA_CLEAR_THREAD_SET_KEY, index.toString())) {
doClearBus(
threadNo = index,
projectIdList = projectIdList,
minThreadProjectPrimaryId = (index - 1) * avgProjectNum,
maxThreadProjectPrimaryId = maxThreadProjectPrimaryId
)
)
}
futureList.forEachIndexed { index, future ->
logger.info("future-$index doClearBus result:${future.get()}")
}
}
} catch (t: Throwable) {
logger.warn("pipelineBuildHistoryDataClear failed", t)
Expand All @@ -143,7 +159,7 @@ class PipelineBuildHistoryDataClearJob @Autowired constructor(
maxThreadProjectPrimaryId: Long
): Future<Boolean> {
val threadName = "Thread-$threadNo"
return executor.submit(Callable<Boolean> {
return executor!!.submit(Callable<Boolean> {
var handleProjectPrimaryId =
redisOperation.get("$threadName:$PIPELINE_BUILD_HISTORY_DATA_CLEAR_PROJECT_ID_KEY")?.toLong()
if (handleProjectPrimaryId == null) {
Expand All @@ -152,43 +168,52 @@ class PipelineBuildHistoryDataClearJob @Autowired constructor(
if (handleProjectPrimaryId >= maxThreadProjectPrimaryId) {
// 已经清理完全部项目的流水线的过期构建记录,再重新开始清理
redisOperation.delete("$threadName:$PIPELINE_BUILD_HISTORY_DATA_CLEAR_PROJECT_ID_KEY")
logger.info("pipelineBuildHistoryDataClear reStart")
logger.info("pipelineBuildHistoryDataClear $threadName reStart")
return@Callable true
}
}
val maxEveryProjectHandleNum = miscBuildDataClearConfig.maxEveryProjectHandleNum
var maxHandleProjectPrimaryId = handleProjectPrimaryId ?: 0L
val projectInfoList = if (projectIdList.isNullOrEmpty()) {
val channelCodeList = miscBuildDataClearConfig.clearChannelCodes.split(",")
maxHandleProjectPrimaryId = handleProjectPrimaryId + maxEveryProjectHandleNum
projectMiscService.getProjectInfoList(
minId = handleProjectPrimaryId,
maxId = maxHandleProjectPrimaryId,
channelCodeList = channelCodeList
)
} else {
projectMiscService.getProjectInfoList(projectIdList = projectIdList)
}
// 根据项目依次查询T_PIPELINE_INFO表中的流水线数据处理
projectInfoList?.forEach { projectInfo ->
val channel = projectInfo.channel
// 获取项目对应的流水线数据清理配置类,如果不存在说明无需清理该项目下的构建记录
val projectDataClearConfigService =
ProjectDataClearConfigFactory.getProjectDataClearConfigService(channel) ?: return@forEach
val projectPrimaryId = projectInfo.id
if (projectPrimaryId > maxHandleProjectPrimaryId) {
maxHandleProjectPrimaryId = projectPrimaryId
// 将线程编号存入redis集合
redisOperation.sadd(PIPELINE_BUILD_HISTORY_DATA_CLEAR_THREAD_SET_KEY, threadNo.toString())
try {
val maxEveryProjectHandleNum = miscBuildDataClearConfig.maxEveryProjectHandleNum
var maxHandleProjectPrimaryId = handleProjectPrimaryId ?: 0L
val projectInfoList = if (projectIdList.isNullOrEmpty()) {
val channelCodeList = miscBuildDataClearConfig.clearChannelCodes.split(",")
maxHandleProjectPrimaryId = handleProjectPrimaryId + maxEveryProjectHandleNum
projectMiscService.getProjectInfoList(
minId = handleProjectPrimaryId,
maxId = maxHandleProjectPrimaryId,
channelCodeList = channelCodeList
)
} else {
projectMiscService.getProjectInfoList(projectIdList = projectIdList)
}
val projectId = projectInfo.projectId
// 清理流水线构建数据
clearPipelineBuildData(projectId, projectDataClearConfigService)
// 根据项目依次查询T_PIPELINE_INFO表中的流水线数据处理
projectInfoList?.forEach { projectInfo ->
val channel = projectInfo.channel
// 获取项目对应的流水线数据清理配置类,如果不存在说明无需清理该项目下的构建记录
val projectDataClearConfigService =
ProjectDataClearConfigFactory.getProjectDataClearConfigService(channel) ?: return@forEach
val projectPrimaryId = projectInfo.id
if (projectPrimaryId > maxHandleProjectPrimaryId) {
maxHandleProjectPrimaryId = projectPrimaryId
}
val projectId = projectInfo.projectId
// 清理流水线构建数据
clearPipelineBuildData(projectId, projectDataClearConfigService)
}
// 将当前已处理完的最大项目Id存入redis
redisOperation.set(
key = "$threadName:$PIPELINE_BUILD_HISTORY_DATA_CLEAR_PROJECT_ID_KEY",
value = maxHandleProjectPrimaryId.toString(),
expired = false
)
} catch (ignore: Exception) {
logger.warn("pipelineBuildHistoryDataClear doClearBus failed", ignore)
} finally {
// 释放redis集合中的线程编号
redisOperation.sremove(PIPELINE_BUILD_HISTORY_DATA_CLEAR_THREAD_SET_KEY, threadNo.toString())
}
// 将当前已处理完的最大项目Id存入redis
redisOperation.set(
key = "$threadName:$PIPELINE_BUILD_HISTORY_DATA_CLEAR_PROJECT_ID_KEY",
value = maxHandleProjectPrimaryId.toString(),
expired = false
)
return@Callable true
})
}
Expand Down

0 comments on commit e052a9f

Please sign in to comment.