From 31f2170bb9b87110c7c461eb08ff0caa86b818e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B3=B0=E5=8F=8B?= Date: Wed, 5 Jul 2023 14:14:09 +0800 Subject: [PATCH] fix: large files broke prefetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Files larger than 4G leads to prefetch panic, because the max blob io range is smaller than 4G. This pr changes blob io max size from u32 to u64. Signed-off-by: 泰友 --- rafs/src/fs.rs | 22 +++++++-------- smoke/tests/api_test.go | 5 +++- smoke/tests/texture/layer.go | 26 +++++++++++++---- smoke/tests/tool/layer.go | 10 +++---- storage/src/cache/worker.rs | 2 +- storage/src/device.rs | 55 +++++++++++++++++++++++++++++++++--- 6 files changed, 92 insertions(+), 28 deletions(-) diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index aafede3a72b..cad27019b4a 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -617,12 +617,12 @@ impl FileSystem for Rafs { let real_size = cmp::min(size as u64, inode_size - offset); let mut result = 0; - let mut descs = inode.alloc_bio_vecs(&self.device, offset, real_size as usize, true)?; - assert!(!descs.is_empty() && !descs[0].is_empty()); + let mut io_vecs = inode.alloc_bio_vecs(&self.device, offset, real_size as usize, true)?; + assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty()); // Try to amplify user io for Rafs v5, to improve performance. if self.sb.meta.is_v5() && size < self.amplify_io { - let all_chunks_ready = self.device.all_chunks_ready(&descs); + let all_chunks_ready = self.device.all_chunks_ready(&io_vecs); if !all_chunks_ready { let chunk_mask = self.metadata().chunk_size as u64 - 1; let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask; @@ -630,16 +630,16 @@ impl FileSystem for Rafs { let actual_size = window_base - (offset & !chunk_mask); if actual_size < self.amplify_io as u64 { let window_size = self.amplify_io as u64 - actual_size; - let orig_cnt = descs.iter().fold(0, |s, d| s + d.len()); + let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len()); self.sb.amplify_io( &self.device, self.amplify_io, - &mut descs, + &mut io_vecs, &inode, window_base, window_size, )?; - let new_cnt = descs.iter().fold(0, |s, d| s + d.len()); + let new_cnt = io_vecs.iter().fold(0, |s, d| s + d.len()); trace!( "amplify RAFS v5 read from {} to {} chunks", orig_cnt, @@ -650,15 +650,15 @@ impl FileSystem for Rafs { } let start = self.ios.latency_start(); - for desc in descs.iter_mut() { - assert!(!desc.is_empty()); - assert_ne!(desc.size(), 0); + for io_vec in io_vecs.iter_mut() { + assert!(!io_vec.is_empty()); + assert_ne!(io_vec.size(), 0); // Avoid copying `desc` - let r = self.device.read_to(w, desc)?; + let r = self.device.read_to(w, io_vec)?; result += r; recorder.mark_success(r); - if r as u32 != desc.size() { + if r as u64 != io_vec.size() { break; } } diff --git a/smoke/tests/api_test.go b/smoke/tests/api_test.go index 1ab2ccab5c5..48bd598b4f2 100644 --- a/smoke/tests/api_test.go +++ b/smoke/tests/api_test.go @@ -159,7 +159,10 @@ func (a *APIV1TestSuite) TestPrefetch(t *testing.T) { ctx.PrepareWorkDir(t) defer ctx.Destroy(t) - rootFs := texture.MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "root-fs")) + rootFs := texture.MakeLowerLayer( + t, + filepath.Join(ctx.Env.WorkDir, "root-fs"), + texture.LargerFileMaker("large-blob.bin", 5)) rafs := a.rootFsToRafs(t, ctx, rootFs) diff --git a/smoke/tests/texture/layer.go b/smoke/tests/texture/layer.go index 90aa8fa3ec5..0bfcfbb61eb 100644 --- a/smoke/tests/texture/layer.go +++ b/smoke/tests/texture/layer.go @@ -13,7 +13,15 @@ import ( "github.com/dragonflyoss/image-service/smoke/tests/tool" ) -func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer { +type LayerMaker func(t *testing.T, layer *tool.Layer) + +func LargerFileMaker(path string, sizeGB int) LayerMaker { + return func(t *testing.T, layer *tool.Layer) { + layer.CreateLargeFile(t, path, sizeGB) + } +} + +func MakeChunkDictLayer(t *testing.T, workDir string, makers ...LayerMaker) *tool.Layer { layer := tool.NewLayer(t, workDir) // Create regular file @@ -24,14 +32,18 @@ func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer { layer.CreateFile(t, "chunk-dict-file-5", []byte("dir-1/file-2")) layer.CreateFile(t, "chunk-dict-file-6", []byte("This is poetry")) layer.CreateFile(t, "chunk-dict-file-7", []byte("My name is long")) - layer.CreateLargeFile(t, "chunk-dict-file-8", 13) layer.CreateHoledFile(t, "chunk-dict-file-9", []byte("hello world"), 1024, 1024*1024) layer.CreateFile(t, "chunk-dict-file-10", []byte("")) + // Customized files + for _, maker := range makers { + maker(t, layer) + } + return layer } -func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer { +func MakeLowerLayer(t *testing.T, workDir string, makers ...LayerMaker) *tool.Layer { layer := tool.NewLayer(t, workDir) // Create regular file @@ -66,9 +78,6 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer { // Create symlink with non-existed source file layer.CreateSymlink(t, "dir-1/file-deleted-symlink", "dir-1/file-deleted") - // Create large file - layer.CreateLargeFile(t, "large-blob.bin", 13) - // Create holed file layer.CreateHoledFile(t, "file-hole-1", []byte("hello world"), 1024, 1024*1024) @@ -79,6 +88,11 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer { // Set file xattr (only `security.capability` xattr is supported in OCI layer) tool.Run(t, fmt.Sprintf("setcap CAP_NET_RAW+ep %s", filepath.Join(workDir, "dir-1/file-2"))) + // Customized files + for _, maker := range makers { + maker(t, layer) + } + return layer } diff --git a/smoke/tests/tool/layer.go b/smoke/tests/tool/layer.go index 1262921aa37..7902d5d38ae 100644 --- a/smoke/tests/tool/layer.go +++ b/smoke/tests/tool/layer.go @@ -8,6 +8,7 @@ import ( "bytes" "compress/gzip" "context" + "crypto/rand" "io" "io/ioutil" "os" @@ -21,6 +22,7 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/converter" "github.com/opencontainers/go-digest" "github.com/pkg/xattr" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" ) @@ -44,17 +46,15 @@ func (l *Layer) CreateFile(t *testing.T, name string, data []byte) { require.NoError(t, err) } -func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeMB int) { +func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeGB int) { f, err := os.Create(filepath.Join(l.workDir, name)) require.NoError(t, err) defer func() { f.Close() }() - for b := 1; b <= sizeMB; b++ { - _, err := f.Write(bytes.Repeat([]byte{byte(b)}, 1024*1024)) - require.NoError(t, err) - } + _, err = io.CopyN(f, rand.Reader, int64(sizeGB)<<30) + assert.Nil(t, err) } func (l *Layer) CreateHoledFile(t *testing.T, name string, data []byte, offset, fileSize int64) { diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index b5412172ee8..965a09ea0fc 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -187,7 +187,7 @@ impl AsyncWorkerMgr { } /// Consume network bandwidth budget for prefetching. - pub fn consume_prefetch_budget(&self, size: u32) { + pub fn consume_prefetch_budget(&self, size: u64) { if self.prefetch_inflight.load(Ordering::Relaxed) > 0 { self.prefetch_consumed .fetch_add(size as usize, Ordering::AcqRel); diff --git a/storage/src/device.rs b/storage/src/device.rs index 9a7d2fce7fa..6c559cf481c 100644 --- a/storage/src/device.rs +++ b/storage/src/device.rs @@ -773,7 +773,7 @@ pub struct BlobIoVec { /// The blob associated with the IO operation. bi_blob: Arc, /// Total size of blob IOs to be performed. - bi_size: u32, + bi_size: u64, /// Array of blob IOs, these IOs should executed sequentially. pub(crate) bi_vec: Vec, } @@ -792,8 +792,8 @@ impl BlobIoVec { pub fn push(&mut self, desc: BlobIoDesc) { assert_eq!(self.bi_blob.blob_index(), desc.blob.blob_index()); assert_eq!(self.bi_blob.blob_id(), desc.blob.blob_id()); - assert!(self.bi_size.checked_add(desc.size).is_some()); - self.bi_size += desc.size; + assert!(self.bi_size.checked_add(desc.size as u64).is_some()); + self.bi_size += desc.size as u64; self.bi_vec.push(desc); } @@ -822,7 +822,7 @@ impl BlobIoVec { } /// Get size of pending IO data. - pub fn size(&self) -> u32 { + pub fn size(&self) -> u64 { self.bi_size } @@ -1564,4 +1564,51 @@ mod tests { iovec.append(iovec2); assert_eq!(0x2000, iovec.bi_size); } + + #[test] + fn test_extend_large_blob_io_vec() { + let size = 0x2_0000_0000; // 8G blob + let chunk_size = 0x10_0000; // 1M chunk + let chunk_count = (size / chunk_size as u64) as u32; + let large_blob = Arc::new(BlobInfo::new( + 0, + "blob_id".to_owned(), + size, + size, + chunk_size, + chunk_count, + BlobFeatures::default(), + )); + + let mut iovec = BlobIoVec::new(large_blob.clone()); + let mut iovec2 = BlobIoVec::new(large_blob.clone()); + + // Extend half of blob + for chunk_idx in 0..chunk_count { + let chunk = Arc::new(MockChunkInfo { + block_id: Default::default(), + blob_index: large_blob.blob_index, + flags: BlobChunkFlags::empty(), + compress_size: chunk_size, + compress_offset: chunk_idx as u64 * chunk_size as u64, + uncompress_size: 2 * chunk_size, + uncompress_offset: 2 * chunk_idx as u64 * chunk_size as u64, + file_offset: 2 * chunk_idx as u64 * chunk_size as u64, + index: chunk_idx as u32, + reserved: 0, + }) as Arc; + let desc = BlobIoDesc::new(large_blob.clone(), BlobIoChunk(chunk), 0, chunk_size, true); + if chunk_idx < chunk_count / 2 { + iovec.push(desc); + } else { + iovec2.push(desc) + } + } + + // Extend other half of blob + iovec.append(iovec2); + + assert_eq!(size, iovec.size()); + assert_eq!(chunk_count, iovec.len() as u32); + } }