From 9813ae512bc0a605dde6d97630af0c444b88a188 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Thu, 4 Jun 2020 14:50:52 +0200 Subject: [PATCH] checks: when a service does not exists in an alias, consider it failing (#7384) In current implementation of Consul, check alias cannot determine if a service exists or not. Because a service without any check is semantically considered as passing, so when no healthchecks are found for an agent, the check was considered as passing. But this make little sense as the current implementation does not make any difference between: * a non-existing service (passing) * a service without any check (passing as well) In order to make it work, we have to ensure that when a check did not find any healthcheck, the service does indeed exists. If it does not, lets consider the check as failing. --- agent/agent_test.go | 155 ++++++++++++++++++++++++++++++++- agent/checks/alias.go | 81 +++++++++++++---- agent/checks/alias_test.go | 173 ++++++++++++++++++++++++++++++++++--- agent/checks/check.go | 2 + agent/local/state.go | 7 ++ agent/local/state_test.go | 2 + agent/mock/notify.go | 24 +++-- 7 files changed, 408 insertions(+), 36 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index e2dee448d814..d98a02f40641 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/rand" "net" "net/http" "net/http/httptest" @@ -583,7 +584,13 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st services := make([]*structs.ServiceDefinition, numServices) checkIDs := make([]types.CheckID, numServices) - for i := 0; i < numServices; i++ { + services[0] = &structs.ServiceDefinition{ + ID: "fake", + Name: "fake", + Port: 8080, + Checks: []*structs.CheckType{}, + } + for i := 1; i < numServices; i++ { name := fmt.Sprintf("web-%d", i) services[i] = &structs.ServiceDefinition{ @@ -620,6 +627,152 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st }) } +func test_createAlias(t *testing.T, agent *TestAgent, chk *structs.CheckType, expectedResult string) func(r *retry.R) { + t.Helper() + serviceNum := rand.Int() + srv := &structs.NodeService{ + Service: fmt.Sprintf("serviceAlias-%d", serviceNum), + Tags: []string{"tag1"}, + Port: 8900 + serviceNum, + } + if srv.ID == "" { + srv.ID = fmt.Sprintf("serviceAlias-%d", serviceNum) + } + chk.Status = api.HealthWarning + if chk.CheckID == "" { + chk.CheckID = types.CheckID(fmt.Sprintf("check-%d", serviceNum)) + } + err := agent.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal) + assert.NoError(t, err) + return func(r *retry.R) { + t.Helper() + found := false + for _, c := range agent.State.CheckStates(structs.WildcardEnterpriseMeta()) { + if c.Check.CheckID == chk.CheckID { + found = true + assert.Equal(t, expectedResult, c.Check.Status, "Check state should be %s, was %s in %#v", expectedResult, c.Check.Status, c.Check) + var srvID structs.ServiceID + srvID.Init(srv.ID, structs.WildcardEnterpriseMeta()) + if err := agent.Agent.State.RemoveService(structs.ServiceID(srvID)); err != nil { + fmt.Println("[DEBUG] Fail to remove service", srvID, ", err:=", err) + } + fmt.Println("[DEBUG] Service Removed", srvID, ", err:=", err) + break + } + } + assert.True(t, found) + } +} + +// TestAgent_CheckAliasRPC test the Alias Check to be properly sync remotely +// and locally. +// It contains a few hacks such as unlockIndexOnNode because watch performed +// in CheckAlias.runQuery() waits for 1 min, so Shutdoww the agent might take time +// So, we ensure the agent will update regularilly the index +func TestAgent_CheckAliasRPC(t *testing.T) { + t.Helper() + + a := NewTestAgent(t, ` + node_name = "node1" + `) + + srv := &structs.NodeService{ + ID: "svcid1", + Service: "svcname1", + Tags: []string{"tag1"}, + Port: 8100, + } + unlockIndexOnNode := func() { + // We ensure to not block and update Agent's index + srv.Tags = []string{fmt.Sprintf("tag-%s", time.Now())} + assert.NoError(t, a.waitForUp()) + err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal) + assert.NoError(t, err) + } + shutdownAgent := func() { + // This is to be sure Alias Checks on remote won't be blocked during 1 min + unlockIndexOnNode() + fmt.Println("[DEBUG] STARTING shutdown for TestAgent_CheckAliasRPC", time.Now()) + go a.Shutdown() + unlockIndexOnNode() + fmt.Println("[DEBUG] DONE shutdown for TestAgent_CheckAliasRPC", time.Now()) + } + defer shutdownAgent() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + assert.NoError(t, a.waitForUp()) + err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal) + assert.NoError(t, err) + + retry.Run(t, func(r *retry.R) { + t.Helper() + var args structs.NodeSpecificRequest + args.Datacenter = "dc1" + args.Node = "node1" + args.AllowStale = true + var out structs.IndexedNodeServices + err := a.RPC("Catalog.NodeServices", &args, &out) + assert.NoError(r, err) + foundService := false + var lookup structs.ServiceID + lookup.Init("svcid1", structs.WildcardEnterpriseMeta()) + for _, srv := range out.NodeServices.Services { + sid := srv.CompoundServiceID() + if lookup.Matches(&sid) { + foundService = true + } + } + assert.True(r, foundService, "could not find svcid1 in %#v", out.NodeServices.Services) + }) + + checks := make([](func(*retry.R)), 0) + + checks = append(checks, test_createAlias(t, a, &structs.CheckType{ + Name: "Check_Local_Ok", + AliasService: "svcid1", + }, api.HealthPassing)) + + checks = append(checks, test_createAlias(t, a, &structs.CheckType{ + Name: "Check_Local_Fail", + AliasService: "svcidNoExistingID", + }, api.HealthCritical)) + + checks = append(checks, test_createAlias(t, a, &structs.CheckType{ + Name: "Check_Remote_Host_Ok", + AliasNode: "node1", + AliasService: "svcid1", + }, api.HealthPassing)) + + checks = append(checks, test_createAlias(t, a, &structs.CheckType{ + Name: "Check_Remote_Host_Non_Existing_Service", + AliasNode: "node1", + AliasService: "svcidNoExistingID", + }, api.HealthCritical)) + + // We wait for max 5s for all checks to be in sync + { + for i := 0; i < 50; i++ { + unlockIndexOnNode() + allNonWarning := true + for _, chk := range a.State.Checks(structs.WildcardEnterpriseMeta()) { + if chk.Status == api.HealthWarning { + allNonWarning = false + } + } + if allNonWarning { + break + } else { + time.Sleep(100 * time.Millisecond) + } + } + } + + for _, toRun := range checks { + unlockIndexOnNode() + retry.Run(t, toRun) + } +} + func TestAgent_AddServiceNoExec(t *testing.T) { t.Run("normal", func(t *testing.T) { t.Parallel() diff --git a/agent/checks/alias.go b/agent/checks/alias.go index 2ecc39ebb2bb..0ef7c0c4796c 100644 --- a/agent/checks/alias.go +++ b/agent/checks/alias.go @@ -114,8 +114,9 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) { for _, chk := range checks { checksList = append(checksList, chk) } - - c.processChecks(checksList) + c.processChecks(checksList, func(serviceID *structs.ServiceID) bool { + return c.Notify.ServiceExists(*serviceID) + }) extendRefreshTimer() } @@ -134,12 +135,44 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) { } } +// CheckIfServiceIDExists is used to determine if a service exists +type CheckIfServiceIDExists func(*structs.ServiceID) bool + +func (c *CheckAlias) checkServiceExistsOnRemoteServer(serviceID *structs.ServiceID) (bool, error) { + args := c.RPCReq + args.Node = c.Node + args.AllowStale = true + args.EnterpriseMeta = c.EnterpriseMeta + // We are late at maximum of 15s compared to leader + args.MaxStaleDuration = time.Duration(15 * time.Second) + attempts := 0 +RETRY_CALL: + var out structs.IndexedNodeServices + attempts++ + if err := c.RPC.RPC("Catalog.NodeServices", &args, &out); err != nil { + if attempts <= 3 { + time.Sleep(time.Duration(attempts) * time.Second) + goto RETRY_CALL + } + return false, err + } + for _, srv := range out.NodeServices.Services { + sid := srv.CompoundServiceID() + if serviceID.Matches(&sid) { + return true, nil + } + } + return false, nil +} + func (c *CheckAlias) runQuery(stopCh chan struct{}) { args := c.RPCReq args.Node = c.Node args.AllowStale = true args.MaxQueryTime = 1 * time.Minute args.EnterpriseMeta = c.EnterpriseMeta + // We are late at maximum of 15s compared to leader + args.MaxStaleDuration = time.Duration(15 * time.Second) var attempt uint for { @@ -173,6 +206,7 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) { // but for blocking queries isn't that much more efficient since the checks // index is global to the cluster. var out structs.IndexedHealthChecks + if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil { attempt++ if attempt > 1 { @@ -195,29 +229,37 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) { if args.MinQueryIndex < 1 { args.MinQueryIndex = 1 } - - c.processChecks(out.HealthChecks) + c.processChecks(out.HealthChecks, func(serviceID *structs.ServiceID) bool { + ret, err := c.checkServiceExistsOnRemoteServer(serviceID) + if err != nil { + // We cannot determine if node has the check, let's assume it exists + return true + } + return ret + }) } } // processChecks is a common helper for taking a set of health checks and // using them to update our alias. This is abstracted since the checks can // come from both the remote server as well as local state. -func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { +func (c *CheckAlias) processChecks(checks []*structs.HealthCheck, CheckIfServiceIDExists CheckIfServiceIDExists) { health := api.HealthPassing msg := "No checks found." + serviceFound := false for _, chk := range checks { - if c.Node != "" && chk.Node != c.Node { + if c.Node != "" && c.Node != chk.Node { continue } - - // We allow ServiceID == "" so that we also check node checks sid := chk.CompoundServiceID() - - if chk.ServiceID != "" && !c.ServiceID.Matches(&sid) { + serviceMatch := c.ServiceID.Matches(&sid) + if chk.ServiceID != "" && !serviceMatch { continue } - + // We have at least one healthcheck for this service + if serviceMatch { + serviceFound = true + } if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning { health = chk.Status msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output) @@ -228,13 +270,18 @@ func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) { if chk.Status == api.HealthCritical { break } + } else { + // if current health is warning, don't overwrite it + if health == api.HealthPassing { + msg = "All checks passing." + } + } + } + if !serviceFound { + if !CheckIfServiceIDExists(&c.ServiceID) { + msg = fmt.Sprintf("Service %s could not be found on node %s", c.ServiceID.ID, c.Node) + health = api.HealthCritical } - - msg = "All checks passing." } - - // TODO(rb): if no matching checks found should this default to critical? - - // Update our check value c.Notify.UpdateCheck(c.CheckID, health, msg) } diff --git a/agent/checks/alias_test.go b/agent/checks/alias_test.go index 4992c098a84f..24147b2c1924 100644 --- a/agent/checks/alias_test.go +++ b/agent/checks/alias_test.go @@ -30,7 +30,7 @@ func TestCheckAlias_remoteErrBackoff(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(fmt.Errorf("failure")) + rpc.AddReply("Health.NodeChecks", fmt.Errorf("failure")) chk.Start() defer chk.Stop() @@ -62,7 +62,7 @@ func TestCheckAlias_remoteNoChecks(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{}) + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{}) chk.Start() defer chk.Stop() @@ -88,7 +88,7 @@ func TestCheckAlias_remoteNodeFailure(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -137,7 +137,7 @@ func TestCheckAlias_remotePassing(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -171,6 +171,116 @@ func TestCheckAlias_remotePassing(t *testing.T) { }) } +// Remote service has no healtchecks, but service exists on remote host +func TestCheckAlias_remotePassingWithoutChecksButWithService(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := structs.NewCheckID("foo", nil) + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: structs.ServiceID{ID: "web"}, + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + }, + }) + + injected := structs.IndexedNodeServices{ + NodeServices: &structs.NodeServices{ + Node: &structs.Node{ + Node: "remote", + }, + Services: make(map[string]*structs.NodeService), + }, + QueryMeta: structs.QueryMeta{}, + } + injected.NodeServices.Services["web"] = &structs.NodeService{ + Service: "web", + ID: "web", + } + rpc.AddReply("Catalog.NodeServices", injected) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthPassing; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + +// Remote service has no healtchecks, service does not exists on remote host +func TestCheckAlias_remotePassingWithoutChecksAndWithoutService(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := structs.NewCheckID("foo", nil) + rpc := &mockRPC{} + chk := &CheckAlias{ + Node: "remote", + ServiceID: structs.ServiceID{ID: "web"}, + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ + HealthChecks: []*structs.HealthCheck{ + // Should ignore non-matching node + &structs.HealthCheck{ + Node: "A", + ServiceID: "web", + Status: api.HealthCritical, + }, + + // Should ignore non-matching service + &structs.HealthCheck{ + Node: "remote", + ServiceID: "db", + Status: api.HealthCritical, + }, + }, + }) + + injected := structs.IndexedNodeServices{ + NodeServices: &structs.NodeServices{ + Node: &structs.Node{ + Node: "remote", + }, + Services: make(map[string]*structs.NodeService), + }, + QueryMeta: structs.QueryMeta{}, + } + rpc.AddReply("Catalog.NodeServices", injected) + + chk.Start() + defer chk.Stop() + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} + // If any checks are critical, it should be critical func TestCheckAlias_remoteCritical(t *testing.T) { t.Parallel() @@ -186,7 +296,7 @@ func TestCheckAlias_remoteCritical(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -241,7 +351,7 @@ func TestCheckAlias_remoteWarning(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -295,7 +405,7 @@ func TestCheckAlias_remoteNodeOnlyPassing(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -342,7 +452,7 @@ func TestCheckAlias_remoteNodeOnlyCritical(t *testing.T) { RPC: rpc, } - rpc.Reply.Store(structs.IndexedHealthChecks{ + rpc.AddReply("Health.NodeChecks", structs.IndexedHealthChecks{ HealthChecks: []*structs.HealthCheck{ // Should ignore non-matching node &structs.HealthCheck{ @@ -403,9 +513,19 @@ type mockRPC struct { Calls uint32 // Read-only, number of RPC calls Args atomic.Value // Read-only, the last args sent - // Write-only, the reply to send. If of type "error" then an error will + // Write-only, the replies to send, indexed per method. If of type "error" then an error will // be returned from the RPC call. - Reply atomic.Value + Replies map[string]*atomic.Value +} + +func (m *mockRPC) AddReply(method string, reply interface{}) { + if m.Replies == nil { + m.Replies = make(map[string]*atomic.Value) + } + val := &atomic.Value{} + val.Store(reply) + m.Replies[method] = val + } func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error { @@ -424,12 +544,15 @@ func (m *mockRPC) RPC(method string, args interface{}, reply interface{}) error } replyv = replyv.Elem() // Get pointer value replyv.Set(reflect.Zero(replyv.Type())) // Reset to zero value - if v := m.Reply.Load(); v != nil { + repl := m.Replies[method] + if repl == nil { + return fmt.Errorf("No Such Method: %s", method) + } + if v := m.Replies[method].Load(); v != nil { // Return an error if the reply is an error type if err, ok := v.(error); ok { return err } - replyv.Set(reflect.ValueOf(v)) // Set to reply value if non-nil } @@ -442,6 +565,8 @@ func TestCheckAlias_localInitialStatus(t *testing.T) { t.Parallel() notify := newMockAliasNotify() + // We fake a local service web to ensure check if passing works + notify.Notify.AddServiceID(structs.ServiceID{ID: "web"}) chkID := structs.NewCheckID(types.CheckID("foo"), nil) rpc := &mockRPC{} chk := &CheckAlias{ @@ -463,3 +588,27 @@ func TestCheckAlias_localInitialStatus(t *testing.T) { } }) } + +// Local check on non-existing service +func TestCheckAlias_localInitialStatusShouldFailBecauseNoService(t *testing.T) { + t.Parallel() + + notify := newMockAliasNotify() + chkID := structs.NewCheckID(types.CheckID("foo"), nil) + rpc := &mockRPC{} + chk := &CheckAlias{ + ServiceID: structs.ServiceID{ID: "web"}, + CheckID: chkID, + Notify: notify, + RPC: rpc, + } + + chk.Start() + defer chk.Stop() + + retry.Run(t, func(r *retry.R) { + if got, want := notify.State(chkID), api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) +} diff --git a/agent/checks/check.go b/agent/checks/check.go index 9c04ca423389..1bebeffa9853 100644 --- a/agent/checks/check.go +++ b/agent/checks/check.go @@ -52,6 +52,8 @@ type RPC interface { // should take care to be idempotent. type CheckNotifier interface { UpdateCheck(checkID structs.CheckID, status, output string) + // ServiceExists return true if the given service does exists + ServiceExists(serviceID structs.ServiceID) bool } // CheckMonitor is used to periodically invoke a script to diff --git a/agent/local/state.go b/agent/local/state.go index 460009a0d6bc..793cab3f0a79 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -485,6 +485,13 @@ func (l *State) AddAliasCheck(checkID structs.CheckID, srcServiceID structs.Serv return nil } +// ServiceExists return true if the given service does exists +func (l *State) ServiceExists(serviceID structs.ServiceID) bool { + l.Lock() + defer l.Unlock() + return l.services[serviceID] != nil +} + // RemoveAliasCheck removes the mapping for the alias check. func (l *State) RemoveAliasCheck(checkID structs.CheckID, srcServiceID structs.ServiceID) { l.Lock() diff --git a/agent/local/state_test.go b/agent/local/state_test.go index f73757b071bb..872876d8de8a 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -48,7 +48,9 @@ func TestAgentAntiEntropy_Services(t *testing.T) { }, EnterpriseMeta: *structs.DefaultEnterpriseMeta(), } + assert.False(t, a.State.ServiceExists(structs.ServiceID{ID: srv1.ID})) a.State.AddService(srv1, "") + assert.True(t, a.State.ServiceExists(structs.ServiceID{ID: srv1.ID})) args.Service = srv1 if err := a.RPC("Catalog.Register", args, &out); err != nil { t.Fatalf("err: %v", err) diff --git a/agent/mock/notify.go b/agent/mock/notify.go index 866786c5a1fe..ccbee20c4ce1 100644 --- a/agent/mock/notify.go +++ b/agent/mock/notify.go @@ -14,19 +14,31 @@ type Notify struct { // of the notification mock in order to prevent panics // raised by the race conditions detector. sync.RWMutex - state map[structs.CheckID]string - updates map[structs.CheckID]int - output map[structs.CheckID]string + state map[structs.CheckID]string + updates map[structs.CheckID]int + output map[structs.CheckID]string + serviceIDs map[structs.ServiceID]bool } func NewNotify() *Notify { return &Notify{ - state: make(map[structs.CheckID]string), - updates: make(map[structs.CheckID]int), - output: make(map[structs.CheckID]string), + state: make(map[structs.CheckID]string), + updates: make(map[structs.CheckID]int), + output: make(map[structs.CheckID]string), + serviceIDs: make(map[structs.ServiceID]bool), } } +// ServiceExists mock +func (c *Notify) ServiceExists(serviceID structs.ServiceID) bool { + return c.serviceIDs[serviceID] +} + +// AddServiceID will mock a service being present locally +func (c *Notify) AddServiceID(serviceID structs.ServiceID) { + c.serviceIDs[serviceID] = true +} + func NewNotifyChan() (*Notify, chan int) { n := &Notify{ updated: make(chan int),