Skip to content

Commit

Permalink
Merge pull request #527 from ipfs-force-community/fix/import-slow
Browse files Browse the repository at this point in the history
fix: import slow
  • Loading branch information
LinZexiao authored Apr 26, 2024
2 parents ceb9ea0 + d4507bc commit ff7aecd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 193 deletions.
95 changes: 22 additions & 73 deletions piecestorage/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"os"
"path"
"path/filepath"

"github.com/filecoin-project/dagstore/mount"

Expand All @@ -23,44 +22,28 @@ type fsPieceStorage struct {
}

func (f *fsPieceStorage) Len(_ context.Context, resourceId string) (int64, error) {
size := int64(-1)
err := filepath.Walk(f.baseUrl, func(path string, info os.FileInfo, _ error) error {
if info.Name() == resourceId {
if info.IsDir() {
return fmt.Errorf("resource %s expect to be a file but found directory", resourceId)
}
fi, err := os.Stat(path)
if err != nil {
return err
}
size = fi.Size()

return filepath.SkipDir
}
return nil
})
st, err := os.Stat(path.Join(f.baseUrl, resourceId))
if err != nil {
return 0, err
}
if size == -1 {
return 0, fmt.Errorf("resource %s not found", resourceId)
}

return size, nil
if st.IsDir() {
return 0, fmt.Errorf("resource %s expect to be a file but found directory", resourceId)
}
return st.Size(), err
}

func (f *fsPieceStorage) ListResourceIds(_ context.Context) ([]string, error) {
var resources []string
err := filepath.Walk(f.baseUrl, func(path string, info os.FileInfo, _ error) error {
if !info.IsDir() {
resources = append(resources, info.Name())
}
return nil
})
entries, err := os.ReadDir(f.baseUrl)
if err != nil {
return nil, err
}

var resources []string
for _, entry := range entries {
if !entry.IsDir() {
resources = append(resources, entry.Name())
}
}
return resources, nil
}

Expand All @@ -84,33 +67,8 @@ func (f *fsPieceStorage) SaveTo(_ context.Context, resourceId string, r io.Reade
return wlen, err
}

func (f *fsPieceStorage) findFile(resourceId string) (string, error) {
var dstPath string
err := filepath.Walk(f.baseUrl, func(path string, info os.FileInfo, _ error) error {
if info.Name() == resourceId {
if info.IsDir() {
return fmt.Errorf("resource %s expect to be a file but found directory", resourceId)
}
dstPath = path
return filepath.SkipDir
}
return nil
})
if err != nil {
return "", err
}
if len(dstPath) == 0 {
return "", fmt.Errorf("resource %s not found", resourceId)
}

return dstPath, nil
}

func (f *fsPieceStorage) GetReaderCloser(_ context.Context, resourceId string) (io.ReadCloser, error) {
dstPath, err := f.findFile(resourceId)
if err != nil {
return nil, err
}
dstPath := path.Join(f.baseUrl, resourceId)
fs, err := os.Open(dstPath)
if err != nil {
return nil, fmt.Errorf("unable to open file %s %w", dstPath, err)
Expand All @@ -119,10 +77,7 @@ func (f *fsPieceStorage) GetReaderCloser(_ context.Context, resourceId string) (
}

func (f *fsPieceStorage) GetMountReader(_ context.Context, resourceId string) (mount.Reader, error) {
dstPath, err := f.findFile(resourceId)
if err != nil {
return nil, err
}
dstPath := path.Join(f.baseUrl, resourceId)
fs, err := os.Open(dstPath)
if err != nil {
return nil, fmt.Errorf("unable to open file %s %w", dstPath, err)
Expand All @@ -146,24 +101,18 @@ func (f *fsPieceStorage) GetPieceTransfer(_ context.Context, pieceCid string) (s
}

func (f *fsPieceStorage) Has(_ context.Context, resourceId string) (bool, error) {
var has bool
err := filepath.Walk(f.baseUrl, func(path string, info os.FileInfo, _ error) error {
if info.Name() == resourceId {
if info.IsDir() {
return fmt.Errorf("resource %s expect to be a file but found directory", resourceId)
}
if info.Mode().IsRegular() {
has = true
}
return filepath.SkipDir
}
return nil
})
s, err := os.Stat(path.Join(f.baseUrl, resourceId))
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
if !s.Mode().IsRegular() {
return false, nil
}

return has, nil
return true, nil
}

func (f *fsPieceStorage) Validate(_ string) error {
Expand Down
124 changes: 4 additions & 120 deletions piecestorage/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package piecestorage
import (
"context"
"crypto/rand"
"fmt"
"io"
rand2 "math/rand"
"os"
path2 "path"
"path/filepath"
"testing"
"time"

"github.com/google/uuid"

Expand Down Expand Up @@ -54,132 +50,20 @@ func TestReWrite(t *testing.T) {
require.Equal(t, int64(len(buf)), int64(100))
}

files := []string{"f1", "f2", "f3", "f4"}
data, err := io.ReadAll(io.LimitReader(rand.Reader, 100))
require.NoError(t, err)
for i, f := range files {
if i%2 == 0 {
_ = os.MkdirAll(filepath.Join(path, "tmp"), os.ModePerm)
assert.NoError(t, os.WriteFile(filepath.Join(path, "tmp", f), data, os.ModePerm))
continue
}
wlen, err := ifs.SaveTo(ctx, f, io.LimitReader(rand.Reader, 100))
assert.NoErrorf(t, err, "expect to write file ")
assert.Equal(t, int64(100), wlen)
}

for _, name := range append(files, name) {
_, err = ifs.GetStorageStatus()
require.Nil(t, err)
require.Equal(t, FS, ifs.Type())

length, err := ifs.Len(ctx, name)
require.NoError(t, err, "fail to get length")
require.Equal(t, int64(100), length)

has, err := ifs.Has(ctx, name)
require.NoError(t, err, "fail to check file exit")
require.True(t, has)
require.False(t, ifs.ReadOnly())

resources, err := ifs.ListResourceIds(ctx)
require.NoErrorf(t, err, "expect resource but got err")
require.Len(t, resources, 6)
require.Contains(t, resources, name)

readerCloser, err := ifs.GetReaderCloser(ctx, name)
require.NoError(t, err, "fail to get reader closer")
buf, err = io.ReadAll(readerCloser)
require.NoErrorf(t, err, "expect read file success")
if len(buf) != 100 {
require.Equal(t, int64(len(buf)), int64(100))
}

mounterReader, err := ifs.GetMountReader(ctx, name)
require.NoError(t, err, "fail to get mount reader")
buf, err = io.ReadAll(mounterReader)
require.NoErrorf(t, err, "expect read file success")
if len(buf) != 100 {
require.Equal(t, int64(len(buf)), int64(100))
}
}

dir := "tmp"
expectErr := fmt.Errorf("resource %s expect to be a file but found directory", dir)
has, err := ifs.Has(ctx, dir)
require.Equal(t, expectErr, err)
require.False(t, has)

length, err := ifs.Len(ctx, dir)
require.Equal(t, expectErr, err)
assert.Equal(t, int64(0), length)

readerCloser, err := ifs.GetReaderCloser(ctx, dir)
require.Equal(t, expectErr, err)
assert.Nil(t, readerCloser)

mounterReader, err := ifs.GetMountReader(ctx, dir)
require.Equal(t, expectErr, err)
assert.Nil(t, mounterReader)

noExistFile := "f111"
has, err = ifs.Has(ctx, noExistFile)
has, err := ifs.Has(ctx, noExistFile)
require.NoError(t, err)
require.False(t, has)

length, err = ifs.Len(ctx, noExistFile)
length, err := ifs.Len(ctx, noExistFile)
require.Error(t, err)
assert.Equal(t, int64(0), length)

readerCloser, err = ifs.GetReaderCloser(ctx, noExistFile)
readerCloser, err := ifs.GetReaderCloser(ctx, noExistFile)
require.Error(t, err)
assert.Nil(t, readerCloser)

mounterReader, err = ifs.GetMountReader(ctx, noExistFile)
mounterReader, err := ifs.GetMountReader(ctx, noExistFile)
require.Error(t, err)
assert.Nil(t, mounterReader)
}

func TestLargeFile(t *testing.T) {
path := path2.Join(os.TempDir(), "market-test-tmp")
_ = os.MkdirAll(path, os.ModePerm)
defer os.RemoveAll(path) // nolint

ctx := context.TODO()
ifs, err := NewFsPieceStorage(&config.FsPieceStorage{ReadOnly: false, Path: path})
require.NoErrorf(t, err, "open file storage")

for i := 0; i < 100; i++ {
if i%2 == 0 {
tmpDir := fmt.Sprintf("tmp_%d", i)
require.NoError(t, os.MkdirAll(filepath.Join(path, tmpDir), os.ModePerm))
for j := 0; j < 3000; j++ {
assert.NoError(t, os.WriteFile(filepath.Join(path, tmpDir, fmt.Sprintf("tmp_%d_%d", i, j)), []byte("test"), os.ModePerm))
}
continue
}
for j := 0; j < 600; j++ {
assert.NoError(t, os.WriteFile(filepath.Join(path, fmt.Sprintf("%d_%d", i, j)), []byte("test"), os.ModePerm))
}
}

var fileName string
for i := 0; i < 100; i += 3 {
if i%2 == 0 {
fileName = fmt.Sprintf("tmp_%d_%d", i, rand2.Intn(3000))
} else {
fileName = fmt.Sprintf("%d_%d", i, rand2.Intn(600))
}

start := time.Now()
_, err = ifs.GetMountReader(ctx, fileName)
require.NoError(t, err)
fmt.Println("file name", fileName, "took", time.Since(start))
}

start := time.Now()
list, err := ifs.ListResourceIds(ctx)
require.NoError(t, err)
require.Len(t, list, 180000, fmt.Sprintf("not match %d", len(list)))
fmt.Println("list took", time.Since(start))
}

0 comments on commit ff7aecd

Please sign in to comment.