Skip to content

Commit

Permalink
fix: amplify io is too large to hold in fuse buffer (dragonflyoss#1311)
Browse files Browse the repository at this point in the history
* fix: amplify io is too large to hold in fuse buffer

Fuse request buffer is fixed by `FUSE_KERN_BUF_SIZE * pagesize() + FUSE_HEADER_ SIZE`. When amplify io is larger than it, FuseDevWriter suffers from smaller buffer. As a result, invalid data error is returned.

Reproduction:
    run nydusd with 3MB amplify_io
    error from random io:
        reply error header OutHeader { len: 16, error: -5, unique: 108 }, error Custom { kind: InvalidData, error: "data out of range, available 1052656 requested 1250066" }

Details:
    size of fuse buffer = 1052656 + 16 (size of inner header) = 256(page number) * 4096(page size) + 4096(fuse header)
    let amplify_io = min(user_specified, fuseWriter.available_bytes())

Resolution:
    This pr is not best implements, but independent of modification to [fuse-backend-rs]("https://github.com/cloud-hypervisor/fuse-backend-rs").
    In future, evalucation of amplify_io will be replaced with [ZeroCopyWriter.available_bytes()]("cloud-hypervisor/fuse-backend-rs#135").

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>

* feat: e2e for amplify io larger than fuse buffer

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>

---------

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
Co-authored-by: 泰友 <cuichengxu.ccx@antgroup.com>
  • Loading branch information
ccx1024cc and 泰友 authored Jul 12, 2023
1 parent 31f2170 commit c8a39c8
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ path = "src/lib.rs"
anyhow = "1"
clap = { version = "4.0.18", features = ["derive", "cargo"] }
flexi_logger = { version = "0.25", features = ["compress"] }
fuse-backend-rs = "^0.10.3"
fuse-backend-rs = "^0.10.4"
hex = "0.4.3"
hyper = "0.14.11"
hyperlocal = "0.8.0"
Expand Down
9 changes: 5 additions & 4 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,19 +621,20 @@ impl FileSystem for Rafs {
assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty());

// Try to amplify user io for Rafs v5, to improve performance.
if self.sb.meta.is_v5() && size < self.amplify_io {
let amplify_io = cmp::min(self.amplify_io as usize, w.available_bytes()) as u32;
if self.sb.meta.is_v5() && size < amplify_io {
let all_chunks_ready = self.device.all_chunks_ready(&io_vecs);
if !all_chunks_ready {
let chunk_mask = self.metadata().chunk_size as u64 - 1;
let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask;
let window_base = cmp::min(next_chunk_base, inode_size);
let actual_size = window_base - (offset & !chunk_mask);
if actual_size < self.amplify_io as u64 {
let window_size = self.amplify_io as u64 - actual_size;
if actual_size < amplify_io as u64 {
let window_size = amplify_io as u64 - actual_size;
let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len());
self.sb.amplify_io(
&self.device,
self.amplify_io,
amplify_io,
&mut io_vecs,
&inode,
window_base,
Expand Down
2 changes: 1 addition & 1 deletion smoke/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ build:
# NYDUS_NYDUSIFY=/path/to/latest/nydusify \
# make test
test: build
sudo -E ./smoke.test -test.v -test.timeout 10m -test.parallel=8 -test.run=$(TESTS)
sudo -E ./smoke.test -test.v -test.timeout 10m -test.parallel=16 -test.run=$(TESTS)

# WORK_DIR=/tmp \
# NYDUS_BUILDER=/path/to/latest/nydus-image \
Expand Down
2 changes: 1 addition & 1 deletion smoke/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (a *APIV1TestSuite) TestPrefetch(t *testing.T) {
config.RafsMode = ctx.Runtime.RafsMode
err = nydusd.MountByAPI(config)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 15)

bcm, err := nydusd.GetBlobCacheMetrics("")
require.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions smoke/tests/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
)

const (
paramZran = "zran"
paramBatch = "batch"
paramEncrypt = "encrypt"
paramZran = "zran"
paramBatch = "batch"
paramEncrypt = "encrypt"
paramAmplifyIO = "amplify_io"
)

type ImageTestSuite struct {
Expand Down
62 changes: 62 additions & 0 deletions smoke/tests/native_layer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (n *NativeLayerTestSuite) TestMakeLayers() test.Generator {
Dimension(paramEnablePrefetch, []interface{}{false, true}).
Dimension(paramBatch, []interface{}{"0", "0x100000"}).
Dimension(paramEncrypt, []interface{}{false, true}).
Dimension(paramAmplifyIO, []interface{}{uint64(0x100000)}).
Skip(func(param *tool.DescartesItem) bool {

// rafs v6 not support cached mode nor dummy cache
Expand Down Expand Up @@ -77,6 +78,67 @@ func (n *NativeLayerTestSuite) TestMakeLayers() test.Generator {
ctx.Runtime.CacheCompressed = scenario.GetBool(paramCacheCompressed)
ctx.Runtime.RafsMode = scenario.GetString(paramRafsMode)
ctx.Runtime.EnablePrefetch = scenario.GetBool(paramEnablePrefetch)
ctx.Runtime.AmplifyIO = scenario.GetUInt64(paramAmplifyIO)

return scenario.Str(), func(t *testing.T) {
n.testMakeLayers(*ctx, t)
}
}
}

func (n *NativeLayerTestSuite) TestAmplifyIO() test.Generator {

scenarios := tool.DescartesIterator{}
scenarios.

/* Common params */
Dimension(paramCompressor, []interface{}{"lz4_block"}).
Dimension(paramFSVersion, []interface{}{"5", "6"}).
Dimension(paramChunkSize, []interface{}{"0x100000"}).
Dimension(paramCacheType, []interface{}{"blobcache"}).
Dimension(paramCacheCompressed, []interface{}{false}).
Dimension(paramRafsMode, []interface{}{"direct"}).
Dimension(paramEnablePrefetch, []interface{}{true}).
Dimension(paramBatch, []interface{}{"0x100000"}).
Dimension(paramEncrypt, []interface{}{false}).

/* Amplify io - target param */
Dimension(paramAmplifyIO, []interface{}{uint64(0x0), uint64(0x100000), uint64(0x10000000)}).
Skip(func(param *tool.DescartesItem) bool {

// Rafs v6 not support cached mode nor dummy cache
if param.GetString(paramFSVersion) == "6" {
return param.GetString(paramRafsMode) == "cached" || param.GetString(paramCacheType) == ""
}

// Dummy cache not support prefetch
if param.GetString(paramCacheType) == "" && param.GetBool(paramEnablePrefetch) {
return true
}

// Batch or encrypt not work with rafs v5.
if param.GetString(paramFSVersion) == "5" && (param.GetString(paramBatch) != "0" || param.GetBool(paramEncrypt)) {
return true
}

return false
})

return func() (name string, testCase test.Case) {
if !scenarios.HasNext() {
return
}
scenario := scenarios.Next()

ctx := tool.DefaultContext(n.t)
ctx.Build.Compressor = scenario.GetString(paramCompressor)
ctx.Build.FSVersion = scenario.GetString(paramFSVersion)
ctx.Build.ChunkSize = scenario.GetString(paramChunkSize)
ctx.Runtime.CacheType = scenario.GetString(paramCacheType)
ctx.Runtime.CacheCompressed = scenario.GetBool(paramCacheCompressed)
ctx.Runtime.RafsMode = scenario.GetString(paramRafsMode)
ctx.Runtime.EnablePrefetch = scenario.GetBool(paramEnablePrefetch)
ctx.Runtime.AmplifyIO = scenario.GetUInt64(paramAmplifyIO)

return scenario.Str(), func(t *testing.T) {
n.testMakeLayers(*ctx, t)
Expand Down
2 changes: 2 additions & 0 deletions smoke/tests/tool/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type RuntimeContext struct {
CacheCompressed bool
RafsMode string
EnablePrefetch bool
AmplifyIO uint64
}

type EnvContext struct {
Expand Down Expand Up @@ -75,6 +76,7 @@ func DefaultContext(t *testing.T) *Context {
CacheCompressed: false,
RafsMode: "direct",
EnablePrefetch: true,
AmplifyIO: uint64(0x100000),
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions smoke/tests/tool/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (d *DescartesItem) Str() string {
return sb.String()
}

func (d *DescartesItem) GetUInt64(name string) uint64 {
return d.vals[name].(uint64)
}

// Generator of Cartesian product.
//
// An example is below:
Expand Down
4 changes: 3 additions & 1 deletion smoke/tests/tool/nydusd.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type NydusdConfig struct {
LatestReadFiles bool
AccessPattern bool
PrefetchFiles []string
AmplifyIO uint64
}

type Nydusd struct {
Expand Down Expand Up @@ -104,7 +105,8 @@ var configTpl = `
"digest_validate": {{.DigestValidate}},
"enable_xattr": true,
"latest_read_files": {{.LatestReadFiles}},
"access_pattern": {{.AccessPattern}}
"access_pattern": {{.AccessPattern}},
"amplify_io": {{.AmplifyIO}}
}
`

Expand Down
1 change: 1 addition & 0 deletions smoke/tests/tool/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func Verify(t *testing.T, ctx Context, expectedFiles map[string]*File) {
CacheCompressed: ctx.Runtime.CacheCompressed,
RafsMode: ctx.Runtime.RafsMode,
DigestValidate: false,
AmplifyIO: ctx.Runtime.AmplifyIO,
}

nydusd, err := NewNydusd(config)
Expand Down

0 comments on commit c8a39c8

Please sign in to comment.