Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUGFIX #7374 When a service does not exists in an alias, consider it failing #7384

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 154 additions & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -584,7 +585,13 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st

services := make([]*structs.ServiceDefinition, numServices)
checkIDs := make([]types.CheckID, numServices)
for i := 0; i < numServices; i++ {
services[0] = &structs.ServiceDefinition{
ID: "fake",
Name: "fake",
Port: 8080,
Checks: []*structs.CheckType{},
}
for i := 1; i < numServices; i++ {
name := fmt.Sprintf("web-%d", i)

services[i] = &structs.ServiceDefinition{
Expand Down Expand Up @@ -621,6 +628,152 @@ func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL st
})
}

func test_createAlias(t *testing.T, agent *TestAgent, chk *structs.CheckType, expectedResult string) func(r *retry.R) {
t.Helper()
serviceNum := rand.Int()
srv := &structs.NodeService{
Service: fmt.Sprintf("serviceAlias-%d", serviceNum),
Tags: []string{"tag1"},
Port: 8900 + serviceNum,
}
if srv.ID == "" {
srv.ID = fmt.Sprintf("serviceAlias-%d", serviceNum)
}
chk.Status = api.HealthWarning
if chk.CheckID == "" {
chk.CheckID = types.CheckID(fmt.Sprintf("check-%d", serviceNum))
}
err := agent.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal)
assert.NoError(t, err)
return func(r *retry.R) {
t.Helper()
found := false
for _, c := range agent.State.CheckStates(structs.WildcardEnterpriseMeta()) {
if c.Check.CheckID == chk.CheckID {
found = true
assert.Equal(t, expectedResult, c.Check.Status, "Check state should be %s, was %s in %#v", expectedResult, c.Check.Status, c.Check)
var srvID structs.ServiceID
srvID.Init(srv.ID, structs.WildcardEnterpriseMeta())
if err := agent.Agent.State.RemoveService(structs.ServiceID(srvID)); err != nil {
fmt.Println("[DEBUG] Fail to remove service", srvID, ", err:=", err)
}
fmt.Println("[DEBUG] Service Removed", srvID, ", err:=", err)
break
}
}
assert.True(t, found)
}
}

// TestAgent_CheckAliasRPC test the Alias Check to be properly sync remotely
// and locally.
// It contains a few hacks such as unlockIndexOnNode because watch performed
// in CheckAlias.runQuery() waits for 1 min, so Shutdoww the agent might take time
// So, we ensure the agent will update regularilly the index
func TestAgent_CheckAliasRPC(t *testing.T) {
t.Helper()

a := NewTestAgent(t, `
node_name = "node1"
`)

srv := &structs.NodeService{
ID: "svcid1",
Service: "svcname1",
Tags: []string{"tag1"},
Port: 8100,
}
unlockIndexOnNode := func() {
// We ensure to not block and update Agent's index
srv.Tags = []string{fmt.Sprintf("tag-%s", time.Now())}
assert.NoError(t, a.waitForUp())
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal)
assert.NoError(t, err)
}
shutdownAgent := func() {
// This is to be sure Alias Checks on remote won't be blocked during 1 min
unlockIndexOnNode()
fmt.Println("[DEBUG] STARTING shutdown for TestAgent_CheckAliasRPC", time.Now())
go a.Shutdown()
unlockIndexOnNode()
fmt.Println("[DEBUG] DONE shutdown for TestAgent_CheckAliasRPC", time.Now())
}
defer shutdownAgent()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

assert.NoError(t, a.waitForUp())
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceLocal)
assert.NoError(t, err)

retry.Run(t, func(r *retry.R) {
t.Helper()
var args structs.NodeSpecificRequest
args.Datacenter = "dc1"
args.Node = "node1"
args.AllowStale = true
var out structs.IndexedNodeServices
err := a.RPC("Catalog.NodeServices", &args, &out)
assert.NoError(r, err)
foundService := false
var lookup structs.ServiceID
lookup.Init("svcid1", structs.WildcardEnterpriseMeta())
for _, srv := range out.NodeServices.Services {
sid := srv.CompoundServiceID()
if lookup.Matches(&sid) {
foundService = true
}
}
assert.True(r, foundService, "could not find svcid1 in %#v", out.NodeServices.Services)
})

checks := make([](func(*retry.R)), 0)

checks = append(checks, test_createAlias(t, a, &structs.CheckType{
Name: "Check_Local_Ok",
AliasService: "svcid1",
}, api.HealthPassing))

checks = append(checks, test_createAlias(t, a, &structs.CheckType{
Name: "Check_Local_Fail",
AliasService: "svcidNoExistingID",
}, api.HealthCritical))

checks = append(checks, test_createAlias(t, a, &structs.CheckType{
Name: "Check_Remote_Host_Ok",
AliasNode: "node1",
AliasService: "svcid1",
}, api.HealthPassing))

checks = append(checks, test_createAlias(t, a, &structs.CheckType{
Name: "Check_Remote_Host_Non_Existing_Service",
AliasNode: "node1",
AliasService: "svcidNoExistingID",
}, api.HealthCritical))

// We wait for max 5s for all checks to be in sync
{
for i := 0; i < 50; i++ {
unlockIndexOnNode()
allNonWarning := true
for _, chk := range a.State.Checks(structs.WildcardEnterpriseMeta()) {
if chk.Status == api.HealthWarning {
allNonWarning = false
}
}
if allNonWarning {
break
} else {
time.Sleep(100 * time.Millisecond)
}
}
}

for _, toRun := range checks {
unlockIndexOnNode()
retry.Run(t, toRun)
}
}

func TestAgent_AddServiceNoExec(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
Expand Down
81 changes: 64 additions & 17 deletions agent/checks/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) {
for _, chk := range checks {
checksList = append(checksList, chk)
}

c.processChecks(checksList)
c.processChecks(checksList, func(serviceID *structs.ServiceID) bool {
return c.Notify.ServiceExists(*serviceID)
})
extendRefreshTimer()
}

Expand All @@ -134,12 +135,44 @@ func (c *CheckAlias) runLocal(stopCh chan struct{}) {
}
}

// CheckIfServiceIDExists is used to determine if a service exists
type CheckIfServiceIDExists func(*structs.ServiceID) bool

func (c *CheckAlias) checkServiceExistsOnRemoteServer(serviceID *structs.ServiceID) (bool, error) {
args := c.RPCReq
args.Node = c.Node
args.AllowStale = true
args.EnterpriseMeta = c.EnterpriseMeta
// We are late at maximum of 15s compared to leader
args.MaxStaleDuration = time.Duration(15 * time.Second)
attempts := 0
RETRY_CALL:
var out structs.IndexedNodeServices
attempts++
if err := c.RPC.RPC("Catalog.NodeServices", &args, &out); err != nil {
if attempts <= 3 {
time.Sleep(time.Duration(attempts) * time.Second)
goto RETRY_CALL
}
return false, err
}
for _, srv := range out.NodeServices.Services {
sid := srv.CompoundServiceID()
if serviceID.Matches(&sid) {
return true, nil
}
}
return false, nil
}

func (c *CheckAlias) runQuery(stopCh chan struct{}) {
args := c.RPCReq
args.Node = c.Node
args.AllowStale = true
args.MaxQueryTime = 1 * time.Minute
args.EnterpriseMeta = c.EnterpriseMeta
// We are late at maximum of 15s compared to leader
args.MaxStaleDuration = time.Duration(15 * time.Second)

var attempt uint
for {
Expand Down Expand Up @@ -173,6 +206,7 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) {
// but for blocking queries isn't that much more efficient since the checks
// index is global to the cluster.
var out structs.IndexedHealthChecks

if err := c.RPC.RPC("Health.NodeChecks", &args, &out); err != nil {
attempt++
if attempt > 1 {
Expand All @@ -195,29 +229,37 @@ func (c *CheckAlias) runQuery(stopCh chan struct{}) {
if args.MinQueryIndex < 1 {
args.MinQueryIndex = 1
}

c.processChecks(out.HealthChecks)
c.processChecks(out.HealthChecks, func(serviceID *structs.ServiceID) bool {
ret, err := c.checkServiceExistsOnRemoteServer(serviceID)
if err != nil {
// We cannot determine if node has the check, let's assume it exists
return true
}
return ret
})
}
}

// processChecks is a common helper for taking a set of health checks and
// using them to update our alias. This is abstracted since the checks can
// come from both the remote server as well as local state.
func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) {
func (c *CheckAlias) processChecks(checks []*structs.HealthCheck, CheckIfServiceIDExists CheckIfServiceIDExists) {
health := api.HealthPassing
msg := "No checks found."
serviceFound := false
for _, chk := range checks {
if c.Node != "" && chk.Node != c.Node {
if c.Node != "" && c.Node != chk.Node {
continue
}

// We allow ServiceID == "" so that we also check node checks
sid := chk.CompoundServiceID()

if chk.ServiceID != "" && !c.ServiceID.Matches(&sid) {
serviceMatch := c.ServiceID.Matches(&sid)
if chk.ServiceID != "" && !serviceMatch {
continue
}

// We have at least one healthcheck for this service
if serviceMatch {
serviceFound = true
}
if chk.Status == api.HealthCritical || chk.Status == api.HealthWarning {
health = chk.Status
msg = fmt.Sprintf("Aliased check %q failing: %s", chk.Name, chk.Output)
Expand All @@ -228,13 +270,18 @@ func (c *CheckAlias) processChecks(checks []*structs.HealthCheck) {
if chk.Status == api.HealthCritical {
break
}
} else {
// if current health is warning, don't overwrite it
if health == api.HealthPassing {
msg = "All checks passing."
}
}
}
if !serviceFound {
if !CheckIfServiceIDExists(&c.ServiceID) {
msg = fmt.Sprintf("Service %s could not be found on node %s", c.ServiceID.ID, c.Node)
health = api.HealthCritical
}

msg = "All checks passing."
}

// TODO(rb): if no matching checks found should this default to critical?

// Update our check value
c.Notify.UpdateCheck(c.CheckID, health, msg)
}
Loading