Skip to content

Commit

Permalink
Refactor Consul Syncer into new ServiceClient
Browse files Browse the repository at this point in the history
Fixes #2478 #2474 #1995 #2294

The new client only handles agent and task service advertisement. Server
discovery is mostly unchanged.

The Nomad client agent now handles all Consul operations instead of the
executor handling task related operations. When upgrading from an
earlier version of Nomad existing executors will be told to deregister
from Consul so that the Nomad agent can re-register the task's services
and checks.

Drivers - other than qemu - now support an Exec method for executing
abritrary commands in a task's environment. This is used to implement
script checks.

Interfaces are used extensively to avoid interacting with Consul in
tests that don't assert any Consul related behavior.
  • Loading branch information
schmichael committed Mar 30, 2017
1 parent aaf7343 commit 99e7126
Show file tree
Hide file tree
Showing 44 changed files with 2,360 additions and 2,509 deletions.
36 changes: 20 additions & 16 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type AllocRunner struct {

updateCh chan *structs.Allocation

vaultClient vaultclient.VaultClient
vaultClient vaultclient.VaultClient
consulClient ConsulServiceAPI

otherAllocDir *allocdir.AllocDir

Expand Down Expand Up @@ -96,20 +97,23 @@ type allocRunnerState struct {

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient) *AllocRunner {
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {

ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
}
return ar
}
Expand Down Expand Up @@ -174,7 +178,7 @@ func (r *AllocRunner) RestoreState() error {
}

task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -500,7 +504,7 @@ func (r *AllocRunner) Run() {
taskdir := r.allocDir.NewTaskDir(task.Name)
r.allocDirLock.Unlock()

tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
r.tasks[task.Name] = tr
tr.MarkReceived()

Expand Down
10 changes: 6 additions & 4 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient)
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient())
return upd, ar
}

Expand Down Expand Up @@ -323,7 +323,8 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -415,7 +416,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {

// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
if err != nil {
Expand Down Expand Up @@ -573,7 +574,8 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient)
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient)
defer ar.Destroy()

// RestoreState should fail on the task state since we only test the
Expand Down
8 changes: 6 additions & 2 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ var (
// included in snapshots.
SharedDataDir = "data"

// TmpDirName is the name of the temporary directory in each alloc and
// task.
TmpDirName = "tmp"

// The set of directories that exist inside eache shared alloc directory.
SharedAllocDirs = []string{LogDirName, "tmp", SharedDataDir}
SharedAllocDirs = []string{LogDirName, TmpDirName, SharedDataDir}

// The name of the directory that exists inside each task directory
// regardless of driver.
Expand All @@ -40,7 +44,7 @@ var (
TaskSecrets = "secrets"

// TaskDirs is the set of directories created in each tasks directory.
TaskDirs = []string{"tmp"}
TaskDirs = []string{TmpDirName}
)

type AllocDir struct {
Expand Down
77 changes: 13 additions & 64 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ const (
// datacenters looking for the Nomad server service.
datacenterQueryLimit = 9

// consulReaperIntv is the interval at which the Consul reaper will
// run.
consulReaperIntv = 5 * time.Second

// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
registerRetryIntv = 15 * time.Second
Expand Down Expand Up @@ -142,8 +138,12 @@ type Client struct {
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation

// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
// consulService is Nomad's custom Consul client for managing services
// and checks.
consulService ConsulServiceAPI

// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI

// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
Expand Down Expand Up @@ -196,7 +196,7 @@ var (
)

// NewClient is used to create a new client from the given configuration
func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) {
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService ConsulServiceAPI, logger *log.Logger) (*Client, error) {
// Create the tls wrapper
var tlsWrap tlsutil.RegionWrapper
if cfg.TLSConfig.EnableRPC {
Expand All @@ -210,7 +210,8 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
// Create the client
c := &Client{
config: cfg,
consulSyncer: consulSyncer,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
Expand Down Expand Up @@ -285,9 +286,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
}
}

// Start Consul reaper
go c.consulReaper()

// Setup the vault client for token and secret renewals
if err := c.setupVaultClient(); err != nil {
return nil, fmt.Errorf("failed to setup vault client: %v", err)
Expand Down Expand Up @@ -596,7 +594,7 @@ func (c *Client) restoreState() error {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
c.configLock.RUnlock()
c.allocLock.Lock()
c.allocs[id] = ar
Expand Down Expand Up @@ -1880,7 +1878,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
defer c.allocLock.Unlock()

c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar.SetPreviousAllocDir(prevAllocDir)
c.configLock.RUnlock()
go ar.Run()
Expand Down Expand Up @@ -2033,8 +2031,7 @@ func (c *Client) consulDiscoveryImpl() error {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()

consulCatalog := c.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
dcs, err := c.consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
Expand Down Expand Up @@ -2070,7 +2067,7 @@ DISCOLOOP:
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
consulServices, _, err := c.consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
continue
Expand Down Expand Up @@ -2129,54 +2126,6 @@ DISCOLOOP:
}
}

// consulReaper periodically reaps unmatched domains from Consul. Intended to
// be called in its own goroutine. See consulReaperIntv for interval.
func (c *Client) consulReaper() {
ticker := time.NewTicker(consulReaperIntv)
defer ticker.Stop()
lastok := true
for {
select {
case <-ticker.C:
if err := c.consulReaperImpl(); err != nil {
if lastok {
c.logger.Printf("[ERR] client.consul: error reaping services in consul: %v", err)
lastok = false
}
} else {
lastok = true
}
case <-c.shutdownCh:
return
}
}
}

// consulReaperImpl reaps unmatched domains from Consul.
func (c *Client) consulReaperImpl() error {
const estInitialExecutorDomains = 8

// Create the domains to keep and add the server and client
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain

for allocID, ar := range c.getAllocRunners() {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning {
d := consul.NewExecutorDomain(allocID, taskName)
domains = append(domains, d)
}
}
}

return c.consulSyncer.ReapUnmatched(domains)
}

// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Start collecting host stats right away and then keep collecting every
Expand Down
30 changes: 10 additions & 20 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,11 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
cb(config)
}

shutdownCh := make(chan struct{})
logger := log.New(config.LogOutput, "", log.LstdFlags)
consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
catalog := consul.NewMockCatalog(logger)

// Create server
server, err := nomad.NewServer(config, consulSyncer, logger)
server, err := nomad.NewServer(config, catalog, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -104,14 +100,11 @@ func testClient(t *testing.T, cb func(c *config.Config)) *Client {
cb(conf)
}

shutdownCh := make(chan struct{})
consulSyncer, err := consul.NewSyncer(conf.ConsulConfig, shutdownCh, log.New(os.Stderr, "", log.LstdFlags))
if err != nil {
t.Fatalf("err: %v", err)
}

logger := log.New(conf.LogOutput, "", log.LstdFlags)
client, err := NewClient(conf, consulSyncer, logger)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
client, err := NewClient(conf, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -651,14 +644,11 @@ func TestClient_SaveRestoreState(t *testing.T) {
}

// Create a new client
shutdownCh := make(chan struct{})
logger := log.New(c1.config.LogOutput, "", log.LstdFlags)
consulSyncer, err := consul.NewSyncer(c1.config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}

c2, err := NewClient(c1.config, consulSyncer, logger)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
c2, err := NewClient(c1.config, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
13 changes: 13 additions & 0 deletions client/consul.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package client

import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)

// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, exec consul.ScriptExecutor) error
RemoveTask(allocID string, task *structs.Task)
}
56 changes: 56 additions & 0 deletions client/consul_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package client

import (
"io/ioutil"
"log"
"os"
"sync"
"testing"

"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
)

// mockConsulOp represents the register/deregister operations.
type mockConsulOp struct {
allocID string
task *structs.Task
exec consul.ScriptExecutor
}

// mockConsulServiceClient implements the ConsulServiceAPI interface to record
// and log task registration/deregistration.
type mockConsulServiceClient struct {
registers []mockConsulOp
removes []mockConsulOp
mu sync.Mutex

logger *log.Logger
}

func newMockConsulServiceClient() *mockConsulServiceClient {
m := mockConsulServiceClient{
registers: make([]mockConsulOp, 0, 10),
removes: make([]mockConsulOp, 0, 10),
logger: log.New(ioutil.Discard, "", 0),
}
if testing.Verbose() {
m.logger = log.New(os.Stderr, "", log.LstdFlags)
}
return &m
}

func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec consul.ScriptExecutor) error {
m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T)", allocID, task.Name, exec)
m.mu.Lock()
defer m.mu.Unlock()
m.registers = append(m.registers, mockConsulOp{allocID, task, exec})
return nil
}

func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task) {
m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", allocID, task.Name)
m.mu.Lock()
defer m.mu.Unlock()
m.removes = append(m.removes, mockConsulOp{allocID, task, nil})
}
Loading

0 comments on commit 99e7126

Please sign in to comment.