Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] Avoid returning empty data on startup of a non-leader server #4554

Merged
merged 5 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 53 additions & 11 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,39 +1467,81 @@ func TestCatalog_ListServices_Timeout(t *testing.T) {

func TestCatalog_ListServices_Stale(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1" // Enable ACLs!
c.Bootstrap = false // Disable bootstrap
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()

args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
args.AllowStale = true
var out structs.IndexedServices

// Inject a fake service
if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
// Inject a node
if err := s1.fsm.State().EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {

codec := rpcClient(t, s2)
defer codec.Close()

// Run the query, do not wait for leader, never any contact with leader, should fail
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out)
}

// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
waitForLeader(s1, s2)

testrpc.WaitForLeader(t, s2.RPC, "dc1")
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}

// Run the query, do not wait for leader!
// Should find the services
if len(out.Services) != 1 {
t.Fatalf("bad: %#v", out.Services)
}

if !out.KnownLeader {
t.Fatalf("should have a leader: %v", out)
}

s1.Leave()
s1.Shutdown()

testrpc.WaitUntilNoLeader(t, s2.RPC, "dc1")

args.AllowStale = false
// Since the leader is now down, non-stale query should fail now
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err == nil || err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("expected %v but got err: %v and %v", structs.ErrNoLeader, err, out)
}

// With stale, request should still work
args.AllowStale = true
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}

// Should find the service
// Should find old service
if len(out.Services) != 1 {
t.Fatalf("bad: %v", out)
t.Fatalf("bad: %#v", out)
}

// Should not have a leader! Stale read
if out.KnownLeader {
t.Fatalf("bad: %v", out)
t.Fatalf("should not have a leader anymore: %#v", out)
}
}

Expand Down
4 changes: 2 additions & 2 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
return true, err
}

// Check if we can allow a stale read
if info.IsRead() && info.AllowStaleRead() {
// Check if we can allow a stale read, ensure our local DB is initialized
if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
return false, nil
}

Expand Down
42 changes: 42 additions & 0 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,48 @@ func TestRPC_NoLeader_Fail(t *testing.T) {
}
}

func TestRPC_NoLeader_Fail_on_stale_read(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.RPCHoldTimeout = 1 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
var out struct{}

// Make sure we eventually fail with a no leader error, which we should
// see given the short timeout.
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err == nil || err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("bad: %v", err)
}

// Until leader has never been known, stale should fail
getKeysReq := structs.KeyListRequest{
Datacenter: "dc1",
Prefix: "",
Seperator: "/",
QueryOptions: structs.QueryOptions{AllowStale: true},
}
var keyList structs.IndexedKeyList
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("expected %v but got err: %v", structs.ErrNoLeader, err)
}

testrpc.WaitForLeader(t, s1.RPC, "dc1")
if err := msgpackrpc.CallWithCodec(codec, "KVS.ListKeys", &getKeysReq, &keyList); err != nil {
t.Fatalf("Did not expect any error but got err: %v", err)
}
}

func TestRPC_NoLeader_Retry(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
Expand Down
14 changes: 14 additions & 0 deletions testrpc/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ func WaitForLeader(t *testing.T, rpc rpcFn, dc string) {
})
}

// WaitUntilNoLeader ensures no leader is present, useful for testing lost leadership.
func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string) {
var out structs.IndexedNodes
retry.Run(t, func(r *retry.R) {
args := &structs.DCSpecificRequest{Datacenter: dc}
if err := rpc("Catalog.ListNodes", args, &out); err == nil {
r.Fatalf("It still has a leader: %#q", out)
}
if out.QueryMeta.KnownLeader {
r.Fatalf("Has still a leader")
}
})
}

// WaitForTestAgent ensures we have a node with serfHealth check registered
func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string) {
var nodes structs.IndexedNodes
Expand Down