diff --git a/common/types.pb.go b/common/types.pb.go index c301d28..047b7ae 100644 --- a/common/types.pb.go +++ b/common/types.pb.go @@ -1,6 +1,5 @@ -// Code generated by protoc-gen-gogo. +// Code generated by protoc-gen-gogo. DO NOT EDIT. // source: common/types.proto -// DO NOT EDIT! /* Package common is a generated protocol buffer package. diff --git a/db/c_level_db.go b/db/c_level_db.go index a591378..e3e6c1d 100644 --- a/db/c_level_db.go +++ b/db/c_level_db.go @@ -171,6 +171,14 @@ func (mBatch *cLevelDBBatch) Write() { } } +// Implements Batch. +func (mBatch *cLevelDBBatch) WriteSync() { + err := mBatch.db.db.Write(mBatch.db.woSync, mBatch.batch) + if err != nil { + panic(err) + } +} + //---------------------------------------- // Iterator // NOTE This is almost identical to db/go_level_db.Iterator diff --git a/db/common_test.go b/db/common_test.go index 1b0f004..5afec28 100644 --- a/db/common_test.go +++ b/db/common_test.go @@ -2,6 +2,7 @@ package db import ( "fmt" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -9,6 +10,14 @@ import ( cmn "github.com/tendermint/tmlibs/common" ) +//---------------------------------------- +// Helper functions. + +func checkValue(t *testing.T, db DB, key []byte, valueWanted []byte) { + valueGot := db.Get(key) + assert.Equal(t, valueWanted, valueGot) +} + func checkValid(t *testing.T, itr Iterator, expected bool) { valid := itr.Valid() require.Equal(t, expected, valid) @@ -46,110 +55,131 @@ func checkValuePanics(t *testing.T, itr Iterator) { } func newTempDB(t *testing.T, backend DBBackendType) (db DB) { - dir, dirname := cmn.Tempdir("test_go_iterator") + dir, dirname := cmn.Tempdir("db_common_test") db = NewDB("testdb", backend, dirname) dir.Close() return db } -func TestDBIteratorSingleKey(t *testing.T) { - for backend, _ := range backends { - t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { - db := newTempDB(t, backend) - db.SetSync(bz("1"), bz("value_1")) - itr := db.Iterator(nil, nil) +//---------------------------------------- +// mockDB - checkValid(t, itr, true) - checkNext(t, itr, false) - checkValid(t, itr, false) - checkNextPanics(t, itr) +// NOTE: not actually goroutine safe. +// If you want something goroutine safe, maybe you just want a MemDB. +type mockDB struct { + mtx sync.Mutex + calls map[string]int +} - // Once invalid... - checkInvalid(t, itr) - }) +func newMockDB() *mockDB { + return &mockDB{ + calls: make(map[string]int), } } -func TestDBIteratorTwoKeys(t *testing.T) { - for backend, _ := range backends { - t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { - db := newTempDB(t, backend) - db.SetSync(bz("1"), bz("value_1")) - db.SetSync(bz("2"), bz("value_1")) +func (mdb *mockDB) Mutex() *sync.Mutex { + return &(mdb.mtx) +} - { // Fail by calling Next too much - itr := db.Iterator(nil, nil) - checkValid(t, itr, true) +func (mdb *mockDB) Get([]byte) []byte { + mdb.calls["Get"] += 1 + return nil +} - checkNext(t, itr, true) - checkValid(t, itr, true) +func (mdb *mockDB) Has([]byte) bool { + mdb.calls["Has"] += 1 + return false +} - checkNext(t, itr, false) - checkValid(t, itr, false) +func (mdb *mockDB) Set([]byte, []byte) { + mdb.calls["Set"] += 1 +} - checkNextPanics(t, itr) +func (mdb *mockDB) SetSync([]byte, []byte) { + mdb.calls["SetSync"] += 1 +} - // Once invalid... - checkInvalid(t, itr) - } - }) - } +func (mdb *mockDB) SetNoLock([]byte, []byte) { + mdb.calls["SetNoLock"] += 1 } -func TestDBIteratorMany(t *testing.T) { - for backend, _ := range backends { - t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { - db := newTempDB(t, backend) +func (mdb *mockDB) SetNoLockSync([]byte, []byte) { + mdb.calls["SetNoLockSync"] += 1 +} - keys := make([][]byte, 100) - for i := 0; i < 100; i++ { - keys[i] = []byte{byte(i)} - } +func (mdb *mockDB) Delete([]byte, []byte) { + mdb.calls["Delete"] += 1 +} - value := []byte{5} - for _, k := range keys { - db.Set(k, value) - } +func (mdb *mockDB) DeleteSync([]byte, []byte) { + mdb.calls["DeleteSync"] += 1 +} - itr := db.Iterator(nil, nil) - defer itr.Close() - for ; itr.Valid(); itr.Next() { - assert.Equal(t, db.Get(itr.Key()), itr.Value()) - } - }) - } +func (mdb *mockDB) DeleteNoLock([]byte) { + mdb.calls["DeleteNoLock"] += 1 } -func TestDBIteratorEmpty(t *testing.T) { - for backend, _ := range backends { - t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { - db := newTempDB(t, backend) - itr := db.Iterator(nil, nil) +func (mdb *mockDB) DeleteNoLockSync([]byte) { + mdb.calls["DeleteNoLockSync"] += 1 +} - checkInvalid(t, itr) - }) - } +func (mdb *mockDB) Iterator(start, end []byte) Iterator { + mdb.calls["Iterator"] += 1 + return &mockIterator{} } -func TestDBIteratorEmptyBeginAfter(t *testing.T) { - for backend, _ := range backends { - t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { - db := newTempDB(t, backend) - itr := db.Iterator(bz("1"), nil) +func (mdb *mockDB) ReverseIterator(start, end []byte) Iterator { + mdb.calls["ReverseIterator"] += 1 + return &mockIterator{} +} - checkInvalid(t, itr) - }) - } +func (mdb *mockDB) Close() { + mdb.calls["Close"] += 1 } -func TestDBIteratorNonemptyBeginAfter(t *testing.T) { - for backend, _ := range backends { - t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { - db := newTempDB(t, backend) - db.SetSync(bz("1"), bz("value_1")) - itr := db.Iterator(bz("2"), nil) +func (mdb *mockDB) NewBatch() Batch { + mdb.calls["NewBatch"] += 1 + return &memBatch{db: mdb} +} + +func (mdb *mockDB) Print() { + mdb.calls["Print"] += 1 + fmt.Sprintf("mockDB{%v}", mdb.Stats()) +} - checkInvalid(t, itr) - }) +func (mdb *mockDB) Stats() map[string]string { + mdb.calls["Stats"] += 1 + + res := make(map[string]string) + for key, count := range mdb.calls { + res[key] = fmt.Sprintf("%d", count) } + return res +} + +//---------------------------------------- +// mockIterator + +type mockIterator struct{} + +func (_ mockIterator) Domain() (start []byte, end []byte) { + return nil, nil +} + +func (_ mockIterator) Valid() bool { + return false +} + +func (_ mockIterator) Next() { +} + +func (_ mockIterator) Key() []byte { + return nil +} + +func (_ mockIterator) Value() []byte { + return nil +} + +func (_ mockIterator) Close() { } diff --git a/db/db_test.go b/db/db_test.go new file mode 100644 index 0000000..8884cea --- /dev/null +++ b/db/db_test.go @@ -0,0 +1,190 @@ +package db + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDBIteratorSingleKey(t *testing.T) { + for backend, _ := range backends { + t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { + db := newTempDB(t, backend) + db.SetSync(bz("1"), bz("value_1")) + itr := db.Iterator(nil, nil) + + checkValid(t, itr, true) + checkNext(t, itr, false) + checkValid(t, itr, false) + checkNextPanics(t, itr) + + // Once invalid... + checkInvalid(t, itr) + }) + } +} + +func TestDBIteratorTwoKeys(t *testing.T) { + for backend, _ := range backends { + t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { + db := newTempDB(t, backend) + db.SetSync(bz("1"), bz("value_1")) + db.SetSync(bz("2"), bz("value_1")) + + { // Fail by calling Next too much + itr := db.Iterator(nil, nil) + checkValid(t, itr, true) + + checkNext(t, itr, true) + checkValid(t, itr, true) + + checkNext(t, itr, false) + checkValid(t, itr, false) + + checkNextPanics(t, itr) + + // Once invalid... + checkInvalid(t, itr) + } + }) + } +} + +func TestDBIteratorMany(t *testing.T) { + for backend, _ := range backends { + t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { + db := newTempDB(t, backend) + + keys := make([][]byte, 100) + for i := 0; i < 100; i++ { + keys[i] = []byte{byte(i)} + } + + value := []byte{5} + for _, k := range keys { + db.Set(k, value) + } + + itr := db.Iterator(nil, nil) + defer itr.Close() + for ; itr.Valid(); itr.Next() { + assert.Equal(t, db.Get(itr.Key()), itr.Value()) + } + }) + } +} + +func TestDBIteratorEmpty(t *testing.T) { + for backend, _ := range backends { + t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { + db := newTempDB(t, backend) + itr := db.Iterator(nil, nil) + + checkInvalid(t, itr) + }) + } +} + +func TestDBIteratorEmptyBeginAfter(t *testing.T) { + for backend, _ := range backends { + t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { + db := newTempDB(t, backend) + itr := db.Iterator(bz("1"), nil) + + checkInvalid(t, itr) + }) + } +} + +func TestDBIteratorNonemptyBeginAfter(t *testing.T) { + for backend, _ := range backends { + t.Run(fmt.Sprintf("Backend %s", backend), func(t *testing.T) { + db := newTempDB(t, backend) + db.SetSync(bz("1"), bz("value_1")) + itr := db.Iterator(bz("2"), nil) + + checkInvalid(t, itr) + }) + } +} + +func TestDBBatchWrite1(t *testing.T) { + mdb := newMockDB() + batch := mdb.NewBatch() + + batch.Set(bz("1"), bz("1")) + batch.Set(bz("2"), bz("2")) + batch.Delete(bz("3")) + batch.Set(bz("4"), bz("4")) + batch.Write() + + assert.Equal(t, 0, mdb.calls["Set"]) + assert.Equal(t, 0, mdb.calls["SetSync"]) + assert.Equal(t, 3, mdb.calls["SetNoLock"]) + assert.Equal(t, 0, mdb.calls["SetNoLockSync"]) + assert.Equal(t, 0, mdb.calls["Delete"]) + assert.Equal(t, 0, mdb.calls["DeleteSync"]) + assert.Equal(t, 1, mdb.calls["DeleteNoLock"]) + assert.Equal(t, 0, mdb.calls["DeleteNoLockSync"]) +} + +func TestDBBatchWrite2(t *testing.T) { + mdb := newMockDB() + batch := mdb.NewBatch() + + batch.Set(bz("1"), bz("1")) + batch.Set(bz("2"), bz("2")) + batch.Set(bz("4"), bz("4")) + batch.Delete(bz("3")) + batch.Write() + + assert.Equal(t, 0, mdb.calls["Set"]) + assert.Equal(t, 0, mdb.calls["SetSync"]) + assert.Equal(t, 3, mdb.calls["SetNoLock"]) + assert.Equal(t, 0, mdb.calls["SetNoLockSync"]) + assert.Equal(t, 0, mdb.calls["Delete"]) + assert.Equal(t, 0, mdb.calls["DeleteSync"]) + assert.Equal(t, 1, mdb.calls["DeleteNoLock"]) + assert.Equal(t, 0, mdb.calls["DeleteNoLockSync"]) +} + +func TestDBBatchWriteSync1(t *testing.T) { + mdb := newMockDB() + batch := mdb.NewBatch() + + batch.Set(bz("1"), bz("1")) + batch.Set(bz("2"), bz("2")) + batch.Delete(bz("3")) + batch.Set(bz("4"), bz("4")) + batch.WriteSync() + + assert.Equal(t, 0, mdb.calls["Set"]) + assert.Equal(t, 0, mdb.calls["SetSync"]) + assert.Equal(t, 2, mdb.calls["SetNoLock"]) + assert.Equal(t, 1, mdb.calls["SetNoLockSync"]) + assert.Equal(t, 0, mdb.calls["Delete"]) + assert.Equal(t, 0, mdb.calls["DeleteSync"]) + assert.Equal(t, 1, mdb.calls["DeleteNoLock"]) + assert.Equal(t, 0, mdb.calls["DeleteNoLockSync"]) +} + +func TestDBBatchWriteSync2(t *testing.T) { + mdb := newMockDB() + batch := mdb.NewBatch() + + batch.Set(bz("1"), bz("1")) + batch.Set(bz("2"), bz("2")) + batch.Set(bz("4"), bz("4")) + batch.Delete(bz("3")) + batch.WriteSync() + + assert.Equal(t, 0, mdb.calls["Set"]) + assert.Equal(t, 0, mdb.calls["SetSync"]) + assert.Equal(t, 3, mdb.calls["SetNoLock"]) + assert.Equal(t, 0, mdb.calls["SetNoLockSync"]) + assert.Equal(t, 0, mdb.calls["Delete"]) + assert.Equal(t, 0, mdb.calls["DeleteSync"]) + assert.Equal(t, 0, mdb.calls["DeleteNoLock"]) + assert.Equal(t, 1, mdb.calls["DeleteNoLockSync"]) +} diff --git a/db/go_level_db.go b/db/go_level_db.go index 9fed329..55ca36c 100644 --- a/db/go_level_db.go +++ b/db/go_level_db.go @@ -110,10 +110,10 @@ func (db *GoLevelDB) Print() { str, _ := db.db.GetProperty("leveldb.stats") fmt.Printf("%v\n", str) - iter := db.db.NewIterator(nil, nil) - for iter.Next() { - key := iter.Key() - value := iter.Value() + itr := db.db.NewIterator(nil, nil) + for itr.Next() { + key := itr.Key() + value := itr.Value() fmt.Printf("[%X]:\t[%X]\n", key, value) } } @@ -167,7 +167,15 @@ func (mBatch *goLevelDBBatch) Delete(key []byte) { // Implements Batch. func (mBatch *goLevelDBBatch) Write() { - err := mBatch.db.db.Write(mBatch.batch, nil) + err := mBatch.db.db.Write(mBatch.batch, &opt.WriteOptions{Sync: false}) + if err != nil { + panic(err) + } +} + +// Implements Batch. +func (mBatch *goLevelDBBatch) WriteSync() { + err := mBatch.db.db.Write(mBatch.batch, &opt.WriteOptions{Sync: true}) if err != nil { panic(err) } diff --git a/db/mem_batch.go b/db/mem_batch.go index 7072d93..756798d 100644 --- a/db/mem_batch.go +++ b/db/mem_batch.go @@ -5,7 +5,9 @@ import "sync" type atomicSetDeleter interface { Mutex() *sync.Mutex SetNoLock(key, value []byte) + SetNoLockSync(key, value []byte) DeleteNoLock(key []byte) + DeleteNoLockSync(key []byte) } type memBatch struct { @@ -35,16 +37,34 @@ func (mBatch *memBatch) Delete(key []byte) { } func (mBatch *memBatch) Write() { + mBatch.write(false) +} + +func (mBatch *memBatch) WriteSync() { + mBatch.write(true) +} + +func (mBatch *memBatch) write(doSync bool) { mtx := mBatch.db.Mutex() mtx.Lock() defer mtx.Unlock() - for _, op := range mBatch.ops { + for i, op := range mBatch.ops { + if doSync && i == (len(mBatch.ops)-1) { + switch op.opType { + case opTypeSet: + mBatch.db.SetNoLockSync(op.key, op.value) + case opTypeDelete: + mBatch.db.DeleteNoLockSync(op.key) + } + break // we're done. + } switch op.opType { case opTypeSet: mBatch.db.SetNoLock(op.key, op.value) case opTypeDelete: mBatch.db.DeleteNoLock(op.key) } + } } diff --git a/db/mem_db.go b/db/mem_db.go index f2c484f..5439d67 100644 --- a/db/mem_db.go +++ b/db/mem_db.go @@ -26,6 +26,11 @@ func NewMemDB() *MemDB { return database } +// Implements atomicSetDeleter. +func (db *MemDB) Mutex() *sync.Mutex { + return &(db.mtx) +} + // Implements DB. func (db *MemDB) Get(key []byte) []byte { db.mtx.Lock() @@ -63,6 +68,11 @@ func (db *MemDB) SetSync(key []byte, value []byte) { // Implements atomicSetDeleter. func (db *MemDB) SetNoLock(key []byte, value []byte) { + db.SetNoLockSync(key, value) +} + +// Implements atomicSetDeleter. +func (db *MemDB) SetNoLockSync(key []byte, value []byte) { key = nonNilBytes(key) value = nonNilBytes(value) @@ -87,6 +97,11 @@ func (db *MemDB) DeleteSync(key []byte) { // Implements atomicSetDeleter. func (db *MemDB) DeleteNoLock(key []byte) { + db.DeleteNoLockSync(key) +} + +// Implements atomicSetDeleter. +func (db *MemDB) DeleteNoLockSync(key []byte) { key = nonNilBytes(key) delete(db.db, string(key)) @@ -122,9 +137,6 @@ func (db *MemDB) Stats() map[string]string { return stats } -//---------------------------------------- -// Batch - // Implements DB. func (db *MemDB) NewBatch() Batch { db.mtx.Lock() @@ -133,10 +145,6 @@ func (db *MemDB) NewBatch() Batch { return &memBatch{db, nil} } -func (db *MemDB) Mutex() *sync.Mutex { - return &(db.mtx) -} - //---------------------------------------- // Iterator diff --git a/db/prefix_db.go b/db/prefix_db.go new file mode 100644 index 0000000..5947e7f --- /dev/null +++ b/db/prefix_db.go @@ -0,0 +1,263 @@ +package db + +import ( + "bytes" + "fmt" + "sync" +) + +// IteratePrefix is a convenience function for iterating over a key domain +// restricted by prefix. +func IteratePrefix(db DB, prefix []byte) Iterator { + var start, end []byte + if len(prefix) == 0 { + start = nil + end = nil + } else { + start = cp(prefix) + end = cpIncr(prefix) + } + return db.Iterator(start, end) +} + +/* +TODO: Make test, maybe rename. +// Like IteratePrefix but the iterator strips the prefix from the keys. +func IteratePrefixStripped(db DB, prefix []byte) Iterator { + return newUnprefixIterator(prefix, IteratePrefix(db, prefix)) +} +*/ + +//---------------------------------------- +// prefixDB + +type prefixDB struct { + mtx sync.Mutex + prefix []byte + db DB +} + +// NewPrefixDB lets you namespace multiple DBs within a single DB. +func NewPrefixDB(db DB, prefix []byte) *prefixDB { + return &prefixDB{ + prefix: prefix, + db: db, + } +} + +// Implements atomicSetDeleter. +func (pdb *prefixDB) Mutex() *sync.Mutex { + return &(pdb.mtx) +} + +// Implements DB. +func (pdb *prefixDB) Get(key []byte) []byte { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return pdb.db.Get(pdb.prefixed(key)) +} + +// Implements DB. +func (pdb *prefixDB) Has(key []byte) bool { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return pdb.db.Has(pdb.prefixed(key)) +} + +// Implements DB. +func (pdb *prefixDB) Set(key []byte, value []byte) { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pdb.db.Set(pdb.prefixed(key), value) +} + +// Implements DB. +func (pdb *prefixDB) SetSync(key []byte, value []byte) { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pdb.db.SetSync(pdb.prefixed(key), value) +} + +// Implements atomicSetDeleter. +func (pdb *prefixDB) SetNoLock(key []byte, value []byte) { + pdb.db.Set(pdb.prefixed(key), value) +} + +// Implements atomicSetDeleter. +func (pdb *prefixDB) SetNoLockSync(key []byte, value []byte) { + pdb.db.SetSync(pdb.prefixed(key), value) +} + +// Implements DB. +func (pdb *prefixDB) Delete(key []byte) { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pdb.db.Delete(pdb.prefixed(key)) +} + +// Implements DB. +func (pdb *prefixDB) DeleteSync(key []byte) { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pdb.db.DeleteSync(pdb.prefixed(key)) +} + +// Implements atomicSetDeleter. +func (pdb *prefixDB) DeleteNoLock(key []byte) { + pdb.db.Delete(pdb.prefixed(key)) +} + +// Implements atomicSetDeleter. +func (pdb *prefixDB) DeleteNoLockSync(key []byte) { + pdb.db.DeleteSync(pdb.prefixed(key)) +} + +// Implements DB. +func (pdb *prefixDB) Iterator(start, end []byte) Iterator { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pstart := append([]byte(pdb.prefix), start...) + pend := []byte(nil) + if end != nil { + pend = append([]byte(pdb.prefix), end...) + } + return newUnprefixIterator( + pdb.prefix, + pdb.db.Iterator( + pstart, + pend, + ), + ) +} + +// Implements DB. +func (pdb *prefixDB) ReverseIterator(start, end []byte) Iterator { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pstart := []byte(nil) + if start != nil { + pstart = append([]byte(pdb.prefix), start...) + } + pend := []byte(nil) + if end != nil { + pend = append([]byte(pdb.prefix), end...) + } + return newUnprefixIterator( + pdb.prefix, + pdb.db.ReverseIterator( + pstart, + pend, + ), + ) +} + +// Implements DB. +func (pdb *prefixDB) NewBatch() Batch { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + return &memBatch{pdb, nil} +} + +// Implements DB. +func (pdb *prefixDB) Close() { + pdb.mtx.Lock() + defer pdb.mtx.Unlock() + + pdb.db.Close() +} + +// Implements DB. +func (pdb *prefixDB) Print() { + fmt.Printf("prefix: %X\n", pdb.prefix) + + itr := pdb.Iterator(nil, nil) + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } +} + +// Implements DB. +func (pdb *prefixDB) Stats() map[string]string { + stats := make(map[string]string) + stats["prefixdb.prefix.string"] = string(pdb.prefix) + stats["prefixdb.prefix.hex"] = fmt.Sprintf("%X", pdb.prefix) + source := pdb.db.Stats() + for key, value := range source { + stats["prefixdb.source."+key] = value + } + return stats +} + +func (pdb *prefixDB) prefixed(key []byte) []byte { + return append([]byte(pdb.prefix), key...) +} + +//---------------------------------------- + +// Strips prefix while iterating from Iterator. +type unprefixIterator struct { + prefix []byte + source Iterator +} + +func newUnprefixIterator(prefix []byte, source Iterator) unprefixIterator { + return unprefixIterator{ + prefix: prefix, + source: source, + } +} + +func (itr unprefixIterator) Domain() (start []byte, end []byte) { + start, end = itr.source.Domain() + if len(start) > 0 { + start = stripPrefix(start, itr.prefix) + } + if len(end) > 0 { + end = stripPrefix(end, itr.prefix) + } + return +} + +func (itr unprefixIterator) Valid() bool { + return itr.source.Valid() +} + +func (itr unprefixIterator) Next() { + itr.source.Next() +} + +func (itr unprefixIterator) Key() (key []byte) { + return stripPrefix(itr.source.Key(), itr.prefix) +} + +func (itr unprefixIterator) Value() (value []byte) { + return itr.source.Value() +} + +func (itr unprefixIterator) Close() { + itr.source.Close() +} + +//---------------------------------------- + +func stripPrefix(key []byte, prefix []byte) (stripped []byte) { + if len(key) < len(prefix) { + panic("should not happen") + } + if !bytes.Equal(key[:len(prefix)], prefix) { + panic("should not happne") + } + return key[len(prefix):] +} diff --git a/db/prefix_db_test.go b/db/prefix_db_test.go new file mode 100644 index 0000000..fd44a7e --- /dev/null +++ b/db/prefix_db_test.go @@ -0,0 +1,44 @@ +package db + +import "testing" + +func TestIteratePrefix(t *testing.T) { + db := NewMemDB() + // Under "key" prefix + db.Set(bz("key"), bz("value")) + db.Set(bz("key1"), bz("value1")) + db.Set(bz("key2"), bz("value2")) + db.Set(bz("key3"), bz("value3")) + db.Set(bz("something"), bz("else")) + db.Set(bz(""), bz("")) + db.Set(bz("k"), bz("val")) + db.Set(bz("ke"), bz("valu")) + db.Set(bz("kee"), bz("valuu")) + xitr := db.Iterator(nil, nil) + xitr.Key() + + pdb := NewPrefixDB(db, bz("key")) + checkValue(t, pdb, bz("key"), nil) + checkValue(t, pdb, bz(""), bz("value")) + checkValue(t, pdb, bz("key1"), nil) + checkValue(t, pdb, bz("1"), bz("value1")) + checkValue(t, pdb, bz("key2"), nil) + checkValue(t, pdb, bz("2"), bz("value2")) + checkValue(t, pdb, bz("key3"), nil) + checkValue(t, pdb, bz("3"), bz("value3")) + checkValue(t, pdb, bz("something"), nil) + checkValue(t, pdb, bz("k"), nil) + checkValue(t, pdb, bz("ke"), nil) + checkValue(t, pdb, bz("kee"), nil) + + itr := pdb.Iterator(nil, nil) + itr.Key() + checkItem(t, itr, bz(""), bz("value")) + checkNext(t, itr, true) + checkItem(t, itr, bz("1"), bz("value1")) + checkNext(t, itr, true) + checkItem(t, itr, bz("2"), bz("value2")) + checkNext(t, itr, true) + checkItem(t, itr, bz("3"), bz("value3")) + itr.Close() +} diff --git a/db/types.go b/db/types.go index 0785808..4514694 100644 --- a/db/types.go +++ b/db/types.go @@ -1,5 +1,6 @@ package db +// DBs are goroutine safe. type DB interface { // Get returns nil iff key doesn't exist. @@ -35,7 +36,7 @@ type DB interface { // Iterate over a domain of keys in descending order. End is exclusive. // Start must be greater than end, or the Iterator is invalid. // If start is nil, iterates from the last/greatest item (inclusive). - // If end is nil, iterates up to the first/least item (iclusive). + // If end is nil, iterates up to the first/least item (inclusive). // CONTRACT: No writes may happen within a domain while an iterator exists over it. // CONTRACT: start, end readonly []byte ReverseIterator(start, end []byte) Iterator @@ -59,6 +60,7 @@ type DB interface { type Batch interface { SetDeleter Write() + WriteSync() } type SetDeleter interface { diff --git a/db/util.go b/db/util.go index b0ab7f6..ecb392d 100644 --- a/db/util.go +++ b/db/util.go @@ -4,28 +4,20 @@ import ( "bytes" ) -func IteratePrefix(db DB, prefix []byte) Iterator { - var start, end []byte - if len(prefix) == 0 { - start = nil - end = nil - } else { - start = cp(prefix) - end = cpIncr(prefix) - } - return db.Iterator(start, end) -} - -//---------------------------------------- - func cp(bz []byte) (ret []byte) { ret = make([]byte, len(bz)) copy(ret, bz) return ret } +// Returns a slice of the same length (big endian) +// except incremented by one. +// Returns nil on overflow (e.g. if bz bytes are all 0xFF) // CONTRACT: len(bz) > 0 func cpIncr(bz []byte) (ret []byte) { + if len(bz) == 0 { + panic("cpIncr expects non-zero bz length") + } ret = cp(bz) for i := len(bz) - 1; i >= 0; i-- { if ret[i] < byte(0xFF) { @@ -33,6 +25,10 @@ func cpIncr(bz []byte) (ret []byte) { return } else { ret[i] = byte(0x00) + if i == 0 { + // Overflow + return nil + } } } return nil diff --git a/flowrate/io_test.go b/flowrate/io_test.go index db40337..c84029d 100644 --- a/flowrate/io_test.go +++ b/flowrate/io_test.go @@ -121,7 +121,15 @@ func TestWriter(t *testing.T) { w.SetBlocking(true) if n, err := w.Write(b[20:]); n != 80 || err != nil { t.Fatalf("w.Write(b[20:]) expected 80 (); got %v (%v)", n, err) - } else if rt := time.Since(start); rt < _400ms { + } else if rt := time.Since(start); rt < _300ms { + // Explanation for `rt < _300ms` (as opposed to `< _400ms`) + // + // |<-- start | | + // epochs: -----0ms|---100ms|---200ms|---300ms|---400ms + // sends: 20|20 |20 |20 |20# + // + // NOTE: The '#' symbol can thus happen before 400ms is up. + // Thus, we can only panic if rt < _300ms. t.Fatalf("w.Write(b[20:]) returned ahead of time (%v)", rt) }