Skip to content

Commit

Permalink
Merge pull request #850 from hashicorp/f-ae-missing
Browse files Browse the repository at this point in the history
Anti-entropy sync services/checks missing entirely from Consul
  • Loading branch information
ryanuber committed Apr 10, 2015
2 parents e582024 + 60a6da2 commit 585fd2a
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 20 deletions.
47 changes: 35 additions & 12 deletions command/agent/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,24 +310,47 @@ func (l *localState) setSyncState() error {
if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil {
return err
}
services := out1.NodeServices
checks := out2.HealthChecks

l.Lock()
defer l.Unlock()

if services != nil {
for id, service := range services.Services {
// If we don't have the service locally, deregister it
existing, ok := l.services[id]
if !ok {
l.serviceStatus[id] = syncStatus{remoteDelete: true}
continue
}
services := make(map[string]*structs.NodeService)
if out1.NodeServices != nil {
services = out1.NodeServices.Services
}

// If our definition is different, we need to update it
equal := reflect.DeepEqual(existing, service)
l.serviceStatus[id] = syncStatus{inSync: equal}
for id, _ := range l.services {
// If the local service doesn't exist remotely, then sync it
if _, ok := services[id]; !ok {
l.serviceStatus[id] = syncStatus{inSync: false}
}
}

for id, service := range services {
// If we don't have the service locally, deregister it
existing, ok := l.services[id]
if !ok {
l.serviceStatus[id] = syncStatus{remoteDelete: true}
continue
}

// If our definition is different, we need to update it
equal := reflect.DeepEqual(existing, service)
l.serviceStatus[id] = syncStatus{inSync: equal}
}

for id, _ := range l.checks {
// Sync any check which doesn't exist on the remote side
found := false
for _, check := range checks {
if check.CheckID == id {
found = true
break
}
}
if !found {
l.checkStatus[id] = syncStatus{inSync: false}
}
}

Expand Down
44 changes: 36 additions & 8 deletions command/agent/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}
agent.state.AddService(srv5)

// Exists local, in sync, remote missing (create)
srv6 := &structs.NodeService{
ID: "cache",
Service: "cache",
Tags: []string{},
Port: 11211,
}
agent.state.AddService(srv6)
agent.state.serviceStatus["cache"] = syncStatus{inSync: true}

srv5_mod := new(structs.NodeService)
*srv5_mod = *srv5
srv5_mod.Address = "127.0.0.1"
Expand All @@ -110,8 +120,8 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
t.Fatalf("err: %v", err)
}

// We should have 5 services (consul included)
if len(services.NodeServices.Services) != 5 {
// We should have 6 services (consul included)
if len(services.NodeServices.Services) != 6 {
t.Fatalf("bad: %v", services.NodeServices.Services)
}

Expand All @@ -134,6 +144,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
if !reflect.DeepEqual(serv, srv5) {
t.Fatalf("bad: %v %v", serv, srv5)
}
case "cache":
if !reflect.DeepEqual(serv, srv6) {
t.Fatalf("bad: %v %v", serv, srv6)
}
case "consul":
// ignore
default:
Expand All @@ -142,10 +156,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
}

// Check the local state
if len(agent.state.services) != 5 {
if len(agent.state.services) != 6 {
t.Fatalf("bad: %v", agent.state.services)
}
if len(agent.state.serviceStatus) != 5 {
if len(agent.state.serviceStatus) != 6 {
t.Fatalf("bad: %v", agent.state.serviceStatus)
}
for name, status := range agent.state.serviceStatus {
Expand Down Expand Up @@ -439,6 +453,16 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Exists local, in sync, remote missing (create)
chk5 := &structs.HealthCheck{
Node: agent.config.NodeName,
CheckID: "cache",
Name: "cache",
Status: structs.HealthPassing,
}
agent.state.AddCheck(chk5)
agent.state.checkStatus["cache"] = syncStatus{inSync: true}

// Trigger anti-entropy run and wait
agent.StartSync()
time.Sleep(200 * time.Millisecond)
Expand All @@ -453,8 +477,8 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
t.Fatalf("err: %v", err)
}

// We should have 4 services (serf included)
if len(checks.HealthChecks) != 4 {
// We should have 5 checks (serf included)
if len(checks.HealthChecks) != 5 {
t.Fatalf("bad: %v", checks)
}

Expand All @@ -473,6 +497,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
if !reflect.DeepEqual(chk, chk3) {
t.Fatalf("bad: %v %v", chk, chk3)
}
case "cache":
if !reflect.DeepEqual(chk, chk5) {
t.Fatalf("bad: %v %v", chk, chk5)
}
case "serfHealth":
// ignore
default:
Expand All @@ -481,10 +509,10 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
}

// Check the local state
if len(agent.state.checks) != 3 {
if len(agent.state.checks) != 4 {
t.Fatalf("bad: %v", agent.state.checks)
}
if len(agent.state.checkStatus) != 3 {
if len(agent.state.checkStatus) != 4 {
t.Fatalf("bad: %v", agent.state.checkStatus)
}
for name, status := range agent.state.checkStatus {
Expand Down

0 comments on commit 585fd2a

Please sign in to comment.