Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist locally registered services and checks #497

Merged
merged 3 commits into from
Nov 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 169 additions & 2 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"encoding/json"
"fmt"
"io"
"log"
Expand All @@ -15,6 +16,14 @@ import (
"github.com/hashicorp/serf/serf"
)

const (
// Path to save agent service definitions
servicesDir = "services"

// Path to save local agent checks
checksDir = "checks"
)

/*
The agent is the long running process that is run on every machine.
It exposes an RPC interface that is used by the CLI to control the
Expand Down Expand Up @@ -132,6 +141,14 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err
}

// Load any persisted services and services
if err := agent.restoreServices(); err != nil {
return nil, err
}
if err := agent.restoreChecks(); err != nil {
return nil, err
}

// Start handling events
go agent.handleEvents()

Expand Down Expand Up @@ -472,6 +489,144 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}

// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, service.ID)
if _, err := os.Stat(svcPath); os.IsNotExist(err) {
encoded, err := json.Marshal(service)
if err != nil {
return nil
}
if err := os.MkdirAll(filepath.Dir(svcPath), 0700); err != nil {
return err
}
fh, err := os.OpenFile(svcPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer fh.Close()
if _, err := fh.Write(encoded); err != nil {
return err
}
}
return nil
}

// purgeService removes a persisted service definition file from the data dir
func (a *Agent) purgeService(serviceID string) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, serviceID)
if _, err := os.Stat(svcPath); err == nil {
return os.Remove(svcPath)
}
return nil
}

// restoreServices is used to load previously persisted service definitions
// into the agent during startup.
func (a *Agent) restoreServices() error {
svcDir := filepath.Join(a.config.DataDir, servicesDir)
if _, err := os.Stat(svcDir); os.IsNotExist(err) {
return nil
}

err := filepath.Walk(svcDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == servicesDir {
return nil
}
fh, err := os.Open(filepath.Join(svcDir, fi.Name()))
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}

var svc *structs.NodeService
if err := json.Unmarshal(content, &svc); err != nil {
return err
}

a.logger.Printf("[DEBUG] Restored service definition: %s", svc.ID)
return a.AddService(svc, nil)
})
return err
}

// persistCheck saves a check definition to the local agent's state directory
func (a *Agent) persistCheck(check *structs.HealthCheck) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, check.CheckID)
if _, err := os.Stat(checkPath); os.IsNotExist(err) {
encoded, err := json.Marshal(check)
if err != nil {
return nil
}
if err := os.MkdirAll(filepath.Dir(checkPath), 0700); err != nil {
return err
}
fh, err := os.OpenFile(checkPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer fh.Close()
if _, err := fh.Write(encoded); err != nil {
return err
}
}
return nil
}

// purgeCheck removes a persisted check definition file from the data dir
func (a *Agent) purgeCheck(checkID string) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, checkID)
if _, err := os.Stat(checkPath); err == nil {
return os.Remove(checkPath)
}
return nil
}

// restoreChecks is used to load previously persisted health check definitions
// into the agent during startup.
func (a *Agent) restoreChecks() error {
checkDir := filepath.Join(a.config.DataDir, checksDir)
if _, err := os.Stat(checkDir); os.IsNotExist(err) {
return nil
}

err := filepath.Walk(checkDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == checksDir {
return nil
}
fh, err := os.Open(filepath.Join(checkDir, fi.Name()))
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}

var check *structs.HealthCheck
if err := json.Unmarshal(content, &check); err != nil {
return err
}

// Default check to critical to avoid placing potentially unhealthy
// services into the active pool
check.Status = structs.HealthCritical

a.logger.Printf("[DEBUG] Restored health check: %s", check.CheckID)
return a.AddCheck(check, nil)
})
return err
}

// AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
Expand All @@ -489,6 +644,11 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) err
// Add the service
a.state.AddService(service)

// Persist the service to a file
if err := a.persistService(service); err != nil {
return err
}

// Create an associated health check
if chkType != nil {
check := &structs.HealthCheck{
Expand Down Expand Up @@ -520,6 +680,11 @@ func (a *Agent) RemoveService(serviceID string) error {
// Remove service immeidately
a.state.RemoveService(serviceID)

// Remove the service from the data dir
if err := a.purgeService(serviceID); err != nil {
return err
}

// Deregister any associated health checks
checkID := fmt.Sprintf("service:%s", serviceID)
return a.RemoveCheck(checkID)
Expand Down Expand Up @@ -580,7 +745,9 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error {

// Add to the local state for anti-entropy
a.state.AddCheck(check)
return nil

// Persist the check
return a.persistCheck(check)
}

// RemoveCheck is used to remove a health check.
Expand All @@ -601,7 +768,7 @@ func (a *Agent) RemoveCheck(checkID string) error {
check.Stop()
delete(a.checkTTLs, checkID)
}
return nil
return a.purgeCheck(checkID)
}

// UpdateCheck is used to update the status of a check.
Expand Down
116 changes: 116 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package agent

import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -380,3 +382,117 @@ func TestAgent_ConsulService(t *testing.T) {
t.Fatalf("%s service should be in sync", consul.ConsulServiceID)
}
}

func TestAgent_PersistService(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)

svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}

if err := agent.AddService(svc, nil); err != nil {
t.Fatalf("err: %v", err)
}

file := filepath.Join(agent.config.DataDir, servicesDir, svc.ID)
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}

expected, err := json.Marshal(svc)
if err != nil {
t.Fatalf("err: %s", err)
}
content, err := ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
}
agent.Shutdown()

// Should load it back during later start
agent2, err := Create(config, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
defer agent2.Shutdown()

if _, ok := agent2.state.services[svc.ID]; !ok {
t.Fatalf("bad: %#v", agent2.state.services)
}

// Should remove the service file
if err := agent2.RemoveService(svc.ID); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("err: %s", err)
}
}

func TestAgent_PersistCheck(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)

check := &structs.HealthCheck{
Node: config.NodeName,
CheckID: "service:redis1",
Name: "redischeck",
Status: structs.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}

if err := agent.AddCheck(check, nil); err != nil {
t.Fatalf("err: %v", err)
}

file := filepath.Join(agent.config.DataDir, checksDir, check.CheckID)
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}

expected, err := json.Marshal(check)
if err != nil {
t.Fatalf("err: %s", err)
}
content, err := ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
}
agent.Shutdown()

// Should load it back during later start
agent2, err := Create(config, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
defer agent2.Shutdown()

result, ok := agent2.state.checks[check.CheckID]
if !ok {
t.Fatalf("bad: %#v", agent2.state.checks)
}
if result.Status != structs.HealthCritical {
t.Fatalf("bad: %#v", result)
}

// Should remove the service file
if err := agent2.RemoveCheck(check.CheckID); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("err: %s", err)
}
}