Skip to content

Commit

Permalink
Consul: add preflight checks for Envoy bootstrap (#23381)
Browse files Browse the repository at this point in the history
Nomad creates Consul ACL tokens and service registrations to support Consul
service mesh workloads, before bootstrapping the Envoy proxy. Nomad always talks
to the local Consul agent and never directly to the Consul servers. But the
local Consul agent talks to the Consul servers in stale consistency mode to
reduce load on the servers. This can result in the Nomad client making the Envoy
bootstrap request with a tokens or services that have not yet replicated to the
follower that the local client is connected to. This request gets a 404 on the
ACL token and that negative entry gets cached, preventing any retries from
succeeding.

To workaround this, we'll use a method described by our friends over on
`consul-k8s` where after creating the objects in Consul we try to read them from
the local agent in stale consistency mode (which prevents a failed read from
being cached). This cannot completely eliminate this source of error because
it's possible that Consul cluster replication is unhealthy at the time we need
it, but this should make Envoy bootstrap significantly more robust.

This changset adds preflight checks for the objects we create in Consul:
* We add a preflight check for ACL tokens after we login via via Workload
  Identity and in the function we use to derive tokens in the legacy
  workflow. We do this check early because we also want to use this token for
  registering group services in the allocrunner hooks.
* We add a preflight check for services right before we bootstrap Envoy in the
  taskrunner hook, so that we have time for our service client to batch updates
  to the local Consul agent in addition to the local agent sync.

We've added the timeouts to be configurable via node metadata rather than the
usual static configuration because for most cases, users should not need to
touch or even know these values are configurable; the configuration is mostly
available for testing.


Fixes: #9307
Fixes: #10451
Fixes: #20516

Ref: hashicorp/consul-k8s#887
Ref: https://hashicorp.atlassian.net/browse/NET-10051
Ref: https://hashicorp.atlassian.net/browse/NET-9273
Follow-up: https://hashicorp.atlassian.net/browse/NET-10138
  • Loading branch information
tgross authored Jun 27, 2024
1 parent 54aafa5 commit df67e74
Show file tree
Hide file tree
Showing 18 changed files with 580 additions and 128 deletions.
3 changes: 3 additions & 0 deletions .changelog/23381.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
consul: Fixed a bug where service registration and Envoy bootstrap would not wait for Consul ACL tokens and services to be replicated to the local agent
```
2 changes: 1 addition & 1 deletion client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
allocdir: ar.allocDir,
widmgr: ar.widmgr,
consulConfigs: ar.clientConfig.GetConsulConfigs(hookLogger),
consulClientConstructor: consul.NewConsulClientFactory(config.Node),
consulClientConstructor: consul.NewConsulClientFactory(config),
hookResources: ar.hookResources,
envBuilder: newEnvBuilder,
logger: hookLogger,
Expand Down
20 changes: 18 additions & 2 deletions client/allocrunner/consul_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package allocrunner

import (
"context"
"fmt"

consulapi "github.com/hashicorp/consul/api"
Expand All @@ -27,7 +28,9 @@ type consulHook struct {
hookResources *cstructs.AllocHookResources
envBuilder *taskenv.Builder

logger log.Logger
logger log.Logger
shutdownCtx context.Context
shutdownCancelFn context.CancelFunc
}

type consulHookConfig struct {
Expand All @@ -51,6 +54,7 @@ type consulHookConfig struct {
}

func newConsulHook(cfg consulHookConfig) *consulHook {
shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background())
h := &consulHook{
alloc: cfg.alloc,
allocdir: cfg.allocdir,
Expand All @@ -59,6 +63,8 @@ func newConsulHook(cfg consulHookConfig) *consulHook {
consulClientConstructor: cfg.consulClientConstructor,
hookResources: cfg.hookResources,
envBuilder: cfg.envBuilder(),
shutdownCtx: shutdownCtx,
shutdownCancelFn: shutdownCancelFn,
}
h.logger = cfg.logger.Named(h.Name())
return h
Expand Down Expand Up @@ -225,7 +231,12 @@ func (h *consulHook) getConsulToken(cluster string, req consul.JWTLoginRequest)
return nil, fmt.Errorf("failed to retrieve Consul client for cluster %s: %v", cluster, err)
}

return client.DeriveTokenWithJWT(req)
t, err := client.DeriveTokenWithJWT(req)
if err == nil {
err = client.TokenPreflightCheck(h.shutdownCtx, t)
}

return t, err
}

func (h *consulHook) clientForCluster(cluster string) (consul.Client, error) {
Expand All @@ -248,6 +259,11 @@ func (h *consulHook) Postrun() error {
return nil
}

// Shutdown will get called when the client is gracefully stopping.
func (h *consulHook) Shutdown() {
h.shutdownCancelFn()
}

// Destroy cleans up any remaining Consul tokens if the alloc is GC'd or fails
// to restore after a client restart.
func (h *consulHook) Destroy() error {
Expand Down
122 changes: 103 additions & 19 deletions client/allocrunner/taskrunner/envoy_bootstrap_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"oss.indeed.com/go/libtime"
"oss.indeed.com/go/libtime/decay"
)

Expand All @@ -38,7 +39,7 @@ const (
// envoyBootstrapInitialGap is the initial amount of time the envoy bootstrap
// retry loop will wait, exponentially increasing each iteration, not including
// jitter.
envoyBoostrapInitialGap = 1 * time.Second
envoyBootstrapInitialGap = 1 * time.Second

// envoyBootstrapMaxJitter is the maximum amount of jitter applied to the
// wait gap each iteration of the envoy bootstrap retry loop.
Expand Down Expand Up @@ -76,10 +77,16 @@ func newConsulTransportConfig(cc *config.ConsulConfig) consulTransportConfig {
}
}

type allocServicesClient interface {
AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error)
}

type envoyBootstrapHookConfig struct {
alloc *structs.Allocation
consul consulTransportConfig
consulNamespace string
consulServices allocServicesClient
node *structs.Node
logger hclog.Logger
}

Expand All @@ -94,11 +101,13 @@ func decodeTriState(b *bool) string {
}
}

func newEnvoyBootstrapHookConfig(alloc *structs.Allocation, consul *config.ConsulConfig, consulNamespace string, logger hclog.Logger) *envoyBootstrapHookConfig {
func newEnvoyBootstrapHookConfig(alloc *structs.Allocation, consul *config.ConsulConfig, consulNamespace string, consulServices allocServicesClient, node *structs.Node, logger hclog.Logger) *envoyBootstrapHookConfig {
return &envoyBootstrapHookConfig{
alloc: alloc,
consul: newConsulTransportConfig(consul),
consulNamespace: consulNamespace,
consulServices: consulServices,
node: node,
logger: logger,
}
}
Expand Down Expand Up @@ -134,28 +143,38 @@ type envoyBootstrapHook struct {
envoyBootstrapWaitTime time.Duration

// envoyBootstrapInitialGap is the initial wait gap when retrying
envoyBoostrapInitialGap time.Duration
envoyBootstrapInitialGap time.Duration

// envoyBootstrapMaxJitter is the maximum amount of jitter applied to retries
envoyBootstrapMaxJitter time.Duration

// envoyBootstrapExpSleep controls exponential waiting
envoyBootstrapExpSleep func(time.Duration)
envoyBootstrapExpSleep libtime.Sleeper

// consulServices queries the Consul service catalog for preflight checks
consulServices allocServicesClient

// logger is used to log things
logger hclog.Logger
}

func newEnvoyBootstrapHook(c *envoyBootstrapHookConfig) *envoyBootstrapHook {

waitTime := durationFromMeta(c.node,
"consul.service_preflight_check.timeout", envoyBootstrapWaitTime)
initialGap := durationFromMeta(c.node,
"consul.service_preflight_check.base", envoyBootstrapInitialGap)

return &envoyBootstrapHook{
alloc: c.alloc,
consulConfig: c.consul,
consulNamespace: c.consulNamespace,
envoyBootstrapWaitTime: envoyBootstrapWaitTime,
envoyBoostrapInitialGap: envoyBoostrapInitialGap,
envoyBootstrapMaxJitter: envoyBootstrapMaxJitter,
envoyBootstrapExpSleep: time.Sleep,
logger: c.logger.Named(envoyBootstrapHookName),
alloc: c.alloc,
consulConfig: c.consul,
consulNamespace: c.consulNamespace,
envoyBootstrapWaitTime: waitTime,
envoyBootstrapInitialGap: initialGap,
envoyBootstrapMaxJitter: envoyBootstrapMaxJitter,
envoyBootstrapExpSleep: libtime.NewSleeper(),
consulServices: c.consulServices,
logger: c.logger.Named(envoyBootstrapHookName),
}
}

Expand Down Expand Up @@ -281,7 +300,8 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *ifs.TaskPrestart
}
h.logger.Debug("check for SI token for task", "task", req.Task.Name, "exists", siToken != "")

bootstrap := h.newEnvoyBootstrapArgs(h.alloc.TaskGroup, service, grpcAddr, envoyAdminBind, envoyReadyBind, siToken, bootstrapFilePath)
proxyID := h.proxyServiceID(h.alloc.TaskGroup, service)
bootstrap := h.newEnvoyBootstrapArgs(service, grpcAddr, envoyAdminBind, envoyReadyBind, siToken, bootstrapFilePath, proxyID)

// Create command line arguments
bootstrapArgs := bootstrap.args()
Expand Down Expand Up @@ -316,13 +336,20 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *ifs.TaskPrestart
// keep track of latest error returned from exec-ing consul envoy bootstrap
var cmdErr error

// Since Consul services are registered asynchronously with this task
// hook running, retry until timeout or success.
backoffOpts := decay.BackoffOptions{
MaxSleepTime: h.envoyBootstrapWaitTime,
InitialGapSize: h.envoyBoostrapInitialGap,
InitialGapSize: h.envoyBootstrapInitialGap,
MaxJitterSize: h.envoyBootstrapMaxJitter,
Sleeper: h.envoyBootstrapExpSleep,
}

err = h.servicePreflightCheck(ctx, backoffOpts, proxyID)
if err != nil {
return err
}

// Since Consul services are registered asynchronously with this task
// hook running, retry until timeout or success.
backoffErr := decay.Backoff(func() (bool, error) {
// If hook is killed, just stop.
select {
Expand Down Expand Up @@ -486,12 +513,11 @@ func (h *envoyBootstrapHook) proxyServiceID(group string, service *structs.Servi
//
// https://www.consul.io/commands/connect/envoy#consul-connect-envoy
func (h *envoyBootstrapHook) newEnvoyBootstrapArgs(
group string, service *structs.Service,
grpcAddr, envoyAdminBind, envoyReadyBind, siToken, filepath string,
service *structs.Service,
grpcAddr, envoyAdminBind, envoyReadyBind, siToken, filepath, proxyID string,
) envoyBootstrapArgs {

namespace := h.getConsulNamespace()
proxyID := h.proxyServiceID(group, service)

var gateway string
switch {
Expand Down Expand Up @@ -608,3 +634,61 @@ func (h *envoyBootstrapHook) maybeLoadSIToken(task, dir string) (string, error)
h.logger.Trace("recovered pre-existing SI token", "task", task)
return string(token), nil
}

func (h *envoyBootstrapHook) servicePreflightCheck(
ctx context.Context, backoffOpts decay.BackoffOptions, proxyServiceID string) error {

// keep track of latest error returned from Consul or from missing service
var apiErr error
var allocServices *serviceregistration.AllocRegistration

backoffErr := decay.Backoff(func() (bool, error) {
// If hook is killed, just stop.
select {
case <-ctx.Done():
return false, nil
default:
}

allocServices, apiErr = h.consulServices.AllocRegistrations(h.alloc.ID)
if apiErr != nil {
return true, apiErr
}

if allocServices != nil {
for _, taskServices := range allocServices.Tasks {
for id := range taskServices.Services {
if id == proxyServiceID {
return false, nil
}
}
}
}
apiErr = fmt.Errorf("missing %q", proxyServiceID)
return true, apiErr
}, backoffOpts)

// Wrap the last error we saw set that as our status.
if backoffErr != nil {
return structs.NewRecoverableError(
fmt.Errorf("%w: %v; see: <https://developer.hashicorp.com/nomad/s/envoy-bootstrap-error>",
errEnvoyBootstrapError,
apiErr,
),
true)
}

return nil
}

func durationFromMeta(node *structs.Node, key string, defaultDur time.Duration) time.Duration {
val := node.Meta[key]
if key == "" {
return defaultDur
}
d, err := time.ParseDuration(val)
if err != nil || d == 0 {
return defaultDur
}
return d
}
Loading

0 comments on commit df67e74

Please sign in to comment.