Skip to content

Commit

Permalink
change batch num
Browse files Browse the repository at this point in the history
Signed-off-by: jiefenghuang <jiefeng@juicedata.io>
  • Loading branch information
jiefenghuang committed Sep 30, 2024
1 parent b1f7432 commit 551c183
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 85 deletions.
89 changes: 47 additions & 42 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ const (
)

var (
DirBatchNum = 4096
DirBatchNum = map[string]int{
"redis": 4096,
"kv": 4096,
"db": 40960,
}
maxCompactSlices = 1000
maxSlices = 2500
inodeNeedPrefetch = uint64(utils.JitterIt(inodeBatch * 0.1)) // Add jitter to reduce probability of txn conflicts
Expand Down Expand Up @@ -2971,17 +2975,18 @@ type dirHandler struct {
batch *dirBatch
fetcher dirFetcher
readOff int
batchNum int
}

func (s *dirHandler) fetch(ctx Context, offset int) (*dirBatch, error) {
func (h *dirHandler) fetch(ctx Context, offset int) (*dirBatch, error) {
var cursor interface{}
if s.batch != nil && s.batch.predecessor(offset) {
if s.batch.isEnd {
return s.batch, nil
if h.batch != nil && h.batch.predecessor(offset) {
if h.batch.isEnd {
return h.batch, nil
}
cursor = s.batch.cursor
cursor = h.batch.cursor
}
nextCursor, entries, err := s.fetcher(ctx, s.inode, cursor, offset, DirBatchNum, s.plus)
nextCursor, entries, err := h.fetcher(ctx, h.inode, cursor, offset, h.batchNum, h.plus)
if err != nil {
return nil, err
}
Expand All @@ -2990,7 +2995,7 @@ func (s *dirHandler) fetch(ctx Context, offset int) (*dirBatch, error) {
nextCursor = cursor
}
isEnd := false
if len(entries) < DirBatchNum {
if len(entries) < h.batchNum {
isEnd = true
}
indexes := make(map[string]int, len(entries))
Expand All @@ -3004,70 +3009,70 @@ func (s *dirHandler) fetch(ctx Context, offset int) (*dirBatch, error) {
return &dirBatch{isEnd: isEnd, offset: offset, cursor: nextCursor, entries: entries, indexes: indexes, maxName: maxName}, nil
}

func (s *dirHandler) List(ctx Context, offset int) ([]*Entry, syscall.Errno) {
func (h *dirHandler) List(ctx Context, offset int) ([]*Entry, syscall.Errno) {
var prefix []*Entry
if offset < len(s.initEntries) {
prefix = s.initEntries[offset:]
if offset < len(h.initEntries) {
prefix = h.initEntries[offset:]
offset = 0
} else {
offset -= len(s.initEntries)
offset -= len(h.initEntries)
}

var err error
s.Lock()
defer s.Unlock()
if !s.batch.contain(offset) {
s.batch, err = s.fetch(ctx, offset)
h.Lock()
defer h.Unlock()
if !h.batch.contain(offset) {
h.batch, err = h.fetch(ctx, offset)
}

if err != nil {
return nil, errno(err)
}

s.readOff = s.batch.offset + len(s.batch.entries)
h.readOff = h.batch.offset + len(h.batch.entries)
if len(prefix) > 0 {
return append(prefix, s.batch.entries...), 0
return append(prefix, h.batch.entries...), 0
}
return s.batch.entries[offset-s.batch.offset:], 0
return h.batch.entries[offset-h.batch.offset:], 0
}

func (s *dirHandler) delete(name string) {
if s.batch == nil || len(s.batch.entries) == 0 {
func (h *dirHandler) delete(name string) {
if h.batch == nil || len(h.batch.entries) == 0 {
return
}

if idx, ok := s.batch.indexes[name]; ok && idx >= s.readOff {
delete(s.batch.indexes, name)
if n := len(s.batch.entries); n == 1 {
s.batch.entries = s.batch.entries[:0]
if idx, ok := h.batch.indexes[name]; ok && idx >= h.readOff {
delete(h.batch.indexes, name)
if n := len(h.batch.entries); n == 1 {
h.batch.entries = h.batch.entries[:0]
} else {
s.batch.entries[idx] = s.batch.entries[n-1]
s.batch.indexes[string(s.batch.entries[idx].Name)] = idx
s.batch.entries = s.batch.entries[:n-1]
h.batch.entries[idx] = h.batch.entries[n-1]
h.batch.indexes[string(h.batch.entries[idx].Name)] = idx
h.batch.entries = h.batch.entries[:n-1]
}
}
}

func (s *dirHandler) Insert(inode Ino, name string, attr *Attr) {
s.Lock()
defer s.Unlock()
if s.batch == nil {
func (h *dirHandler) Insert(inode Ino, name string, attr *Attr) {
h.Lock()
defer h.Unlock()
if h.batch == nil {
return
}
if s.batch.isEnd || bytes.Compare([]byte(name), s.batch.maxName) < 0 {
if h.batch.isEnd || bytes.Compare([]byte(name), h.batch.maxName) < 0 {
// TODO: not sorted
s.batch.entries = append(s.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr})
s.batch.indexes[name] = len(s.batch.entries) - 1
h.batch.entries = append(h.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr})
h.batch.indexes[name] = len(h.batch.entries) - 1
}
}

func (s *dirHandler) Read(offset int) {
s.readOff = offset - len(s.initEntries)
func (h *dirHandler) Read(offset int) {
h.readOff = offset - len(h.initEntries)
}

func (s *dirHandler) Close() {
s.Lock()
s.batch = nil
s.readOff = 0
s.Unlock()
func (h *dirHandler) Close() {
h.Lock()
h.batch = nil
h.readOff = 0
h.Unlock()
}
6 changes: 4 additions & 2 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4566,6 +4566,7 @@ func (m *redisMeta) newDirHandler(inode Ino, plus bool, entries []*Entry) DirHan
inode: inode,
plus: plus,
initEntries: entries,
batchNum: DirBatchNum["redis"],
}
}

Expand All @@ -4578,6 +4579,7 @@ type redisDirHandler struct {
entries []*Entry
indexes map[string]int
readOff int
batchNum int
}

func (s *redisDirHandler) Close() {
Expand Down Expand Up @@ -4676,8 +4678,8 @@ func (s *redisDirHandler) List(ctx Context, offset int) ([]*Entry, syscall.Errno
s.Unlock()

size := len(s.entries) - offset
if size > DirBatchNum {
size = DirBatchNum
if size > s.batchNum {
size = s.batchNum
}
s.readOff = offset + size
entries := s.entries[offset : offset+size]
Expand Down
31 changes: 17 additions & 14 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4451,37 +4451,40 @@ type dbDirHandler struct {
dirHandler
}

func (s *dbDirHandler) Insert(inode Ino, name string, attr *Attr) {
s.Lock()
defer s.Unlock()
if s.batch == nil {
func (h *dbDirHandler) Insert(inode Ino, name string, attr *Attr) {
h.Lock()
defer h.Unlock()
if h.batch == nil {
return
}
if s.batch.isEnd || bytes.Compare([]byte(name), s.batch.maxName) < 0 {
s.batch.entries = append(s.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr})
s.batch.indexes[name] = len(s.batch.entries) - 1
if h.batch.isEnd || bytes.Compare([]byte(name), h.batch.maxName) < 0 {
h.batch.entries = append(h.batch.entries, &Entry{Inode: inode, Name: []byte(name), Attr: attr})
h.batch.indexes[name] = len(h.batch.entries) - 1
}
}

func (s *dbDirHandler) Delete(name string) {
s.Lock()
defer s.Unlock()
func (h *dbDirHandler) Delete(name string) {
h.Lock()
defer h.Unlock()

s.dirHandler.delete(name)
if s.batch != nil && !s.batch.isEnd && bytes.Compare(s.batch.maxName, []byte(name)) > 0 && s.batch.cursor != nil {
s.batch.cursor = s.batch.cursor.(int) - 1
h.dirHandler.delete(name)
if h.batch != nil && !h.batch.isEnd && bytes.Compare(h.batch.maxName, []byte(name)) > 0 && h.batch.cursor != nil {
h.batch.cursor = h.batch.cursor.(int) - 1
}
}

func (m *dbMeta) newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler {
return &dbDirHandler{
h := &dbDirHandler{
dirHandler: dirHandler{
inode: inode,
plus: plus,
initEntries: entries,
fetcher: m.getDirFetcher(),
batchNum: DirBatchNum["db"],
},
}
h.batch, _ = h.fetch(Background, 0)
return h
}

func (m *dbMeta) getDirFetcher() dirFetcher {
Expand Down
9 changes: 5 additions & 4 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -3785,10 +3785,10 @@ type kvDirHandler struct {
dirHandler
}

func (s *kvDirHandler) Delete(name string) {
s.Lock()
defer s.Unlock()
s.dirHandler.delete(name)
func (h *kvDirHandler) Delete(name string) {
h.Lock()
defer h.Unlock()
h.dirHandler.delete(name)
}

func (m *kvMeta) newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandler {
Expand All @@ -3798,6 +3798,7 @@ func (m *kvMeta) newDirHandler(inode Ino, plus bool, entries []*Entry) DirHandle
plus: plus,
initEntries: entries,
fetcher: m.getDirFetcher(),
batchNum: DirBatchNum["kv"],
},
}
s.batch, _ = s.fetch(Background, 0)
Expand Down
69 changes: 46 additions & 23 deletions pkg/vfs/vfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,15 +874,20 @@ func TestInternalFile(t *testing.T) {
}

func TestReaddirCache(t *testing.T) {
for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} {
testReaddirCache(t, metaUri, 20)
testReaddirCache(t, metaUri, 4096)
engines := map[string]string{
"kv": "",
"db": "sqlite3://",
"redis": "redis://127.0.0.1:6379/2",
}
for typ, metaUri := range engines {
testReaddirCache(t, metaUri, typ, 20)
testReaddirCache(t, metaUri, typ, 4096)
}
}

func testReaddirCache(t *testing.T, metaUri string, batchNum int) {
func testReaddirCache(t *testing.T, metaUri string, typ string, batchNum int) {
old := meta.DirBatchNum
meta.DirBatchNum = batchNum
meta.DirBatchNum[typ] = batchNum
defer func() {
meta.DirBatchNum = old
}()
Expand Down Expand Up @@ -1011,7 +1016,13 @@ func testVFSReadDirSort(t *testing.T, metaUri string) {
v.Releasedir(ctx, parent, fh2)
}

func testReaddirBatch(t *testing.T, metaUri string) {
func testReaddirBatch(t *testing.T, metaUri string, typ string, batchNum int) {
old := meta.DirBatchNum
meta.DirBatchNum[typ] = batchNum
defer func() {
meta.DirBatchNum = old
}()

n, extra := 5, 40

v, _ := createTestVFS(nil, metaUri)
Expand All @@ -1023,11 +1034,11 @@ func testReaddirBatch(t *testing.T, metaUri string) {
}

parent := entry.Inode
for i := 0; i < n*meta.DirBatchNum+extra; i++ {
for i := 0; i < n*batchNum+extra; i++ {
_, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022)
}
defer func() {
for i := 0; i < n*meta.DirBatchNum+extra; i++ {
for i := 0; i < n*batchNum+extra; i++ {
_ = v.Rmdir(ctx, parent, fmt.Sprintf("d%d", i))
}
v.Rmdir(ctx, 1, "testdir")
Expand All @@ -1037,23 +1048,23 @@ func testReaddirBatch(t *testing.T, metaUri string) {
defer v.Releasedir(ctx, parent, fh)
entries1, _, _ := v.Readdir(ctx, parent, 0, 0, fh, true)
require.NotNil(t, entries1)
require.Equal(t, 2+meta.DirBatchNum, len(entries1)) // init entries: "." and ".."
require.Equal(t, 2+batchNum, len(entries1)) // init entries: "." and ".."

entries2, _, _ := v.Readdir(ctx, parent, 0, 2, fh, true)
require.NotNil(t, entries2)
require.Equal(t, meta.DirBatchNum, len(entries2))
require.Equal(t, batchNum, len(entries2))

entries3, _, _ := v.Readdir(ctx, parent, 0, 2+meta.DirBatchNum, fh, true)
entries3, _, _ := v.Readdir(ctx, parent, 0, 2+batchNum, fh, true)
require.NotNil(t, entries3)
require.Equal(t, meta.DirBatchNum, len(entries3))
require.Equal(t, batchNum, len(entries3))

// reach the end
entries4, _, _ := v.Readdir(ctx, parent, 0, n*meta.DirBatchNum+extra+2, fh, true)
entries4, _, _ := v.Readdir(ctx, parent, 0, n*batchNum+extra+2, fh, true)
require.NotNil(t, entries4)
require.Equal(t, 0, len(entries4))

// skip-style readdir
entries5, _, _ := v.Readdir(ctx, parent, 0, n*meta.DirBatchNum+2, fh, true)
entries5, _, _ := v.Readdir(ctx, parent, 0, n*batchNum+2, fh, true)
require.NotNil(t, entries5)
require.Equal(t, extra, len(entries5))

Expand All @@ -1064,25 +1075,37 @@ func testReaddirBatch(t *testing.T, metaUri string) {
}

// dir seak
entries7, _, _ := v.Readdir(ctx, parent, 0, n*meta.DirBatchNum+2-20, fh, true)
entries7, _, _ := v.Readdir(ctx, parent, 0, n*batchNum+2-20, fh, true)
require.True(t, reflect.DeepEqual(entries5, entries7[20:]))
}

func TestReadDirSteaming(t *testing.T) {
for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} {
testReaddirBatch(t, metaUri)
engines := map[string]string{
"kv": "",
"db": "sqlite3://",
"redis": "redis://127.0.0.1:6379/2",
}
for typ, metaUri := range engines {
testReaddirBatch(t, metaUri, typ, 100)
testReaddirBatch(t, metaUri, typ, 4096)
}
}

func TestReaddir(t *testing.T) {
extra := rand.Intn(meta.DirBatchNum)
for _, metaUri := range []string{"", "sqlite3://", "redis://127.0.0.1:6379/2"} {
engines := map[string]string{
"kv": "",
"db": "sqlite3://",
"redis": "redis://127.0.0.1:6379/2",
}
for typ, metaUri := range engines {
batchNum := meta.DirBatchNum[typ]
extra := rand.Intn(batchNum)
testReaddir(t, metaUri, 20, 0)
testReaddir(t, metaUri, 20, 5)
testReaddir(t, metaUri, 4*meta.DirBatchNum, 0)
testReaddir(t, metaUri, 4*meta.DirBatchNum, extra)
testReaddir(t, metaUri, 5*meta.DirBatchNum+extra, 0)
testReaddir(t, metaUri, 5*meta.DirBatchNum+extra, 2*meta.DirBatchNum)
testReaddir(t, metaUri, 2*batchNum, 0)
testReaddir(t, metaUri, 2*batchNum, extra)
testReaddir(t, metaUri, 4*batchNum, 0)
testReaddir(t, metaUri, 4*batchNum, 2*batchNum+extra)
}
}

Expand Down

0 comments on commit 551c183

Please sign in to comment.