From a60f4adf9502b552b7965019b749382cf0e9ba7b Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Wed, 8 Apr 2015 12:20:34 -0700 Subject: [PATCH 1/4] agent: anti-entropy sync services/checks if they don't exist in the catalog --- command/agent/local.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/command/agent/local.go b/command/agent/local.go index 38b5ee1699a6..d5c6062f6074 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -316,6 +316,13 @@ func (l *localState) setSyncState() error { l.Lock() defer l.Unlock() + for id, _ := range l.services { + // If the local service doesn't exist remotely, then sync it + if _, ok := services.Services[id]; !ok { + l.serviceStatus[id] = syncStatus{inSync: false} + } + } + if services != nil { for id, service := range services.Services { // If we don't have the service locally, deregister it @@ -331,6 +338,16 @@ func (l *localState) setSyncState() error { } } + for id, _ := range l.checks { + // Sync any check which doesn't exist on the remote side + for _, check := range checks { + if check.CheckID == id { + continue + } + l.checkStatus[id] = syncStatus{inSync: false} + } + } + for _, check := range checks { // If we don't have the check locally, deregister it id := check.CheckID From f417279761e56b30833c87f96c2ef71501a347f5 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Wed, 8 Apr 2015 12:36:53 -0700 Subject: [PATCH 2/4] agent: test anti-entropy sync --- command/agent/local_test.go | 44 ++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/command/agent/local_test.go b/command/agent/local_test.go index d2cb7fa7ec51..480da00cd73a 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -88,6 +88,16 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } agent.state.AddService(srv5) + // Exists local, in sync, remote missing (create) + srv6 := &structs.NodeService{ + ID: "cache", + Service: "cache", + Tags: []string{}, + Port: 11211, + } + agent.state.AddService(srv6) + agent.state.serviceStatus["cache"] = syncStatus{inSync: true} + srv5_mod := new(structs.NodeService) *srv5_mod = *srv5 srv5_mod.Address = "127.0.0.1" @@ -110,8 +120,8 @@ func TestAgentAntiEntropy_Services(t *testing.T) { t.Fatalf("err: %v", err) } - // We should have 5 services (consul included) - if len(services.NodeServices.Services) != 5 { + // We should have 6 services (consul included) + if len(services.NodeServices.Services) != 6 { t.Fatalf("bad: %v", services.NodeServices.Services) } @@ -134,6 +144,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { if !reflect.DeepEqual(serv, srv5) { t.Fatalf("bad: %v %v", serv, srv5) } + case "cache": + if !reflect.DeepEqual(serv, srv6) { + t.Fatalf("bad: %v %v", serv, srv6) + } case "consul": // ignore default: @@ -142,10 +156,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) { } // Check the local state - if len(agent.state.services) != 5 { + if len(agent.state.services) != 6 { t.Fatalf("bad: %v", agent.state.services) } - if len(agent.state.serviceStatus) != 5 { + if len(agent.state.serviceStatus) != 6 { t.Fatalf("bad: %v", agent.state.serviceStatus) } for name, status := range agent.state.serviceStatus { @@ -439,6 +453,16 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { t.Fatalf("err: %v", err) } + // Exists local, in sync, remote missing (create) + chk5 := &structs.HealthCheck{ + Node: agent.config.NodeName, + CheckID: "cache", + Name: "cache", + Status: structs.HealthPassing, + } + agent.state.AddCheck(chk5) + agent.state.checkStatus["cache"] = syncStatus{inSync: true} + // Trigger anti-entropy run and wait agent.StartSync() time.Sleep(200 * time.Millisecond) @@ -453,8 +477,8 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { t.Fatalf("err: %v", err) } - // We should have 4 services (serf included) - if len(checks.HealthChecks) != 4 { + // We should have 5 checks (serf included) + if len(checks.HealthChecks) != 5 { t.Fatalf("bad: %v", checks) } @@ -473,6 +497,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { if !reflect.DeepEqual(chk, chk3) { t.Fatalf("bad: %v %v", chk, chk3) } + case "cache": + if !reflect.DeepEqual(chk, chk5) { + t.Fatalf("bad: %v %v", chk, chk5) + } case "serfHealth": // ignore default: @@ -481,10 +509,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) { } // Check the local state - if len(agent.state.checks) != 3 { + if len(agent.state.checks) != 4 { t.Fatalf("bad: %v", agent.state.checks) } - if len(agent.state.checkStatus) != 3 { + if len(agent.state.checkStatus) != 4 { t.Fatalf("bad: %v", agent.state.checkStatus) } for name, status := range agent.state.checkStatus { From 7e170b047ed1785a90c0bb9a6fb37284bb372088 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Thu, 9 Apr 2015 10:40:05 -0700 Subject: [PATCH 3/4] agent: fix anti-entropy check sync --- command/agent/local.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/command/agent/local.go b/command/agent/local.go index d5c6062f6074..eed995a1d3e6 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -340,10 +340,14 @@ func (l *localState) setSyncState() error { for id, _ := range l.checks { // Sync any check which doesn't exist on the remote side + found := false for _, check := range checks { if check.CheckID == id { - continue + found = true + break } + } + if !found { l.checkStatus[id] = syncStatus{inSync: false} } } From 60a6da213f49c4c5d5227eacc3447a047b872629 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Fri, 10 Apr 2015 11:04:15 -0700 Subject: [PATCH 4/4] agent: handle nil node services in anti-entropy --- command/agent/local.go | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index eed995a1d3e6..f85d6671ad85 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -310,32 +310,34 @@ func (l *localState) setSyncState() error { if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil { return err } - services := out1.NodeServices checks := out2.HealthChecks l.Lock() defer l.Unlock() + services := make(map[string]*structs.NodeService) + if out1.NodeServices != nil { + services = out1.NodeServices.Services + } + for id, _ := range l.services { // If the local service doesn't exist remotely, then sync it - if _, ok := services.Services[id]; !ok { + if _, ok := services[id]; !ok { l.serviceStatus[id] = syncStatus{inSync: false} } } - if services != nil { - for id, service := range services.Services { - // If we don't have the service locally, deregister it - existing, ok := l.services[id] - if !ok { - l.serviceStatus[id] = syncStatus{remoteDelete: true} - continue - } - - // If our definition is different, we need to update it - equal := reflect.DeepEqual(existing, service) - l.serviceStatus[id] = syncStatus{inSync: equal} + for id, service := range services { + // If we don't have the service locally, deregister it + existing, ok := l.services[id] + if !ok { + l.serviceStatus[id] = syncStatus{remoteDelete: true} + continue } + + // If our definition is different, we need to update it + equal := reflect.DeepEqual(existing, service) + l.serviceStatus[id] = syncStatus{inSync: equal} } for id, _ := range l.checks {