Skip to content

Commit

Permalink
fileservice: add S3FS.restoreFromDiskCache
Browse files Browse the repository at this point in the history
fileservice: cache IOEntry if not caching full file in S3FS disk cache

fileservice: more tests for S3FS restore

fileservice: add Config.FixMissing

fileservice: add fixMissingFlag

fileservice: add MO_FS_FIX_MISSING env
  • Loading branch information
reusee committed Nov 13, 2023
1 parent 890d297 commit 30b341c
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 3 deletions.
9 changes: 9 additions & 0 deletions pkg/fileservice/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Config struct {
Cache CacheConfig `toml:"cache"`
// DataDir used to create fileservice using DISK as the backend
DataDir string `toml:"data-dir"`
// FixMissing inidicates the file service to try its best to fix missing files
FixMissing bool `toml:"fix-missing"`
}

// NewFileServicesFunc creates a new *FileServices
Expand Down Expand Up @@ -136,6 +138,7 @@ func newMinioFileService(
func newS3FileService(
ctx context.Context, cfg Config, perfCounters []*perfcounter.CounterSet,
) (FileService, error) {

fs, err := NewS3FS(
ctx,
ObjectStorageArguments{
Expand All @@ -152,5 +155,11 @@ func newS3FileService(
if err != nil {
return nil, err
}

if cfg.FixMissing || *fixMissingFlag || fixMissingFromEnv {
//TODO use context.WithoutCancel(ctx)
go fs.restoreFromDiskCache(context.Background())
}

return fs, nil
}
14 changes: 14 additions & 0 deletions pkg/fileservice/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,20 @@ func (d *DiskCache) pathForFile(path string) string {
)
}

var ErrNotCacheFile = errorStr("not a cache file")

func (d *DiskCache) decodeFilePath(diskPath string) (string, error) {
path, err := filepath.Rel(d.path, diskPath)
if err != nil {
return "", err
}
dir, file := filepath.Split(path)
if file != fmt.Sprintf("full%s", cacheFileSuffix) {
return "", ErrNotCacheFile
}
return fromOSPath(dir), nil
}

func (d *DiskCache) waitUpdateComplete(path string) {
d.updatingPaths.L.Lock()
for d.updatingPaths.m[path] {
Expand Down
6 changes: 6 additions & 0 deletions pkg/fileservice/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ func isRetryableError(err error) bool {
}
return false
}

type errorStr string

func (e errorStr) Error() string {
return string(e)
}
9 changes: 9 additions & 0 deletions pkg/fileservice/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package fileservice
import (
"encoding/csv"
"os"
"path/filepath"
"strings"
"unicode"

Expand Down Expand Up @@ -130,3 +131,11 @@ func toOSPath(filePath string) string {
}
return strings.ReplaceAll(filePath, "/", osPathSeparatorStr)
}

func fromOSPath(diskPath string) string {
diskPath = filepath.Clean(diskPath)
if os.PathSeparator == '/' {
return diskPath
}
return strings.ReplaceAll(diskPath, osPathSeparatorStr, "/")
}
9 changes: 9 additions & 0 deletions pkg/fileservice/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ func (c Policy) Any(policies ...Policy) bool {
}
return false
}

func (c Policy) CacheIOEntry() bool {
// cache IOEntry if not caching full file
return c.Any(SkipFullFilePreloads)
}

func (c Policy) CacheFullFile() bool {
return !c.Any(SkipFullFilePreloads)
}
13 changes: 10 additions & 3 deletions pkg/fileservice/s3_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,15 @@ func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error) {
if err := s.diskCache.Read(ctx, vector); err != nil {
return err
}
// we don't cache IOEntry to disk
// disk cache will be set by diskCache.SetFile in S3FS.read and S3FS.write
// try to cache IOEntry if not caching the full file
if vector.Policy.CacheIOEntry() {
defer func() {
if err != nil {
return
}
err = s.diskCache.Update(ctx, vector, s.asyncUpdate)
}()
}
}

if s.remoteCache != nil {
Expand Down Expand Up @@ -428,7 +435,7 @@ func (s *S3FS) read(ctx context.Context, vector *IOVector, bytesCounter *atomic.
return err
}

readFullObject := !vector.Policy.Any(SkipFullFilePreloads) &&
readFullObject := vector.Policy.CacheFullFile() &&
!vector.Policy.Any(SkipDiskCache)

var min, max *int64
Expand Down
125 changes: 125 additions & 0 deletions pkg/fileservice/s3_fs_restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileservice

import (
"context"
"flag"
"os"
"path/filepath"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"go.uber.org/zap"
)

var fixMissingFlag = flag.Bool(
"fs-fix-missing",
false,
"indicates file services to try their best to fix missing files",
)

var fixMissingFromEnv = os.Getenv("MO_FS_FIX_MISSING") != ""

func (s *S3FS) restoreFromDiskCache(ctx context.Context) {
if s.diskCache == nil {
return
}
cache := s.diskCache
logutil.Info("restore S3FS from disk cache",
zap.Any("fs name", s.Name()),
zap.Any("cache dir", cache.path),
)

counterSet := new(perfcounter.CounterSet)
ctx = perfcounter.WithCounterSet(ctx, counterSet)
numS3Write := counterSet.FileService.S3.Put.Load()

err := filepath.WalkDir(cache.path, func(diskPath string, entry os.DirEntry, err error) error {

// ignore error
if err != nil {
logutil.Info("restore from disk cache error",
zap.Any("do", "WalkDir entry"),
zap.Any("error", err),
)
return nil
}

// ignore entry
if entry.IsDir() {
return nil
}

path, err := cache.decodeFilePath(diskPath)
if err != nil {
logutil.Info("restore from disk cache error",
zap.Any("do", "decode file path"),
zap.Any("error", err),
)
// ignore
return nil
}

f, err := os.Open(diskPath)
if err != nil {
logutil.Info("restore from disk cache error",
zap.Any("do", "open file"),
zap.Any("error", err),
)
return nil
}
defer f.Close()

err = s.Write(ctx, IOVector{
FilePath: path,
Entries: []IOEntry{
{
Size: -1,
ReaderForWrite: f,
},
},
})
if err != nil {
if moerr.IsMoErrCode(err, moerr.ErrFileAlreadyExists) {
// already exists
return nil
}
logutil.Info("restore from disk cache error",
zap.Any("do", "write"),
zap.Any("error", err),
)
return nil
}

n := counterSet.FileService.S3.Put.Load()
if n > numS3Write {
logutil.Info("restore from disk cache",
zap.Any("path", path),
)
numS3Write = n
}

return nil
})
if err != nil {
logutil.Info("restore from disk cache error",
zap.Any("do", "WalkDir"),
zap.Any("error", err),
)
}

}
95 changes: 95 additions & 0 deletions pkg/fileservice/s3_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/util/toml"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

type _TestS3Config struct {
Expand Down Expand Up @@ -780,3 +782,96 @@ func TestSequentialS3Read(t *testing.T) {
}

}

func TestS3RestoreFromCache(t *testing.T) {
ctx := context.Background()

config, err := loadS3TestConfig()
assert.Nil(t, err)
if config.Endpoint == "" {
// no config
t.Skip()
}

t.Setenv("AWS_REGION", config.Region)
t.Setenv("AWS_ACCESS_KEY_ID", config.APIKey)
t.Setenv("AWS_SECRET_ACCESS_KEY", config.APISecret)

cacheDir := t.TempDir()
fs, err := NewS3FS(
ctx,
ObjectStorageArguments{
Name: "s3",
Endpoint: config.Endpoint,
Bucket: config.Bucket,
KeyPrefix: time.Now().Format("2006-01-02.15:04:05.000000"),
AssumeRoleARN: config.RoleARN,
},
CacheConfig{
DiskPath: ptrTo(cacheDir),
},
nil,
false,
)
assert.Nil(t, err)

// write file
err = fs.Write(ctx, IOVector{
FilePath: "foo/bar",
Entries: []IOEntry{
{
Size: 3,
Data: []byte("foo"),
},
},
})
assert.Nil(t, err)

// write file without full file cache
err = fs.Write(ctx, IOVector{
FilePath: "quux",
Entries: []IOEntry{
{
Size: 3,
Data: []byte("foo"),
},
},
Policy: SkipFullFilePreloads,
})
assert.Nil(t, err)
err = fs.Read(ctx, &IOVector{
FilePath: "quux",
Entries: []IOEntry{
{
Size: 3,
},
},
})
assert.Nil(t, err)

err = fs.Delete(ctx, "foo/bar")
assert.Nil(t, err)

logutil.Info("cache dir", zap.Any("dir", cacheDir))

counterSet := new(perfcounter.CounterSet)
ctx = perfcounter.WithCounterSet(ctx, counterSet)
fs.restoreFromDiskCache(ctx)

if counterSet.FileService.S3.Put.Load() != 1 {
t.Fatal()
}

vec := &IOVector{
FilePath: "foo/bar",
Entries: []IOEntry{
{
Size: -1,
},
},
}
err = fs.Read(ctx, vec)
assert.Nil(t, err)
assert.Equal(t, []byte("foo"), vec.Entries[0].Data)

}

0 comments on commit 30b341c

Please sign in to comment.