Skip to content

Commit

Permalink
sstable merge now skips deleted entries
Browse files Browse the repository at this point in the history
  • Loading branch information
strogiyotec committed Jul 8, 2021
1 parent 7a9a36e commit 380d3ab
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 12 deletions.
29 changes: 19 additions & 10 deletions pkg/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type LsmTree struct {
rwm sync.RWMutex
gcMutex sync.RWMutex
sstableDir string //directory with sstables
log *vlog //vlog
memtable *Memtable //in memory table
Expand Down Expand Up @@ -45,15 +46,15 @@ func NewLsmTree(log *vlog, sstableDir string, memtable *Memtable, gc uint) *LsmT
//run job to periodically merge sstables
go func(tree *LsmTree, gc uint) {
fmt.Println("Gc thread was initialized")
for true {
//for true {
time.Sleep(time.Duration(gc) * time.Second)
fmt.Println("SSTABLE GC started")
err := lsm.Merge()
if err != nil {
fmt.Println("Gc encountered an error " + err.Error() + " Stop gc thread")
return
}
}
//}
}(lsm, gc)
return lsm
}
Expand All @@ -63,6 +64,13 @@ type TableWithIndex struct {
tablePath string
}


func (lsm *LsmTree) CompressVlog() error {
//TODO: hard coded value, let's make it configurable
size := 2
return lsm.log.RunGc(size, lsm)
}

//Check if given key was deleted
func (lsm *LsmTree) Exists(key []byte) []TableWithIndex {
var tableWithIndexes []TableWithIndex
Expand All @@ -85,6 +93,10 @@ func (lsm *LsmTree) Merge() error {
var newSstableFiles []string
index := 0
if len(lsm.sstables)%2 == 0 {
for _, sstable := range lsm.sstables {
_, err := os.Stat(sstable)
fmt.Printf("%v exists %v\n", sstable, !os.IsNotExist(err))
}
for index < len(lsm.sstables) {
firstReader, _ := os.Open(lsm.sstables[index])
secondReader, _ := os.Open(lsm.sstables[index+1])
Expand All @@ -101,15 +113,13 @@ func (lsm *LsmTree) Merge() error {
}
firstSStable.Close()
secondSStable.Close()
err = os.Remove(lsm.sstables[index])
if err != nil {
return err
}
err = os.Remove(lsm.sstables[index+1])
index += 2
}
for _, sstable := range lsm.sstables {
err := os.Remove(sstable)
if err != nil {
return err
}
index += 2
}
lsm.sstables = newSstableFiles
}
Expand Down Expand Up @@ -221,9 +231,8 @@ func (lsm *LsmTree) save(entry *TableEntry) error {
func (lsm *LsmTree) findInSStables(key []byte) (*SearchEntry, bool) {
var latestEntry *SearchEntry
for _, tablePath := range lsm.sstables {
//TODO: test case fails because it's null ?
reader, e := os.Open(tablePath)
if e != nil{
if e != nil {
panic(e)
}
sstable := ReadTable(reader, lsm.log)
Expand Down
15 changes: 14 additions & 1 deletion pkg/lsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestLsmTree_Merge(t *testing.T) {
amount := len(tree.sstables)
t.Logf("Lsm has %d files before merge", amount)
//wait for merge
time.Sleep(100 * time.Second)
time.Sleep(6 * time.Second)
sizeAfterGc := len(tree.sstables)
if sizeAfterGc != 2 {
t.Fatal("Amount of sstables after merge had to be decreased by 2 times")
Expand All @@ -166,6 +166,19 @@ func TestLsmTree_Merge(t *testing.T) {
t.Fatal("Wasn't able to find key after merge")
}
}
stat, err := os.Stat(tree.log.file)
if err != nil{
t.Fatal(err)
}
sizeBefore := uint32(stat.Size())
err = tree.CompressVlog()
if err != nil{
t.Fatal(err)
}
sizeAfter := tree.log.size
if sizeBefore >= sizeAfter{
t.Fatal("The size of vlog had to decrease after compression")
}
}

func TestLsmTree_Restore(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (table *SSTable) binarySearch(key []byte) (*SearchEntry, bool, int) {
if bytes.Compare(key, keyFromFile) == 0 {
return table.fetchFromVlog(tableReader), true, left
}
tableReader.readTimestamp()
tableReader.readValueOffset()
tableReader.readValueLength()
}
return nil, false, -1
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ type vlog struct {
}

func NewVlog(file string, checkpoint string) *vlog {
stat, err := os.Stat(file)
if err != nil {
panic(err)
}
return &vlog{
file: file,
checkpoint: checkpoint,
size: 0,
size: uint32(stat.Size()),
}
}

Expand Down Expand Up @@ -107,6 +111,14 @@ func (log *vlog) RunGc(entries int, lsm *LsmTree) error {
readBytesSize += int64(uint32Size + keyLength + uint32Size + valueLength)
counter++
}
//TODO: so we skipped deleted entries
//now we have to remove the beginning of the file
//starting from readBytesSize position
info, err := os.Stat(log.file)
if err != nil{
return err
}
log.size = uint32(info.Size())
return nil
}

Expand Down

0 comments on commit 380d3ab

Please sign in to comment.