Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复fileRetainResolver自启动对数据库的压力过大问题 #2685

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.springframework.data.mongodb.core.find
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
import org.springframework.util.unit.DataSize
import java.time.LocalDateTime

Expand All @@ -32,16 +31,13 @@ import java.time.LocalDateTime
* */
class BasedRepositoryNodeRetainResolver(
private val expireConfig: RepositoryExpireConfig,
taskScheduler: ThreadPoolTaskScheduler,
private val fileCacheService: FileCacheService,
private val mongoTemplate: MongoTemplate,
) : NodeRetainResolver {

private var retainNodes = HashMap<String, RetainNode>()

init {
taskScheduler.scheduleWithFixedDelay(this::refreshRetainNode, expireConfig.cacheTime)
}
private var lastUpdateTime: Long = -1

override fun retain(sha256: String): Boolean {
return retainNodes.contains(sha256)
Expand All @@ -51,8 +47,12 @@ class BasedRepositoryNodeRetainResolver(
return retainNodes[sha256]
}

private fun refreshRetainNode() {
fun refreshRetainNode() {
logger.info("Refresh retain nodes start. size of nodes ${retainNodes.size}")
if (System.currentTimeMillis() < lastUpdateTime + expireConfig.cacheTime) {
logger.info("BasedRepositoryNodeRetainResolver was refreshed")
return
}
try {
val temp = HashMap<String, RetainNode>()
val configs = expireConfig.repos.map { convertRepoConfigToFileCache(it) } + fileCacheService.list()
Expand All @@ -63,7 +63,7 @@ class BasedRepositoryNodeRetainResolver(
repoName = config.repoName,
fullPath = node[FULL_PATH].toString(),
sha256 = node[SHA256].toString(),
size = node[SIZE].toString().toLong()
size = node[SIZE].toString().toLong(),
)
temp[retainNode.sha256] = retainNode
logger.info("Retain node[$retainNode]")
Expand All @@ -73,17 +73,18 @@ class BasedRepositoryNodeRetainResolver(
} catch (e: Exception) {
logger.warn("An error occurred while refreshing retain node $e")
}
lastUpdateTime = System.currentTimeMillis()
logger.info("Refresh retain nodes finished. size of nodes ${retainNodes.size}")
}

private fun convertRepoConfigToFileCache(repoConfig: RepoConfig):TFileCache {
private fun convertRepoConfigToFileCache(repoConfig: RepoConfig): TFileCache {
return TFileCache(
id = null,
projectId = repoConfig.projectId,
repoName = repoConfig.repoName,
pathPrefix = repoConfig.pathPrefix,
days = repoConfig.days,
size = expireConfig.size.toMegabytes()
size = expireConfig.size.toMegabytes(),
)
}

Expand All @@ -97,7 +98,7 @@ class BasedRepositoryNodeRetainResolver(
size = tFileCache.size,
dateTime = dateTime,
collection = collectionName,
pathPrefixs = tFileCache.pathPrefix
pathPrefixs = tFileCache.pathPrefix,
)
}

Expand All @@ -108,7 +109,7 @@ class BasedRepositoryNodeRetainResolver(
dateTime: LocalDateTime,
collection: String,
batchSize: Int = 20000,
pathPrefixs: List<String>
pathPrefixs: List<String>,
): Set<Map<String, Any?>> {
val temp = mutableSetOf<Map<String, Any?>>()
val prefixCri = pathPrefixs.map {
Expand All @@ -119,7 +120,7 @@ class BasedRepositoryNodeRetainResolver(
Criteria.where(PROJECT).isEqualTo(projectId).and(REPO).isEqualTo(repoName)
.and(FOLDER).isEqualTo(false).and(SIZE).gte(DataSize.ofMegabytes(size).toBytes())
.and(LAST_ACCESS_DATE).gt(dateTime).andOperator(Criteria().orOperator(prefixCri))
.and(DELETED_DATE).isEqualTo(null)
.and(DELETED_DATE).isEqualTo(null),
)

val fields = query.fields()
Expand All @@ -145,10 +146,8 @@ class BasedRepositoryNodeRetainResolver(
return temp
}


companion object {
private val logger = LoggerFactory.getLogger(BasedRepositoryNodeRetainResolver::class.java)
private const val COLLECTION_NODE_PREFIX = "node_"

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@ import com.tencent.bkrepo.job.service.FileCacheService
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler

@Configuration
class ExpireFileResolverConfig {
@Bean
fun fileRetainResolver(
expiredCacheFileCleanupJobProperties: ExpiredCacheFileCleanupJobProperties,
scheduler: ThreadPoolTaskScheduler,
fileCacheService: FileCacheService,
mongoTemplate: MongoTemplate
): FileRetainResolver {
return BasedRepositoryNodeRetainResolver(
expiredCacheFileCleanupJobProperties.repoConfig,
scheduler,
fileCacheService,
mongoTemplate
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import com.tencent.bkrepo.common.service.cluster.properties.ClusterProperties
import com.tencent.bkrepo.common.storage.config.StorageProperties
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.filesystem.cleanup.FileRetainResolver
import com.tencent.bkrepo.common.storage.util.toPath
import com.tencent.bkrepo.job.batch.base.DefaultContextJob
import com.tencent.bkrepo.job.batch.base.JobContext
import com.tencent.bkrepo.job.batch.file.BasedRepositoryNodeRetainResolver
import com.tencent.bkrepo.job.config.properties.ExpiredCacheFileCleanupJobProperties
import com.tencent.bkrepo.job.metrics.StorageCacheMetrics
import org.slf4j.LoggerFactory
Expand All @@ -59,6 +61,7 @@ class ExpiredCacheFileCleanupJob(
private val clusterProperties: ClusterProperties,
private val storageProperties: StorageProperties,
private val storageCacheMetrics: StorageCacheMetrics,
private val fileRetainResolver: BasedRepositoryNodeRetainResolver,
) : DefaultContextJob(properties) {

data class TStorageCredentials(
Expand All @@ -74,6 +77,7 @@ class ExpiredCacheFileCleanupJob(
override fun getLockAtMostFor(): Duration = Duration.ofDays(1)

override fun doStart0(jobContext: JobContext) {
fileRetainResolver.refreshRetainNode()
// cleanup default storage
if (DEFAULT_STORAGE_KEY !in properties.ignoredStorageCredentialsKeys) {
cleanupStorage(storageProperties.defaultStorageCredentials())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.tencent.bkrepo.common.storage.config.StorageProperties
import com.tencent.bkrepo.common.storage.core.cache.indexer.StorageCacheIndexProperties
import com.tencent.bkrepo.common.storage.core.cache.indexer.StorageCacheIndexerManager
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.job.batch.file.BasedRepositoryNodeRetainResolver
import com.tencent.bkrepo.job.config.properties.StorageCacheIndexEvictJobProperties
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.data.mongodb.core.MongoTemplate
Expand All @@ -49,7 +50,8 @@ class StorageCacheIndexEvictJob(
clusterProperties: ClusterProperties,
mongoTemplate: MongoTemplate,
storageCacheIndexProperties: StorageCacheIndexProperties?,
indexerManager: StorageCacheIndexerManager?
indexerManager: StorageCacheIndexerManager?,
private val fileRetainResolver: BasedRepositoryNodeRetainResolver,
) : StorageCacheIndexJob(
properties,
storageProperties,
Expand All @@ -60,6 +62,7 @@ class StorageCacheIndexEvictJob(
) {

override fun doWithCredentials(credentials: StorageCredentials) {
fileRetainResolver.refreshRetainNode()
val evicted = indexerManager?.evict(credentials, Int.MAX_VALUE)
logger.info("credential[${credentials.key}] evict[$evicted]")
}
Expand Down
Loading