diff --git a/db.go b/db.go index 2cbc349..38fabe2 100644 --- a/db.go +++ b/db.go @@ -15,13 +15,12 @@ import ( "github.com/sirupsen/logrus" "io/ioutil" "os" - "sort" "time" ) type DB struct { - memory *Engine - wal *os.File + memDB *MemDB + wal *os.File dir string walPath string @@ -43,7 +42,7 @@ func Open(dirPath string) (*DB, error) { } wal = wa - db := &DB{memory: NewEngine(), wal: wal, dir: dirPath, walPath: walPath} + db := &DB{memDB: NewEngine(), wal: wal, dir: dirPath, walPath: walPath} err = db.recoveryDB() if err != nil { logrus.Error(err) @@ -97,7 +96,7 @@ func (db *DB) Close() { } func (db *DB) Get(key string) (Value, error) { - value, err := db.memory.Get(key) + value, err := db.memDB.Get(key) // 如果没有得到value if err == nil { return value, err @@ -114,15 +113,15 @@ func (db *DB) Put(key, value string) error { func (db *DB) put(kv KeyValue) error { // TODO: 改进写日志的方式 db.writeAheadLog(kv) - db.memory.Put(kv) + db.memDB.Put(kv) // 这个阈值用常量 MaxMemSize表示, MaxMemSize定义在配置文件中, 后续改为可配置的量 - if db.memory.memSize >= config.MaxMemSize { + if db.memDB.memSize >= config.MaxMemSize { // 刷到磁盘 db.flush() // TODO: 之后实现异步的flush操作 - db.memory.memStore = make(map[string]Value) + db.memDB.memStore = NewSkipList() // ! 这个也要重置 - db.memory.memSize = 0 + db.memDB.memSize = 0 // 清空 wal.log 文件内容, 用直接创建的方式 // Create creates or truncates the named file. If the file already exists, it is truncated. wal, err := os.Create(db.walPath) @@ -140,14 +139,15 @@ func (db *DB) put(kv KeyValue) error { func (db *DB) Delete(key string) error { delTime := time.Now().UnixNano() / 1e6 db.writeAheadLog(KeyValue{Key: key, Val: Value{"", delTime, DEL}}) - return db.memory.Delete(key, delTime) + return db.memDB.Delete(key, delTime) } +// 暂时不需要区间扫描 // 扫描一个区间的key, 得到key value的结果slice // 如果value为deleted, 那么不添加 -func (db *DB) Scan(startKey, endKey string) ([]KeyValue, error) { - return db.memory.Scan(startKey, endKey) -} +//func (db *DB) Scan(startKey, endKey string) ([]KeyValue, error) { +// return db.memDB.Scan(startKey, endKey) +//} func (db *DB) writeAheadLog(kv KeyValue) error { write := bufio.NewWriter(db.wal) @@ -172,20 +172,13 @@ func (db *DB) flush() error { fileBytes := make([]byte, 0) - // 有序地flush - keys := make([]string, 0, len(db.memory.memStore)) - for key, _ := range db.memory.memStore { - if key == "" { - continue + // 使用迭代器, 有序(从小到大)地编码 + it := db.memDB.memStore.NewIterator() + for { + kv, ok := it.Next() + if !ok { + break } - keys = append(keys, key) - } - sort.Strings(keys) - - for _, key := range keys { - // 编码, [varint_key, key, varint_value, value, Timestamp, Op] - kv := KeyValue{Key: key, Val: db.memory.memStore[key]} - //logrus.Info(kv.Key) kvBytes := kv.Encode() fileBytes = append(fileBytes, kvBytes...) } diff --git a/engine.go b/memdb.go similarity index 70% rename from engine.go rename to memdb.go index 3340028..4592d06 100644 --- a/engine.go +++ b/memdb.go @@ -1,12 +1,8 @@ package gokv -import ( - "sort" -) - -// the memory Engine -type Engine struct { - memStore map[string]Value +// the memDB MemDB +type MemDB struct { + memStore *SkipList memSize int // 记录mem存储的容量, put和del的时候进行计算 } @@ -92,33 +88,34 @@ func VarIntDecode(bytes []byte) (int, []byte) { return val, bytes[i + 1:] } -func NewEngine() *Engine { - return &Engine{memStore: make(map[string]Value), memSize: 0} +func NewEngine() *MemDB { + return &MemDB{memStore: NewSkipList(), memSize: 0} } -func (e *Engine) Get(key string) (Value, error) { - m, ok := e.memStore[key] +func (e *MemDB) Get(key string) (Value, error) { + m, ok := e.memStore.Get(key) if !ok { return Value{}, GetEmptyError } - return m, nil + return m.Val, nil } // 这个Put只是在内存上的操作, 不涉及磁盘的操作. -func (e *Engine) Put(kv KeyValue) error { - e.memStore[kv.Key] = kv.Val +func (e *MemDB) Put(kv KeyValue) error { + e.memStore.Put(KeyValue{Key: kv.Key, Val: kv.Val}) // TODO: 需要调整一下, 加上8字节的时间戳和1字节的Op e.memSize += len(kv.Key) + len(kv.Val.Value) + 8 + 1 return nil } // 删除的元素的value用特殊的字符串来代替 -func (e *Engine) Delete(key string, delTime int64) error { - val, ok := e.memStore[key] +func (e *MemDB) Delete(key string, delTime int64) error { + kv, ok := e.memStore.Get(key) + val := kv.Val if ok { // 这里调整大小只需要调整字符串的大小 e.memSize = e.memSize - len(val.Value) + len(deleted) - e.memStore[key] = Value{Value: deleted, Timestamp: delTime, Op: DEL} + e.memStore.Put(KeyValue{Key: key, Val: Value{Value: deleted, Timestamp: delTime, Op: DEL}}) } else { err := e.Put(KeyValue{ Key: key, @@ -133,31 +130,32 @@ func (e *Engine) Delete(key string, delTime int64) error { return nil } +// 先注释掉, 这部分暂时不需要 // 扫描一个区间的key, 得到key value的结果slice // 如果value为deleted, 那么不添加 -func (e *Engine) Scan(startKey, endKey string) ([]KeyValue, error) { - keys := make([]string, len(e.memStore)) - i := 0 - for k := range e.memStore { - keys[i] = k - i += 1 - } - // 排序 - sort.Slice(keys, func(i, j int) bool { - return keys[i] < keys[j] - }) - kvs := make([]KeyValue, 0) - for _, k := range keys { - if k >= startKey && k <= endKey { - value := e.memStore[k] - if value.Value == deleted { // 如果已删除 - continue - } - kvs = append(kvs, KeyValue{Key: k, Val: value}) - } - if k > endKey { - break - } - } - return kvs, nil -} \ No newline at end of file +//func (e *MemDB) Scan(startKey, endKey string) ([]KeyValue, error) { +// keys := make([]string, e.memStore.Len()) +// i := 0 +// for k := range e.memStore { +// keys[i] = k +// i += 1 +// } +// // 排序 +// sort.Slice(keys, func(i, j int) bool { +// return keys[i] < keys[j] +// }) +// kvs := make([]KeyValue, 0) +// for _, k := range keys { +// if k >= startKey && k <= endKey { +// value := e.memStore[k] +// if value.Value == deleted { // 如果已删除 +// continue +// } +// kvs = append(kvs, KeyValue{Key: k, Val: value}) +// } +// if k > endKey { +// break +// } +// } +// return kvs, nil +//} \ No newline at end of file diff --git a/readme.md b/readme.md index 62e5744..201207c 100644 --- a/readme.md +++ b/readme.md @@ -33,7 +33,7 @@ A persistent, LSM tree structured key value database engine implemented by go la - [x] 实现完整的key-value结构, 包括时间戳 - [x] 实现基于WAL的故障恢复(启动时从内存恢复) - [x] 设计实现varint可变长度编码 -- [ ] SkipList版 Memstore +- [x] SkipList版 Memstore ## 全部待实现的feature @@ -46,7 +46,7 @@ A persistent, LSM tree structured key value database engine implemented by go la - [x] 实现WAL存储 (暂不实现故障恢复功能) - [x] 实现完整的key-value结构, 包括时间戳 - [x] 读取一个配置文件 -- [ ] SkipList版 Memstore +- [x] SkipList版 Memstore - [ ] 支持并发的 SkipList - [x] 有序SSTable结构 - [ ] 集成Bloom Filter diff --git a/skiplist.go b/skiplist.go index ebadc93..106f052 100644 --- a/skiplist.go +++ b/skiplist.go @@ -25,13 +25,14 @@ type SkipListNode struct { // 一个完整的跳跃表结构 type SkipList struct { maxLevel int // 最大的层级 + len int // 元素的个数 head, tail *SkipListNode // 跳跃表的头和尾 } // 这个maxLevel有一个默认值. // 这个值应该是可以配置的 func NewSkipList() *SkipList { - sl := &SkipList{maxLevel: 3, head: &SkipListNode{key: "head"}, tail: &SkipListNode{key: "tail"}} + sl := &SkipList{maxLevel: 3, len: 0, head: &SkipListNode{key: "head"}, tail: &SkipListNode{key: "tail"}} // 初始化 head和tail的指针 for i := 0; i <= sl.maxLevel; i++ { sl.head.pointers = append(sl.head.pointers, sl.tail) @@ -46,6 +47,7 @@ type pNode struct { // 插入一个KeyValue func (sl *SkipList) Put(kv KeyValue) { + sl.len++ key := kv.Key p := sl.head level := sl.maxLevel @@ -145,6 +147,11 @@ func (sl *SkipList) FindGE(key string) KeyValue { return KeyValue{} } +// 元素的个数 +func (sl *SkipList) Len() int { + return sl.len +} + // 获取一个随机的高度值 func (sl *SkipList) randHeight() int { // 设置随机种子