Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow a way to disable the use of O_DIRECT in parallel downloads #2372

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type DebugConfig struct {
type FileCacheConfig struct {
CacheFileForRangeRead bool `yaml:"cache-file-for-range-read,omitempty" json:"cache-file-for-range-read,omitempty"`

DisableODirect bool `yaml:"disable-o-direct,omitempty" json:"disable-o-direct,omitempty"`

DownloadChunkSizeMb int64 `yaml:"download-chunk-size-mb,omitempty" json:"download-chunk-size-mb,omitempty"`

EnableCrc bool `yaml:"enable-crc,omitempty" json:"enable-crc,omitempty"`
Expand Down Expand Up @@ -317,6 +319,16 @@ func BindFlags(v *viper.Viper, flagSet *pflag.FlagSet) error {
return err
}

flagSet.BoolP("disable-o-direct", "", false, "Whether to disable using O_DIRECT while writing to file-cache in case of parallel downloads.")

if err := flagSet.MarkHidden("disable-o-direct"); err != nil {
return err
}

if err := v.BindPFlag("file-cache.disable-o-direct", flagSet.Lookup("disable-o-direct")); err != nil {
return err
}

flagSet.BoolP("disable-parallel-dirops", "", false, "Specifies whether to allow parallel dir operations (lookups and readers)")

if err := flagSet.MarkHidden("disable-parallel-dirops"); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cfg/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func validFileCacheConfig(t *testing.T) FileCacheConfig {
MaxSizeMb: -1,
ParallelDownloadsPerFile: 16,
WriteBufferSize: 4 * 1024 * 1024,
DisableODirect: true,
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/config_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func TestValidateConfigFile_FileCacheConfigSuccessful(t *testing.T) {
MaxSizeMb: 40,
ParallelDownloadsPerFile: 10,
WriteBufferSize: 8192,
DisableODirect: true,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions cmd/testdata/valid_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ file-cache:
max-size-mb: 40
parallel-downloads-per-file: 10
write-buffer-size: 8192
disable-o-direct: true
gcs-auth:
anonymous-access: true
key-file: "~/key.file"
Expand Down
12 changes: 12 additions & 0 deletions internal/cache/file/downloader/jm_parallel_downloads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func TestParallelDownloads(t *testing.T) {
maxParallelDownloads int64
downloadOffset int64
subscribedOffset int64
disableODirect bool
}{
{
name: "download the entire object when object size > no of goroutines * readReqSize",
Expand All @@ -118,6 +119,16 @@ func TestParallelDownloads(t *testing.T) {
subscribedOffset: 7,
downloadOffset: 10,
},
{
name: "download the entire object with O_DIRECT disabled.",
objectSize: 16 * util.MiB,
readReqSize: 4,
parallelDownloadsPerFile: 100,
maxParallelDownloads: 3,
subscribedOffset: 7,
downloadOffset: 10,
disableODirect: true,
},
}
for _, tc := range tbl {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -133,6 +144,7 @@ func TestParallelDownloads(t *testing.T) {
DownloadChunkSizeMb: tc.readReqSize, EnableCrc: true,
MaxParallelDownloads: tc.maxParallelDownloads,
WriteBufferSize: 4 * 1024 * 1024,
DisableODirect: tc.disableODirect,
}
jm := NewJobManager(cache, util.DefaultFilePerm, util.DefaultDirPerm, cacheDir, 2, fileCacheConfig)
job := jm.CreateJobIfNotExists(&minObj, bucket)
Expand Down
5 changes: 3 additions & 2 deletions internal/cache/file/downloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,9 @@ func (job *Job) createCacheFile() (*os.File, error) {
openFileFlags := os.O_TRUNC | os.O_WRONLY
var cacheFile *os.File
var err error
// Try using O_DIRECT while opening file in case of parallel downloads.
if job.fileCacheConfig.EnableParallelDownloads {
// Try using O_DIRECT while opening file when parallel downloads are enabled
// and O_DIRECT use is not disabled.
if job.fileCacheConfig.EnableParallelDownloads && !job.fileCacheConfig.DisableODirect {
cacheFile, err = cacheutil.CreateFile(job.fileSpec, openFileFlags|syscall.O_DIRECT)
if errors.Is(err, fs.ErrInvalid) || errors.Is(err, syscall.EINVAL) {
logger.Warnf("downloadObjectAsync: failure in opening file with O_DIRECT, falling back to without O_DIRECT")
Expand Down
13 changes: 13 additions & 0 deletions internal/cache/file/downloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,3 +1042,16 @@ func (dt *downloaderTest) Test_createCacheFile_WhenParallelDownloads() {
_ = cacheFile.Close()
}()
}

func (dt *downloaderTest) Test_createCacheFile_WhenParallelDownloadsEnabledAndODirectDisabled() {
//Arrange - initJobTest is being called in setup of downloader.go
dt.job.fileCacheConfig.EnableParallelDownloads = true
dt.job.fileCacheConfig.DisableODirect = true

cacheFile, err := dt.job.createCacheFile()

AssertEq(nil, err)
defer func() {
_ = cacheFile.Close()
}()
}
20 changes: 13 additions & 7 deletions internal/cache/file/downloader/parallel_downloads_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,20 @@ func (job *Job) downloadRange(ctx context.Context, dstWriter io.Writer, start, e

monitor.CaptureGCSReadMetrics(ctx, util.Parallel, end-start)

_, err = cacheutil.CopyUsingMemoryAlignedBuffer(ctx, newReader, dstWriter, end-start,
job.fileCacheConfig.WriteBufferSize)
// If context is canceled while reading/writing in CopyUsingMemoryAlignedBuffer
// then it returns error different from context cancelled (invalid argument),
// and we need to report that error as context cancelled.
if !errors.Is(err, context.Canceled) && errors.Is(ctx.Err(), context.Canceled) {
err = errors.Join(err, ctx.Err())
// Use of memory aligned buffer is not required if use of O_DIRECT is disabled.
if job.fileCacheConfig.DisableODirect {
_, err = io.CopyN(dstWriter, newReader, end-start)
} else {
_, err = cacheutil.CopyUsingMemoryAlignedBuffer(ctx, newReader, dstWriter, end-start,
job.fileCacheConfig.WriteBufferSize)
// If context is canceled while reading/writing in CopyUsingMemoryAlignedBuffer
// then it returns error different from context cancelled (invalid argument),
// and we need to report that error as context cancelled.
if !errors.Is(err, context.Canceled) && errors.Is(ctx.Err(), context.Canceled) {
err = errors.Join(err, ctx.Err())
}
}

if err != nil {
err = fmt.Errorf("downloadRange: error at the time of copying content to cache file %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/config/mount_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
DefaultDownloadChunkSizeMB = 50
DefaultParallelDownloadsPerFile = 16
DefaultWriteBufferSize = int64(4 * util.MiB)
DefaultDisableODirect = false
)

type LogConfig struct {
Expand Down Expand Up @@ -109,6 +110,7 @@ type FileCacheConfig struct {
DownloadChunkSizeMB int `yaml:"download-chunk-size-mb,omitempty"`
EnableCRC bool `yaml:"enable-crc"`
WriteBufferSize int64 `yaml:"write-buffer-size,omitempty"`
DisableODirect bool `yaml:"disable-o-direct"`
}

type MetadataCacheConfig struct {
Expand Down Expand Up @@ -188,6 +190,7 @@ func NewMountConfig() *MountConfig {
DownloadChunkSizeMB: DefaultDownloadChunkSizeMB,
EnableCRC: DefaultEnableCRC,
WriteBufferSize: DefaultWriteBufferSize,
DisableODirect: DefaultDisableODirect,
}
mountConfig.MetadataCacheConfig = MetadataCacheConfig{
TtlInSeconds: TtlInSecsUnsetSentinel,
Expand Down
1 change: 1 addition & 0 deletions internal/config/testdata/valid_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ file-cache:
download-chunk-size-mb: 100
enable-crc: false
write-buffer-size: 8192
disable-o-direct: true
metadata-cache:
ttl-secs: 5
type-cache-max-size-mb: 1
Expand Down
2 changes: 2 additions & 0 deletions internal/config/yaml_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func validateDefaultConfig(t *testing.T, mountConfig *MountConfig) {
assert.Equal(t, 50, mountConfig.FileCacheConfig.DownloadChunkSizeMB)
assert.False(t, mountConfig.FileCacheConfig.EnableCRC)
assert.Equal(t, int64(4*1024*1024), mountConfig.FileCacheConfig.WriteBufferSize)
assert.Equal(t, false, mountConfig.FileCacheConfig.DisableODirect)
assert.Equal(t, 1, mountConfig.GCSConnection.GRPCConnPoolSize)
assert.False(t, mountConfig.GCSAuth.AnonymousAccess)
assert.True(t, bool(mountConfig.EnableHNS))
Expand Down Expand Up @@ -133,6 +134,7 @@ func (t *YamlParserTest) TestReadConfigFile_ValidConfig() {
assert.Equal(t.T(), 100, mountConfig.DownloadChunkSizeMB)
assert.False(t.T(), mountConfig.FileCacheConfig.EnableCRC)
assert.Equal(t.T(), int64(8192), mountConfig.FileCacheConfig.WriteBufferSize)
assert.Equal(t.T(), true, mountConfig.FileCacheConfig.DisableODirect)

// gcs-retries
assert.Equal(t.T(), int64(6), mountConfig.GCSRetries.MaxRetryAttempts)
Expand Down
7 changes: 7 additions & 0 deletions tools/config-gen/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@
usage: "Whether to cache file for range reads."
default: false

- config-path: "file-cache.disable-o-direct"
flag-name: "disable-o-direct"
type: "bool"
usage: "Whether to disable using O_DIRECT while writing to file-cache in case of parallel downloads."
default: "false"
hide-flag: true

- config-path: "file-cache.download-chunk-size-mb"
flag-name: "download-chunk-size-mb"
type: "int"
Expand Down
Loading