Skip to content

Commit

Permalink
Merge pull request #497 from hashicorp/f-persist
Browse files Browse the repository at this point in the history
Persist locally registered services and checks
  • Loading branch information
armon committed Nov 24, 2014
2 parents 50e21b0 + 2ebe854 commit f74d3db
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 2 deletions.
171 changes: 169 additions & 2 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"encoding/json"
"fmt"
"io"
"log"
Expand All @@ -15,6 +16,14 @@ import (
"github.com/hashicorp/serf/serf"
)

const (
// Path to save agent service definitions
servicesDir = "services"

// Path to save local agent checks
checksDir = "checks"
)

/*
The agent is the long running process that is run on every machine.
It exposes an RPC interface that is used by the CLI to control the
Expand Down Expand Up @@ -132,6 +141,14 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err
}

// Load any persisted services and services
if err := agent.restoreServices(); err != nil {
return nil, err
}
if err := agent.restoreChecks(); err != nil {
return nil, err
}

// Start handling events
go agent.handleEvents()

Expand Down Expand Up @@ -472,6 +489,144 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}

// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, service.ID)
if _, err := os.Stat(svcPath); os.IsNotExist(err) {
encoded, err := json.Marshal(service)
if err != nil {
return nil
}
if err := os.MkdirAll(filepath.Dir(svcPath), 0700); err != nil {
return err
}
fh, err := os.OpenFile(svcPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer fh.Close()
if _, err := fh.Write(encoded); err != nil {
return err
}
}
return nil
}

// purgeService removes a persisted service definition file from the data dir
func (a *Agent) purgeService(serviceID string) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, serviceID)
if _, err := os.Stat(svcPath); err == nil {
return os.Remove(svcPath)
}
return nil
}

// restoreServices is used to load previously persisted service definitions
// into the agent during startup.
func (a *Agent) restoreServices() error {
svcDir := filepath.Join(a.config.DataDir, servicesDir)
if _, err := os.Stat(svcDir); os.IsNotExist(err) {
return nil
}

err := filepath.Walk(svcDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == servicesDir {
return nil
}
fh, err := os.Open(filepath.Join(svcDir, fi.Name()))
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}

var svc *structs.NodeService
if err := json.Unmarshal(content, &svc); err != nil {
return err
}

a.logger.Printf("[DEBUG] Restored service definition: %s", svc.ID)
return a.AddService(svc, nil)
})
return err
}

// persistCheck saves a check definition to the local agent's state directory
func (a *Agent) persistCheck(check *structs.HealthCheck) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, check.CheckID)
if _, err := os.Stat(checkPath); os.IsNotExist(err) {
encoded, err := json.Marshal(check)
if err != nil {
return nil
}
if err := os.MkdirAll(filepath.Dir(checkPath), 0700); err != nil {
return err
}
fh, err := os.OpenFile(checkPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer fh.Close()
if _, err := fh.Write(encoded); err != nil {
return err
}
}
return nil
}

// purgeCheck removes a persisted check definition file from the data dir
func (a *Agent) purgeCheck(checkID string) error {
checkPath := filepath.Join(a.config.DataDir, checksDir, checkID)
if _, err := os.Stat(checkPath); err == nil {
return os.Remove(checkPath)
}
return nil
}

// restoreChecks is used to load previously persisted health check definitions
// into the agent during startup.
func (a *Agent) restoreChecks() error {
checkDir := filepath.Join(a.config.DataDir, checksDir)
if _, err := os.Stat(checkDir); os.IsNotExist(err) {
return nil
}

err := filepath.Walk(checkDir, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.Name() == checksDir {
return nil
}
fh, err := os.Open(filepath.Join(checkDir, fi.Name()))
if err != nil {
return err
}
content := make([]byte, fi.Size())
if _, err := fh.Read(content); err != nil {
return err
}

var check *structs.HealthCheck
if err := json.Unmarshal(content, &check); err != nil {
return err
}

// Default check to critical to avoid placing potentially unhealthy
// services into the active pool
check.Status = structs.HealthCritical

a.logger.Printf("[DEBUG] Restored health check: %s", check.CheckID)
return a.AddCheck(check, nil)
})
return err
}

// AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
Expand All @@ -489,6 +644,11 @@ func (a *Agent) AddService(service *structs.NodeService, chkType *CheckType) err
// Add the service
a.state.AddService(service)

// Persist the service to a file
if err := a.persistService(service); err != nil {
return err
}

// Create an associated health check
if chkType != nil {
check := &structs.HealthCheck{
Expand Down Expand Up @@ -520,6 +680,11 @@ func (a *Agent) RemoveService(serviceID string) error {
// Remove service immeidately
a.state.RemoveService(serviceID)

// Remove the service from the data dir
if err := a.purgeService(serviceID); err != nil {
return err
}

// Deregister any associated health checks
checkID := fmt.Sprintf("service:%s", serviceID)
return a.RemoveCheck(checkID)
Expand Down Expand Up @@ -580,7 +745,9 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType) error {

// Add to the local state for anti-entropy
a.state.AddCheck(check)
return nil

// Persist the check
return a.persistCheck(check)
}

// RemoveCheck is used to remove a health check.
Expand All @@ -601,7 +768,7 @@ func (a *Agent) RemoveCheck(checkID string) error {
check.Stop()
delete(a.checkTTLs, checkID)
}
return nil
return a.purgeCheck(checkID)
}

// UpdateCheck is used to update the status of a check.
Expand Down
116 changes: 116 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package agent

import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -380,3 +382,117 @@ func TestAgent_ConsulService(t *testing.T) {
t.Fatalf("%s service should be in sync", consul.ConsulServiceID)
}
}

func TestAgent_PersistService(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)

svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}

if err := agent.AddService(svc, nil); err != nil {
t.Fatalf("err: %v", err)
}

file := filepath.Join(agent.config.DataDir, servicesDir, svc.ID)
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}

expected, err := json.Marshal(svc)
if err != nil {
t.Fatalf("err: %s", err)
}
content, err := ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
}
agent.Shutdown()

// Should load it back during later start
agent2, err := Create(config, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
defer agent2.Shutdown()

if _, ok := agent2.state.services[svc.ID]; !ok {
t.Fatalf("bad: %#v", agent2.state.services)
}

// Should remove the service file
if err := agent2.RemoveService(svc.ID); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("err: %s", err)
}
}

func TestAgent_PersistCheck(t *testing.T) {
config := nextConfig()
dir, agent := makeAgent(t, config)
defer os.RemoveAll(dir)

check := &structs.HealthCheck{
Node: config.NodeName,
CheckID: "service:redis1",
Name: "redischeck",
Status: structs.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}

if err := agent.AddCheck(check, nil); err != nil {
t.Fatalf("err: %v", err)
}

file := filepath.Join(agent.config.DataDir, checksDir, check.CheckID)
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}

expected, err := json.Marshal(check)
if err != nil {
t.Fatalf("err: %s", err)
}
content, err := ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
}
agent.Shutdown()

// Should load it back during later start
agent2, err := Create(config, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
defer agent2.Shutdown()

result, ok := agent2.state.checks[check.CheckID]
if !ok {
t.Fatalf("bad: %#v", agent2.state.checks)
}
if result.Status != structs.HealthCritical {
t.Fatalf("bad: %#v", result)
}

// Should remove the service file
if err := agent2.RemoveCheck(check.CheckID); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("err: %s", err)
}
}

0 comments on commit f74d3db

Please sign in to comment.