Skip to content

Commit

Permalink
start implementing vlog compression
Browse files Browse the repository at this point in the history
  • Loading branch information
strogiyotec committed Jul 5, 2021
1 parent 91bdf60 commit f198f56
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 39 deletions.
36 changes: 22 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,24 @@ paper

# Description

Dead simple lsm implementation which stores values in the vlog which
decreases write amplification of lsm tree during merging
Dead simple lsm implementation which stores values in the vlog which decreases
write amplification of lsm tree during merging

# Things that are implemented

1. [X] SSTable
- [X] Create sstable
- [X] Read from sstable
2. [X] Memtable(in memory redblack tree that stores the data and flushes it once memory is full)
2. [X] Memtable(in memory redblack tree that stores the data and flushes it once
memory is full)
- [X] Put
- [X] Delete
- [X] Get
3. [X] Lsm tree
- [X] Put
- [X] Put
- [X] Get
- [X] Delete
4. [X] Http interface
4. [X] Http interface
- [X] Http Get
- [X] Http Put
- [X] Http Delete
Expand All @@ -31,37 +32,44 @@ decreases write amplification of lsm tree during merging
- [X] Store al values from head to tail into the memtable during recovery
6. [X] Merge sstable files
7. [X] Cli interface
- [X] specify sstable path
- [X] specify sstable path
- [X] specify vlog path
- [X] specify checkpoint path
- [X] specify memtable size
8. [ ] Cleanup
8. [ ] Reclaim space
- [X] Merge sstables
- [ ] Garbage collect vlog

## Install
In order to install the binary run `go get github.com/strogiyotec/go-wiskey` , it will be installed in `$HOME/go/bin/wiskey`

In order to install the binary run `go get github.com/strogiyotec/go-wiskey` ,
it will be installed in `$HOME/go/bin/wiskey`

## Usage

## Usage
In order to start the app run
`wiskey -s ../go-wiskey/sstable -v vlog -c checkpoint -m 20`
In order to start the app run
`wiskey -s ../go-wiskey/sstable -v vlog -c checkpoint -m 20`
where :

1. `-s` - directory with sstables
2. `-v` - path to vlog file(vlog doesn't have to exist)
3. `-c` - path to checkpoint (checkpoint doesn't have to exist)
4. `-m` - memtable size in bytes(the size of in memory red black tree that keeps keys , when full will flush this tree to sstable)
4. `-m` - memtable size in bytes(the size of in memory red black tree that keeps
keys , when full will flush this tree to sstable)

It will start an http server

### Http server

In order to GET/UPDATE/DELETE you can use http endpoints
1. Save key value - `curl -X POST -H "Content-Type: application/json" -d '{"value":"Developer"}' http://localhost:8080/anita` it will save value `Developer` with a key `anita`

1. Save key value
- `curl -X POST -H "Content-Type: application/json" -d '{"value":"Developer"}' http://localhost:8080/anita`
it will save value `Developer` with a key `anita`
2. Get by key - `curl -i localhost:8080/fetch/anita`
3. Delete by key - `curl -i localhost:8080/fetch/anita`


### How it works

Here is the general image on how the storage works
![storage](https://raw.githubusercontent.com/strogiyotec/go-wiskey/master/images/Architecture.jpg)
22 changes: 22 additions & 0 deletions pkg/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ func NewLsmTree(log *vlog, sstableDir string, memtable *Memtable, gc uint) *LsmT
return lsm
}

type TableWithIndex struct {
index int
tablePath string
}

//Check if given key was deleted
func (lsm *LsmTree) Exists(key []byte) []TableWithIndex {
lsm.rwm.RLock()
defer lsm.rwm.RUnlock()
var tableWithIndexes []TableWithIndex
for _, tablePath := range lsm.sstables {
reader, _ := os.Open(tablePath)
sstable := ReadTable(reader, lsm.log)
found, index := sstable.KeyAtIndex(key)
if found {
tableWithIndexes = append(tableWithIndexes, TableWithIndex{index: index, tablePath: tablePath})
}
}
return tableWithIndexes
}

//Merge sstables, the final result is the sstable files with amount decreased by x2
func (lsm *LsmTree) Merge() error {
lsm.rwm.Lock()
Expand Down Expand Up @@ -197,6 +218,7 @@ func (lsm *LsmTree) save(entry *TableEntry) error {
}
return nil
}

func (lsm *LsmTree) findInSStables(key []byte) (*SearchEntry, bool) {
var latestEntry *SearchEntry
for _, tablePath := range lsm.sstables {
Expand Down
27 changes: 16 additions & 11 deletions pkg/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type SSTable struct {
log *vlog
}


//Constructor
func ReadTable(reader *os.File, log *vlog) *SSTable {
stats, _ := reader.Stat()
Expand Down Expand Up @@ -56,14 +55,20 @@ func (table *SSTable) Get(key []byte) (*SearchEntry, bool) {
if compare > 0 {
return nil, false
}
return table.binarySearch(key)
search, found, _ := table.binarySearch(key)
return search, found
}

func (table *SSTable) KeyAtIndex(key []byte) (bool, int) {
_, found, index := table.binarySearch(key)
return found, index
}

//Tries to find given key in the sstable
//Returns value byte array or nil if not found
//timestamp of this value or 0 if not found
//bool true if found,false otherwise
func (table *SSTable) binarySearch(key []byte) (*SearchEntry, bool) {
//Returns 1. value byte array or nil if not found
//2. bool true if found,false otherwise
//3. at which index this key was found
func (table *SSTable) binarySearch(key []byte) (*SearchEntry, bool, int) {
left := 0
right := len(table.indexes) - 1
for left < right {
Expand All @@ -76,7 +81,7 @@ func (table *SSTable) binarySearch(key []byte) (*SearchEntry, bool) {
keyBuffer := tableReader.readKey(fileKeyLength)
compare := bytes.Compare(key, keyBuffer)
if compare == 0 {
return table.fetchFromVlog(tableReader), true
return table.fetchFromVlog(tableReader), true, middle
} else if compare > 0 {
left = middle + 1
} else {
Expand All @@ -87,12 +92,12 @@ func (table *SSTable) binarySearch(key []byte) (*SearchEntry, bool) {
tableReader := NewReader(table.reader, int64(index.Offset))
for tableReader.offset != index.BlockLength {
keyLength := tableReader.readKeyLength()
fileKey := tableReader.readKey(keyLength)
if bytes.Compare(key, fileKey) == 0 {
return table.fetchFromVlog(tableReader), true
keyFromFile := tableReader.readKey(keyLength)
if bytes.Compare(key, keyFromFile) == 0 {
return table.fetchFromVlog(tableReader), true, left
}
}
return nil, false
return nil, false, -1
}

func (table *SSTable) fetchFromVlog(tableReader *SSTableReader) *SearchEntry {
Expand Down
50 changes: 36 additions & 14 deletions pkg/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ func (log *vlog) Get(meta ValueMeta) (*TableEntry, error) {
return &TableEntry{key: key, value: value}, nil
}

//Run garbage collector
//Read tailLength entries from the start of vlog
//Check if they were deleted, if no append them to head
//Params: entries - how many entries to read from gc
//Need to implement sstable merge first
/*func (log *vlog) Gc(entries uint, sstable *SSTable) error {
func (log *vlog) RunGc(entries int, lsm *LsmTree) error {
type toAppend struct {
keyLength uint32
valueLength uint32
key []byte
value []byte
tableWithIndexes []TableWithIndex
}
file, err := os.OpenFile(log.file, os.O_RDONLY|os.O_APPEND, 0666)
if err != nil {
return err
Expand All @@ -67,21 +69,41 @@ func (log *vlog) Get(meta ValueMeta) (*TableEntry, error) {
if err != nil {
return err
}
totalSize := stat.Size()
currentSize := int64(0)
for currentSize < totalSize {
logFileSize := stat.Size()
if logFileSize == 0 {
return nil
}
readBytesSize := int64(0) //how many bytes were read from file
counter := 0
//list of entries that are still exist
var stillExist []toAppend
for readBytesSize < logFileSize && counter < entries {
keyLengthBuffer := make([]byte, uint32Size)
//read key length
file.Read(keyLengthBuffer)
_, _ = file.Read(keyLengthBuffer)
keyLength := binary.BigEndian.Uint32(keyLengthBuffer)
//read value length
valueLengthBuffer := make([]byte, uint32Size)
//read key length
file.Read(valueLengthBuffer)
_, _ = file.Read(valueLengthBuffer)
valueLength := binary.BigEndian.Uint32(valueLengthBuffer)
keyBuffer := make([]byte, keyLength)
valueBuffer := make([]byte, valueLength)
_, _ = file.Read(keyBuffer)
_, _ = file.Read(valueBuffer)
tableWithIndexes := lsm.Exists(keyBuffer)
if len(tableWithIndexes) != 0 {
stillExist = append(stillExist, toAppend{keyLength: keyLength, valueLength: valueLength, key: keyBuffer, value: valueBuffer, tableWithIndexes: tableWithIndexes})
}
readBytesSize += int64(uint32Size + keyLength + uint32Size + valueLength)
counter++
}
return binary.BigEndian.Uint32(keyLengthBuffer)
//TODO : So we have a list of entries that still exist in lsm tree
// 1. we need to append them to the end of vlog
// 2. truncate the beginning of vlog by readBytesSize
// 3. Update sstable path to vlog using new offset that was appended to end of file
return nil
}
*/

//Restore vlog to given memtable
func (log *vlog) RestoreTo(headOffset uint32, memtable *Memtable) error {
reader, err := os.OpenFile(log.file, os.O_RDONLY|os.O_CREATE, 0666)
Expand Down

0 comments on commit f198f56

Please sign in to comment.