Skip to content

Commit

Permalink
wip wrap boltdb to get path information
Browse files Browse the repository at this point in the history
finished but doesn't handle deleting deeply nested buckets
  • Loading branch information
schmichael authored and nickethier committed Sep 14, 2018
1 parent 924a979 commit 2d40f59
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 98 deletions.
66 changes: 47 additions & 19 deletions client/state/kvcodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,36 @@ import (
)

type kvStore interface {
Path() []byte
Bucket(name []byte) kvStore
CreateBucket(key []byte) (kvStore, error)
CreateBucketIfNotExists(key []byte) (kvStore, error)
DeleteBucket(key []byte) error
Get(key []byte) (val []byte)
Put(key, val []byte) error
Writable() bool
}

// keyValueCodec handles encoding and decoding values from a key/value store
// such as boltdb.
type keyValueCodec struct {
// hashes maps keys to the hash of the last content written
hashes map[string][]byte
// hashes maps buckets to keys to the hash of the last content written:
// bucket -> key -> hash for example:
// allocations/1234 -> alloc -> abcd
// allocations/1234/redis -> task_state -> efff
hashes map[string]map[string][]byte
hashesLock sync.Mutex
}

func newKeyValueCodec() *keyValueCodec {
return &keyValueCodec{
hashes: make(map[string][]byte),
hashes: make(map[string]map[string][]byte),
}
}

// hashKey returns a unique key for each hashed boltdb value
func (c *keyValueCodec) hashKey(path string, key []byte) string {
return path + "-" + string(key)
}

// Put into kv store iff it has changed since the last write. A globally
// unique key is constructed for each value by concatinating the path and key
// passed in.
func (c *keyValueCodec) Put(bkt kvStore, path string, key []byte, val interface{}) error {
if !bkt.Writable() {
return fmt.Errorf("bucket must be writable")
}

func (c *keyValueCodec) Put(bkt kvStore, key []byte, val interface{}) error {
// buffer for writing serialized state to
var buf bytes.Buffer

Expand All @@ -63,24 +61,34 @@ func (c *keyValueCodec) Put(bkt kvStore, path string, key []byte, val interface{
}

// If the hashes are equal, skip the write
hashPath := string(bkt.Path())
hashKey := string(key)
hashVal := h.Sum(nil)
hashKey := c.hashKey(path, key)

// lastHash value or nil if it hasn't been hashed yet
var lastHash []byte

c.hashesLock.Lock()
persistedHash := c.hashes[hashKey]
if hashBkt, ok := c.hashes[hashPath]; ok {
lastHash = hashBkt[hashKey]
} else {
// Create hash bucket
c.hashes[hashPath] = make(map[string][]byte, 2)
}
c.hashesLock.Unlock()

if bytes.Equal(hashVal, persistedHash) {
if bytes.Equal(hashVal, lastHash) {
return nil
}

// New value: write it to the underlying store
if err := bkt.Put(key, buf.Bytes()); err != nil {
return fmt.Errorf("failed to write data at key %s: %v", key, err)
}

// New value written, store hash
// New value written, store hash (bucket path map was created above)
c.hashesLock.Lock()
c.hashes[hashKey] = hashVal
c.hashes[hashPath][hashKey] = hashVal
c.hashesLock.Unlock()

return nil
Expand All @@ -102,3 +110,23 @@ func (c *keyValueCodec) Get(bkt kvStore, key []byte, obj interface{}) error {

return nil
}

// DeleteBucket or do nothing if bucket doesn't exist.
func (c *keyValueCodec) DeleteBucket(parent kvStore, bktName []byte) error {
// Get the path of the bucket being deleted
bkt := parent.Bucket(bktName)
if bkt == nil {
// Doesn't exist! Nothing to delete
return nil
}

// Delete the bucket
err := parent.DeleteBucket(bktName)

// Always purge all corresponding hashes to prevent memory leaks
c.hashesLock.Lock()
delete(c.hashes, string(bkt.Path()))
c.hashesLock.Unlock()

return err
}
42 changes: 26 additions & 16 deletions client/state/kvcodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,26 @@ type mockKVStore struct {
puts int
}

func (mockKVStore) Path() []byte {
return []byte{}
}

func (m *mockKVStore) Bucket(name []byte) kvStore {
return m
}

func (m *mockKVStore) CreateBucket(key []byte) (kvStore, error) {
return m, nil
}

func (m *mockKVStore) CreateBucketIfNotExists(key []byte) (kvStore, error) {
return m, nil
}

func (m *mockKVStore) DeleteBucket(key []byte) error {
return nil
}

func (mockKVStore) Get(key []byte) (val []byte) {
return nil
}
Expand All @@ -21,10 +41,6 @@ func (m *mockKVStore) Put(key, val []byte) error {
return nil
}

func (mockKVStore) Writable() bool {
return true
}

// TestKVCodec_PutHash asserts that Puts on the underlying kvstore only occur
// when the data actually changes.
func TestKVCodec_PutHash(t *testing.T) {
Expand All @@ -33,7 +49,6 @@ func TestKVCodec_PutHash(t *testing.T) {

// Create arguments for Put
kv := new(mockKVStore)
path := "path-path"
key := []byte("key1")
val := &struct {
Val int
Expand All @@ -42,29 +57,24 @@ func TestKVCodec_PutHash(t *testing.T) {
}

// Initial Put should be written
require.NoError(codec.Put(kv, path, key, val))
require.NoError(codec.Put(kv, key, val))
require.Equal(1, kv.puts)

// Writing the same values again should be a noop
require.NoError(codec.Put(kv, path, key, val))
require.NoError(codec.Put(kv, key, val))
require.Equal(1, kv.puts)

// Changing the value should write again
val.Val++
require.NoError(codec.Put(kv, path, key, val))
require.NoError(codec.Put(kv, key, val))
require.Equal(2, kv.puts)

// Changing the key should write again
key = []byte("key2")
require.NoError(codec.Put(kv, path, key, val))
require.NoError(codec.Put(kv, key, val))
require.Equal(3, kv.puts)

// Changing the path should write again
path = "new-path"
require.NoError(codec.Put(kv, path, key, val))
require.Equal(4, kv.puts)

// Writing the same values again should be a noop
require.NoError(codec.Put(kv, path, key, val))
require.Equal(4, kv.puts)
require.NoError(codec.Put(kv, key, val))
require.Equal(3, kv.puts)
}
123 changes: 123 additions & 0 deletions client/state/named_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package state

import "github.com/boltdb/bolt"

// namedBucket is a wrapper around bolt.Bucket's to preserve their path
// information and expose it via the Path() method.
//
// Knowing the full bucket path to a key is necessary for tracking accesses in
// another datastructure such as the hashing writer keyValueCodec.
type namedBucket struct {
path []byte
name []byte
bkt *bolt.Bucket
}

// newNamedBucket from a bolt transaction.
func newNamedBucket(tx *bolt.Tx, root []byte) *namedBucket {
b := tx.Bucket(root)
if b == nil {
return nil
}

return &namedBucket{
path: root,
name: root,
bkt: b,
}
}

// createNamedBucketIfNotExists from a bolt transaction.
func createNamedBucketIfNotExists(tx *bolt.Tx, root []byte) (*namedBucket, error) {
b, err := tx.CreateBucketIfNotExists(root)
if err != nil {
return nil, err
}

return &namedBucket{
path: root,
name: root,
bkt: b,
}, nil
}

// Path to this bucket (including this bucket).
func (n *namedBucket) Path() []byte {
return n.path
}

// Name of this bucket.
func (n *namedBucket) Name() []byte {
return n.name
}

// Bucket returns a bucket inside the current one or nil if the bucket does not
// exist.
func (n *namedBucket) Bucket(name []byte) kvStore {
b := n.bkt.Bucket(name)
if b == nil {
return nil
}

return &namedBucket{
path: n.chBkt(name),
name: name,
bkt: b,
}
}

// CreateBucketIfNotExists creates a bucket if it doesn't exist and returns it
// or an error.
func (n *namedBucket) CreateBucketIfNotExists(name []byte) (kvStore, error) {
b, err := n.bkt.CreateBucketIfNotExists(name)
if err != nil {
return nil, err
}

return &namedBucket{
path: n.chBkt(name),
name: name,
bkt: b,
}, nil
}

// CreateBucket creates a bucket and returns it.
func (n *namedBucket) CreateBucket(name []byte) (kvStore, error) {
b, err := n.bkt.CreateBucket(name)
if err != nil {
return nil, err
}

return &namedBucket{
path: n.chBkt(name),
name: name,
bkt: b,
}, nil
}

// DeleteBucket calls DeleteBucket on the underlying bolt.Bucket.
func (n *namedBucket) DeleteBucket(name []byte) error {
return n.bkt.DeleteBucket(name)
}

// Get calls Get on the underlying bolt.Bucket.
func (n *namedBucket) Get(key []byte) []byte {
return n.bkt.Get(key)
}

// Put calls Put on the underlying bolt.Bucket.
func (n *namedBucket) Put(key, value []byte) error {
return n.bkt.Put(key, value)
}

// chBkt is like chdir but for buckets: it appends the new name to the end of
// a copy of the path and returns it.
func (n *namedBucket) chBkt(name []byte) []byte {
// existing path + new path element + path separator
path := make([]byte, len(n.path)+len(name)+1)
copy(path[0:len(n.path)], n.path)
path[len(n.path)] = '/'
copy(path[len(n.path)+1:], name)

return path
}
Loading

0 comments on commit 2d40f59

Please sign in to comment.