Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Consul Syncer into new ServiceClient #2467

Merged
merged 38 commits into from
Apr 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
10cb924
Refactor Consul Syncer into new ServiceClient
schmichael Feb 1, 2017
e13b149
Remove unused syncInterval
schmichael Mar 30, 2017
7930d35
Remove some lies
schmichael Mar 30, 2017
b693791
Add UpdateTask method instead of Remove/Add
schmichael Apr 4, 2017
80882f3
Switch ServiceClient to synchronizing state
schmichael Apr 8, 2017
b4e40ef
Fix shutdown when consul is down
schmichael Apr 12, 2017
63a8307
Move ScriptExecutor to driver
schmichael Apr 12, 2017
a3fc76b
Fix comment to reflect reality
schmichael Apr 12, 2017
fd69d48
Move removal from Consul into TaskRunner cleanup
schmichael Apr 12, 2017
b4c7d92
Explain cleanup defer in test
schmichael Apr 12, 2017
4002d87
Add comments, clarify names, fix PR comments
schmichael Apr 12, 2017
0e0845e
Use a DriverAbility to expose Exec functionality
schmichael Apr 13, 2017
7ac9215
Use nifty testtask sleep command for xplat compat
schmichael Apr 13, 2017
80617a9
Stop being lazy and just type out struct{}{}
schmichael Apr 13, 2017
48269c3
Plumb alloc id + task name into script check logs
schmichael Apr 13, 2017
5d75efc
Explain PortLabel handling in RegisterAgent
schmichael Apr 13, 2017
6bb7d8b
Backoff on Consul lookup failures
schmichael Apr 14, 2017
db8aabe
Fix circular test imports
schmichael Apr 14, 2017
4f22413
Remove stale comment
schmichael Apr 17, 2017
7de3ada
Remove commits return value
schmichael Apr 18, 2017
927b265
Rework to account for ports not being in IDs
schmichael Apr 18, 2017
6a56b0e
Follow _testing.go convention for testing tools
schmichael Apr 18, 2017
465cc51
Test script check exit codes
schmichael Apr 18, 2017
d3f3af8
Always fail script checks when deadline exceeded
schmichael Apr 18, 2017
de3d783
Metricsify new Consul client
schmichael Apr 18, 2017
f0bec63
Explain weird timer logic
schmichael Apr 18, 2017
8e0c97e
Unregister from Consul when waiting for restart
schmichael Apr 19, 2017
3468383
Only register HTTPS agent check when Consul>=0.7.2
schmichael Apr 19, 2017
5948daf
Forgot an important word
schmichael Apr 19, 2017
6d9e61b
Use spiffy new Go 1.8 subtest feature
schmichael Apr 19, 2017
64057d4
Use go-version instead of manual version parsing
schmichael Apr 19, 2017
86bc2fb
Fix diff test + bonus upgrade to subtests
schmichael Apr 19, 2017
7c67166
Add TLSSkipVerify support to api and parser
schmichael Apr 19, 2017
4cf34ed
Skip checks with TLSSkipVerify if it's unsupported
schmichael Apr 19, 2017
a5dcf6b
Document `tls_skip_verify`
schmichael Apr 19, 2017
b9ea276
Thanks go vet!
schmichael Apr 19, 2017
fb3b30b
Fix Windows build.
schmichael Apr 19, 2017
58430bf
Fix consul test build on Windows
schmichael Apr 19, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type ServiceCheck struct {
Interval time.Duration
Timeout time.Duration
InitialStatus string `mapstructure:"initial_status"`
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
}

// The Service model represents a Consul service definition
Expand Down
36 changes: 20 additions & 16 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type AllocRunner struct {

updateCh chan *structs.Allocation

vaultClient vaultclient.VaultClient
vaultClient vaultclient.VaultClient
consulClient ConsulServiceAPI

otherAllocDir *allocdir.AllocDir

Expand Down Expand Up @@ -96,20 +97,23 @@ type allocRunnerState struct {

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient) *AllocRunner {
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {

ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
}
return ar
}
Expand Down Expand Up @@ -174,7 +178,7 @@ func (r *AllocRunner) RestoreState() error {
}

task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -512,7 +516,7 @@ func (r *AllocRunner) Run() {
taskdir := r.allocDir.NewTaskDir(task.Name)
r.allocDirLock.Unlock()

tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
r.tasks[task.Name] = tr
tr.MarkReceived()

Expand Down
10 changes: 6 additions & 4 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient)
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient())
return upd, ar
}

Expand Down Expand Up @@ -323,7 +323,8 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -415,7 +416,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {

// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
if err != nil {
Expand Down Expand Up @@ -573,7 +574,8 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient)
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient)
defer ar.Destroy()

// RestoreState should fail on the task state since we only test the
Expand Down
8 changes: 6 additions & 2 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ var (
// included in snapshots.
SharedDataDir = "data"

// TmpDirName is the name of the temporary directory in each alloc and
// task.
TmpDirName = "tmp"

// The set of directories that exist inside eache shared alloc directory.
SharedAllocDirs = []string{LogDirName, "tmp", SharedDataDir}
SharedAllocDirs = []string{LogDirName, TmpDirName, SharedDataDir}

// The name of the directory that exists inside each task directory
// regardless of driver.
Expand All @@ -46,7 +50,7 @@ var (
TaskSecrets = "secrets"

// TaskDirs is the set of directories created in each tasks directory.
TaskDirs = map[string]os.FileMode{"tmp": os.ModeSticky | 0777}
TaskDirs = map[string]os.FileMode{TmpDirName: os.ModeSticky | 0777}
)

type AllocDir struct {
Expand Down
77 changes: 13 additions & 64 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ const (
// datacenters looking for the Nomad server service.
datacenterQueryLimit = 9

// consulReaperIntv is the interval at which the Consul reaper will
// run.
consulReaperIntv = 5 * time.Second

// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
registerRetryIntv = 15 * time.Second
Expand Down Expand Up @@ -142,8 +138,12 @@ type Client struct {
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation

// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
// consulService is Nomad's custom Consul client for managing services
// and checks.
consulService ConsulServiceAPI

// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI

// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
Expand Down Expand Up @@ -196,7 +196,7 @@ var (
)

// NewClient is used to create a new client from the given configuration
func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) {
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService ConsulServiceAPI, logger *log.Logger) (*Client, error) {
// Create the tls wrapper
var tlsWrap tlsutil.RegionWrapper
if cfg.TLSConfig.EnableRPC {
Expand All @@ -210,7 +210,8 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
// Create the client
c := &Client{
config: cfg,
consulSyncer: consulSyncer,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
Expand Down Expand Up @@ -285,9 +286,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
}
}

// Start Consul reaper
go c.consulReaper()

// Setup the vault client for token and secret renewals
if err := c.setupVaultClient(); err != nil {
return nil, fmt.Errorf("failed to setup vault client: %v", err)
Expand Down Expand Up @@ -606,7 +604,7 @@ func (c *Client) restoreState() error {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
c.configLock.RUnlock()
c.allocLock.Lock()
c.allocs[id] = ar
Expand Down Expand Up @@ -1894,7 +1892,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
defer c.allocLock.Unlock()

c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar.SetPreviousAllocDir(prevAllocDir)
c.configLock.RUnlock()
go ar.Run()
Expand Down Expand Up @@ -2047,8 +2045,7 @@ func (c *Client) consulDiscoveryImpl() error {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()

consulCatalog := c.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
dcs, err := c.consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
Expand Down Expand Up @@ -2084,7 +2081,7 @@ DISCOLOOP:
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
consulServices, _, err := c.consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
continue
Expand Down Expand Up @@ -2143,54 +2140,6 @@ DISCOLOOP:
}
}

// consulReaper periodically reaps unmatched domains from Consul. Intended to
// be called in its own goroutine. See consulReaperIntv for interval.
func (c *Client) consulReaper() {
ticker := time.NewTicker(consulReaperIntv)
defer ticker.Stop()
lastok := true
for {
select {
case <-ticker.C:
if err := c.consulReaperImpl(); err != nil {
if lastok {
c.logger.Printf("[ERR] client.consul: error reaping services in consul: %v", err)
lastok = false
}
} else {
lastok = true
}
case <-c.shutdownCh:
return
}
}
}

// consulReaperImpl reaps unmatched domains from Consul.
func (c *Client) consulReaperImpl() error {
const estInitialExecutorDomains = 8

// Create the domains to keep and add the server and client
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain

for allocID, ar := range c.getAllocRunners() {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning {
d := consul.NewExecutorDomain(allocID, taskName)
domains = append(domains, d)
}
}
}

return c.consulSyncer.ReapUnmatched(domains)
}

// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Start collecting host stats right away and then keep collecting every
Expand Down
30 changes: 10 additions & 20 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,11 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
cb(config)
}

shutdownCh := make(chan struct{})
logger := log.New(config.LogOutput, "", log.LstdFlags)
consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
catalog := consul.NewMockCatalog(logger)

// Create server
server, err := nomad.NewServer(config, consulSyncer, logger)
server, err := nomad.NewServer(config, catalog, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -105,14 +101,11 @@ func testClient(t *testing.T, cb func(c *config.Config)) *Client {
cb(conf)
}

shutdownCh := make(chan struct{})
consulSyncer, err := consul.NewSyncer(conf.ConsulConfig, shutdownCh, log.New(os.Stderr, "", log.LstdFlags))
if err != nil {
t.Fatalf("err: %v", err)
}

logger := log.New(conf.LogOutput, "", log.LstdFlags)
client, err := NewClient(conf, consulSyncer, logger)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
client, err := NewClient(conf, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -754,14 +747,11 @@ func TestClient_SaveRestoreState(t *testing.T) {
}

// Create a new client
shutdownCh := make(chan struct{})
logger := log.New(c1.config.LogOutput, "", log.LstdFlags)
consulSyncer, err := consul.NewSyncer(c1.config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}

c2, err := NewClient(c1.config, consulSyncer, logger)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
c2, err := NewClient(c1.config, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
14 changes: 14 additions & 0 deletions client/consul.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package client

import (
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)

// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor) error
RemoveTask(allocID string, task *structs.Task)
UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error
}
Loading