Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add btree Ascend、Descend method and unitest. #257

Merged
merged 4 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,34 @@ func (db *DB) Watch() (chan *Event, error) {
return db.watchCh, nil
}

// Ascend calls handleFn for each key/value pair in the db in ascending order.
func (db *DB) Ascend(handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()

db.index.Ascend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
val, err := db.dataFiles.Read(pos)
if err != nil {
return false, nil
}
return handleFn(key, val)
})
}

// Descend calls handleFn for each key/value pair in the db in descending order.
func (db *DB) Descend(handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()

db.index.Descend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
val, err := db.dataFiles.Read(pos)
if err != nil {
return false, nil
}
return handleFn(key, val)
})
}

func checkOptions(options Options) error {
if options.DirPath == "" {
return errors.New("database dir path is empty")
Expand Down
98 changes: 98 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,101 @@ func TestDB_Concurrent_Put(t *testing.T) {
})
assert.Equal(t, count, db.index.Size())
}

func TestDB_Ascend(t *testing.T) {
// Create a test database instance
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}

// Insert some test data
data := []struct {
key []byte
value []byte
}{
{[]byte("key1"), []byte("value1")},
{[]byte("key2"), []byte("value2")},
{[]byte("key3"), []byte("value3")},
}

for _, d := range data {
if err := db.Put(d.key, d.value); err != nil {
t.Fatalf("Failed to put data: %v", err)
}
}

// Test Ascend function
var result []string
db.Ascend(func(k []byte, v []byte) (bool, error) {
result = append(result, string(k))
return true, nil // No error here
})

if err != nil {
t.Errorf("Ascend returned an error: %v", err)
}

expected := []string{"key1", "key2", "key3"}
if len(result) != len(expected) {
t.Errorf("Unexpected number of results. Expected: %v, Got: %v", expected, result)
} else {
for i, val := range expected {
if result[i] != val {
t.Errorf("Unexpected result at index %d. Expected: %v, Got: %v", i, val, result[i])
}
}
}
}

func TestDB_Descend(t *testing.T) {
// Create a test database instance
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}

// Insert some test data
data := []struct {
key []byte
value []byte
}{
{[]byte("key1"), []byte("value1")},
{[]byte("key2"), []byte("value2")},
{[]byte("key3"), []byte("value3")},
}

for _, d := range data {
if err := db.Put(d.key, d.value); err != nil {
t.Fatalf("Failed to put data: %v", err)
}
}

// Test Descend function
var result []string
db.Descend(func(k []byte, v []byte) (bool, error) {
result = append(result, string(k))
return true, nil
})

if err != nil {
t.Errorf("Descend returned an error: %v", err)
}

expected := []string{"key3", "key2", "key1"}
if len(result) != len(expected) {
t.Errorf("Unexpected number of results. Expected: %v, Got: %v", expected, result)
} else {
for i, val := range expected {
if result[i] != val {
t.Errorf("Unexpected result at index %d. Expected: %v, Got: %v", i, val, result[i])
}
}
}
}
26 changes: 26 additions & 0 deletions index/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,29 @@ func (mt *MemoryBTree) Delete(key []byte) (*wal.ChunkPosition, bool) {
func (mt *MemoryBTree) Size() int {
return mt.tree.Len()
}

func (mt *MemoryBTree) Ascend(handleFn func(key []byte, position *wal.ChunkPosition) (bool, error)) {
mt.lock.RLock()
defer mt.lock.RUnlock()

mt.tree.Ascend(func(i btree.Item) bool {
cont, err := handleFn(i.(*item).key, i.(*item).pos)
if err != nil {
return false
}
return cont
})
}

func (mt *MemoryBTree) Descend(handleFn func(key []byte, position *wal.ChunkPosition) (bool, error)) {
mt.lock.RLock()
defer mt.lock.RUnlock()

mt.tree.Descend(func(i btree.Item) bool {
cont, err := handleFn(i.(*item).key, i.(*item).pos)
if err != nil {
return false
}
return cont
})
}
125 changes: 125 additions & 0 deletions index/btree_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package index

import (
"bytes"
"fmt"
"testing"

"github.com/rosedblabs/wal"
)

func TestMemoryBTree_Put_Get(t *testing.T) {
mt := newBTree()
w, _ := wal.Open(wal.DefaultOptions)

key := []byte("testKey")
chunkPosition, _ := w.Write([]byte("some data 1"))

// Test Put
oldPos := mt.Put(key, chunkPosition)
if oldPos != nil {
t.Fatalf("expected nil, got %+v", oldPos)
}

// Test Get
gotPos := mt.Get(key)
if chunkPosition.ChunkOffset != gotPos.ChunkOffset {
t.Fatalf("expected %+v, got %+v", chunkPosition, gotPos)
}
}

func TestMemoryBTree_Delete(t *testing.T) {
mt := newBTree()
w, _ := wal.Open(wal.DefaultOptions)

key := []byte("testKey")
chunkPosition, _ := w.Write([]byte("some data 2"))

mt.Put(key, chunkPosition)

// Test Delete
delPos, ok := mt.Delete(key)
if !ok {
t.Fatal("expected item to be deleted")
}
if chunkPosition.ChunkOffset != delPos.ChunkOffset {
t.Fatalf("expected %+v, got %+v", chunkPosition, delPos)
}

// Ensure the key is deleted
if mt.Get(key) != nil {
t.Fatal("expected nil, got value")
}
}

func TestMemoryBTree_Size(t *testing.T) {
mt := newBTree()

if mt.Size() != 0 {
t.Fatalf("expected size to be 0, got %d", mt.Size())
}

w, _ := wal.Open(wal.DefaultOptions)
key := []byte("testKey")
chunkPosition, _ := w.Write([]byte("some data 3"))

mt.Put(key, chunkPosition)

if mt.Size() != 1 {
t.Fatalf("expected size to be 1, got %d", mt.Size())
}
}

func TestMemoryBTree_Ascend_Descend(t *testing.T) {
mt := newBTree()
w, _ := wal.Open(wal.DefaultOptions)

data := map[string][]byte{
"apple": []byte("some data 4"),
"banana": []byte("some data 5"),
"cherry": []byte("some data 6"),
}

positionMap := make(map[string]*wal.ChunkPosition)

for k, v := range data {
chunkPosition, _ := w.Write(v)
positionMap[k] = chunkPosition
mt.Put([]byte(k), chunkPosition)
}

// Test Ascend
prevKey := ""

// Define the Ascend handler function
ascendHandler := func(key []byte, pos *wal.ChunkPosition) (bool, error) {
if prevKey != "" && bytes.Compare([]byte(prevKey), key) >= 0 {
return false, fmt.Errorf("items are not in ascending order")
}
expectedPos := positionMap[string(key)]
if expectedPos.ChunkOffset != pos.ChunkOffset {
return false, fmt.Errorf("expected position %+v, got %+v", expectedPos, pos)
}
prevKey = string(key)
return true, nil
}

mt.Ascend(ascendHandler)

// Define the Descend handler function
descendHandler := func(key []byte, pos *wal.ChunkPosition) (bool, error) {
if bytes.Compare([]byte(prevKey), key) <= 0 {
return false, fmt.Errorf("items are not in descending order")
}
expectedPos := positionMap[string(key)]
if expectedPos.ChunkOffset != pos.ChunkOffset {
return false, fmt.Errorf("expected position %+v, got %+v", expectedPos, pos)
}
prevKey = string(key)
return true, nil
}

// Test Descend
prevKey = "zzzzzz"
mt.Descend(descendHandler)
}
8 changes: 8 additions & 0 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ type Indexer interface {

// Size represents the number of keys in the index.
Size() int

// Ascend iterates over items in ascending order and invokes the handler function for each item.
// If the handler function returns false, iteration stops.
Ascend(handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))

// Descend iterates over items in descending order and invokes the handler function for each item.
// If the handler function returns false, iteration stops.
Descend(handleFn func(key []byte, pos *wal.ChunkPosition) (bool, error))
}

type IndexerType = byte
Expand Down