From b23521216c275604a9a494269a664c75a2d70edc Mon Sep 17 00:00:00 2001 From: xujian Date: Thu, 25 Jan 2024 17:18:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=86=E5=8F=91=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E4=B8=AD=E4=BB=A5=E5=88=B6=E5=93=81=E7=BB=B4=E5=BA=A6=E6=9D=A5?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E8=AF=A6=E7=BB=86=E6=97=A5=E5=BF=97=20#1675?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/apidoc/replication/record.md | 182 ++++++++++----- docs/apidoc/replication/replication.md | 24 +- .../com/tencent/bkrepo/job/Constants.kt | 4 +- .../job/batch/ReplicaRecordCleanupJob.kt | 152 ++++++++++++ .../ReplicaRecordCleanupJobProperties.kt | 9 + .../pojo/record/ExecutionResult.kt | 8 +- .../pojo/record/ReplicaOverview.kt | 16 ++ .../pojo/record/ReplicaProgress.kt | 19 +- .../pojo/record/ReplicaRecordDetail.kt | 11 + .../record/ReplicaRecordDetailListOption.kt | 6 +- .../pojo/record/ReplicaRecordInfo.kt | 4 +- .../replication/pojo/record/ResultsSummary.kt | 7 + .../request/RecordDetailInitialRequest.kt | 13 +- .../replication/pojo/task/ReplicaTaskInfo.kt | 4 + .../task/request/ReplicaTaskCreateRequest.kt | 6 +- .../task/request/ReplicaTaskUpdateRequest.kt | 6 +- .../controller/api/ReplicaRecordController.kt | 7 + .../replication/model/TReplicaRecord.kt | 7 +- .../replication/model/TReplicaRecordDetail.kt | 21 ++ .../bkrepo/replication/model/TReplicaTask.kt | 8 + .../replica/context/ReplicaContext.kt | 11 + .../context/ReplicaExecutionContext.kt | 14 +- .../executor/AbstractReplicaJobExecutor.kt | 90 ++++--- .../standalone/ClusterReplicator.kt | 30 --- .../replica/type/AbstractReplicaService.kt | 219 +++++++++++------- .../type/edge/EdgePullReplicaTaskJob.kt | 2 + .../event/EventBasedReplicaJobExecutor.kt | 9 +- .../type/manual/ManualReplicaJobExecutor.kt | 15 +- .../schedule/ScheduledReplicaJobExecutor.kt | 14 +- .../service/ReplicaRecordService.kt | 21 +- .../service/impl/ReplicaRecordServiceImpl.kt | 64 +++-- .../service/impl/ReplicaTaskServiceImpl.kt | 18 +- .../replication/util/TaskRecordQueryHelper.kt | 4 + 33 files changed, 753 insertions(+), 272 deletions(-) create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ReplicaRecordCleanupJob.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/ReplicaRecordCleanupJobProperties.kt create mode 100644 src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaOverview.kt create mode 100644 src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ResultsSummary.kt diff --git a/docs/apidoc/replication/record.md b/docs/apidoc/replication/record.md index 64f993f50c..ec722b4015 100644 --- a/docs/apidoc/replication/record.md +++ b/docs/apidoc/replication/record.md @@ -1,3 +1,5 @@ + + # Replication仓库同步执行日志接口 [toc] @@ -240,58 +242,62 @@ 此接口无请求体 - 请求字段说明 - |字段|类型|是否必须|默认值|说明|Description| - |---|---|---|---|---|---| - |recordId|string|是|无|任务执行日志唯一id|record id| - |pageNumber|int|是|无|当前页|page number| - |pageSize|int|是|无|分页数量|page size| - |packageName|string|否|无|包名称,支持前缀模糊匹配|package name| - |repoName|string|否|无|仓库名称|repo name| - |clusterName|string|否|无|远程节点名称|cluster node name| - |path|string|否|无|路径名称,支持前缀模糊匹配|file path| - |status|enum|否|无|[SUCCESS,RUNNING,FAILED]|execute status| - + |字段| 类型 |是否必须|默认值| 说明 | Description | + |---|--------|---|---|--------------------------|-------------------| + |recordId| string |是|无| 任务执行日志唯一id | record id | + |pageNumber| int |是|无| 当前页 | page number | + |pageSize| int |是|无| 分页数量 | page size | + |packageName| string |否|无| 包名称,支持前缀模糊匹配 | package name | + |repoName| string |否|无| 仓库名称 | repo name | + |clusterName| string |否|无| 远程节点名称 | cluster node name | + |path| string |否|无| 路径名称,支持前缀模糊匹配 | file path | + |status| enum |否|无| [SUCCESS,RUNNING,FAILED] | execute status | + |artifactName| string |否|无| 制品名称(Generic为fullPath) | artifact name | + |version| string |否|无| 版本 | artifact version | + + - 响应体 ```json { - "code": 0, - "message": null, - "data": { - "pageNumber": 0, - "pageSize": 1, - "totalRecords": 18, - "totalPages": 2, - "records": [ - { - "id": "979b573d53efcd752bf9b762", - "recordId": "609b573d53ccce752bf9b860", - "localCluster": "wuxi", - "remoteCluster": "wuhan", - "localRepoName": "npm-local", - "repoType": "NPM", - "packageConstraint": { - "packageKey": "npm://helloworld", - "versions": ["1.1.0","1.3.0"] - }, - "pathConstraint": { - "path": "/busy/box.txt" - }, - "status": "SUCCESS", - "progress": { - "success": 10, - "skip": 0, - "failed": 0, - "totalSize": 10 - }, - "startTime": "2021-05-12T12:19:08.813", - "endTime": "2021-05-12T12:19:37.967", - "errorReason": null - } - ] - }, - "traceId": null - } + "code" : 0, + "message" : null, + "data" : { + "pageNumber" : 1, + "pageSize" : 20, + "totalRecords" : 2, + "totalPages" : 1, + "records" : [ { + "id" : "65b07d80b69e4404f9ba4a50", + "recordId" : "65b07d7eb69e4404f9ba4a4e", + "localCluster" : "center", + "remoteCluster" : "dev", + "localRepoName" : "generic", + "repoType" : "GENERIC", + "packageConstraint" : null, + "pathConstraint" : null, + "artifactName" : "/bkrepo.op.conf.bak", + "version" : null, + "conflictStrategy" : "SKIP", + "size" : 646, + "sha256" : "bcdf4256783b6d9cf3cb7bcd666e59c6e0a5d7e58e45c9469bb29a60a69ffff4", + "status" : "SUCCESS", + "progress" : { + "success" : 0, + "skip" : 0, + "failed" : 0, + "totalSize" : 0, + "conflict" : 0 + }, + "startTime" : "2024-01-24T11:01:20.534", + "endTime" : "2024-01-24T11:01:20.556", + "errorReason" : "" + } ], + "count" : 2, + "page" : 1 + }, + "traceId" : "2f6803fcf2b5d1ab8cbbb97fa1d20b52" +} ``` - data字段说明 @@ -304,19 +310,83 @@ |remoteCluster|string|远程集群名称|remote cluster node name| |localRepoName|string|本地仓库名称|local repository name| |repoType|enum|[DOCKER,NPM,RPM,...]|local repository type| - |packageConstraints|object|否|无|包限制|package constraints| - |pathConstraints|object|否|无|路径限制|path constraints| + |packageConstraints|object|否|无| + |pathConstraints|object|否|无| + |artifactName|string|制品名称(历史数据为null)|artifact name| + |version|string|版本(Generic制品和历史数据为null)|artifact version| + |conflictStrategy|string|冲突策略(没有冲突和历史数据为null)|conflict strategy| + |size|long|制品大小|artifact size| + |sha256|string|制品摘要(依赖源和历史数据为null)|artifact digest| |status|enum|[RUNNING,SUCCESS,FAILED]|task execute status| - |progress|object|同步进度|task execute progress| + |~~progress~~|~~object~~|~~同步进度~~|~~task execute progress~~| |startTime|date|任务开始执行时间|task execute start time| |endTime|date|任务结束执行时间|task execute end time| |errorReason|string|错误原因,未执行或执行成功则为null|task failed error reason| -- progress字段说明 +- ~~progress字段说明(废弃)~~ |字段|类型|说明|Description| |---|---|---|---| - |success|long|成功数量|success size| - |skip|long|跳过数量|skip size| - |failed|long|失败数量|failed size| - |totalSize|long|数据总量|total size| + |~~success~~|~~long~~|~~成功数量~~|~~success size~~| + |~~skip~~|~~long~~|~~跳过数量~~|~~skip size~~| + |~~failed~~|~~long~~|~~失败数量~~|~~failed size~~| + |~~totalSize~~|~~long~~|~~数据总量~~|~~total size~~| + + + +## 根据recordId查询任务执行总览 + +- API: GET /replication/api/task/record/overview/{recordId} + +- API 名称: get_task_record_overview + +- 功能说明: + + - 中文:根据recordId查询任务执行总览 + - English:get task record overview + +- 请求体 + 此接口无请求体 + +- 请求字段说明 + + | 字段 | 类型 | 是否必须 | 默认值 | 说明 | Description | + | ----------- | ------ | -------- | ------ | -------------------------- | ----------------- | + | recordId | string | 是 | 无 | 任务执行日志唯一id | record id | + + +- 响应体 + + ```json + { + "code": 0, + "message": null, + "data": { + "success": 0, + "failed": 0, + "conflict": 0 + }, + "traceId": "6c4376c1f002127e9444d9897d0ee7ce" + } + ``` + +- 历史数据响应体 + + ```json + { + "code": 0, + "message": null, + "data": null, + "traceId": "3df4fdf97a77684e352fd17973064ece" + } + ``` + + + +- data字段说明(历史数据为null) + + | 字段 | 类型 | 说明 | Description | + | -------- | ---- | -------- | ------------- | + | success | long | 成功数量 | success size | + | failed | long | 失败数量 | failed size | + | conflict | long | 冲突数量 | conflict size | \ No newline at end of file diff --git a/docs/apidoc/replication/replication.md b/docs/apidoc/replication/replication.md index 131649110a..833c1ad10e 100644 --- a/docs/apidoc/replication/replication.md +++ b/docs/apidoc/replication/replication.md @@ -44,7 +44,9 @@ }, "remoteClusterIds": ["651095dfe0524ce9b3ab53d13532361c","329fbcda45944fb9ae5c2573acd7bd2a"], "enabled": true, - "description": "test replica task" + "description": "test replica task", + "record" : true, + "recordReserveDays" : 30 } ``` @@ -61,6 +63,8 @@ |remoteClusterIds|list|是|无|远程集成节点id|the remote cluster node ids| |enabled|bool|是|true|计划是否启动|do task enabled| |description|sting|否|无|描述|description| + |record|bool|是|true|是否记录分发详细日志|is record replica detail| + |recordReserveDays|long|是|30|分发详细日志保留天数|replica detail reserve days| - replicaTaskObjects对象说明 @@ -132,6 +136,8 @@ "nextExecutionTime": "2020-03-17T12:00:00.000", "executionTimes": 5, "enabled": true, + "record" : true, + "recordReserveDays" : 30, "createdBy" : "system", "createdDate" : "2020-03-16T12:13:03.371", "lastModifiedBy" : "system", @@ -195,6 +201,8 @@ "nextExecutionTime": "2020-03-17T12:00:00.000", "executionTimes": 5, "enabled": true, + "record" : true, + "recordReserveDays" : 30, "createdBy" : "system", "createdDate" : "2020-03-16T12:13:03.371", "lastModifiedBy" : "system", @@ -211,8 +219,8 @@ |id|string|任务唯一id|task id| |key|string|任务唯一key|task key| |name|string|任务名称|task name| - |projectId|string/所属项目id|task projectId| - |replicaObjectType|enum|是|无|[REPOSITORY,PACKAGE,PATH]|replication object type| + |projectId|string/所属项目id|task projectId|| + |replicaObjectType|enum|是|无| |replicaType|enum|[SCHEDULED,REAL_TIME]|replica type| |setting|object|计划相关设置|task setting| |remoteClusters|set|远程集成节点信息|the remote cluster node info| @@ -221,7 +229,9 @@ |lastExecutionTime|date|上次执行时间|task last execution time| |nextExecutionTime|date|下次执行时间|task next execution time| |executionTimes|long|执行次数|execution times| - |enabled|bool|是|true|计划是否启动|do task enabled| + |enabled|bool|是|true| + |record|bool|是否记录分发详细日志|is record replica detail| + |recordReserveDays|long|分发详细日志保留天数|replica detail reserve days| |createdBy|string|创建者|create user| |createdDate|string|创建时间|create time| |lastModifiedBy|string|上次修改者|last modify user| @@ -528,7 +538,9 @@ } ], "remoteClusterIds": ["651095dfe0524ce9b3ab53d13532361c","329fbcda45944fb9ae5c2573acd7bd2a"], - "description": "test replica task" + "description": "test replica task", + "record" : true, + "recordReserveDays" : 30 } ``` @@ -543,6 +555,8 @@ |replicaTaskObjects|object|是|无|同步对象信息|replication object info| |remoteClusterIds|list|是|无|远程集成节点id|the remote cluster node ids| |description|sting|否|无|描述|description| + |record|bool|是|true|是否记录分发详细日志|is record replica detail| + |recordReserveDays|long|是|30|分发详细日志保留天数|replica detail reserve days| - replicaTaskObjects对象说明 diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt index 9452ad2fa1..8b2c17d82b 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt @@ -55,7 +55,9 @@ const val DELETED_DATE = "deleted" const val FULL_PATH = "fullPath" const val PATH = "path" const val LAST_MODIFIED_BY = "lastModifiedBy" - +const val KEY = "key" +const val RECORD_RESERVE_DAYS = "recordReserveDays" +const val REPLICA_TYPE = "replicaType" /** * 缓存类型 */ diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ReplicaRecordCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ReplicaRecordCleanupJob.kt new file mode 100644 index 0000000000..da48a055c1 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/ReplicaRecordCleanupJob.kt @@ -0,0 +1,152 @@ +package com.tencent.bkrepo.job.batch + +import com.tencent.bkrepo.common.mongo.constant.ID +import com.tencent.bkrepo.common.service.log.LoggerHolder +import com.tencent.bkrepo.job.KEY +import com.tencent.bkrepo.job.RECORD_RESERVE_DAYS +import com.tencent.bkrepo.job.REPLICA_TYPE +import com.tencent.bkrepo.job.batch.base.DefaultContextMongoDbJob +import com.tencent.bkrepo.job.batch.base.JobContext +import com.tencent.bkrepo.job.config.properties.ReplicaRecordCleanupJobProperties +import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.data.domain.PageRequest +import org.springframework.data.mongodb.core.query.Criteria +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.and +import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.data.mongodb.core.query.where +import org.springframework.stereotype.Component +import java.time.Duration +import java.time.LocalDateTime +import kotlin.reflect.KClass + +/** + * 清理超过保留时间的分发记录 + * 根据分发计划设置的 保留天数,历史数据默认为 60 天 + */ +@Component +@EnableConfigurationProperties(ReplicaRecordCleanupJobProperties::class) +class ReplicaRecordCleanupJob( + properties: ReplicaRecordCleanupJobProperties, +) : DefaultContextMongoDbJob(properties) { + override fun getLockAtMostFor(): Duration = Duration.ofDays(7) + + override fun collectionNames(): List { + return listOf(COLLECTION_REPLICA_TASK) + } + + override fun buildQuery(): Query { + return Query() + } + + override fun mapToEntity(row: Map): ReplicaTask { + return ReplicaTask( + id = row[ID].toString(), + key = row[KEY].toString(), + replicaType = row[REPLICA_TYPE].toString(), + recordReserveDays = row[RECORD_RESERVE_DAYS]?.toString()?.toLong() + ) + } + + override fun entityClass(): KClass { + return ReplicaTask::class + } + + override fun run(row: ReplicaTask, collectionName: String, context: JobContext) { + val expireDate = if (row.recordReserveDays != null) { + LocalDateTime.now().minusDays(row.recordReserveDays) + } else { + LocalDateTime.now().minusDays(60) + } + cleanUpReplicaTask(row.key, row.replicaType, expireDate) + } + + + private fun cleanUpReplicaTask( + key: String, + replicaType: String, + expireDate: LocalDateTime, + ) { + if (replicaType == "REAL_TIME") { + val recordQuery = Query(where(Record::taskKey).isEqualTo(key)) + var page = 0 + var recordList = mongoTemplate.find( + recordQuery.with(PageRequest.of(page, PAGE_SIZE)), + Record::class.java, + COLLECTION_REPLICA_RECORD + ) + while (recordList.isNotEmpty()) { + recordList.forEach { + cleanUpRecordDetail(it.id, expireDate) + logger.info("cleanup replica record:[$key/${it.id}]") + } + page++ + recordList = mongoTemplate.find( + recordQuery.with(PageRequest.of(page, PAGE_SIZE)), + Record::class.java, + COLLECTION_REPLICA_RECORD + ) + } + } else { + val criteria = where(Record::taskKey).isEqualTo(key) + .and(RecordDetail::status).ne(ExecutionStatus.RUNNING) + .and(Record::endTime).lt(expireDate) + val recordQuery = Query(criteria).with(PageRequest.of(0, PAGE_SIZE)) + while (true) { + val recordList = mongoTemplate.find( + recordQuery, Record::class.java, COLLECTION_REPLICA_RECORD + ).takeIf { it.isNotEmpty() } ?: break + recordList.forEach { + cleanUpRecordDetail(it.id) + logger.info("cleanup replica record:[$key/${it.id}]") + } + } + } + } + + private fun cleanUpRecordDetail(recordId: String) { + val recordQuery = Query.query(Criteria.where(ID).isEqualTo(recordId)) + val recordDetailQuery = Query.query(Criteria.where(RecordDetail::recordId.name).isEqualTo(recordId)) + mongoTemplate.remove(recordDetailQuery, COLLECTION_REPLICA_RECORD_DETAIL) + mongoTemplate.remove(recordQuery, COLLECTION_REPLICA_RECORD) + } + + private fun cleanUpRecordDetail(recordId: String, expireDate: LocalDateTime) { + val recordDetailQuery = Query.query( + Criteria.where(RecordDetail::recordId.name).isEqualTo(recordId) + .and(RecordDetail::status).ne(ExecutionStatus.RUNNING) + .and(RecordDetail::endTime).lt(expireDate) + ) + mongoTemplate.remove(recordDetailQuery, COLLECTION_REPLICA_RECORD_DETAIL) + } + + data class ReplicaTask( + val id: String, + val key: String, + val replicaType: String, + val recordReserveDays: Long? + ) + + data class Record( + val id: String, + val taskKey: String, + val status: ExecutionStatus, + val endTime: LocalDateTime? + ) + + data class RecordDetail( + val id: String, + val recordId: String, + val status: ExecutionStatus, + val endTime: LocalDateTime? + ) + + companion object { + private val logger = LoggerHolder.jobLogger + private const val COLLECTION_REPLICA_TASK = "replica_task" + private const val COLLECTION_REPLICA_RECORD = "replica_record" + private const val COLLECTION_REPLICA_RECORD_DETAIL = "replica_record_detail" + private const val PAGE_SIZE = 1000 + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/ReplicaRecordCleanupJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/ReplicaRecordCleanupJobProperties.kt new file mode 100644 index 0000000000..52a2eb2c51 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/ReplicaRecordCleanupJobProperties.kt @@ -0,0 +1,9 @@ +package com.tencent.bkrepo.job.config.properties + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(value = "job.replica-record-cleanup") +class ReplicaRecordCleanupJobProperties( + override var enabled: Boolean = true, + override var cron: String = "0 0 3 * * ?", +) : MongodbJobProperties() diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ExecutionResult.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ExecutionResult.kt index 5930043b58..4fb94fd1d8 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ExecutionResult.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ExecutionResult.kt @@ -36,8 +36,12 @@ data class ExecutionResult( val errorReason: String? = null ) { companion object { - fun fail(errorReason: String?): ExecutionResult { - return ExecutionResult(status = ExecutionStatus.FAILED, errorReason = errorReason.orEmpty()) + fun fail(errorReason: String?, progress: ReplicaProgress? = null): ExecutionResult { + return ExecutionResult( + status = ExecutionStatus.FAILED, + errorReason = errorReason.orEmpty(), + progress = progress + ) } fun success(): ExecutionResult { diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaOverview.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaOverview.kt new file mode 100644 index 0000000000..d009502bcb --- /dev/null +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaOverview.kt @@ -0,0 +1,16 @@ +package com.tencent.bkrepo.replication.pojo.record + +data class ReplicaOverview( + /** + * 成功数量 + */ + var success: Long = 0, + /** + * 失败数量 + */ + var failed: Long = 0, + /** + * 冲突数量 + */ + var conflict: Long = 0 +) diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaProgress.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaProgress.kt index a0eb127435..8176830763 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaProgress.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaProgress.kt @@ -46,5 +46,20 @@ data class ReplicaProgress( /** * 数据大小, 单位bytes */ - var totalSize: Long = 0 -) + var totalSize: Long = 0, + /** + * 冲突数量 + */ + var conflict: Long = 0 +) { + + operator fun plus(replicaProgress: ReplicaProgress): ReplicaProgress { + return ReplicaProgress( + success = this.success + replicaProgress.success, + skip = this.skip + replicaProgress.skip, + failed = this.failed + replicaProgress.failed, + totalSize = this.totalSize + replicaProgress.totalSize, + conflict = this.conflict + replicaProgress.conflict + ) + } +} diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordDetail.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordDetail.kt index 4a17362ed3..a804f2b9a7 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordDetail.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordDetail.kt @@ -30,6 +30,7 @@ package com.tencent.bkrepo.replication.pojo.record import com.tencent.bkrepo.common.artifact.pojo.RepositoryType import com.tencent.bkrepo.replication.pojo.task.objects.PackageConstraint import com.tencent.bkrepo.replication.pojo.task.objects.PathConstraint +import com.tencent.bkrepo.replication.pojo.task.setting.ConflictStrategy import io.swagger.annotations.ApiModel import io.swagger.annotations.ApiModelProperty import java.time.LocalDateTime @@ -52,6 +53,16 @@ data class ReplicaRecordDetail( var packageConstraint: PackageConstraint? = null, @ApiModelProperty("路径名称") var pathConstraint: PathConstraint? = null, + @ApiModelProperty("制品名称") + var artifactName: String? = null, + @ApiModelProperty("包版本") + var version: String? = null, + @ApiModelProperty("冲突策略") + var conflictStrategy: ConflictStrategy? = null, + @ApiModelProperty("制品大小") + var size: Long? = null, + @ApiModelProperty("制品sha256") + var sha256: String? = null, @ApiModelProperty("运行状态") var status: ExecutionStatus, @ApiModelProperty("同步进度") diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordDetailListOption.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordDetailListOption.kt index 6a46d2f48d..8c31aa7fa4 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordDetailListOption.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordDetailListOption.kt @@ -47,5 +47,9 @@ class ReplicaRecordDetailListOption( @ApiModelProperty("路径名称, 根据该字段前缀匹配") val path: String? = null, @ApiModelProperty("执行状态") - val status: ExecutionStatus? = null + val status: ExecutionStatus? = null, + @ApiModelProperty("制品名称") + var artifactName: String? = null, + @ApiModelProperty("版本") + var version: String? = null ) diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordInfo.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordInfo.kt index 7a38186cda..d47d7a61d9 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordInfo.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ReplicaRecordInfo.kt @@ -48,5 +48,7 @@ data class ReplicaRecordInfo( @ApiModelProperty("已同步字节数") var replicatedBytes: Long? = 0, @ApiModelProperty("总字节数") - var totalBytes: Long? = 0 + var totalBytes: Long? = 0, + @ApiModelProperty("执行结果总览") + var replicaOverview: ReplicaOverview? = null ) diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ResultsSummary.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ResultsSummary.kt new file mode 100644 index 0000000000..fcddeff234 --- /dev/null +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/ResultsSummary.kt @@ -0,0 +1,7 @@ +package com.tencent.bkrepo.replication.pojo.record + +data class ResultsSummary( + val replicaOverview: ReplicaOverview, + val errorReason: String?, + val status: ExecutionStatus +) diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/request/RecordDetailInitialRequest.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/request/RecordDetailInitialRequest.kt index 13f2ea58b5..442c93291a 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/request/RecordDetailInitialRequest.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/record/request/RecordDetailInitialRequest.kt @@ -30,6 +30,7 @@ package com.tencent.bkrepo.replication.pojo.record.request import com.tencent.bkrepo.common.artifact.pojo.RepositoryType import com.tencent.bkrepo.replication.pojo.task.objects.PackageConstraint import com.tencent.bkrepo.replication.pojo.task.objects.PathConstraint +import com.tencent.bkrepo.replication.pojo.task.setting.ConflictStrategy import io.swagger.annotations.ApiModel import io.swagger.annotations.ApiModelProperty @@ -46,5 +47,15 @@ data class RecordDetailInitialRequest( @ApiModelProperty("包限制") var packageConstraint: PackageConstraint? = null, @ApiModelProperty("路径名称") - var pathConstraint: PathConstraint? = null + var pathConstraint: PathConstraint? = null, + @ApiModelProperty("制品名称") + var artifactName: String? = null, + @ApiModelProperty("版本") + var version: String? = null, + @ApiModelProperty("冲突策略") + var conflictStrategy: ConflictStrategy? = null, + @ApiModelProperty("制品大小") + var size: Long? = null, + @ApiModelProperty("制品sha256值") + var sha256: String? = null ) diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/ReplicaTaskInfo.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/ReplicaTaskInfo.kt index 7580b92633..de16eba788 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/ReplicaTaskInfo.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/ReplicaTaskInfo.kt @@ -72,6 +72,10 @@ data class ReplicaTaskInfo( var executionTimes: Long, @ApiModelProperty("是否启用") var enabled: Boolean = true, + @ApiModelProperty("是否记录分发日志") + val record: Boolean? = null, + @ApiModelProperty("分发日志保留天数") + val recordReserveDays: Long? = null, @ApiModelProperty("创建者") val createdBy: String, @ApiModelProperty("创建日期") diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/request/ReplicaTaskCreateRequest.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/request/ReplicaTaskCreateRequest.kt index 337f8c07d3..693ff82b67 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/request/ReplicaTaskCreateRequest.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/request/ReplicaTaskCreateRequest.kt @@ -53,5 +53,9 @@ data class ReplicaTaskCreateRequest( @ApiModelProperty("是否启用", required = true) val enabled: Boolean = true, @ApiModelProperty("任务描述", required = false) - val description: String? = null + val description: String? = null, + @ApiModelProperty("是否记录分发日志", required = true) + val record: Boolean = true, + @ApiModelProperty("分发日志保留天数", required = true) + val recordReserveDays: Long = 30, ) diff --git a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/request/ReplicaTaskUpdateRequest.kt b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/request/ReplicaTaskUpdateRequest.kt index e44abaa604..9cec4edc79 100644 --- a/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/request/ReplicaTaskUpdateRequest.kt +++ b/src/backend/replication/api-replication/src/main/kotlin/com/tencent/bkrepo/replication/pojo/task/request/ReplicaTaskUpdateRequest.kt @@ -50,5 +50,9 @@ data class ReplicaTaskUpdateRequest( @ApiModelProperty("远程集群集合", required = true) val remoteClusterIds: Set, @ApiModelProperty("任务描述", required = false) - val description: String? = null + val description: String? = null, + @ApiModelProperty("是否记录分发日志", required = true) + val record: Boolean = true, + @ApiModelProperty("分发日志保留天数", required = true) + val recordReserveDays: Long = 30, ) diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/controller/api/ReplicaRecordController.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/controller/api/ReplicaRecordController.kt index 0e2cd1439e..c4b1e5b9fa 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/controller/api/ReplicaRecordController.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/controller/api/ReplicaRecordController.kt @@ -32,6 +32,7 @@ import com.tencent.bkrepo.common.api.pojo.Response import com.tencent.bkrepo.common.security.permission.Principal import com.tencent.bkrepo.common.security.permission.PrincipalType import com.tencent.bkrepo.common.service.util.ResponseBuilder +import com.tencent.bkrepo.replication.pojo.record.ReplicaOverview import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordDetail import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordDetailListOption import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordInfo @@ -92,4 +93,10 @@ class ReplicaRecordController( ): Response> { return ResponseBuilder.success(replicaRecordService.listRecordDetailPage(recordId, option)) } + + @ApiOperation("根据recordId查询任务执行总览信息") + @GetMapping("/overview/{recordId}") + fun getRecordOverviewByRecordId(@PathVariable recordId: String): Response { + return ResponseBuilder.success(replicaRecordService.getRecordById(recordId)?.replicaOverview) + } } diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaRecord.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaRecord.kt index b97beef3be..3fd024194e 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaRecord.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaRecord.kt @@ -28,6 +28,7 @@ package com.tencent.bkrepo.replication.model import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus +import com.tencent.bkrepo.replication.pojo.record.ReplicaOverview import org.springframework.data.mongodb.core.index.Indexed import org.springframework.data.mongodb.core.mapping.Document import java.time.LocalDateTime @@ -58,5 +59,9 @@ data class TReplicaRecord( /** * 错误原因 */ - var errorReason: String? = null + var errorReason: String? = null, + /** + * 执行结果总览 + */ + var replicaOverview: ReplicaOverview? = null ) diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaRecordDetail.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaRecordDetail.kt index 045b27b4a5..85e35e11e1 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaRecordDetail.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaRecordDetail.kt @@ -32,6 +32,7 @@ import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus import com.tencent.bkrepo.replication.pojo.record.ReplicaProgress import com.tencent.bkrepo.replication.pojo.task.objects.PackageConstraint import com.tencent.bkrepo.replication.pojo.task.objects.PathConstraint +import com.tencent.bkrepo.replication.pojo.task.setting.ConflictStrategy import org.springframework.data.mongodb.core.index.Indexed import org.springframework.data.mongodb.core.mapping.Document import java.time.LocalDateTime @@ -72,6 +73,26 @@ data class TReplicaRecordDetail( * 路径限制 */ val pathConstraint: PathConstraint? = null, + /** + * 制品名称,Generic 为 fullPath + */ + val artifactName: String? = null, + /** + * 依赖源包版本 + */ + val version: String? = null, + /** + * 冲突策略 + */ + val conflictStrategy: ConflictStrategy? = null, + /** + * 制品大小 + */ + val size: Long? = null, + /** + * 制品sha256 + */ + val sha256: String? = null, /** * 运行状态 */ diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaTask.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaTask.kt index 1f362c46a4..263adfe974 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaTask.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/model/TReplicaTask.kt @@ -110,6 +110,14 @@ data class TReplicaTask( * 是否启用 */ var enabled: Boolean = true, + /** + * 是否记录详细日志 + */ + val record: Boolean?, + /** + * 不记录制品的分发记录 + */ + val recordReserveDays: Long?, /** * 审计信息 */ diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/context/ReplicaContext.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/context/ReplicaContext.kt index c0153585ad..35002e1ca7 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/context/ReplicaContext.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/context/ReplicaContext.kt @@ -41,6 +41,7 @@ import com.tencent.bkrepo.replication.api.BlobReplicaClient import com.tencent.bkrepo.replication.config.ReplicationProperties import com.tencent.bkrepo.replication.pojo.cluster.ClusterNodeInfo import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus +import com.tencent.bkrepo.replication.pojo.record.ReplicaProgress import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordInfo import com.tencent.bkrepo.replication.pojo.task.ReplicaTaskDetail import com.tencent.bkrepo.replication.pojo.task.objects.ReplicaObjectInfo @@ -97,6 +98,8 @@ class ReplicaContext( val httpClient: OkHttpClient + var replicaProgress = ReplicaProgress() + init { cluster = ClusterInfo( name = remoteCluster.name, @@ -174,6 +177,14 @@ class ReplicaContext( return taskObject.packageConstraints!!.first().targetVersions } + fun updateProgress(executed: Boolean) { + if (executed) { + replicaProgress.success++ + } else { + replicaProgress.skip++ + } + } + companion object { private val logger = LoggerFactory.getLogger(ReplicaContext::class.java) const val READ_TIMEOUT = 60 * 60 * 1000L diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/context/ReplicaExecutionContext.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/context/ReplicaExecutionContext.kt index ceafe53d81..2a047fe638 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/context/ReplicaExecutionContext.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/context/ReplicaExecutionContext.kt @@ -48,7 +48,7 @@ class ReplicaExecutionContext( /** * 同步进度 */ - + @Deprecated("以制品维度记录RecordDetail") val progress = ReplicaProgress() /** @@ -69,16 +69,4 @@ class ReplicaExecutionContext( fun buildErrorReason(): String { return errorReason.toString() } - - /** - * 更新进度 - * @param executed 是否执行了同步 - */ - fun updateProgress(executed: Boolean) { - if (executed) { - progress.success += 1 - } else { - progress.skip += 1 - } - } } diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/executor/AbstractReplicaJobExecutor.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/executor/AbstractReplicaJobExecutor.kt index 3a27b84c6a..ad49741fcf 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/executor/AbstractReplicaJobExecutor.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/executor/AbstractReplicaJobExecutor.kt @@ -34,7 +34,10 @@ import com.tencent.bkrepo.replication.manager.LocalDataManager import com.tencent.bkrepo.replication.pojo.cluster.ClusterNodeName import com.tencent.bkrepo.replication.pojo.record.ExecutionResult import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus +import com.tencent.bkrepo.replication.pojo.record.ReplicaOverview +import com.tencent.bkrepo.replication.pojo.record.ReplicaProgress import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordInfo +import com.tencent.bkrepo.replication.pojo.record.ResultsSummary import com.tencent.bkrepo.replication.pojo.task.ReplicaTaskDetail import com.tencent.bkrepo.replication.replica.type.ReplicaService import com.tencent.bkrepo.replication.replica.context.ReplicaContext @@ -69,42 +72,67 @@ open class AbstractReplicaJobExecutor( clusterNodeName: ClusterNodeName, event: ArtifactEvent? = null ): Future { - return threadPoolExecutor.submit( Callable { - try { - val clusterNode = clusterNodeService.getByClusterId(clusterNodeName.id) - require(clusterNode != null) { "Cluster[${clusterNodeName.id}] does not exist." } - var status = ExecutionStatus.SUCCESS - var message: String? = null - taskDetail.objects.map { taskObject -> - val localRepo = localDataManager.findRepoByName( - taskDetail.task.projectId, - taskObject.localRepoName, - taskObject.repoType.toString() - ) - val context = ReplicaContext( - taskDetail = taskDetail, - taskObject = taskObject, - taskRecord = taskRecord, - localRepo = localRepo, - remoteCluster = clusterNode, - replicationProperties = replicationProperties - ) - event?.let { context.event = it } - replicaService.replica(context) - if (context.status == ExecutionStatus.FAILED) { - status = context.status - message = context.errorMessage + return threadPoolExecutor.submit( + Callable { + var replicaProgress = ReplicaProgress() + try { + val clusterNode = clusterNodeService.getByClusterId(clusterNodeName.id) + require(clusterNode != null) { "Cluster[${clusterNodeName.id}] does not exist." } + var status = ExecutionStatus.SUCCESS + var message: String? = null + taskDetail.objects.map { taskObject -> + val localRepo = localDataManager.findRepoByName( + taskDetail.task.projectId, + taskObject.localRepoName, + taskObject.repoType.toString() + ) + val context = ReplicaContext( + taskDetail = taskDetail, + taskObject = taskObject, + taskRecord = taskRecord, + localRepo = localRepo, + remoteCluster = clusterNode, + replicationProperties = replicationProperties + ) + event?.let { context.event = it } + replicaService.replica(context) + replicaProgress = replicaProgress.plus(context.replicaProgress) + if (context.status == ExecutionStatus.FAILED) { + status = context.status + message = context.errorMessage + } } + ExecutionResult(status = status, errorReason = message, progress = replicaProgress) + } catch (exception: Throwable) { + logger.error("${taskDetail.task.name}/$clusterNodeName] replica exception:${exception}") + ExecutionResult.fail("${clusterNodeName.name}:${exception.message}\n", replicaProgress) } - ExecutionResult(status = status, errorReason = message) - } catch (exception: Throwable) { - logger.error("同步任务执行失败", exception) - ExecutionResult.fail(exception.message) - } - }.trace() + }.trace() ) } + /** + * 以Task维度,汇总线程执行结果 + */ + protected fun getResultsSummary(results: List): ResultsSummary { + val replicaOverview = ReplicaOverview() + var status = ExecutionStatus.SUCCESS + var errorReason = "" + results.forEach { result -> + if (result.status == ExecutionStatus.FAILED) { + status = ExecutionStatus.FAILED + errorReason = "部分数据同步失败 " + errorReason += result.errorReason ?: "" + } + result.progress?.let { progress -> + replicaOverview.success += (progress.success + progress.skip) + replicaOverview.failed += progress.failed + replicaOverview.conflict += progress.conflict + } + } + return ResultsSummary(replicaOverview, errorReason, status) + } + companion object { private val logger = LoggerFactory.getLogger(AbstractReplicaJobExecutor::class.java) } diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/replicator/standalone/ClusterReplicator.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/replicator/standalone/ClusterReplicator.kt index 8b06e46df6..a0c7eb838a 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/replicator/standalone/ClusterReplicator.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/replicator/standalone/ClusterReplicator.kt @@ -41,8 +41,6 @@ import com.tencent.bkrepo.replication.constant.RETRY_COUNT import com.tencent.bkrepo.replication.enums.WayOfPushArtifact import com.tencent.bkrepo.replication.exception.ArtifactPushException import com.tencent.bkrepo.replication.manager.LocalDataManager -import com.tencent.bkrepo.replication.pojo.request.PackageVersionExistCheckRequest -import com.tencent.bkrepo.replication.pojo.task.setting.ConflictStrategy import com.tencent.bkrepo.replication.replica.replicator.base.internal.ClusterArtifactReplicationHandler import com.tencent.bkrepo.replication.replica.repository.internal.PackageNodeMappings import com.tencent.bkrepo.replication.replica.context.FilePushContext @@ -139,23 +137,6 @@ class ClusterReplicator( with(context) { // 外部集群仓库没有project/repoName if (remoteProjectId.isNullOrBlank() || remoteRepoName.isNullOrBlank()) return true - // 包版本冲突检查 - val fullPath = "${packageSummary.name}-${packageVersion.name}" - val checkRequest = PackageVersionExistCheckRequest( - projectId = remoteProjectId, - repoName = remoteRepoName, - packageKey = packageSummary.key, - versionName = packageVersion.name - ) - if (artifactReplicaClient!!.checkPackageVersionExist(checkRequest).data == true) { - when (task.setting.conflictStrategy) { - ConflictStrategy.SKIP -> return false - ConflictStrategy.FAST_FAIL -> throw IllegalArgumentException("Package [$fullPath] conflict.") - else -> { - // do nothing - } - } - } // 文件数据 PackageNodeMappings.map( packageSummary = packageSummary, @@ -298,17 +279,6 @@ class ClusterReplicator( with(context) { // 外部集群仓库没有project/repoName if (remoteProjectId.isNullOrBlank() || remoteRepoName.isNullOrBlank()) return null - val fullPath = "${node.projectId}/${node.repoName}${node.fullPath}" - // 节点冲突检查 - if (artifactReplicaClient!!.checkNodeExist(remoteProjectId, remoteRepoName, node.fullPath).data == true) { - when (task.setting.conflictStrategy) { - ConflictStrategy.SKIP -> return null - ConflictStrategy.FAST_FAIL -> throw IllegalArgumentException("File[$fullPath] conflict.") - else -> { - // do nothing - } - } - } // 查询元数据 val metadata = if (task.setting.includeMetadata) node.nodeMetadata else emptyList() return NodeCreateRequest( diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/AbstractReplicaService.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/AbstractReplicaService.kt index cd534dfab2..2b53dd1b84 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/AbstractReplicaService.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/AbstractReplicaService.kt @@ -36,9 +36,11 @@ import com.tencent.bkrepo.replication.pojo.metrics.ReplicationRecord import com.tencent.bkrepo.replication.pojo.record.ExecutionResult import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus import com.tencent.bkrepo.replication.pojo.record.request.RecordDetailInitialRequest +import com.tencent.bkrepo.replication.pojo.request.PackageVersionExistCheckRequest import com.tencent.bkrepo.replication.pojo.task.ReplicaTaskInfo import com.tencent.bkrepo.replication.pojo.task.objects.PackageConstraint import com.tencent.bkrepo.replication.pojo.task.objects.PathConstraint +import com.tencent.bkrepo.replication.pojo.task.setting.ConflictStrategy import com.tencent.bkrepo.replication.pojo.task.setting.ErrorStrategy import com.tencent.bkrepo.replication.replica.context.ReplicaContext import com.tencent.bkrepo.replication.replica.context.ReplicaExecutionContext @@ -114,67 +116,53 @@ abstract class AbstractReplicaService( * 同步整个仓库数据 */ protected fun replicaByRepo(replicaContext: ReplicaContext) { - val context = initialExecutionContext(replicaContext) - try { - if (replicaContext.taskObject.repoType == RepositoryType.GENERIC) { - // 同步generic节点 - val root = localDataManager.findNodeDetail( - projectId = replicaContext.localProjectId, - repoName = replicaContext.localRepoName, - fullPath = PathUtils.ROOT - ).nodeInfo - replicaByPath(context, root) - logger.info( - "replicaByRepo for generic finished" + - " ${replicaContext.localProjectId}|${replicaContext.localRepoName}" - ) - return + if (replicaContext.taskObject.repoType == RepositoryType.GENERIC) { + // 同步generic节点 + val root = localDataManager.findNodeDetail( + projectId = replicaContext.localProjectId, + repoName = replicaContext.localRepoName, + fullPath = PathUtils.ROOT + ).nodeInfo + replicaByPath(replicaContext, root) + logger.info( + "replicaByRepo for generic finished" + + " ${replicaContext.localProjectId}|${replicaContext.localRepoName}" + ) + return + } + // 同步包 + val option = PackageListOption(pageNumber = 1, pageSize = PAGE_SIZE) + var packages = localDataManager.listPackagePage( + projectId = replicaContext.localProjectId, + repoName = replicaContext.localRepoName, + option = option + ) + while (packages.isNotEmpty()) { + packages.forEach { + replicaByPackage(replicaContext, it) } - // 同步包 - val option = PackageListOption(pageNumber = 1, pageSize = PAGE_SIZE) - var packages = localDataManager.listPackagePage( + option.pageNumber += 1 + packages = localDataManager.listPackagePage( projectId = replicaContext.localProjectId, repoName = replicaContext.localRepoName, option = option ) - while (packages.isNotEmpty()) { - packages.forEach { - replicaByPackage(context, it) - } - option.pageNumber += 1 - packages = localDataManager.listPackagePage( - projectId = replicaContext.localProjectId, - repoName = replicaContext.localRepoName, - option = option - ) - } - logger.info("replicaByRepo finished ${replicaContext.localProjectId}|${replicaContext.localRepoName}") - } catch (throwable: Throwable) { - setErrorStatus(context, throwable) - logger.error("replicaByRepo failed,error is ${Throwables.getStackTraceAsString(throwable)}") - } finally { - completeRecordDetail(context) } + logger.info("replicaByRepo finished ${replicaContext.localProjectId}|${replicaContext.localRepoName}") } /** * 同步指定包的数据 */ protected fun replicaByPackageConstraint(replicaContext: ReplicaContext, constraint: PackageConstraint) { - val context = initialExecutionContext(replicaContext, packageConstraint = constraint) - try { + with(replicaContext) { // 查询本地包信息 val packageSummary = localDataManager.findPackageByKey( - projectId = replicaContext.localProjectId, - repoName = replicaContext.taskObject.localRepoName, + projectId = localProjectId, + repoName = taskObject.localRepoName, packageKey = constraint.packageKey!! ) - replicaByPackage(context, packageSummary, constraint.versions) - } catch (throwable: Throwable) { - setErrorStatus(context, throwable) - setRunOnceTaskFailedRecordMetrics(context, throwable, packageConstraint = constraint) - } finally { - completeRecordDetail(context) + replicaByPackage(this, packageSummary, constraint.versions) } } @@ -182,20 +170,13 @@ abstract class AbstractReplicaService( * 同步指定路径的数据 */ protected fun replicaByPathConstraint(replicaContext: ReplicaContext, constraint: PathConstraint) { - val context = initialExecutionContext(replicaContext, pathConstraint = constraint) - try { + with(replicaContext) { val nodeInfo = localDataManager.findNodeDetail( - projectId = replicaContext.localProjectId, - repoName = replicaContext.localRepoName, + projectId = localProjectId, + repoName = localRepoName, fullPath = constraint.path!! ).nodeInfo - replicaByPath(context, nodeInfo) - } catch (throwable: Throwable) { - logger.error("replicaByPathConstraint ${constraint.path} failed, error is ${throwable.message}") - setErrorStatus(context, throwable) - setRunOnceTaskFailedRecordMetrics(context, throwable, pathConstraint = constraint) - } finally { - completeRecordDetail(context) + replicaByPath(this, nodeInfo) } } @@ -203,10 +184,26 @@ abstract class AbstractReplicaService( * 同步路径 * 采用广度优先遍历 */ - private fun replicaByPath(context: ReplicaExecutionContext, node: NodeInfo) { - with(context) { + private fun replicaByPath(replicaContext: ReplicaContext, node: NodeInfo) { + with(replicaContext) { if (!node.folder) { - replicaFile(context, node) + // 存在冲突:记录冲突策略 + // 外部集群仓库没有project/repoName + val conflictStrategy = if ( + !remoteProjectId.isNullOrBlank() && !remoteRepoName.isNullOrBlank() && + artifactReplicaClient!!.checkNodeExist(remoteProjectId, remoteRepoName, node.fullPath).data == true + ) { + replicaProgress.conflict++ + task.setting.conflictStrategy + } else null + val replicaExecutionContext = initialExecutionContext( + context = replicaContext, + artifactName = node.fullPath, + conflictStrategy = conflictStrategy, + size = node.size, + sha256 = node.sha256 + ) + replicaFile(replicaExecutionContext, node) return } // 查询子节点 @@ -230,7 +227,14 @@ abstract class AbstractReplicaService( sha256 = node.sha256, size = node.size.toString() ) - runActionAndPrintLog(context, record) { replicaContext.replicator.replicaFile(replicaContext, node) } + val fullPath = "${node.projectId}/${node.repoName}${node.fullPath}" + runActionAndPrintLog(context, record) { + when (context.detail.conflictStrategy) { + ConflictStrategy.SKIP -> false + ConflictStrategy.FAST_FAIL -> throw IllegalArgumentException("File[$fullPath] conflict.") + else -> replicaContext.replicator.replicaFile(replicaContext, node) + } + } } } @@ -238,36 +242,57 @@ abstract class AbstractReplicaService( * 根据[packageSummary]和版本列表[versionNames]执行同步 */ private fun replicaByPackage( - context: ReplicaExecutionContext, + replicaContext: ReplicaContext, packageSummary: PackageSummary, versionNames: List? = null ) { - with(context) { - replicator.replicaPackage(replicaContext, packageSummary) - // 同步package功能: 对应内部集群配置是当version不存在时则同步全部的package version - // 而对于外部集群配置而言,当version不存在时,则不进行同步 - val versions = versionNames?.map { - localDataManager.findPackageVersion( + replicaContext.replicator.replicaPackage(replicaContext, packageSummary) + // 同步package功能: 对应内部集群配置是当version不存在时则同步全部的package version + // 而对于外部集群配置而言,当version不存在时,则不进行同步 + val versions = versionNames?.map { + localDataManager.findPackageVersion( + projectId = replicaContext.localProjectId, + repoName = replicaContext.localRepoName, + packageKey = packageSummary.key, + version = it + ) + } ?: run { + if (replicaContext.remoteCluster.type == ClusterNodeType.REMOTE) { + emptyList() + } else { + localDataManager.listAllVersion( projectId = replicaContext.localProjectId, repoName = replicaContext.localRepoName, packageKey = packageSummary.key, - version = it + option = VersionListOption() ) - } ?: run { - if (replicaContext.remoteCluster.type == ClusterNodeType.REMOTE) { - emptyList() - } else { - localDataManager.listAllVersion( - projectId = replicaContext.localProjectId, - repoName = replicaContext.localRepoName, + } + } + versions.forEach { + // 存在冲突:记录冲突策略 + // 外部集群仓库没有project/repoName + val conflictStrategy = if ( + !replicaContext.remoteProjectId.isNullOrBlank() && !replicaContext.remoteRepoName.isNullOrBlank() && + replicaContext.artifactReplicaClient!!.checkPackageVersionExist( + PackageVersionExistCheckRequest( + projectId = replicaContext.remoteProjectId, + repoName = replicaContext.remoteRepoName, packageKey = packageSummary.key, - option = VersionListOption() + versionName = it.name ) - } - } - versions.forEach { - replicaPackageVersion(this, packageSummary, it) - } + ).data == true + ) { + replicaContext.replicaProgress.conflict++ + replicaContext.task.setting.conflictStrategy + } else null + val replicaExecutionContext = initialExecutionContext( + context = replicaContext, + artifactName = packageSummary.name, + version = it.name, + conflictStrategy = conflictStrategy, + size = it.size + ) + replicaPackageVersion(replicaExecutionContext, packageSummary, it) } } @@ -285,8 +310,13 @@ abstract class AbstractReplicaService( version = version.name, size = version.size.toString() ) + val fullPath = "${packageSummary.name}-${version.name}" runActionAndPrintLog(context, record) { - replicator.replicaPackageVersion(replicaContext, packageSummary, version) + when (context.detail.conflictStrategy) { + ConflictStrategy.SKIP -> false + ConflictStrategy.FAST_FAIL -> throw IllegalArgumentException("File[$fullPath] conflict.") + else -> replicator.replicaPackageVersion(replicaContext, packageSummary, version) + } } } } @@ -302,20 +332,25 @@ abstract class AbstractReplicaService( var errorReason: String? = null try { val executed = action() - updateProgress(executed) + replicaContext.updateProgress(executed) } catch (throwable: Throwable) { status = ExecutionStatus.FAILED errorReason = throwable.message.orEmpty() logger.error( "replica file failed, " + - "error is ${Throwables.getStackTraceAsString(throwable)}" + "error is ${Throwables.getStackTraceAsString(throwable)}" ) - progress.failed += 1 + replicaContext.replicaProgress.failed++ setErrorStatus(this, throwable) if (replicaContext.task.setting.errorStrategy == ErrorStrategy.FAST_FAIL) { throw throwable } } finally { + if (context.replicaContext.task.record == false) { + replicaRecordService.deleteRecordDetailById(detail.id) + } else { + completeRecordDetail(context) + } setRunOnceTaskRecordMetrics( task = replicaContext.task, recordId = detail.recordId, @@ -389,7 +424,12 @@ abstract class AbstractReplicaService( private fun initialExecutionContext( context: ReplicaContext, packageConstraint: PackageConstraint? = null, - pathConstraint: PathConstraint? = null + pathConstraint: PathConstraint? = null, + artifactName: String, + version: String? = null, + conflictStrategy: ConflictStrategy? = null, + size: Long? = null, + sha256: String? = null ): ReplicaExecutionContext { // 创建详情 val request = RecordDetailInitialRequest( @@ -398,7 +438,12 @@ abstract class AbstractReplicaService( localRepoName = context.localRepoName, repoType = context.localRepoType, packageConstraint = packageConstraint, - pathConstraint = pathConstraint + pathConstraint = pathConstraint, + artifactName = artifactName, + version = version, + conflictStrategy = conflictStrategy, + size = size, + sha256 = sha256 ) val recordDetail = replicaRecordService.initialRecordDetail(request) return ReplicaExecutionContext(context, recordDetail) diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/edge/EdgePullReplicaTaskJob.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/edge/EdgePullReplicaTaskJob.kt index 3b71958920..5496d26305 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/edge/EdgePullReplicaTaskJob.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/edge/EdgePullReplicaTaskJob.kt @@ -131,6 +131,8 @@ class EdgePullReplicaTaskJob( nextExecutionTime = nextExecutionTime, executionTimes = executionTimes, enabled = enabled, + record = record, + recordReserveDays = recordReserveDays, createdBy = createdBy, createdDate = LocalDateTime.parse(createdDate, DateTimeFormatter.ISO_DATE_TIME), lastModifiedBy = lastModifiedBy, diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventBasedReplicaJobExecutor.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventBasedReplicaJobExecutor.kt index 582f894310..aff26c8ee0 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventBasedReplicaJobExecutor.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventBasedReplicaJobExecutor.kt @@ -58,7 +58,14 @@ class EventBasedReplicaJobExecutor( val task = taskDetail.task val taskRecord: ReplicaRecordInfo = replicaRecordService.findOrCreateLatestRecord(task.key) try { - task.remoteClusters.map { submit(taskDetail, taskRecord, it, event) }.map { it.get() } + val results = task.remoteClusters.map { submit(taskDetail, taskRecord, it, event) }.map { it.get() } + val replicaOverview = getResultsSummary(results).replicaOverview + taskRecord.replicaOverview?.let { overview -> + replicaOverview.success += overview.success + replicaOverview.failed += overview.failed + replicaOverview.conflict += overview.conflict + } + replicaRecordService.updateRecordReplicaOverview(taskRecord.id, replicaOverview) logger.info("Replica ${event.getFullResourceKey()} completed.") } catch (exception: Exception) { logger.error("Replica ${event.getFullResourceKey()}} failed: $exception", exception) diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/manual/ManualReplicaJobExecutor.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/manual/ManualReplicaJobExecutor.kt index 1b099b6057..76c8d7c3ca 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/manual/ManualReplicaJobExecutor.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/manual/ManualReplicaJobExecutor.kt @@ -30,6 +30,7 @@ package com.tencent.bkrepo.replication.replica.type.manual import com.tencent.bkrepo.replication.config.ReplicationProperties import com.tencent.bkrepo.replication.manager.LocalDataManager import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus +import com.tencent.bkrepo.replication.pojo.record.ReplicaOverview import com.tencent.bkrepo.replication.pojo.task.ReplicaStatus import com.tencent.bkrepo.replication.pojo.task.ReplicaTaskDetail import com.tencent.bkrepo.replication.replica.executor.AbstractReplicaJobExecutor @@ -62,6 +63,7 @@ class ManualReplicaJobExecutor( logger.info("The run once replication task[${taskDetail.task.key}] will be manually executed.") var status = ExecutionStatus.SUCCESS var errorReason: String? = null + var replicaOverview: ReplicaOverview? = null val taskRecord = replicaRecordService.findOrCreateLatestRecord(taskDetail.task.key) .copy(startTime = LocalDateTime.now()) try { @@ -74,20 +76,17 @@ class ManualReplicaJobExecutor( )) ) val result = taskDetail.task.remoteClusters.map { submit(taskDetail, taskRecord, it) }.map { it.get() } - result.forEach { - if (it.status == ExecutionStatus.FAILED) { - status = ExecutionStatus.FAILED - errorReason = it.errorReason - return@forEach - } - } + val resultsSummary = getResultsSummary(result) + status = resultsSummary.status + replicaOverview = resultsSummary.replicaOverview + errorReason = resultsSummary.errorReason } catch (exception: Exception) { // 记录异常 status = ExecutionStatus.FAILED errorReason = exception.message.orEmpty() } finally { // 保存结果 - replicaRecordService.completeRecord(taskRecord.id, status, errorReason) + replicaRecordService.completeRecord(taskRecord.id, status, errorReason, replicaOverview) val taskStatus = if (isCronJob(taskDetail.task.setting, taskDetail.task.replicaType)) ReplicaStatus.WAITING else ReplicaStatus.COMPLETED logger.info( diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/schedule/ScheduledReplicaJobExecutor.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/schedule/ScheduledReplicaJobExecutor.kt index c76a213e5b..58e5769778 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/schedule/ScheduledReplicaJobExecutor.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/schedule/ScheduledReplicaJobExecutor.kt @@ -30,6 +30,7 @@ package com.tencent.bkrepo.replication.replica.type.schedule import com.tencent.bkrepo.replication.config.ReplicationProperties import com.tencent.bkrepo.replication.manager.LocalDataManager import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus +import com.tencent.bkrepo.replication.pojo.record.ReplicaOverview import com.tencent.bkrepo.replication.pojo.task.ReplicaTaskInfo import com.tencent.bkrepo.replication.replica.executor.AbstractReplicaJobExecutor import com.tencent.bkrepo.replication.service.ClusterNodeService @@ -65,18 +66,17 @@ class ScheduledReplicaJobExecutor( var status = ExecutionStatus.SUCCESS var errorReason: String? = null var recordId: String? = null + var replicaOverview: ReplicaOverview? = null try { // 查询同步对象 val taskDetail = replicaTaskService.getDetailByTaskKey(task.key) // 开启新的同步记录 val taskRecord = replicaRecordService.startNewRecord(task.key).apply { recordId = id } val result = task.remoteClusters.map { submit(taskDetail, taskRecord, it) }.map { it.get() } - result.forEach { - if (it.status == ExecutionStatus.FAILED) { - status = ExecutionStatus.FAILED - errorReason = "部分数据同步失败" - } - } + val resultsSummary = getResultsSummary(result) + status = resultsSummary.status + errorReason = resultsSummary.errorReason + replicaOverview = resultsSummary.replicaOverview } catch (exception: Exception) { logger.error("提交同步任务失败", exception) // 记录异常 @@ -84,7 +84,7 @@ class ScheduledReplicaJobExecutor( errorReason = exception.message.orEmpty() } finally { // 保存结果 - replicaRecordService.completeRecord(recordId!!, status, errorReason) + replicaRecordService.completeRecord(recordId!!, status, errorReason, replicaOverview) logger.info("Replica task[$taskId], record[$recordId] finished") } } diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/ReplicaRecordService.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/ReplicaRecordService.kt index eea4476835..a87adaa1dc 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/ReplicaRecordService.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/ReplicaRecordService.kt @@ -30,6 +30,7 @@ package com.tencent.bkrepo.replication.service import com.tencent.bkrepo.common.api.pojo.Page import com.tencent.bkrepo.replication.pojo.record.ExecutionResult import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus +import com.tencent.bkrepo.replication.pojo.record.ReplicaOverview import com.tencent.bkrepo.replication.pojo.record.ReplicaProgress import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordDetail import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordDetailListOption @@ -63,8 +64,14 @@ interface ReplicaRecordService { * @param recordId 记录id * @param status 执行状态 * @param errorReason 错误原因,当status为失败情况下才设置 + * @param replicaOverview 执行结果总览 */ - fun completeRecord(recordId: String, status: ExecutionStatus, errorReason: String? = null) + fun completeRecord( + recordId: String, + status: ExecutionStatus, + errorReason: String? = null, + replicaOverview: ReplicaOverview? = null + ) /** * 初始化一条同步详情 @@ -129,6 +136,13 @@ interface ReplicaRecordService { */ fun getRecordDetailById(id: String): ReplicaRecordDetail? + /** + * 根据[id]删除执行记录详情 + * + * @param id 记录详情id + */ + fun deleteRecordDetailById(id: String) + /** * 根据任务[key]删除执行记录 * @param key 任务key @@ -160,4 +174,9 @@ interface ReplicaRecordService { * 边缘节点回写记录到中心节点 */ fun writeBack(replicaRecordInfo: ReplicaRecordInfo) + + /** + * 更新ReplicaRecord的replicaOverview + */ + fun updateRecordReplicaOverview(recordId: String, replicaOverview: ReplicaOverview) } diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/ReplicaRecordServiceImpl.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/ReplicaRecordServiceImpl.kt index 9558d32e7a..5d2b32a133 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/ReplicaRecordServiceImpl.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/ReplicaRecordServiceImpl.kt @@ -29,6 +29,7 @@ package com.tencent.bkrepo.replication.service.impl import com.tencent.bkrepo.common.api.exception.ErrorCodeException import com.tencent.bkrepo.common.api.pojo.Page +import com.tencent.bkrepo.common.mongo.constant.ID import com.tencent.bkrepo.common.mongo.dao.util.Pages import com.tencent.bkrepo.common.service.cluster.ClusterProperties import com.tencent.bkrepo.common.service.cluster.DefaultCondition @@ -40,6 +41,7 @@ import com.tencent.bkrepo.replication.model.TReplicaRecord import com.tencent.bkrepo.replication.model.TReplicaRecordDetail import com.tencent.bkrepo.replication.pojo.record.ExecutionResult import com.tencent.bkrepo.replication.pojo.record.ExecutionStatus +import com.tencent.bkrepo.replication.pojo.record.ReplicaOverview import com.tencent.bkrepo.replication.pojo.record.ReplicaProgress import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordDetail import com.tencent.bkrepo.replication.pojo.record.ReplicaRecordDetailListOption @@ -57,6 +59,9 @@ import com.tencent.bkrepo.replication.util.TaskRecordQueryHelper import org.slf4j.LoggerFactory import org.springframework.context.annotation.Conditional import org.springframework.dao.DuplicateKeyException +import org.springframework.data.mongodb.core.query.Criteria +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.Update import org.springframework.stereotype.Service import java.time.LocalDateTime @@ -99,7 +104,12 @@ class ReplicaRecordServiceImpl( } } - override fun completeRecord(recordId: String, status: ExecutionStatus, errorReason: String?) { + override fun completeRecord( + recordId: String, + status: ExecutionStatus, + errorReason: String?, + replicaOverview: ReplicaOverview? + ) { val replicaRecordInfo = getRecordById(recordId) ?: throw ErrorCodeException(ReplicationMessageCode.REPLICA_TASK_NOT_FOUND, recordId) val record = with(replicaRecordInfo) { @@ -109,7 +119,8 @@ class ReplicaRecordServiceImpl( status = status, startTime = startTime, endTime = LocalDateTime.now(), - errorReason = errorReason + errorReason = errorReason, + replicaOverview = replicaOverview ) } val tReplicaTask = replicaTaskDao.findByKey(record.taskKey) @@ -136,6 +147,11 @@ class ReplicaRecordServiceImpl( repoType = repoType, packageConstraint = packageConstraint, pathConstraint = pathConstraint, + artifactName = artifactName, + version = version, + conflictStrategy = conflictStrategy, + size = size, + sha256 = sha256, status = ExecutionStatus.RUNNING, progress = ReplicaProgress(), startTime = LocalDateTime.now() @@ -161,25 +177,14 @@ class ReplicaRecordServiceImpl( } override fun completeRecordDetail(detailId: String, result: ExecutionResult) { - val replicaRecordDetail = getRecordDetailById(detailId) + val tReplicaRecordDetail = replicaRecordDetailDao.findById(detailId) ?: throw ErrorCodeException(ReplicationMessageCode.REPLICA_TASK_NOT_FOUND, detailId) - val recordDetail = with(replicaRecordDetail) { - TReplicaRecordDetail( - id = detailId, - recordId = recordId, - localCluster = localCluster, - remoteCluster = remoteCluster, - localRepoName = localRepoName, - repoType = repoType, - packageConstraint = packageConstraint, - pathConstraint = pathConstraint, - status = result.status, - progress = result.progress!!, - startTime = startTime, - endTime = LocalDateTime.now(), - errorReason = result.errorReason - ) - } + val recordDetail = tReplicaRecordDetail.copy( + status = result.status, + progress = result.progress!!, + errorReason = result.errorReason, + endTime = LocalDateTime.now() + ) replicaRecordDetailDao.save(recordDetail) } @@ -218,6 +223,10 @@ class ReplicaRecordServiceImpl( return convert(findRecordDetailById(id)) } + override fun deleteRecordDetailById(id: String) { + replicaRecordDetailDao.removeById(id) + } + private fun findRecordDetailById(id: String): TReplicaRecordDetail? { return replicaRecordDetailDao.findById(id) } @@ -256,6 +265,13 @@ class ReplicaRecordServiceImpl( return } + override fun updateRecordReplicaOverview(recordId: String, replicaOverview: ReplicaOverview) { + replicaRecordDao.updateFirst( + Query(Criteria.where(ID).`is`(recordId)), + Update.update(TReplicaRecord::replicaOverview.name, replicaOverview) + ) + } + companion object { private val logger = LoggerFactory.getLogger(ReplicaRecordServiceImpl::class.java) @@ -272,7 +288,8 @@ class ReplicaRecordServiceImpl( status = it.status, startTime = it.startTime, endTime = it.endTime, - errorReason = it.errorReason + errorReason = it.errorReason, + replicaOverview = it.replicaOverview ) } } @@ -288,6 +305,11 @@ class ReplicaRecordServiceImpl( repoType = it.repoType, packageConstraint = it.packageConstraint, pathConstraint = it.pathConstraint, + artifactName = it.artifactName, + version = it.version, + conflictStrategy = it.conflictStrategy, + size = it.size, + sha256 = it.sha256, status = it.status, progress = it.progress, startTime = it.startTime, diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/ReplicaTaskServiceImpl.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/ReplicaTaskServiceImpl.kt index 83ae95b220..0d981e515a 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/ReplicaTaskServiceImpl.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/service/impl/ReplicaTaskServiceImpl.kt @@ -207,6 +207,8 @@ class ReplicaTaskServiceImpl( nextExecutionTime = null, executionTimes = 0L, enabled = enabled, + record = record, + recordReserveDays = recordReserveDays, createdBy = userId, createdDate = LocalDateTime.now(), lastModifiedBy = userId, @@ -317,6 +319,10 @@ class ReplicaTaskServiceImpl( if (replicaType != ReplicaType.EDGE_PULL) { Preconditions.checkNotBlank(remoteClusterIds, this::remoteClusterIds.name) } + Preconditions.checkArgument( + recordReserveDays in RECORD_RESERVE_DAYS_MIN..RECORD_RESERVE_DAYS_MAX, + "recordReserveDays" + ) // 校验计划名称长度 if (name.length < TASK_NAME_LENGTH_MIN || name.length > TASK_NAME_LENGTH_MAX) { throw ErrorCodeException(CommonMessageCode.PARAMETER_INVALID, request::name.name) @@ -457,6 +463,10 @@ class ReplicaTaskServiceImpl( throw ErrorCodeException(ReplicationMessageCode.TASK_DISABLE_UPDATE, key) } } + Preconditions.checkArgument( + recordReserveDays in RECORD_RESERVE_DAYS_MIN..RECORD_RESERVE_DAYS_MAX, + "recordReserveDays" + ) // 更新任务 val userId = SecurityUtils.getUserId() // 查询集群节点信息 @@ -485,7 +495,9 @@ class ReplicaTaskServiceImpl( ), description = description, lastModifiedBy = userId, - lastModifiedDate = LocalDateTime.now() + lastModifiedDate = LocalDateTime.now(), + record = record, + recordReserveDays = recordReserveDays ) // 创建replicaObject val replicaObjectList = replicaTaskObjects.map { @@ -561,6 +573,8 @@ class ReplicaTaskServiceImpl( private val logger = LoggerFactory.getLogger(ReplicaTaskServiceImpl::class.java) private const val TASK_NAME_LENGTH_MIN = 2 private const val TASK_NAME_LENGTH_MAX = 256 + private const val RECORD_RESERVE_DAYS_MIN = 1 + private const val RECORD_RESERVE_DAYS_MAX = 60 private fun convert(tReplicaTask: TReplicaTask?): ReplicaTaskInfo? { return tReplicaTask?.let { @@ -582,6 +596,8 @@ class ReplicaTaskServiceImpl( nextExecutionTime = it.nextExecutionTime, executionTimes = it.executionTimes, enabled = it.enabled, + record = it.record, + recordReserveDays = it.recordReserveDays, createdBy = it.createdBy, createdDate = it.createdDate.format(DateTimeFormatter.ISO_DATE_TIME), lastModifiedBy = it.lastModifiedBy, diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/util/TaskRecordQueryHelper.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/util/TaskRecordQueryHelper.kt index 1a9206421b..9b2e9e39a7 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/util/TaskRecordQueryHelper.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/util/TaskRecordQueryHelper.kt @@ -58,6 +58,10 @@ object TaskRecordQueryHelper { path?.let { and("pathConstraint.path").isEqualTo("^$it") } }.apply { status?.let { and(TReplicaRecordDetail::status).isEqualTo(it) } + }.apply { + artifactName?.let { and(TReplicaRecordDetail::artifactName).regex("^$it") } + }.apply { + version?.let { and(TReplicaRecordDetail::version).isEqualTo(it) } } return Query(criteria) .with(Sort.by(Sort.Order(Sort.Direction.DESC, TReplicaRecordDetail::startTime.name)))