Skip to content

Commit

Permalink
fix: large files broke prefetch
Browse files Browse the repository at this point in the history
Files larger than 4G leads to prefetch panic, because the max blob io
range is smaller than 4G. This pr changes blob io max size from u32 to
u64.

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
  • Loading branch information
泰友 authored and imeoer committed Jul 11, 2023
1 parent 9bb5151 commit 31f2170
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 28 deletions.
22 changes: 11 additions & 11 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,29 +617,29 @@ impl FileSystem for Rafs {

let real_size = cmp::min(size as u64, inode_size - offset);
let mut result = 0;
let mut descs = inode.alloc_bio_vecs(&self.device, offset, real_size as usize, true)?;
assert!(!descs.is_empty() && !descs[0].is_empty());
let mut io_vecs = inode.alloc_bio_vecs(&self.device, offset, real_size as usize, true)?;
assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty());

// Try to amplify user io for Rafs v5, to improve performance.
if self.sb.meta.is_v5() && size < self.amplify_io {
let all_chunks_ready = self.device.all_chunks_ready(&descs);
let all_chunks_ready = self.device.all_chunks_ready(&io_vecs);
if !all_chunks_ready {
let chunk_mask = self.metadata().chunk_size as u64 - 1;
let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask;
let window_base = cmp::min(next_chunk_base, inode_size);
let actual_size = window_base - (offset & !chunk_mask);
if actual_size < self.amplify_io as u64 {
let window_size = self.amplify_io as u64 - actual_size;
let orig_cnt = descs.iter().fold(0, |s, d| s + d.len());
let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len());
self.sb.amplify_io(
&self.device,
self.amplify_io,
&mut descs,
&mut io_vecs,
&inode,
window_base,
window_size,
)?;
let new_cnt = descs.iter().fold(0, |s, d| s + d.len());
let new_cnt = io_vecs.iter().fold(0, |s, d| s + d.len());
trace!(
"amplify RAFS v5 read from {} to {} chunks",
orig_cnt,
Expand All @@ -650,15 +650,15 @@ impl FileSystem for Rafs {
}

let start = self.ios.latency_start();
for desc in descs.iter_mut() {
assert!(!desc.is_empty());
assert_ne!(desc.size(), 0);
for io_vec in io_vecs.iter_mut() {
assert!(!io_vec.is_empty());
assert_ne!(io_vec.size(), 0);

// Avoid copying `desc`
let r = self.device.read_to(w, desc)?;
let r = self.device.read_to(w, io_vec)?;
result += r;
recorder.mark_success(r);
if r as u32 != desc.size() {
if r as u64 != io_vec.size() {
break;
}
}
Expand Down
5 changes: 4 additions & 1 deletion smoke/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ func (a *APIV1TestSuite) TestPrefetch(t *testing.T) {
ctx.PrepareWorkDir(t)
defer ctx.Destroy(t)

rootFs := texture.MakeLowerLayer(t, filepath.Join(ctx.Env.WorkDir, "root-fs"))
rootFs := texture.MakeLowerLayer(
t,
filepath.Join(ctx.Env.WorkDir, "root-fs"),
texture.LargerFileMaker("large-blob.bin", 5))

rafs := a.rootFsToRafs(t, ctx, rootFs)

Expand Down
26 changes: 20 additions & 6 deletions smoke/tests/texture/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ import (
"github.com/dragonflyoss/image-service/smoke/tests/tool"
)

func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer {
type LayerMaker func(t *testing.T, layer *tool.Layer)

func LargerFileMaker(path string, sizeGB int) LayerMaker {
return func(t *testing.T, layer *tool.Layer) {
layer.CreateLargeFile(t, path, sizeGB)
}
}

func MakeChunkDictLayer(t *testing.T, workDir string, makers ...LayerMaker) *tool.Layer {
layer := tool.NewLayer(t, workDir)

// Create regular file
Expand All @@ -24,14 +32,18 @@ func MakeChunkDictLayer(t *testing.T, workDir string) *tool.Layer {
layer.CreateFile(t, "chunk-dict-file-5", []byte("dir-1/file-2"))
layer.CreateFile(t, "chunk-dict-file-6", []byte("This is poetry"))
layer.CreateFile(t, "chunk-dict-file-7", []byte("My name is long"))
layer.CreateLargeFile(t, "chunk-dict-file-8", 13)
layer.CreateHoledFile(t, "chunk-dict-file-9", []byte("hello world"), 1024, 1024*1024)
layer.CreateFile(t, "chunk-dict-file-10", []byte(""))

// Customized files
for _, maker := range makers {
maker(t, layer)
}

return layer
}

func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer {
func MakeLowerLayer(t *testing.T, workDir string, makers ...LayerMaker) *tool.Layer {
layer := tool.NewLayer(t, workDir)

// Create regular file
Expand Down Expand Up @@ -66,9 +78,6 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer {
// Create symlink with non-existed source file
layer.CreateSymlink(t, "dir-1/file-deleted-symlink", "dir-1/file-deleted")

// Create large file
layer.CreateLargeFile(t, "large-blob.bin", 13)

// Create holed file
layer.CreateHoledFile(t, "file-hole-1", []byte("hello world"), 1024, 1024*1024)

Expand All @@ -79,6 +88,11 @@ func MakeLowerLayer(t *testing.T, workDir string) *tool.Layer {
// Set file xattr (only `security.capability` xattr is supported in OCI layer)
tool.Run(t, fmt.Sprintf("setcap CAP_NET_RAW+ep %s", filepath.Join(workDir, "dir-1/file-2")))

// Customized files
for _, maker := range makers {
maker(t, layer)
}

return layer
}

Expand Down
10 changes: 5 additions & 5 deletions smoke/tests/tool/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"compress/gzip"
"context"
"crypto/rand"
"io"
"io/ioutil"
"os"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/converter"
"github.com/opencontainers/go-digest"
"github.com/pkg/xattr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
)
Expand All @@ -44,17 +46,15 @@ func (l *Layer) CreateFile(t *testing.T, name string, data []byte) {
require.NoError(t, err)
}

func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeMB int) {
func (l *Layer) CreateLargeFile(t *testing.T, name string, sizeGB int) {
f, err := os.Create(filepath.Join(l.workDir, name))
require.NoError(t, err)
defer func() {
f.Close()
}()

for b := 1; b <= sizeMB; b++ {
_, err := f.Write(bytes.Repeat([]byte{byte(b)}, 1024*1024))
require.NoError(t, err)
}
_, err = io.CopyN(f, rand.Reader, int64(sizeGB)<<30)
assert.Nil(t, err)
}

func (l *Layer) CreateHoledFile(t *testing.T, name string, data []byte, offset, fileSize int64) {
Expand Down
2 changes: 1 addition & 1 deletion storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl AsyncWorkerMgr {
}

/// Consume network bandwidth budget for prefetching.
pub fn consume_prefetch_budget(&self, size: u32) {
pub fn consume_prefetch_budget(&self, size: u64) {
if self.prefetch_inflight.load(Ordering::Relaxed) > 0 {
self.prefetch_consumed
.fetch_add(size as usize, Ordering::AcqRel);
Expand Down
55 changes: 51 additions & 4 deletions storage/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ pub struct BlobIoVec {
/// The blob associated with the IO operation.
bi_blob: Arc<BlobInfo>,
/// Total size of blob IOs to be performed.
bi_size: u32,
bi_size: u64,
/// Array of blob IOs, these IOs should executed sequentially.
pub(crate) bi_vec: Vec<BlobIoDesc>,
}
Expand All @@ -792,8 +792,8 @@ impl BlobIoVec {
pub fn push(&mut self, desc: BlobIoDesc) {
assert_eq!(self.bi_blob.blob_index(), desc.blob.blob_index());
assert_eq!(self.bi_blob.blob_id(), desc.blob.blob_id());
assert!(self.bi_size.checked_add(desc.size).is_some());
self.bi_size += desc.size;
assert!(self.bi_size.checked_add(desc.size as u64).is_some());
self.bi_size += desc.size as u64;
self.bi_vec.push(desc);
}

Expand Down Expand Up @@ -822,7 +822,7 @@ impl BlobIoVec {
}

/// Get size of pending IO data.
pub fn size(&self) -> u32 {
pub fn size(&self) -> u64 {
self.bi_size
}

Expand Down Expand Up @@ -1564,4 +1564,51 @@ mod tests {
iovec.append(iovec2);
assert_eq!(0x2000, iovec.bi_size);
}

#[test]
fn test_extend_large_blob_io_vec() {
let size = 0x2_0000_0000; // 8G blob
let chunk_size = 0x10_0000; // 1M chunk
let chunk_count = (size / chunk_size as u64) as u32;
let large_blob = Arc::new(BlobInfo::new(
0,
"blob_id".to_owned(),
size,
size,
chunk_size,
chunk_count,
BlobFeatures::default(),
));

let mut iovec = BlobIoVec::new(large_blob.clone());
let mut iovec2 = BlobIoVec::new(large_blob.clone());

// Extend half of blob
for chunk_idx in 0..chunk_count {
let chunk = Arc::new(MockChunkInfo {
block_id: Default::default(),
blob_index: large_blob.blob_index,
flags: BlobChunkFlags::empty(),
compress_size: chunk_size,
compress_offset: chunk_idx as u64 * chunk_size as u64,
uncompress_size: 2 * chunk_size,
uncompress_offset: 2 * chunk_idx as u64 * chunk_size as u64,
file_offset: 2 * chunk_idx as u64 * chunk_size as u64,
index: chunk_idx as u32,
reserved: 0,
}) as Arc<dyn BlobChunkInfo>;
let desc = BlobIoDesc::new(large_blob.clone(), BlobIoChunk(chunk), 0, chunk_size, true);
if chunk_idx < chunk_count / 2 {
iovec.push(desc);
} else {
iovec2.push(desc)
}
}

// Extend other half of blob
iovec.append(iovec2);

assert_eq!(size, iovec.size());
assert_eq!(chunk_count, iovec.len() as u32);
}
}

0 comments on commit 31f2170

Please sign in to comment.