Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 大文件下载优化 #2656 #2665

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.tencent.bkrepo.common.api.serializer

import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
import org.springframework.util.unit.DataSize

class DataSizeDeserializer : StdDeserializer<DataSize>(DataSize::class.java) {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): DataSize? {
return if (p.text != null) DataSize.parse(p.text) else null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.tencent.bkrepo.common.api.stream

import java.io.InputStream

/**
* 分块输入流
* */
class ChunkedInputStream(
val iterator: Iterator<InputStream>,
) : InputStream() {
private var cursor: InputStream = iterator.next()
override fun read(): Int {
val read = cursor.read()
if (read == -1) {
cursor.close()
if (iterator.hasNext()) {
move2next()
return cursor.read()
}
}
return read
}

override fun read(b: ByteArray, off: Int, len: Int): Int {
val read = cursor.read(b, off, len)
if (read == -1) {
cursor.close()
if (iterator.hasNext()) {
move2next()
return cursor.read(b, off, len)
}
}
return read
}

private fun move2next() {
cursor = iterator.next()
}

override fun close() {
cursor.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonTypeRef
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
import com.tencent.bkrepo.common.api.serializer.DataSizeDeserializer
import com.tencent.bkrepo.common.api.serializer.DataSizeSerializer
import org.springframework.util.unit.DataSize
import java.io.InputStream
Expand Down Expand Up @@ -73,6 +74,7 @@ object JsonUtils {
registerModule(Jdk8Module())
var dateSizeModule = SimpleModule()
dateSizeModule.addSerializer(DataSize::class.java, DataSizeSerializer())
dateSizeModule.addDeserializer(DataSize::class.java, DataSizeDeserializer())
registerModule(dateSizeModule)
enable(SerializationFeature.INDENT_OUTPUT)
disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import java.io.InputStream
*/
open class ArtifactInputStream(
delegate: InputStream,
val range: Range
val range: Range,
) : DelegateInputStream(delegate) {

private val listenerList = mutableListOf<StreamReadListener>()
Expand Down Expand Up @@ -84,8 +84,8 @@ open class ArtifactInputStream(
/**
* 添加流读取监听器[listener]
*/
fun addListener(listener: StreamReadListener) {
if (range.isPartialContent()) {
fun addListener(listener: StreamReadListener, allowPartial: Boolean = false) {
if (range.isPartialContent() && !allowPartial) {
listener.close()
throw IllegalArgumentException("ArtifactInputStream is partial content, maybe cause data inconsistent")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package com.tencent.bkrepo.common.storage.config

import org.springframework.util.unit.DataSize
import java.time.Duration

/**
Expand Down Expand Up @@ -61,5 +62,15 @@ data class CacheProperties(
/**
* 最大允许缓存的制品总大小,为0时表示无限制
*/
var maxSize: Long = 0
var maxSize: Long = 0,

/**
* 是否开启大文件下载优化
* */
var largeFileOptimization: Boolean = false,

/**
* 大文件阈值
* */
var largeFileSizeThreshold: DataSize = DataSize.ofMegabytes(8),
felixncheng marked this conversation as resolved.
Show resolved Hide resolved
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@

package com.tencent.bkrepo.common.storage.core.cache

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.common.api.constant.StringPool.TEMP
import com.tencent.bkrepo.common.api.stream.ChunkedInputStream
import com.tencent.bkrepo.common.api.util.StreamUtils.drain
import com.tencent.bkrepo.common.artifact.api.ArtifactFile
import com.tencent.bkrepo.common.artifact.stream.ArtifactInputStream
import com.tencent.bkrepo.common.artifact.stream.ArtifactInputStream.Companion.METADATA_KEY_CACHE_ENABLED
import com.tencent.bkrepo.common.artifact.stream.BoundedInputStream
import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.artifact.stream.artifactStream
import com.tencent.bkrepo.common.storage.core.AbstractStorageService
Expand All @@ -44,13 +48,21 @@ import com.tencent.bkrepo.common.storage.filesystem.cleanup.BasedAtimeAndMTimeFi
import com.tencent.bkrepo.common.storage.filesystem.cleanup.CleanupFileVisitor
import com.tencent.bkrepo.common.storage.filesystem.cleanup.CleanupResult
import com.tencent.bkrepo.common.storage.filesystem.cleanup.FileRetainResolver
import com.tencent.bkrepo.common.storage.innercos.request.DownloadPartRequestFactory
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import org.slf4j.LoggerFactory
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.io.File
import java.io.FileNotFoundException
import java.io.InputStream
import java.lang.IllegalArgumentException
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.TimeUnit
import kotlin.math.min

/**
* 支持缓存的存储服务
Expand Down Expand Up @@ -125,15 +137,19 @@ class CacheStorageService(
return it.apply { putMetadata(METADATA_KEY_CACHE_ENABLED, true) }
}
}
val artifactInputStream = fileStorage.load(path, filename, range, credentials)?.artifactStream(range)
if (artifactInputStream != null && loadCacheFirst && range.isFullContent()) {
val cachePath = Paths.get(credentials.cache.path, path)
val tempPath = Paths.get(credentials.cache.path, TEMP)
val cacheFileLoadedEventPublisher = CacheFileLoadedEventPublisher(publisher, credentials)
val readListener = CachedFileWriter(cachePath, filename, tempPath, cacheFileLoadedEventPublisher)
artifactInputStream.addListener(readListener)
val artifactInputStream = if (enableLargeFileOptimization(range, credentials)) {
loadLargeFileByChunked(path, filename, range, credentials)
} else {
fileStorage.load(path, filename, range, credentials)?.artifactStream(range)?.apply {
if (loadCacheFirst && range.isFullContent()) {
val cachePath = Paths.get(credentials.cache.path, path)
val tempPath = Paths.get(credentials.cache.path, TEMP)
val cacheFileLoadedEventPublisher = CacheFileLoadedEventPublisher(publisher, credentials)
val readListener = CachedFileWriter(cachePath, filename, tempPath, cacheFileLoadedEventPublisher)
addListener(readListener)
}
}
}

return if (artifactInputStream == null && !loadCacheFirst) {
cacheClient.load(path, filename)?.artifactStream(range)
} else {
Expand Down Expand Up @@ -179,7 +195,7 @@ class CacheStorageService(
credentials,
resolver,
publisher,
fileRetainResolver
fileRetainResolver,
)
getCacheClient(credentials).walk(visitor)
val result = mutableMapOf<Path, CleanupResult>()
Expand Down Expand Up @@ -207,7 +223,7 @@ class CacheStorageService(
override fun copy(
digest: String,
fromCredentials: StorageCredentials?,
toCredentials: StorageCredentials?
toCredentials: StorageCredentials?,
) {
val path = fileLocator.locate(digest)
val from = getCredentialsOrDefault(fromCredentials)
Expand Down Expand Up @@ -301,6 +317,12 @@ class CacheStorageService(
return cacheFirst && isHealth && isExceedThreshold
}

private fun enableLargeFileOptimization(range: Range, credentials: StorageCredentials): Boolean {
val total = range.total ?: return false
felixncheng marked this conversation as resolved.
Show resolved Hide resolved
val cache = credentials.cache
return cache.largeFileOptimization && total > cache.largeFileSizeThreshold.toBytes()
}

private fun getMonitor(credentials: StorageCredentials): StorageHealthMonitor {
return monitorHelper.getMonitor(storageProperties, credentials)
}
Expand All @@ -313,10 +335,18 @@ class CacheStorageService(
return FileSystemClient(getStagingPath(credentials))
}

private fun getChunksClient(credentials: StorageCredentials): FileSystemClient {
return FileSystemClient(getChunksPath(credentials))
felixncheng marked this conversation as resolved.
Show resolved Hide resolved
}

private fun getStagingPath(credentials: StorageCredentials): Path {
return Paths.get(credentials.cache.path, STAGING)
}

private fun getChunksPath(credentials: StorageCredentials): Path {
return getTempPath(credentials).resolve(CHUNKS)
}

private fun stagingFile(credentials: StorageCredentials, path: String, filename: String, file: File) {
try {
getStagingClient(credentials).createLink(path, filename, file)
Expand All @@ -325,8 +355,98 @@ class CacheStorageService(
}
}

/**
* 大文件分块下载
*
* 该方法对大文件下载进行了优化,使用分块下载降低了源存储带宽压力,减少了请求时长。具体行为如下:
*
* 大文件的下载会转换成多个分块下载请求,客户端在读取一个分块时,
* 服务端会预取下一个分块,如果下一个分块本地不存在,则会从存储中下载,并进行缓存。
*
* */
private fun loadLargeFileByChunked(
path: String,
name: String,
range: Range,
storageCredentials: StorageCredentials,
): ArtifactInputStream? {
val optimalPartSize = storageCredentials.cache.largeFileSizeThreshold.toBytes()
val start = (range.start / optimalPartSize) * optimalPartSize
val end = ((range.end / optimalPartSize + 1) * optimalPartSize - 1).coerceAtMost(range.total!!)
val factory = DownloadPartRequestFactory(name, optimalPartSize, start, end)
var request = factory.nextDownloadPartRequest()
var fileChunk = Range(request.rangeStart!!, request.rangeEnd!!, range.total)
var preFetchFuture: Future<InputStream?>? = loadFileChunk(path, name, fileChunk, storageCredentials)
if (preFetchFuture?.get() == null) return null
val it = object : Iterator<InputStream> {
override fun hasNext(): Boolean {
return preFetchFuture != null
}

override fun next(): InputStream {
val inputStream = getCurInputStream()
if (factory.hasMoreRequests()) {
request = factory.nextDownloadPartRequest()
fileChunk = Range(request.rangeStart!!, request.rangeEnd!!, range.total)
preFetchFuture = loadFileChunk(path, name, fileChunk, storageCredentials)
} else {
preFetchFuture = null
}
return inputStream
}

private fun getCurInputStream(): InputStream {
val inputStream = preFetchFuture?.get(CHUNK_DOWNLOAD_TIMEOUT, TimeUnit.SECONDS)
felixncheng marked this conversation as resolved.
Show resolved Hide resolved
?: throw IllegalArgumentException("Load file[$name] chunk $range failed ")
return BoundedInputStream(inputStream, min(range.end - request.rangeStart!! + 1, optimalPartSize))
.apply {
if (range.start > request.rangeStart!!) {
skip(range.start - request.rangeStart!!)
}
}
}
}
return ChunkedInputStream(it).artifactStream(range)
}

/**
* 获取文件块,优先从本地获取,没有则从实际存储中获取
* */
private fun loadFileChunk(
path: String,
name: String,
range: Range,
storageCredentials: StorageCredentials,
): Future<InputStream?> {
val largeFileSizeThreshold = storageCredentials.cache.largeFileSizeThreshold.toBytes()
val filename = "${name}_${range.start}_${range.end}"
val chunkRange = Range.full(largeFileSizeThreshold)
val chunksClient = getChunksClient(storageCredentials)
chunksClient.load(path, filename)?.artifactStream(chunkRange)?.let {
return FutureTask<InputStream> { it }.apply { run() }
}
return ioThreadPool.submit<InputStream> {
val cachePath = Paths.get(getChunksPath(storageCredentials).toString(), path)
val tempPath = Paths.get(storageCredentials.cache.path, TEMP)
val inputStream = fileStorage.load(path, name, range, storageCredentials)?.artifactStream(range)
if (inputStream != null && range.length >= largeFileSizeThreshold) {
inputStream.addListener(CachedFileWriter(cachePath, filename, tempPath), true)
inputStream.use { it.drain() }
felixncheng marked this conversation as resolved.
Show resolved Hide resolved
val chunk = chunksClient.load(path, filename)
require(chunk != null && chunk.length() == largeFileSizeThreshold) { "file[$name] chunk cache failed." }
chunk.artifactStream(chunkRange)
} else {
inputStream
}
}
}

companion object {
private val logger = LoggerFactory.getLogger(CacheStorageService::class.java)
private const val STAGING = "staging"
private const val CHUNKS = "chunks"
private const val CHUNK_DOWNLOAD_TIMEOUT = 60L
private val ioThreadPool =
felixncheng marked this conversation as resolved.
Show resolved Hide resolved
Executors.newCachedThreadPool(ThreadFactoryBuilder().setNameFormat("cache-io-%d").build())!!
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
*
* Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved.
*
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
Expand Down Expand Up @@ -60,6 +62,7 @@ import java.nio.charset.Charset
import java.util.concurrent.CyclicBarrier
import kotlin.concurrent.thread
import kotlin.random.Random
import kotlin.random.nextInt

@ExtendWith(SpringExtension::class)
@ImportAutoConfiguration(StorageAutoConfiguration::class, TaskExecutionAutoConfiguration::class)
Expand All @@ -82,7 +85,7 @@ internal class CacheStorageServiceTest {

@BeforeEach
fun beforeEach() {
// before each
storageProperties.defaultStorageCredentials().cache.loadCacheFirst = true
}

@AfterEach
Expand Down Expand Up @@ -371,6 +374,32 @@ internal class CacheStorageServiceTest {
Thread.sleep(2000)
}

@Test
fun largeFileOptimizationTest() {
storageProperties.defaultStorageCredentials().cache.loadCacheFirst = false
val size = 50 * 1024 * 1024
val data = Random.nextBytes(size)
val artifactFile = createTempArtifactFile(data)
val sha256 = artifactFile.getFileSha256()
storageService.store(sha256, artifactFile, null)
// 等待异步存储完毕
Thread.sleep(1000)
repeat(3) {
val start = Random.nextInt(size)
val end = Random.nextInt(start + 1, size + 1)
println("${Thread.currentThread().name} $start ----- $end")
val artifactInputStream =
storageService.load(sha256, Range(start.toLong(), end.toLong(), size.toLong()), null)
Assertions.assertEquals(
data.copyOfRange(start, end + 1).inputStream().sha256(),
artifactInputStream!!.sha256(),
)
}
val artifactInputStream = storageService.load(sha256, Range.full(size.toLong()), null)

Assertions.assertEquals(sha256, artifactInputStream!!.sha256())
}

private fun createTempArtifactFile(size: Long): ArtifactFile {
val tempFile = createTempFile()
val content = StringPool.randomString(size.toInt())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ storage.filesystem.path=${java.io.tmpdir}/data/unittest/store
storage.filesystem.upload.location=${java.io.tmpdir}/data/unittest/temp
storage.filesystem.cache.enabled=true
storage.filesystem.cache.path=${java.io.tmpdir}/data/unittest/cached
storage.filesystem.cache.largeFileOptimization=true
felixncheng marked this conversation as resolved.
Show resolved Hide resolved
Loading