Skip to content

Commit

Permalink
fix: large files broke prefetch
Browse files Browse the repository at this point in the history
Files larger than 4G leads to prefetch panic, because the max blob io
range is smaller than 4G. This pr changes blob io max size from u32 to
u64.

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
  • Loading branch information
泰友 committed Jul 6, 2023
1 parent 7d5cb1a commit 56cb7e6
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 16 deletions.
6 changes: 5 additions & 1 deletion rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,11 @@ impl FileSystem for Rafs {
let r = self.device.read_to(w, desc)?;
result += r;
recorder.mark_success(r);
if r as u32 != desc.size() {

// There are two ways to large blob io: large `offset`, amplify.
// Both of them are rarely larger than 4G. Fix device.read_to later.
// FIXME: device.read_to() on blob io larger than 4G.
if r as u64 != desc.size() {
break;
}
}
Expand Down
5 changes: 4 additions & 1 deletion smoke/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ func (a *APIV1TestSuite) TestPrefetch(t *testing.T) {
ctx.PrepareWorkDir(t)
defer ctx.Destroy(t)

rootFs := texture.MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "root-fs"))
rootFs := texture.MakeLowerLayer(
t,
filepath.Join(ctx.Env.WorkDir, "root-fs"),
texture.LargerFileMaker("large-blob.bin", 5))

rafs := a.rootFsToRafs(t, ctx, rootFs)

Expand Down
26 changes: 20 additions & 6 deletions smoke/tests/texture/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ import (
"github.com/dragonflyoss/image-service/smoke/tests/tool"
)

func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer {
type LayerMaker func(t *testing.T, layer *tool.Layer)

func LargerFileMaker(path string, sizeGB int) LayerMaker {
return func(t *testing.T, layer *tool.Layer) {
layer.CreateLargeFile(t, path, sizeGB)
}
}

func MakeChunkDictLayer(t *testing.T, workDir string, makers ...LayerMaker) *tool.Layer {
layer := tool.NewLayer(t, workDir)

// Create regular file
Expand All @@ -24,14 +32,18 @@ func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer {
layer.CreateFile(t, "chunk-dict-file-5", []byte("dir-1/file-2"))
layer.CreateFile(t, "chunk-dict-file-6", []byte("This is poetry"))
layer.CreateFile(t, "chunk-dict-file-7", []byte("My name is long"))
layer.CreateLargeFile(t, "chunk-dict-file-8", 13)
layer.CreateHoledFile(t, "chunk-dict-file-9", []byte("hello world"), 1024, 1024*1024)
layer.CreateFile(t, "chunk-dict-file-10", []byte(""))

// customized files
for _, maker := range makers {
maker(t, layer)
}

return layer
}

func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer {
func MakeLowerLayer(t *testing.T, workDir string, makers ...LayerMaker) *tool.Layer {
layer := tool.NewLayer(t, workDir)

// Create regular file
Expand Down Expand Up @@ -66,9 +78,6 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer {
// Create symlink with non-existed source file
layer.CreateSymlink(t, "dir-1/file-deleted-symlink", "dir-1/file-deleted")

// Create large file
layer.CreateLargeFile(t, "large-blob.bin", 13)

// Create holed file
layer.CreateHoledFile(t, "file-hole-1", []byte("hello world"), 1024, 1024*1024)

Expand All @@ -79,6 +88,11 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer {
// Set file xattr (only `security.capability` xattr is supported in OCI layer)
tool.Run(t, fmt.Sprintf("setcap CAP_NET_RAW+ep %s", filepath.Join(workDir, "dir-1/file-2")))

// customized files
for _, maker := range makers {
maker(t, layer)
}

return layer
}

Expand Down
11 changes: 8 additions & 3 deletions smoke/tests/tool/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,20 @@ func (l *Layer) CreateFile(t *testing.T, name string, data []byte) {
require.NoError(t, err)
}

func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeMB int) {
func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeGB int) {
f, err := os.Create(filepath.Join(l.workDir, name))
require.NoError(t, err)
defer func() {
f.Close()
}()

for b := 1; b <= sizeMB; b++ {
_, err := f.Write(bytes.Repeat([]byte{byte(b)}, 1024*1024))
buf := make([]byte, 0x1000)
for i := 0; i < len(buf); i++ {
buf[i] = byte(i)
}

for count := 1024 * 1024 * 1024 * sizeGB >> 12; count >= 0; count-- {
_, err := f.Write(buf)
require.NoError(t, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl AsyncWorkerMgr {
}

/// Consume network bandwidth budget for prefetching.
pub fn consume_prefetch_budget(&self, size: u32) {
pub fn consume_prefetch_budget(&self, size: u64) {
if self.prefetch_inflight.load(Ordering::Relaxed) > 0 {
self.prefetch_consumed
.fetch_add(size as usize, Ordering::AcqRel);
Expand Down
55 changes: 51 additions & 4 deletions storage/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ pub struct BlobIoVec {
/// The blob associated with the IO operation.
bi_blob: Arc<BlobInfo>,
/// Total size of blob IOs to be performed.
bi_size: u32,
bi_size: u64,
/// Array of blob IOs, these IOs should executed sequentially.
pub(crate) bi_vec: Vec<BlobIoDesc>,
}
Expand All @@ -762,8 +762,8 @@ impl BlobIoVec {
pub fn push(&mut self, desc: BlobIoDesc) {
assert_eq!(self.bi_blob.blob_index(), desc.blob.blob_index());
assert_eq!(self.bi_blob.blob_id(), desc.blob.blob_id());
assert!(self.bi_size.checked_add(desc.size).is_some());
self.bi_size += desc.size;
assert!(self.bi_size.checked_add(desc.size as u64).is_some());
self.bi_size += desc.size as u64;
self.bi_vec.push(desc);
}

Expand Down Expand Up @@ -793,7 +793,7 @@ impl BlobIoVec {
}

/// Get size of pending IO data.
pub fn size(&self) -> u32 {
pub fn size(&self) -> u64 {
self.bi_size
}

Expand Down Expand Up @@ -1481,4 +1481,51 @@ mod tests {
assert!(desc2.is_continuous(&desc3, 0x800));
assert!(desc2.is_continuous(&desc3, 0x1000));
}

#[test]
fn test_extend_large_blob_io_vec() {
let size = 0x2_0000_0000; // 8G blob
let chunk_size = 0x10_0000; // 1M chunk
let chunk_count = (size / chunk_size as u64) as u32;
let large_blob = Arc::new(BlobInfo::new(
0,
"blob_id".to_owned(),
size,
size,
chunk_size,
chunk_count,
BlobFeatures::default(),
));

let mut iovec = BlobIoVec::new(large_blob.clone());
let mut iovec2 = BlobIoVec::new(large_blob.clone());

// extend half of blob
for chunk_idx in 0..chunk_count {
let chunk = Arc::new(MockChunkInfo {
block_id: Default::default(),
blob_index: large_blob.blob_index,
flags: BlobChunkFlags::empty(),
compress_size: chunk_size,
compress_offset: chunk_idx as u64 * chunk_size as u64,
uncompress_size: 2 * chunk_size,
uncompress_offset: 2 * chunk_idx as u64 * chunk_size as u64,
file_offset: 2 * chunk_idx as u64 * chunk_size as u64,
index: chunk_idx as u32,
reserved: 0,
}) as Arc<dyn BlobChunkInfo>;
let desc = BlobIoDesc::new(large_blob.clone(), BlobIoChunk(chunk), 0, chunk_size, true);
if chunk_idx < chunk_count / 2 {
iovec.push(desc);
} else {
iovec2.push(desc)
}
}

// extend other half of blob
iovec.append(iovec2);

assert_eq!(size, iovec.size());
assert_eq!(chunk_count, iovec.len() as u32);
}
}

0 comments on commit 56cb7e6

Please sign in to comment.