Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize performance of KV watchers #578

Merged
merged 5 commits into from
Jan 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions consul/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er

// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSGet"),
func() error {
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
kvWatch: true,
kvPrefix: args.Key,
run: func() error {
index, ent, err := state.KVSGet(args.Key)
if err != nil {
return err
Expand All @@ -115,7 +117,9 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
reply.Entries = structs.DirEntries{ent}
}
return nil
})
},
}
return k.srv.blockingRPCOpt(&opts)
}

// List is used to list all keys with a given prefix
Expand All @@ -131,10 +135,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e

// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSList"),
func() error {
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
kvWatch: true,
kvPrefix: args.Key,
run: func() error {
tombIndex, index, ent, err := state.KVSList(args.Key)
if err != nil {
return err
Expand Down Expand Up @@ -166,7 +172,9 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
reply.Entries = ent
}
return nil
})
},
}
return k.srv.blockingRPCOpt(&opts)
}

// ListKeys is used to list all keys with a given prefix to a seperator
Expand All @@ -182,16 +190,21 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi

// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSListKeys"),
func() error {
opts := blockingRPCOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
kvWatch: true,
kvPrefix: args.Prefix,
run: func() error {
index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator)
reply.Index = index
if acl != nil {
keys = FilterKeys(acl, keys)
}
reply.Keys = keys
return err
})

},
}
return k.srv.blockingRPCOpt(&opts)
}
95 changes: 95 additions & 0 deletions consul/kvs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,101 @@ func TestKVSEndpoint_List(t *testing.T) {
}
}

func TestKVSEndpoint_List_Blocking(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()

testutil.WaitForLeader(t, client.Call, "dc1")

keys := []string{
"/test/key1",
"/test/key2",
"/test/sub/key3",
}

for _, key := range keys {
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: key,
Flags: 1,
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}

getR := structs.KeyRequest{
Datacenter: "dc1",
Key: "/test",
}
var dirent structs.IndexedDirEntries
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}

// Setup a blocking query
getR.MinQueryIndex = dirent.Index
getR.MaxQueryTime = time.Second

// Async cause a change
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
client := rpcClient(t, s1)
defer client.Close()
arg := structs.KVSRequest{
Datacenter: "dc1",
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "/test/sub/key3",
},
}
var out bool
if err := client.Call("KVS.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
}()

// Re-run the query
dirent = structs.IndexedDirEntries{}
if err := client.Call("KVS.List", &getR, &dirent); err != nil {
t.Fatalf("err: %v", err)
}

// Should block at least 100ms
if time.Now().Sub(start) < 100*time.Millisecond {
t.Fatalf("too fast")
}

if dirent.Index == 0 {
t.Fatalf("Bad: %v", dirent)
}
if len(dirent.Entries) != 2 {
for _, ent := range dirent.Entries {
t.Errorf("Bad: %#v", *ent)
}
}
for i := 0; i < len(dirent.Entries); i++ {
d := dirent.Entries[i]
if d.Key != keys[i] {
t.Fatalf("bad: %v", d)
}
if d.Flags != 1 {
t.Fatalf("bad: %v", d)
}
if d.Value != nil {
t.Fatalf("bad: %v", d)
}
}
}

func TestKVSEndpoint_List_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
Expand Down
55 changes: 41 additions & 14 deletions consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,57 +289,84 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
// minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(b *structs.QueryOptions, m *structs.QueryMeta,
tables MDBTables, run func() error) error {
opts := blockingRPCOptions{
queryOpts: b,
queryMeta: m,
tables: tables,
run: run,
}
return s.blockingRPCOpt(&opts)
}

// blockingRPCOptions is used to parameterize blockingRPCOpt since
// it takes so many options. It should be prefered over blockingRPC.
type blockingRPCOptions struct {
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
tables MDBTables
kvWatch bool
kvPrefix string
run func() error
}

// blockingRPCOpt is the replacement for blockingRPC as it allows
// for more parameterization easily. It should be prefered over blockingRPC.
func (s *Server) blockingRPCOpt(opts *blockingRPCOptions) error {
var timeout <-chan time.Time
var notifyCh chan struct{}

// Fast path non-blocking
if b.MinQueryIndex == 0 {
if opts.queryOpts.MinQueryIndex == 0 {
goto RUN_QUERY
}

// Sanity check that we have tables to block on
if len(tables) == 0 {
if len(opts.tables) == 0 && !opts.kvWatch {
panic("no tables to block on")
}

// Restrict the max query time
if b.MaxQueryTime > maxQueryTime {
b.MaxQueryTime = maxQueryTime
if opts.queryOpts.MaxQueryTime > maxQueryTime {
opts.queryOpts.MaxQueryTime = maxQueryTime
}

// Ensure a time limit is set if we have an index
if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 {
b.MaxQueryTime = maxQueryTime
if opts.queryOpts.MinQueryIndex > 0 && opts.queryOpts.MaxQueryTime == 0 {
opts.queryOpts.MaxQueryTime = maxQueryTime
}

// Setup a query timeout
if b.MaxQueryTime > 0 {
timeout = time.After(b.MaxQueryTime)
if opts.queryOpts.MaxQueryTime > 0 {
timeout = time.After(opts.queryOpts.MaxQueryTime)
}

// Setup a notification channel for changes
SETUP_NOTIFY:
if b.MinQueryIndex > 0 {
if opts.queryOpts.MinQueryIndex > 0 {
notifyCh = make(chan struct{}, 1)
s.fsm.State().Watch(tables, notifyCh)
state := s.fsm.State()
state.Watch(opts.tables, notifyCh)
if opts.kvWatch {
state.WatchKV(opts.kvPrefix, notifyCh)
}
}

RUN_QUERY:
// Update the query meta data
s.setQueryMeta(m)
s.setQueryMeta(opts.queryMeta)

// Check if query must be consistent
if b.RequireConsistent {
if opts.queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
}

// Run the query function
err := run()
err := opts.run()

// Check for minimum query time
if err == nil && m.Index > 0 && m.Index <= b.MinQueryIndex {
if err == nil && opts.queryMeta.Index > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto SETUP_NOTIFY
Expand Down
Loading