Skip to content

Commit

Permalink
Prevents watches from being orphaned when KVS blocking queries loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
James Phillips committed Jan 20, 2016
1 parent b6f03d3 commit 01da5a2
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 37 deletions.
129 changes: 129 additions & 0 deletions consul/kvs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,135 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
}
}

func TestKVS_Issue_1626(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

testutil.WaitForLeader(t, s1.RPC, "dc1")

// Set up the first key.
{
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/test",
Value: []byte("test"),
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}

// Retrieve the base key and snag the index.
var index uint64
{
getR := structs.KeyRequest{
Datacenter: "dc1",
Key: "foo/test",
}
var dirent structs.IndexedDirEntries
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
if dirent.Index == 0 {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 1 {
t.Fatalf("Bad: %v", dirent)
}
d := dirent.Entries[0]
if string(d.Value) != "test" {
t.Fatalf("bad: %v", d)
}

index = dirent.Index
}

// Set up a blocking query on the base key.
doneCh := make(chan *structs.IndexedDirEntries, 1)
go func() {
codec := rpcClient(t, s1)
defer codec.Close()

getR := structs.KeyRequest{
Datacenter: "dc1",
Key: "foo/test",
QueryOptions: structs.QueryOptions{
MinQueryIndex: index,
MaxQueryTime: 3 * time.Second,
},
}
var dirent structs.IndexedDirEntries
if err := msgpackrpc.CallWithCodec(codec, "KVS.Get", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}
doneCh <- &dirent
}()

// Now update a second key with a prefix that has the first key name
// as part of it.
{
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/test2",
Value: []byte("test"),
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}

// Make sure the blocking query didn't wake up for this update.
select {
case <-doneCh:
t.Fatalf("Blocking query should not have completed")
case <-time.After(1 * time.Second):
}

// Now update the first key's payload.
{
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo/test",
Value: []byte("updated"),
},
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}

// Make sure the blocking query wakes up for the final update.
select {
case dirent := <-doneCh:
if dirent.Index <= index {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 1 {
t.Fatalf("Bad: %v", dirent)
}
d := dirent.Entries[0]
if string(d.Value) != "updated" {
t.Fatalf("bad: %v", d)
}
case <-time.After(1 * time.Second):
t.Fatalf("Blocking query should have completed")
}
}

var testListRules = `
key "" {
policy = "deny"
Expand Down
6 changes: 3 additions & 3 deletions consul/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type StateStore struct {
tableWatches map[string]*FullTableWatch

// kvsWatch holds the special prefix watch for the key value store.
kvsWatch *PrefixWatch
kvsWatch *PrefixWatchManager

// kvsGraveyard manages tombstones for the key value store.
kvsGraveyard *Graveyard
Expand Down Expand Up @@ -110,7 +110,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
schema: schema,
db: db,
tableWatches: tableWatches,
kvsWatch: NewPrefixWatch(),
kvsWatch: NewPrefixWatchManager(),
kvsGraveyard: NewGraveyard(gc),
lockDelay: NewDelay(),
}
Expand Down Expand Up @@ -448,7 +448,7 @@ func (s *StateStore) GetQueryWatch(method string) Watch {

// GetKVSWatch returns a watch for the given prefix in the key value store.
func (s *StateStore) GetKVSWatch(prefix string) Watch {
return s.kvsWatch.GetSubwatch(prefix)
return s.kvsWatch.NewPrefixWatch(prefix)
}

// EnsureRegistration is used to make sure a node, service, and check
Expand Down
70 changes: 56 additions & 14 deletions consul/state/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,47 +80,89 @@ func (d *DumbWatchManager) Notify() {
}
}

// PrefixWatch maintains a notify group for each prefix, allowing for much more
// fine-grained watches.
// PrefixWatch provides a Watch-compatible interface for a PrefixWatchManager,
// bound to a specific prefix.
type PrefixWatch struct {
// manager is the underlying watch manager.
manager *PrefixWatchManager

// prefix is the prefix we are watching.
prefix string
}

// Wait registers the given channel with the notify group for our prefix.
func (w *PrefixWatch) Wait(notifyCh chan struct{}) {
w.manager.Wait(w.prefix, notifyCh)
}

// Clear deregisters the given channel from the the notify group for our prefix.
func (w *PrefixWatch) Clear(notifyCh chan struct{}) {
w.manager.Clear(w.prefix, notifyCh)
}

// PrefixWatchManager maintains a notify group for each prefix, allowing for
// much more fine-grained watches.
type PrefixWatchManager struct {
// watches has the set of notify groups, organized by prefix.
watches *radix.Tree

// lock protects the watches tree.
lock sync.Mutex
}

// NewPrefixWatch returns a new prefix watch.
func NewPrefixWatch() *PrefixWatch {
return &PrefixWatch{
// NewPrefixWatchManager returns a new prefix watch manager.
func NewPrefixWatchManager() *PrefixWatchManager {
return &PrefixWatchManager{
watches: radix.New(),
}
}

// GetSubwatch returns the notify group for the given prefix.
func (w *PrefixWatch) GetSubwatch(prefix string) *NotifyGroup {
// NewPrefixWatch returns a Watch-compatible interface for watching the given
// prefix.
func (w *PrefixWatchManager) NewPrefixWatch(prefix string) Watch {
return &PrefixWatch{
manager: w,
prefix: prefix,
}
}

// Wait registers the given channel on a prefix.
func (w *PrefixWatchManager) Wait(prefix string, notifyCh chan struct{}) {
w.lock.Lock()
defer w.lock.Unlock()

var group *NotifyGroup
if raw, ok := w.watches.Get(prefix); ok {
return raw.(*NotifyGroup)
group = raw.(*NotifyGroup)
} else {
group = &NotifyGroup{}
w.watches.Insert(prefix, group)
}
group.Wait(notifyCh)
}

group := &NotifyGroup{}
w.watches.Insert(prefix, group)
return group
// Clear deregisters the given channel from the notify group for a prefix (if
// one exists).
func (w *PrefixWatchManager) Clear(prefix string, notifyCh chan struct{}) {
w.lock.Lock()
defer w.lock.Unlock()

if raw, ok := w.watches.Get(prefix); ok {
group := raw.(*NotifyGroup)
group.Clear(notifyCh)
}
}

// Notify wakes up all the watchers associated with the given prefix. If subtree
// is true then we will also notify all the tree under the prefix, such as when
// a key is being deleted.
func (w *PrefixWatch) Notify(prefix string, subtree bool) {
func (w *PrefixWatchManager) Notify(prefix string, subtree bool) {
w.lock.Lock()
defer w.lock.Unlock()

var cleanup []string
fn := func(k string, v interface{}) bool {
group := v.(*NotifyGroup)
fn := func(k string, raw interface{}) bool {
group := raw.(*NotifyGroup)
group.Notify()
if k != "" {
cleanup = append(cleanup, k)
Expand Down
Loading

0 comments on commit 01da5a2

Please sign in to comment.