Skip to content

Commit

Permalink
consul: add preflight check for created ACL tokens
Browse files Browse the repository at this point in the history
Nomad creates a Consul ACL token for each service for registering it in Consul
or bootstrapping the Envoy proxy (for service mesh workloads). Nomad always
talks to the local Consul agent and never directly to the Consul servers. But
the local Consul agent talks to the Consul servers in stale consistency mode to
reduce load on the servers. This can result in the Nomad client making the Envoy
bootstrap request with a token that has not yet replicated to the follower that
the local client is connected to. This request gets a 404 on the ACL token and
that negative entry gets cached, preventing any retries from succeeding.

To workaround this, we'll use a method described by our friends over on
`consul-k8s` where after creating the service token we try to read the token
from the local agent in stale consistency mode (which prevents a failed read
from being cached). This cannot completely eliminate this source of error
because it's possible that Consul cluster replication is unhealthy at the time
we need it, but this should make Envoy bootstrap significantly more robust.

In this changeset, we add the preflight check after we login via Workload
Identity and in the function we use to derive tokens in the legacy
workflow. We've added the timeouts to be configurable via node metadata rather
than the usual static configuration because for most cases, users should not
need to touch or even know these values are configurable; the configuration is
mostly available for testing.

Fixes: #9307
Fixes: #20516
Fixes: #10451

Ref: hashicorp/consul-k8s#887
Ref: https://hashicorp.atlassian.net/browse/NET-10051
  • Loading branch information
tgross committed Jun 26, 2024
1 parent 6df8537 commit c2cf0e3
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 33 deletions.
20 changes: 18 additions & 2 deletions client/allocrunner/consul_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package allocrunner

import (
"context"
"fmt"

consulapi "github.com/hashicorp/consul/api"
Expand All @@ -27,7 +28,9 @@ type consulHook struct {
hookResources *cstructs.AllocHookResources
envBuilder *taskenv.Builder

logger log.Logger
logger log.Logger
shutdownCtx context.Context
shutdownCancelFn context.CancelFunc
}

type consulHookConfig struct {
Expand All @@ -51,6 +54,7 @@ type consulHookConfig struct {
}

func newConsulHook(cfg consulHookConfig) *consulHook {
shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background())
h := &consulHook{
alloc: cfg.alloc,
allocdir: cfg.allocdir,
Expand All @@ -59,6 +63,8 @@ func newConsulHook(cfg consulHookConfig) *consulHook {
consulClientConstructor: cfg.consulClientConstructor,
hookResources: cfg.hookResources,
envBuilder: cfg.envBuilder(),
shutdownCtx: shutdownCtx,
shutdownCancelFn: shutdownCancelFn,
}
h.logger = cfg.logger.Named(h.Name())
return h
Expand Down Expand Up @@ -225,7 +231,12 @@ func (h *consulHook) getConsulToken(cluster string, req consul.JWTLoginRequest)
return nil, fmt.Errorf("failed to retrieve Consul client for cluster %s: %v", cluster, err)
}

return client.DeriveTokenWithJWT(req)
t, err := client.DeriveTokenWithJWT(req)
if err == nil {
err = client.TokenPreflightCheck(h.shutdownCtx, t)
}

return t, err
}

func (h *consulHook) clientForCluster(cluster string) (consul.Client, error) {
Expand All @@ -248,6 +259,11 @@ func (h *consulHook) Postrun() error {
return nil
}

// Shutdown will get called when the client is gracefully stopping.
func (h *consulHook) Shutdown() {
h.shutdownCancelFn()
}

// Destroy cleans up any remaining Consul tokens if the alloc is GC'd or fails
// to restore after a client restart.
func (h *consulHook) Destroy() error {
Expand Down
5 changes: 3 additions & 2 deletions client/allocrunner/taskrunner/sids_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func (h *sidsHook) Prestart(
}
}

// need to ask for a new SI token & persist it to disk
// COMPAT(1.9): this code path exists only to support the legacy (non-WI)
// workflow. remove for 1.9.0.
if token == "" {
if token, err = h.deriveSIToken(ctx); err != nil {
return err
Expand Down Expand Up @@ -255,7 +256,7 @@ func (h *sidsHook) kill(ctx context.Context, reason error) {
func (h *sidsHook) tryDerive(ctx context.Context, ch chan<- siDerivationResult) {
for attempt := 0; backoff(ctx, attempt); attempt++ {

tokens, err := h.sidsClient.DeriveSITokens(h.alloc, []string{h.task.Name})
tokens, err := h.sidsClient.DeriveSITokens(ctx, h.alloc, []string{h.task.Name})

switch {
case err == nil:
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/taskrunner/sids_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestSIDSHook_deriveSIToken_timeout(t *testing.T) {
r := require.New(t)

siClient := consulclient.NewMockServiceIdentitiesClient()
siClient.DeriveTokenFn = func(allocation *structs.Allocation, strings []string) (m map[string]string, err error) {
siClient.DeriveTokenFn = func(context.Context, *structs.Allocation, []string) (m map[string]string, err error) {
select {
// block forever, hopefully triggering a timeout in the caller
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestTaskRunner_DeriveSIToken_UnWritableTokenFile(t *testing.T) {
trConfig.ClientConfig.GetDefaultConsul().Token = uuid.Generate()

// derive token works just fine
deriveFn := func(*structs.Allocation, []string) (map[string]string, error) {
deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) {
return map[string]string{task.Name: uuid.Generate()}, nil
}
siClient := trConfig.ConsulSI.(*consulclient.MockServiceIdentitiesClient)
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/taskrunner/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) {
// control when we get a Consul SI token
token := uuid.Generate()
waitCh := make(chan struct{})
deriveFn := func(*structs.Allocation, []string) (map[string]string, error) {
deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) {
<-waitCh
return map[string]string{task.Name: token}, nil
}
Expand Down Expand Up @@ -1530,7 +1530,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) {
// control when we get a Consul SI token (recoverable failure on first call)
token := uuid.Generate()
deriveCount := 0
deriveFn := func(*structs.Allocation, []string) (map[string]string, error) {
deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) {
if deriveCount > 0 {

return map[string]string{task.Name: token}, nil
Expand Down
42 changes: 36 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package client

import (
"context"
"errors"
"fmt"
"maps"
Expand All @@ -28,7 +29,7 @@ import (
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/getter"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
consulApiShim "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/fingerprint"
Expand Down Expand Up @@ -232,7 +233,7 @@ type Client struct {

// consulProxiesFunc gets an interface to Nomad's custom Consul client for
// looking up supported envoy versions
consulProxiesFunc consulApi.SupportedProxiesAPIFunc
consulProxiesFunc consulApiShim.SupportedProxiesAPIFunc

// consulCatalog is the subset of Consul's Catalog API Nomad uses for self
// service discovery
Expand All @@ -256,7 +257,7 @@ type Client struct {

// tokensClient is Nomad Client's custom Consul client for requesting Consul
// Service Identity tokens through Nomad Server.
tokensClient consulApi.ServiceIdentityAPI
tokensClient consulApiShim.ServiceIdentityAPI

// vaultClients is used to interact with Vault for token and secret renewals
vaultClients map[string]vaultclient.VaultClient
Expand Down Expand Up @@ -348,7 +349,7 @@ var (
// registered via https://golang.org/pkg/net/rpc/#Server.RegisterName in place
// of the client's normal RPC handlers. This allows server tests to override
// the behavior of the client.
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxiesFunc consulApi.SupportedProxiesAPIFunc, consulServices serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) {
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxiesFunc consulApiShim.SupportedProxiesAPIFunc, consulServices serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) {
// Create the tls wrapper
var tlsWrap tlsutil.RegionWrapper
if cfg.TLSConfig.EnableRPC {
Expand Down Expand Up @@ -2813,7 +2814,7 @@ func (c *Client) newAllocRunnerConfig(
// identity tokens.
// DEPRECATED: remove in 1.9.0
func (c *Client) setupConsulTokenClient() error {
tc := consulApi.NewIdentitiesClient(c.logger, c.deriveSIToken)
tc := consulApiShim.NewIdentitiesClient(c.logger, c.deriveSIToken)
c.tokensClient = tc
return nil
}
Expand Down Expand Up @@ -2960,7 +2961,7 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli
// deriveSIToken takes an allocation and a set of tasks and derives Consul
// Service Identity tokens for each of the tasks by requesting them from the
// Nomad Server.
func (c *Client) deriveSIToken(alloc *structs.Allocation, taskNames []string) (map[string]string, error) {
func (c *Client) deriveSIToken(ctx context.Context, alloc *structs.Allocation, taskNames []string) (map[string]string, error) {
tasks, err := verifiedTasks(c.logger, alloc, taskNames)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3001,7 +3002,36 @@ func (c *Client) deriveSIToken(alloc *structs.Allocation, taskNames []string) (m
// https://www.consul.io/api/acl/tokens.html#read-a-token
// https://www.consul.io/docs/internals/security.html

consulConfigs := c.config.GetConsulConfigs(c.logger)
consulClientConstructor := consulApiShim.NewConsulClientFactory(c.Node())

tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
tgNs := tg.Consul.GetNamespace()

for task, secretID := range resp.Tokens {
t := tg.LookupTask(task)
ns := t.Consul.GetNamespace()
if ns == "" {
ns = tgNs
}
cluster := tg.LookupTask(task).GetConsulClusterName(tg)
consulConfig := consulConfigs[cluster]
consulClient, err := consulClientConstructor(consulConfig, c.logger)
if err != nil {
return nil, err
}

err = consulClient.TokenPreflightCheck(ctx, &consulapi.ACLToken{
Namespace: ns,
SecretID: secretID,
})
if err != nil {
return nil, err
}
}

m := maps.Clone(resp.Tokens)

return m, nil
}

Expand Down
86 changes: 79 additions & 7 deletions client/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@
package consul

import (
"context"
"fmt"
"time"

consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/useragent"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
)

// TokenDeriverFunc takes an allocation and a set of tasks and derives a
// service identity token for each. Requests go through nomad server.
type TokenDeriverFunc func(*structs.Allocation, []string) (map[string]string, error)
// TokenDeriverFunc takes an allocation and a set of tasks and derives a service
// identity token for each. Requests go through nomad server and the local
// Consul agent.
type TokenDeriverFunc func(context.Context, *structs.Allocation, []string) (map[string]string, error)

// ServiceIdentityAPI is the interface the Nomad Client uses to request Consul
// Service Identity tokens through Nomad Server.
Expand All @@ -27,7 +31,7 @@ type TokenDeriverFunc func(*structs.Allocation, []string) (map[string]string, er
type ServiceIdentityAPI interface {
// DeriveSITokens contacts the nomad server and requests consul service
// identity tokens be generated for tasks in the allocation.
DeriveSITokens(alloc *structs.Allocation, tasks []string) (map[string]string, error)
DeriveSITokens(ctx context.Context, alloc *structs.Allocation, tasks []string) (map[string]string, error)
}

// SupportedProxiesAPI is the interface the Nomad Client uses to request from
Expand Down Expand Up @@ -57,6 +61,10 @@ type Client interface {
DeriveTokenWithJWT(JWTLoginRequest) (*consulapi.ACLToken, error)

RevokeTokens([]*consulapi.ACLToken) error

// TokenPreflightCheck verifies that a token has been replicated before we
// try to use it for registering services or bootstrapping Envoy
TokenPreflightCheck(context.Context, *consulapi.ACLToken) error
}

type consulClient struct {
Expand All @@ -70,6 +78,12 @@ type consulClient struct {
config *config.ConsulConfig

logger hclog.Logger

// preflightCheckTimeout/BaseInterval control how long the client will wait
// for Consul ACLs tokens to be fully replicated before giving up on the
// allocation; these are configurable via node metadata
preflightCheckTimeout time.Duration
preflightCheckBaseInterval time.Duration
}

// ConsulClientFunc creates a new Consul client for the specific Consul config
Expand All @@ -78,7 +92,14 @@ type ConsulClientFunc func(config *config.ConsulConfig, logger hclog.Logger) (Cl
// NewConsulClientFactory returns a ConsulClientFunc that closes over the
// partition
func NewConsulClientFactory(node *structs.Node) ConsulClientFunc {

// these node values will be evaluated at the time we create the hooks, so
// we don't need to worry about them changing out from under us
partition := node.Attributes["consul.partition"]
preflightCheckTimeout := durationFromMeta(
node, "consul.token_preflight_check.timeout", time.Second*10)
preflightCheckBaseInterval := durationFromMeta(
node, "consul.token_preflight_check.base", time.Millisecond*500)

return func(config *config.ConsulConfig, logger hclog.Logger) (Client, error) {
if config == nil {
Expand All @@ -88,9 +109,11 @@ func NewConsulClientFactory(node *structs.Node) ConsulClientFunc {
logger = logger.Named("consul").With("name", config.Name)

c := &consulClient{
config: config,
logger: logger,
partition: partition,
config: config,
logger: logger,
partition: partition,
preflightCheckTimeout: preflightCheckTimeout,
preflightCheckBaseInterval: preflightCheckBaseInterval,
}

// Get the Consul API configuration
Expand All @@ -115,6 +138,18 @@ func NewConsulClientFactory(node *structs.Node) ConsulClientFunc {
}
}

func durationFromMeta(node *structs.Node, key string, defaultDur time.Duration) time.Duration {
val := node.Meta[key]
if key == "" {
return defaultDur
}
d, err := time.ParseDuration(val)
if err != nil || d == 0 {
return defaultDur
}
return d
}

// DeriveTokenWithJWT takes a JWT from request and returns a consul token.
func (c *consulClient) DeriveTokenWithJWT(req JWTLoginRequest) (*consulapi.ACLToken, error) {
t, _, err := c.client.ACL().Login(&consulapi.ACLLoginParams{
Expand All @@ -141,3 +176,40 @@ func (c *consulClient) RevokeTokens(tokens []*consulapi.ACLToken) error {

return mErr.ErrorOrNil()
}

// TokenPreflightCheck verifies that a token has been replicated before we
// try to use it for registering services or bootstrapping Envoy
func (c *consulClient) TokenPreflightCheck(pctx context.Context, t *consulapi.ACLToken) error {
timer, timerStop := helper.NewStoppedTimer()
defer timerStop()

var retry uint64
var err error
ctx, cancel := context.WithTimeout(pctx, c.preflightCheckTimeout)
defer cancel()

for {
_, _, err = c.client.ACL().TokenReadSelf(&consulapi.QueryOptions{
Namespace: t.Namespace,
Partition: c.partition,
AllowStale: true,
Token: t.SecretID,
})
if err == nil {
return nil
}

retry++
backoff := helper.Backoff(
c.preflightCheckBaseInterval, c.preflightCheckBaseInterval*2, retry)
c.logger.Trace("waiting for Consul stale query on token",
"error", err, "backoff", backoff)
timer.Reset(backoff)
select {
case <-ctx.Done():
return err
case <-timer.C:
continue
}
}
}
Loading

0 comments on commit c2cf0e3

Please sign in to comment.