diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index aafede3a72b..cad27019b4a 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -617,12 +617,12 @@ impl FileSystem for Rafs { let real_size = cmp::min(size as u64, inode_size - offset); let mut result = 0; - let mut descs = inode.alloc_bio_vecs(&self.device, offset, real_size as usize, true)?; - assert!(!descs.is_empty() && !descs[0].is_empty()); + let mut io_vecs = inode.alloc_bio_vecs(&self.device, offset, real_size as usize, true)?; + assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty()); // Try to amplify user io for Rafs v5, to improve performance. if self.sb.meta.is_v5() && size < self.amplify_io { - let all_chunks_ready = self.device.all_chunks_ready(&descs); + let all_chunks_ready = self.device.all_chunks_ready(&io_vecs); if !all_chunks_ready { let chunk_mask = self.metadata().chunk_size as u64 - 1; let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask; @@ -630,16 +630,16 @@ impl FileSystem for Rafs { let actual_size = window_base - (offset & !chunk_mask); if actual_size < self.amplify_io as u64 { let window_size = self.amplify_io as u64 - actual_size; - let orig_cnt = descs.iter().fold(0, |s, d| s + d.len()); + let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len()); self.sb.amplify_io( &self.device, self.amplify_io, - &mut descs, + &mut io_vecs, &inode, window_base, window_size, )?; - let new_cnt = descs.iter().fold(0, |s, d| s + d.len()); + let new_cnt = io_vecs.iter().fold(0, |s, d| s + d.len()); trace!( "amplify RAFS v5 read from {} to {} chunks", orig_cnt, @@ -650,15 +650,15 @@ impl FileSystem for Rafs { } let start = self.ios.latency_start(); - for desc in descs.iter_mut() { - assert!(!desc.is_empty()); - assert_ne!(desc.size(), 0); + for io_vec in io_vecs.iter_mut() { + assert!(!io_vec.is_empty()); + assert_ne!(io_vec.size(), 0); // Avoid copying `desc` - let r = self.device.read_to(w, desc)?; + let r = self.device.read_to(w, io_vec)?; result += r; recorder.mark_success(r); - if r as u32 != desc.size() { + if r as u64 != io_vec.size() { break; } } diff --git a/smoke/tests/api_test.go b/smoke/tests/api_test.go index 1ab2ccab5c5..48bd598b4f2 100644 --- a/smoke/tests/api_test.go +++ b/smoke/tests/api_test.go @@ -159,7 +159,10 @@ func (a *APIV1TestSuite) TestPrefetch(t *testing.T) { ctx.PrepareWorkDir(t) defer ctx.Destroy(t) - rootFs := texture.MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "root-fs")) + rootFs := texture.MakeLowerLayer( + t, + filepath.Join(ctx.Env.WorkDir, "root-fs"), + texture.LargerFileMaker("large-blob.bin", 5)) rafs := a.rootFsToRafs(t, ctx, rootFs) diff --git a/smoke/tests/texture/layer.go b/smoke/tests/texture/layer.go index 90aa8fa3ec5..0bfcfbb61eb 100644 --- a/smoke/tests/texture/layer.go +++ b/smoke/tests/texture/layer.go @@ -13,7 +13,15 @@ import ( "github.com/dragonflyoss/image-service/smoke/tests/tool" ) -func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer { +type LayerMaker func(t *testing.T, layer *tool.Layer) + +func LargerFileMaker(path string, sizeGB int) LayerMaker { + return func(t *testing.T, layer *tool.Layer) { + layer.CreateLargeFile(t, path, sizeGB) + } +} + +func MakeChunkDictLayer(t *testing.T, workDir string, makers ...LayerMaker) *tool.Layer { layer := tool.NewLayer(t, workDir) // Create regular file @@ -24,14 +32,18 @@ func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer { layer.CreateFile(t, "chunk-dict-file-5", []byte("dir-1/file-2")) layer.CreateFile(t, "chunk-dict-file-6", []byte("This is poetry")) layer.CreateFile(t, "chunk-dict-file-7", []byte("My name is long")) - layer.CreateLargeFile(t, "chunk-dict-file-8", 13) layer.CreateHoledFile(t, "chunk-dict-file-9", []byte("hello world"), 1024, 1024*1024) layer.CreateFile(t, "chunk-dict-file-10", []byte("")) + // Customized files + for _, maker := range makers { + maker(t, layer) + } + return layer } -func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer { +func MakeLowerLayer(t *testing.T, workDir string, makers ...LayerMaker) *tool.Layer { layer := tool.NewLayer(t, workDir) // Create regular file @@ -66,9 +78,6 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer { // Create symlink with non-existed source file layer.CreateSymlink(t, "dir-1/file-deleted-symlink", "dir-1/file-deleted") - // Create large file - layer.CreateLargeFile(t, "large-blob.bin", 13) - // Create holed file layer.CreateHoledFile(t, "file-hole-1", []byte("hello world"), 1024, 1024*1024) @@ -79,6 +88,11 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer { // Set file xattr (only `security.capability` xattr is supported in OCI layer) tool.Run(t, fmt.Sprintf("setcap CAP_NET_RAW+ep %s", filepath.Join(workDir, "dir-1/file-2"))) + // Customized files + for _, maker := range makers { + maker(t, layer) + } + return layer } diff --git a/smoke/tests/tool/layer.go b/smoke/tests/tool/layer.go index 1262921aa37..7902d5d38ae 100644 --- a/smoke/tests/tool/layer.go +++ b/smoke/tests/tool/layer.go @@ -8,6 +8,7 @@ import ( "bytes" "compress/gzip" "context" + "crypto/rand" "io" "io/ioutil" "os" @@ -21,6 +22,7 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/converter" "github.com/opencontainers/go-digest" "github.com/pkg/xattr" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" ) @@ -44,17 +46,15 @@ func (l *Layer) CreateFile(t *testing.T, name string, data []byte) { require.NoError(t, err) } -func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeMB int) { +func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeGB int) { f, err := os.Create(filepath.Join(l.workDir, name)) require.NoError(t, err) defer func() { f.Close() }() - for b := 1; b <= sizeMB; b++ { - _, err := f.Write(bytes.Repeat([]byte{byte(b)}, 1024*1024)) - require.NoError(t, err) - } + _, err = io.CopyN(f, rand.Reader, int64(sizeGB)<<30) + assert.Nil(t, err) } func (l *Layer) CreateHoledFile(t *testing.T, name string, data []byte, offset, fileSize int64) { diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index b5412172ee8..965a09ea0fc 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -187,7 +187,7 @@ impl AsyncWorkerMgr { } /// Consume network bandwidth budget for prefetching. - pub fn consume_prefetch_budget(&self, size: u32) { + pub fn consume_prefetch_budget(&self, size: u64) { if self.prefetch_inflight.load(Ordering::Relaxed) > 0 { self.prefetch_consumed .fetch_add(size as usize, Ordering::AcqRel); diff --git a/storage/src/device.rs b/storage/src/device.rs index 9a7d2fce7fa..6c559cf481c 100644 --- a/storage/src/device.rs +++ b/storage/src/device.rs @@ -773,7 +773,7 @@ pub struct BlobIoVec { /// The blob associated with the IO operation. bi_blob: Arc, /// Total size of blob IOs to be performed. - bi_size: u32, + bi_size: u64, /// Array of blob IOs, these IOs should executed sequentially. pub(crate) bi_vec: Vec, } @@ -792,8 +792,8 @@ impl BlobIoVec { pub fn push(&mut self, desc: BlobIoDesc) { assert_eq!(self.bi_blob.blob_index(), desc.blob.blob_index()); assert_eq!(self.bi_blob.blob_id(), desc.blob.blob_id()); - assert!(self.bi_size.checked_add(desc.size).is_some()); - self.bi_size += desc.size; + assert!(self.bi_size.checked_add(desc.size as u64).is_some()); + self.bi_size += desc.size as u64; self.bi_vec.push(desc); } @@ -822,7 +822,7 @@ impl BlobIoVec { } /// Get size of pending IO data. - pub fn size(&self) -> u32 { + pub fn size(&self) -> u64 { self.bi_size } @@ -1564,4 +1564,51 @@ mod tests { iovec.append(iovec2); assert_eq!(0x2000, iovec.bi_size); } + + #[test] + fn test_extend_large_blob_io_vec() { + let size = 0x2_0000_0000; // 8G blob + let chunk_size = 0x10_0000; // 1M chunk + let chunk_count = (size / chunk_size as u64) as u32; + let large_blob = Arc::new(BlobInfo::new( + 0, + "blob_id".to_owned(), + size, + size, + chunk_size, + chunk_count, + BlobFeatures::default(), + )); + + let mut iovec = BlobIoVec::new(large_blob.clone()); + let mut iovec2 = BlobIoVec::new(large_blob.clone()); + + // Extend half of blob + for chunk_idx in 0..chunk_count { + let chunk = Arc::new(MockChunkInfo { + block_id: Default::default(), + blob_index: large_blob.blob_index, + flags: BlobChunkFlags::empty(), + compress_size: chunk_size, + compress_offset: chunk_idx as u64 * chunk_size as u64, + uncompress_size: 2 * chunk_size, + uncompress_offset: 2 * chunk_idx as u64 * chunk_size as u64, + file_offset: 2 * chunk_idx as u64 * chunk_size as u64, + index: chunk_idx as u32, + reserved: 0, + }) as Arc; + let desc = BlobIoDesc::new(large_blob.clone(), BlobIoChunk(chunk), 0, chunk_size, true); + if chunk_idx < chunk_count / 2 { + iovec.push(desc); + } else { + iovec2.push(desc) + } + } + + // Extend other half of blob + iovec.append(iovec2); + + assert_eq!(size, iovec.size()); + assert_eq!(chunk_count, iovec.len() as u32); + } }