Skip to content

Commit

Permalink
v0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
JameyWoo committed Jan 22, 2021
1 parent f08cb4d commit 62c90ce
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 72 deletions.
45 changes: 19 additions & 26 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ import (
"github.com/sirupsen/logrus"
"io/ioutil"
"os"
"sort"
"time"
)

type DB struct {
memory *Engine
wal *os.File
memDB *MemDB
wal *os.File

dir string
walPath string
Expand All @@ -43,7 +42,7 @@ func Open(dirPath string) (*DB, error) {
}
wal = wa

db := &DB{memory: NewEngine(), wal: wal, dir: dirPath, walPath: walPath}
db := &DB{memDB: NewEngine(), wal: wal, dir: dirPath, walPath: walPath}
err = db.recoveryDB()
if err != nil {
logrus.Error(err)
Expand Down Expand Up @@ -97,7 +96,7 @@ func (db *DB) Close() {
}

func (db *DB) Get(key string) (Value, error) {
value, err := db.memory.Get(key)
value, err := db.memDB.Get(key)
// 如果没有得到value
if err == nil {
return value, err
Expand All @@ -114,15 +113,15 @@ func (db *DB) Put(key, value string) error {
func (db *DB) put(kv KeyValue) error {
// TODO: 改进写日志的方式
db.writeAheadLog(kv)
db.memory.Put(kv)
db.memDB.Put(kv)
// 这个阈值用常量 MaxMemSize表示, MaxMemSize定义在配置文件中, 后续改为可配置的量
if db.memory.memSize >= config.MaxMemSize {
if db.memDB.memSize >= config.MaxMemSize {
// 刷到磁盘
db.flush()
// TODO: 之后实现异步的flush操作
db.memory.memStore = make(map[string]Value)
db.memDB.memStore = NewSkipList()
// ! 这个也要重置
db.memory.memSize = 0
db.memDB.memSize = 0
// 清空 wal.log 文件内容, 用直接创建的方式
// Create creates or truncates the named file. If the file already exists, it is truncated.
wal, err := os.Create(db.walPath)
Expand All @@ -140,14 +139,15 @@ func (db *DB) put(kv KeyValue) error {
func (db *DB) Delete(key string) error {
delTime := time.Now().UnixNano() / 1e6
db.writeAheadLog(KeyValue{Key: key, Val: Value{"", delTime, DEL}})
return db.memory.Delete(key, delTime)
return db.memDB.Delete(key, delTime)
}

// 暂时不需要区间扫描
// 扫描一个区间的key, 得到key value的结果slice
// 如果value为deleted, 那么不添加
func (db *DB) Scan(startKey, endKey string) ([]KeyValue, error) {
return db.memory.Scan(startKey, endKey)
}
//func (db *DB) Scan(startKey, endKey string) ([]KeyValue, error) {
// return db.memDB.Scan(startKey, endKey)
//}

func (db *DB) writeAheadLog(kv KeyValue) error {
write := bufio.NewWriter(db.wal)
Expand All @@ -172,20 +172,13 @@ func (db *DB) flush() error {

fileBytes := make([]byte, 0)

// 有序地flush
keys := make([]string, 0, len(db.memory.memStore))
for key, _ := range db.memory.memStore {
if key == "" {
continue
// 使用迭代器, 有序(从小到大)地编码
it := db.memDB.memStore.NewIterator()
for {
kv, ok := it.Next()
if !ok {
break
}
keys = append(keys, key)
}
sort.Strings(keys)

for _, key := range keys {
// 编码, [varint_key, key, varint_value, value, Timestamp, Op]
kv := KeyValue{Key: key, Val: db.memory.memStore[key]}
//logrus.Info(kv.Key)
kvBytes := kv.Encode()
fileBytes = append(fileBytes, kvBytes...)
}
Expand Down
84 changes: 41 additions & 43 deletions engine.go → memdb.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package gokv

import (
"sort"
)

// the memory Engine
type Engine struct {
memStore map[string]Value
// the memDB MemDB
type MemDB struct {
memStore *SkipList
memSize int // 记录mem存储的容量, put和del的时候进行计算
}

Expand Down Expand Up @@ -92,33 +88,34 @@ func VarIntDecode(bytes []byte) (int, []byte) {
return val, bytes[i + 1:]
}

func NewEngine() *Engine {
return &Engine{memStore: make(map[string]Value), memSize: 0}
func NewEngine() *MemDB {
return &MemDB{memStore: NewSkipList(), memSize: 0}
}

func (e *Engine) Get(key string) (Value, error) {
m, ok := e.memStore[key]
func (e *MemDB) Get(key string) (Value, error) {
m, ok := e.memStore.Get(key)
if !ok {
return Value{}, GetEmptyError
}
return m, nil
return m.Val, nil
}

// 这个Put只是在内存上的操作, 不涉及磁盘的操作.
func (e *Engine) Put(kv KeyValue) error {
e.memStore[kv.Key] = kv.Val
func (e *MemDB) Put(kv KeyValue) error {
e.memStore.Put(KeyValue{Key: kv.Key, Val: kv.Val})
// TODO: 需要调整一下, 加上8字节的时间戳和1字节的Op
e.memSize += len(kv.Key) + len(kv.Val.Value) + 8 + 1
return nil
}

// 删除的元素的value用特殊的字符串来代替
func (e *Engine) Delete(key string, delTime int64) error {
val, ok := e.memStore[key]
func (e *MemDB) Delete(key string, delTime int64) error {
kv, ok := e.memStore.Get(key)
val := kv.Val
if ok {
// 这里调整大小只需要调整字符串的大小
e.memSize = e.memSize - len(val.Value) + len(deleted)
e.memStore[key] = Value{Value: deleted, Timestamp: delTime, Op: DEL}
e.memStore.Put(KeyValue{Key: key, Val: Value{Value: deleted, Timestamp: delTime, Op: DEL}})
} else {
err := e.Put(KeyValue{
Key: key,
Expand All @@ -133,31 +130,32 @@ func (e *Engine) Delete(key string, delTime int64) error {
return nil
}

// 先注释掉, 这部分暂时不需要
// 扫描一个区间的key, 得到key value的结果slice
// 如果value为deleted, 那么不添加
func (e *Engine) Scan(startKey, endKey string) ([]KeyValue, error) {
keys := make([]string, len(e.memStore))
i := 0
for k := range e.memStore {
keys[i] = k
i += 1
}
// 排序
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
kvs := make([]KeyValue, 0)
for _, k := range keys {
if k >= startKey && k <= endKey {
value := e.memStore[k]
if value.Value == deleted { // 如果已删除
continue
}
kvs = append(kvs, KeyValue{Key: k, Val: value})
}
if k > endKey {
break
}
}
return kvs, nil
}
//func (e *MemDB) Scan(startKey, endKey string) ([]KeyValue, error) {
// keys := make([]string, e.memStore.Len())
// i := 0
// for k := range e.memStore {
// keys[i] = k
// i += 1
// }
// // 排序
// sort.Slice(keys, func(i, j int) bool {
// return keys[i] < keys[j]
// })
// kvs := make([]KeyValue, 0)
// for _, k := range keys {
// if k >= startKey && k <= endKey {
// value := e.memStore[k]
// if value.Value == deleted { // 如果已删除
// continue
// }
// kvs = append(kvs, KeyValue{Key: k, Val: value})
// }
// if k > endKey {
// break
// }
// }
// return kvs, nil
//}
4 changes: 2 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ A persistent, LSM tree structured key value database engine implemented by go la
- [x] 实现完整的key-value结构, 包括时间戳
- [x] 实现基于WAL的故障恢复(启动时从内存恢复)
- [x] 设计实现varint可变长度编码
- [ ] SkipList版 Memstore
- [x] SkipList版 Memstore


## 全部待实现的feature
Expand All @@ -46,7 +46,7 @@ A persistent, LSM tree structured key value database engine implemented by go la
- [x] 实现WAL存储 (暂不实现故障恢复功能)
- [x] 实现完整的key-value结构, 包括时间戳
- [x] 读取一个配置文件
- [ ] SkipList版 Memstore
- [x] SkipList版 Memstore
- [ ] 支持并发的 SkipList
- [x] 有序SSTable结构
- [ ] 集成Bloom Filter
Expand Down
9 changes: 8 additions & 1 deletion skiplist.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ type SkipListNode struct {
// 一个完整的跳跃表结构
type SkipList struct {
maxLevel int // 最大的层级
len int // 元素的个数
head, tail *SkipListNode // 跳跃表的头和尾
}

// 这个maxLevel有一个默认值.
// 这个值应该是可以配置的
func NewSkipList() *SkipList {
sl := &SkipList{maxLevel: 3, head: &SkipListNode{key: "head"}, tail: &SkipListNode{key: "tail"}}
sl := &SkipList{maxLevel: 3, len: 0, head: &SkipListNode{key: "head"}, tail: &SkipListNode{key: "tail"}}
// 初始化 head和tail的指针
for i := 0; i <= sl.maxLevel; i++ {
sl.head.pointers = append(sl.head.pointers, sl.tail)
Expand All @@ -46,6 +47,7 @@ type pNode struct {

// 插入一个KeyValue
func (sl *SkipList) Put(kv KeyValue) {
sl.len++
key := kv.Key
p := sl.head
level := sl.maxLevel
Expand Down Expand Up @@ -145,6 +147,11 @@ func (sl *SkipList) FindGE(key string) KeyValue {
return KeyValue{}
}

// 元素的个数
func (sl *SkipList) Len() int {
return sl.len
}

// 获取一个随机的高度值
func (sl *SkipList) randHeight() int {
// 设置随机种子
Expand Down

0 comments on commit 62c90ce

Please sign in to comment.