diff --git a/client/state/kvcodec.go b/client/state/kvcodec.go index 581ea528c10..61bca1a4fc2 100644 --- a/client/state/kvcodec.go +++ b/client/state/kvcodec.go @@ -12,38 +12,36 @@ import ( ) type kvStore interface { + Path() []byte + Bucket(name []byte) kvStore + CreateBucket(key []byte) (kvStore, error) + CreateBucketIfNotExists(key []byte) (kvStore, error) + DeleteBucket(key []byte) error Get(key []byte) (val []byte) Put(key, val []byte) error - Writable() bool } // keyValueCodec handles encoding and decoding values from a key/value store // such as boltdb. type keyValueCodec struct { - // hashes maps keys to the hash of the last content written - hashes map[string][]byte + // hashes maps buckets to keys to the hash of the last content written: + // bucket -> key -> hash for example: + // allocations/1234 -> alloc -> abcd + // allocations/1234/redis -> task_state -> efff + hashes map[string]map[string][]byte hashesLock sync.Mutex } func newKeyValueCodec() *keyValueCodec { return &keyValueCodec{ - hashes: make(map[string][]byte), + hashes: make(map[string]map[string][]byte), } } -// hashKey returns a unique key for each hashed boltdb value -func (c *keyValueCodec) hashKey(path string, key []byte) string { - return path + "-" + string(key) -} - // Put into kv store iff it has changed since the last write. A globally // unique key is constructed for each value by concatinating the path and key // passed in. -func (c *keyValueCodec) Put(bkt kvStore, path string, key []byte, val interface{}) error { - if !bkt.Writable() { - return fmt.Errorf("bucket must be writable") - } - +func (c *keyValueCodec) Put(bkt kvStore, key []byte, val interface{}) error { // buffer for writing serialized state to var buf bytes.Buffer @@ -63,24 +61,34 @@ func (c *keyValueCodec) Put(bkt kvStore, path string, key []byte, val interface{ } // If the hashes are equal, skip the write + hashPath := string(bkt.Path()) + hashKey := string(key) hashVal := h.Sum(nil) - hashKey := c.hashKey(path, key) + + // lastHash value or nil if it hasn't been hashed yet + var lastHash []byte c.hashesLock.Lock() - persistedHash := c.hashes[hashKey] + if hashBkt, ok := c.hashes[hashPath]; ok { + lastHash = hashBkt[hashKey] + } else { + // Create hash bucket + c.hashes[hashPath] = make(map[string][]byte, 2) + } c.hashesLock.Unlock() - if bytes.Equal(hashVal, persistedHash) { + if bytes.Equal(hashVal, lastHash) { return nil } + // New value: write it to the underlying store if err := bkt.Put(key, buf.Bytes()); err != nil { return fmt.Errorf("failed to write data at key %s: %v", key, err) } - // New value written, store hash + // New value written, store hash (bucket path map was created above) c.hashesLock.Lock() - c.hashes[hashKey] = hashVal + c.hashes[hashPath][hashKey] = hashVal c.hashesLock.Unlock() return nil @@ -102,3 +110,23 @@ func (c *keyValueCodec) Get(bkt kvStore, key []byte, obj interface{}) error { return nil } + +// DeleteBucket or do nothing if bucket doesn't exist. +func (c *keyValueCodec) DeleteBucket(parent kvStore, bktName []byte) error { + // Get the path of the bucket being deleted + bkt := parent.Bucket(bktName) + if bkt == nil { + // Doesn't exist! Nothing to delete + return nil + } + + // Delete the bucket + err := parent.DeleteBucket(bktName) + + // Always purge all corresponding hashes to prevent memory leaks + c.hashesLock.Lock() + delete(c.hashes, string(bkt.Path())) + c.hashesLock.Unlock() + + return err +} diff --git a/client/state/kvcodec_test.go b/client/state/kvcodec_test.go index 49c110b9185..ee625d0166f 100644 --- a/client/state/kvcodec_test.go +++ b/client/state/kvcodec_test.go @@ -12,6 +12,26 @@ type mockKVStore struct { puts int } +func (mockKVStore) Path() []byte { + return []byte{} +} + +func (m *mockKVStore) Bucket(name []byte) kvStore { + return m +} + +func (m *mockKVStore) CreateBucket(key []byte) (kvStore, error) { + return m, nil +} + +func (m *mockKVStore) CreateBucketIfNotExists(key []byte) (kvStore, error) { + return m, nil +} + +func (m *mockKVStore) DeleteBucket(key []byte) error { + return nil +} + func (mockKVStore) Get(key []byte) (val []byte) { return nil } @@ -21,10 +41,6 @@ func (m *mockKVStore) Put(key, val []byte) error { return nil } -func (mockKVStore) Writable() bool { - return true -} - // TestKVCodec_PutHash asserts that Puts on the underlying kvstore only occur // when the data actually changes. func TestKVCodec_PutHash(t *testing.T) { @@ -33,7 +49,6 @@ func TestKVCodec_PutHash(t *testing.T) { // Create arguments for Put kv := new(mockKVStore) - path := "path-path" key := []byte("key1") val := &struct { Val int @@ -42,29 +57,24 @@ func TestKVCodec_PutHash(t *testing.T) { } // Initial Put should be written - require.NoError(codec.Put(kv, path, key, val)) + require.NoError(codec.Put(kv, key, val)) require.Equal(1, kv.puts) // Writing the same values again should be a noop - require.NoError(codec.Put(kv, path, key, val)) + require.NoError(codec.Put(kv, key, val)) require.Equal(1, kv.puts) // Changing the value should write again val.Val++ - require.NoError(codec.Put(kv, path, key, val)) + require.NoError(codec.Put(kv, key, val)) require.Equal(2, kv.puts) // Changing the key should write again key = []byte("key2") - require.NoError(codec.Put(kv, path, key, val)) + require.NoError(codec.Put(kv, key, val)) require.Equal(3, kv.puts) - // Changing the path should write again - path = "new-path" - require.NoError(codec.Put(kv, path, key, val)) - require.Equal(4, kv.puts) - // Writing the same values again should be a noop - require.NoError(codec.Put(kv, path, key, val)) - require.Equal(4, kv.puts) + require.NoError(codec.Put(kv, key, val)) + require.Equal(3, kv.puts) } diff --git a/client/state/named_bucket.go b/client/state/named_bucket.go new file mode 100644 index 00000000000..b5383e8385b --- /dev/null +++ b/client/state/named_bucket.go @@ -0,0 +1,123 @@ +package state + +import "github.com/boltdb/bolt" + +// namedBucket is a wrapper around bolt.Bucket's to preserve their path +// information and expose it via the Path() method. +// +// Knowing the full bucket path to a key is necessary for tracking accesses in +// another datastructure such as the hashing writer keyValueCodec. +type namedBucket struct { + path []byte + name []byte + bkt *bolt.Bucket +} + +// newNamedBucket from a bolt transaction. +func newNamedBucket(tx *bolt.Tx, root []byte) *namedBucket { + b := tx.Bucket(root) + if b == nil { + return nil + } + + return &namedBucket{ + path: root, + name: root, + bkt: b, + } +} + +// createNamedBucketIfNotExists from a bolt transaction. +func createNamedBucketIfNotExists(tx *bolt.Tx, root []byte) (*namedBucket, error) { + b, err := tx.CreateBucketIfNotExists(root) + if err != nil { + return nil, err + } + + return &namedBucket{ + path: root, + name: root, + bkt: b, + }, nil +} + +// Path to this bucket (including this bucket). +func (n *namedBucket) Path() []byte { + return n.path +} + +// Name of this bucket. +func (n *namedBucket) Name() []byte { + return n.name +} + +// Bucket returns a bucket inside the current one or nil if the bucket does not +// exist. +func (n *namedBucket) Bucket(name []byte) kvStore { + b := n.bkt.Bucket(name) + if b == nil { + return nil + } + + return &namedBucket{ + path: n.chBkt(name), + name: name, + bkt: b, + } +} + +// CreateBucketIfNotExists creates a bucket if it doesn't exist and returns it +// or an error. +func (n *namedBucket) CreateBucketIfNotExists(name []byte) (kvStore, error) { + b, err := n.bkt.CreateBucketIfNotExists(name) + if err != nil { + return nil, err + } + + return &namedBucket{ + path: n.chBkt(name), + name: name, + bkt: b, + }, nil +} + +// CreateBucket creates a bucket and returns it. +func (n *namedBucket) CreateBucket(name []byte) (kvStore, error) { + b, err := n.bkt.CreateBucket(name) + if err != nil { + return nil, err + } + + return &namedBucket{ + path: n.chBkt(name), + name: name, + bkt: b, + }, nil +} + +// DeleteBucket calls DeleteBucket on the underlying bolt.Bucket. +func (n *namedBucket) DeleteBucket(name []byte) error { + return n.bkt.DeleteBucket(name) +} + +// Get calls Get on the underlying bolt.Bucket. +func (n *namedBucket) Get(key []byte) []byte { + return n.bkt.Get(key) +} + +// Put calls Put on the underlying bolt.Bucket. +func (n *namedBucket) Put(key, value []byte) error { + return n.bkt.Put(key, value) +} + +// chBkt is like chdir but for buckets: it appends the new name to the end of +// a copy of the path and returns it. +func (n *namedBucket) chBkt(name []byte) []byte { + // existing path + new path element + path separator + path := make([]byte, len(n.path)+len(name)+1) + copy(path[0:len(n.path)], n.path) + path[len(n.path)] = '/' + copy(path[len(n.path)+1:], name) + + return path +} diff --git a/client/state/named_bucket_test.go b/client/state/named_bucket_test.go new file mode 100644 index 00000000000..705483a1679 --- /dev/null +++ b/client/state/named_bucket_test.go @@ -0,0 +1,151 @@ +package state + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/boltdb/bolt" + "github.com/stretchr/testify/require" +) + +func setupBoltDB(t *testing.T) (*bolt.DB, func()) { + dir, err := ioutil.TempDir("", "nomadtest_") + require.NoError(t, err) + cleanup := func() { + if err := os.RemoveAll(dir); err != nil { + t.Logf("error removing test dir: %v", err) + } + } + + dbFilename := filepath.Join(dir, "nomadtest.db") + db, err := bolt.Open(dbFilename, 0600, nil) + if err != nil { + cleanup() + t.Fatalf("error creating boltdb: %v", err) + } + + return db, func() { + db.Close() + cleanup() + } +} + +// TestNamedBucket_Path asserts that creating and changing buckets are tracked +// properly by the namedBucket wrapper. +func TestNamedBucket_Path(t *testing.T) { + t.Parallel() + require := require.New(t) + + db, cleanup := setupBoltDB(t) + defer cleanup() + + parentBktName, childBktName := []byte("root"), []byte("child") + parentKey, parentVal := []byte("pkey"), []byte("pval") + childKey, childVal := []byte("ckey"), []byte("cval") + + require.NoError(db.Update(func(tx *bolt.Tx) error { + // Trying to open a named bucket from a nonexistent bucket + // should return nil. + require.Nil(newNamedBucket(tx, []byte("nonexistent"))) + + // Creating a named bucket from a bolt tx should work and set + // the path and name properly. + b, err := createNamedBucketIfNotExists(tx, parentBktName) + require.NoError(err) + require.Equal(parentBktName, b.Name()) + require.Equal(parentBktName, b.Path()) + + // Trying to descend into a nonexistent bucket should return + // nil. + require.Nil(b.Bucket([]byte("nonexistent"))) + + // Descending into a new bucket should update the path. + childBkt, err := b.CreateBucket(childBktName) + require.NoError(err) + require.Equal(childBktName, childBkt.(*namedBucket).Name()) + require.Equal([]byte("root/child"), childBkt.Path()) + + // Assert the parent bucket did not get changed. + require.Equal(parentBktName, b.Name()) + require.Equal(parentBktName, b.Path()) + + // Add entries to both buckets + require.NoError(b.Put(parentKey, parentVal)) + require.NoError(childBkt.Put(childKey, childVal)) + return nil + })) + + // Read buckets and values back out + require.NoError(db.View(func(tx *bolt.Tx) error { + b := newNamedBucket(tx, parentBktName) + require.NotNil(b) + require.Equal(parentVal, b.Get(parentKey)) + require.Nil(b.Get(childKey)) + + childBkt := b.Bucket(childBktName) + require.NotNil(childBkt) + require.Nil(childBkt.Get(parentKey)) + require.Equal(childVal, childBkt.Get(childKey)) + return nil + })) +} + +// TestNamedBucket_DeleteBucket asserts that deleting a bucket properly purges +// all related keys from the internal hashes map. +func TestNamedBucket_DeleteBucket(t *testing.T) { + t.Parallel() + require := require.New(t) + + db, cleanup := setupBoltDB(t) + defer cleanup() + + // Create some nested buckets and keys (key values will just be their names) + b1Name, c1Name, c2Name, c1c1Name := []byte("b1"), []byte("c1"), []byte("c2"), []byte("c1c1") + b1k1, c1k1, c2k1, c1c1k1 := []byte("b1k1"), []byte("c1k1"), []byte("c2k1"), []byte("c1c1k1") + + codec := newKeyValueCodec() + + // Create initial db state + require.NoError(db.Update(func(tx *bolt.Tx) error { + // Create bucket 1 and key + b1, err := createNamedBucketIfNotExists(tx, b1Name) + require.NoError(err) + require.NoError(codec.Put(b1, b1k1, b1k1)) + + // Create child bucket 1 and key + c1, err := b1.CreateBucketIfNotExists(c1Name) + require.NoError(err) + require.NoError(codec.Put(c1, c1k1, c1k1)) + + // Create child-child bucket 1 and key + c1c1, err := c1.CreateBucketIfNotExists(c1c1Name) + require.NoError(err) + require.NoError(codec.Put(c1c1, c1c1k1, c1c1k1)) + + // Create child bucket 2 and key + c2, err := b1.CreateBucketIfNotExists(c2Name) + require.NoError(err) + require.NoError(codec.Put(c2, c2k1, c2k1)) + return nil + })) + + // codec should be tracking 4 hash buckets (b1, c1, c2, c1c1) + require.Len(codec.hashes, 4) + + // Delete c1 + require.NoError(db.Update(func(tx *bolt.Tx) error { + b1 := newNamedBucket(tx, b1Name) + return codec.DeleteBucket(b1, c1Name) + })) + + START HERE // We don't appear to be properly deleting the sub-bucket + // codec should be tracking 2 hash buckets (b1, c2) + require.Len(codec.hashes, 2) + + // Assert all of c1 is gone + require.NoError(db.View(func(tx *bolt.Tx) error { + return nil + })) +} diff --git a/client/state/state_database.go b/client/state/state_database.go index 8793d651eee..4e0e2d94623 100644 --- a/client/state/state_database.go +++ b/client/state/state_database.go @@ -3,7 +3,6 @@ package state import ( "fmt" "path/filepath" - "strings" "github.com/boltdb/bolt" trstate "github.com/hashicorp/nomad/client/allocrunnerv2/taskrunner/state" @@ -39,56 +38,6 @@ var ( taskStateKey = []byte("task_state") ) -//TODO delete from kvcodec -// DeleteAllocationBucket is used to delete an allocation bucket if it exists. -func DeleteAllocationBucket(tx *bolt.Tx, allocID string) error { - if !tx.Writable() { - return fmt.Errorf("transaction must be writable") - } - - // Retrieve the root allocations bucket - allocations := tx.Bucket(allocationsBucket) - if allocations == nil { - return nil - } - - // Check if the bucket exists - key := []byte(allocID) - if allocBkt := allocations.Bucket(key); allocBkt == nil { - return nil - } - - return allocations.DeleteBucket(key) -} - -//TODO delete from kvcodec -// DeleteTaskBucket is used to delete a task bucket if it exists. -func DeleteTaskBucket(tx *bolt.Tx, allocID, taskName string) error { - if !tx.Writable() { - return fmt.Errorf("transaction must be writable") - } - - // Retrieve the root allocations bucket - allocations := tx.Bucket(allocationsBucket) - if allocations == nil { - return nil - } - - // Retrieve the specific allocations bucket - alloc := allocations.Bucket([]byte(allocID)) - if alloc == nil { - return nil - } - - // Check if the bucket exists - key := []byte(taskName) - if taskBkt := alloc.Bucket(key); taskBkt == nil { - return nil - } - - return alloc.DeleteBucket(key) -} - // NewStateDBFunc creates a StateDB given a state directory. type NewStateDBFunc func(stateDir string) (StateDB, error) @@ -153,7 +102,7 @@ type allocEntry struct { } func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map[string]error) { - allocationsBkt := tx.Bucket(allocationsBucket) + allocationsBkt := newNamedBucket(tx, allocationsBucket) if allocationsBkt == nil { // No allocs return nil, nil @@ -163,7 +112,7 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map errs := map[string]error{} // Create a cursor for iteration. - c := allocationsBkt.Cursor() + c := allocationsBkt.bkt.Cursor() // Iterate over all the allocation buckets for k, _ := c.First(); k != nil; k, _ = c.Next() { @@ -190,7 +139,7 @@ func (s *BoltStateDB) getAllAllocations(tx *bolt.Tx) ([]*structs.Allocation, map func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { return s.db.Update(func(tx *bolt.Tx) error { // Retrieve the root allocations bucket - allocsBkt, err := tx.CreateBucketIfNotExists(allocationsBucket) + allocsBkt, err := createNamedBucketIfNotExists(tx, allocationsBucket) if err != nil { return err } @@ -205,7 +154,7 @@ func (s *BoltStateDB) PutAllocation(alloc *structs.Allocation) error { allocState := allocEntry{ Alloc: alloc, } - return s.codec.Put(allocBkt, alloc.ID, allocKey, &allocState) + return s.codec.Put(allocBkt, allocKey, &allocState) }) } @@ -256,8 +205,7 @@ func (s *BoltStateDB) PutTaskRunnerLocalState(allocID, taskName string, val inte return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - path := strings.Join([]string{allocID, taskName, string(taskLocalStateKey)}, "-") - if err := s.codec.Put(taskBkt, path, taskLocalStateKey, val); err != nil { + if err := s.codec.Put(taskBkt, taskLocalStateKey, val); err != nil { return fmt.Errorf("failed to write task_runner state: %v", err) } @@ -273,8 +221,42 @@ func (s *BoltStateDB) PutTaskState(allocID, taskName string, state *structs.Task return fmt.Errorf("failed to retrieve allocation bucket: %v", err) } - path := strings.Join([]string{allocID, taskName, string(taskStateKey)}, "-") - return s.codec.Put(taskBkt, path, taskStateKey, state) + return s.codec.Put(taskBkt, taskStateKey, state) + }) +} + +// DeleteTaskBucket is used to delete a task bucket if it exists. +func (s *BoltStateDB) DeleteTaskBucket(allocID, taskName string) error { + return s.db.Update(func(tx *bolt.Tx) error { + // Retrieve the root allocations bucket + allocations := newNamedBucket(tx, allocationsBucket) + if allocations == nil { + return nil + } + + // Retrieve the specific allocations bucket + alloc := allocations.Bucket([]byte(allocID)) + if alloc == nil { + return nil + } + + // Check if the bucket exists + key := []byte(taskName) + return s.codec.DeleteBucket(alloc, key) + }) +} + +// DeleteAllocationBucket is used to delete an allocation bucket if it exists. +func (s *BoltStateDB) DeleteAllocationBucket(allocID string) error { + return s.db.Update(func(tx *bolt.Tx) error { + // Retrieve the root allocations bucket + allocations := newNamedBucket(tx, allocationsBucket) + if allocations == nil { + return nil + } + + key := []byte(allocID) + return s.codec.DeleteBucket(allocations, key) }) } @@ -288,18 +270,18 @@ func (s *BoltStateDB) Close() error { // particular allocation. If the root allocation bucket or the specific // allocation bucket doesn't exist, it will be created as long as the // transaction is writable. -func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { +func getAllocationBucket(tx *bolt.Tx, allocID string) (kvStore, error) { var err error w := tx.Writable() // Retrieve the root allocations bucket - allocations := tx.Bucket(allocationsBucket) + allocations := newNamedBucket(tx, allocationsBucket) if allocations == nil { if !w { return nil, fmt.Errorf("Allocations bucket doesn't exist and transaction is not writable") } - allocations, err = tx.CreateBucket(allocationsBucket) + allocations, err = createNamedBucketIfNotExists(tx, allocationsBucket) if err != nil { return nil, err } @@ -326,7 +308,7 @@ func getAllocationBucket(tx *bolt.Tx, allocID string) (*bolt.Bucket, error) { // particular task. If the root allocation bucket, the specific // allocation or task bucket doesn't exist, they will be created as long as the // transaction is writable. -func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (*bolt.Bucket, error) { +func getTaskBucket(tx *bolt.Tx, allocID, taskName string) (kvStore, error) { alloc, err := getAllocationBucket(tx, allocID) if err != nil { return nil, err