From df67e74615476dc95cf759a520d644a9daeb9cb5 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 27 Jun 2024 10:15:37 -0400 Subject: [PATCH] Consul: add preflight checks for Envoy bootstrap (#23381) Nomad creates Consul ACL tokens and service registrations to support Consul service mesh workloads, before bootstrapping the Envoy proxy. Nomad always talks to the local Consul agent and never directly to the Consul servers. But the local Consul agent talks to the Consul servers in stale consistency mode to reduce load on the servers. This can result in the Nomad client making the Envoy bootstrap request with a tokens or services that have not yet replicated to the follower that the local client is connected to. This request gets a 404 on the ACL token and that negative entry gets cached, preventing any retries from succeeding. To workaround this, we'll use a method described by our friends over on `consul-k8s` where after creating the objects in Consul we try to read them from the local agent in stale consistency mode (which prevents a failed read from being cached). This cannot completely eliminate this source of error because it's possible that Consul cluster replication is unhealthy at the time we need it, but this should make Envoy bootstrap significantly more robust. This changset adds preflight checks for the objects we create in Consul: * We add a preflight check for ACL tokens after we login via via Workload Identity and in the function we use to derive tokens in the legacy workflow. We do this check early because we also want to use this token for registering group services in the allocrunner hooks. * We add a preflight check for services right before we bootstrap Envoy in the taskrunner hook, so that we have time for our service client to batch updates to the local Consul agent in addition to the local agent sync. We've added the timeouts to be configurable via node metadata rather than the usual static configuration because for most cases, users should not need to touch or even know these values are configurable; the configuration is mostly available for testing. Fixes: https://github.com/hashicorp/nomad/issues/9307 Fixes: https://github.com/hashicorp/nomad/issues/10451 Fixes: https://github.com/hashicorp/nomad/issues/20516 Ref: https://github.com/hashicorp/consul-k8s/pull/887 Ref: https://hashicorp.atlassian.net/browse/NET-10051 Ref: https://hashicorp.atlassian.net/browse/NET-9273 Follow-up: https://hashicorp.atlassian.net/browse/NET-10138 --- .changelog/23381.txt | 3 + client/allocrunner/alloc_runner_hooks.go | 2 +- client/allocrunner/consul_hook.go | 20 +- .../taskrunner/envoy_bootstrap_hook.go | 122 ++++++++-- .../taskrunner/envoy_bootstrap_hook_test.go | 210 ++++++++++++------ client/allocrunner/taskrunner/sids_hook.go | 5 +- .../allocrunner/taskrunner/sids_hook_test.go | 4 +- .../taskrunner/task_runner_hooks.go | 11 +- .../taskrunner/task_runner_test.go | 4 +- client/client.go | 42 +++- client/config/config.go | 4 + client/consul/consul.go | 99 ++++++++- client/consul/consul_test.go | 131 +++++++++++ client/consul/consul_testing.go | 5 + client/consul/identities.go | 6 +- client/consul/identities_test.go | 17 +- client/consul/identities_testing.go | 5 +- .../docs/integrations/consul/service-mesh.mdx | 18 +- 18 files changed, 580 insertions(+), 128 deletions(-) create mode 100644 .changelog/23381.txt create mode 100644 client/consul/consul_test.go diff --git a/.changelog/23381.txt b/.changelog/23381.txt new file mode 100644 index 00000000000..4aa458af39b --- /dev/null +++ b/.changelog/23381.txt @@ -0,0 +1,3 @@ +```release-note:bug +consul: Fixed a bug where service registration and Envoy bootstrap would not wait for Consul ACL tokens and services to be replicated to the local agent +``` diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index ae57c4c7982..21e7aef053f 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -130,7 +130,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { allocdir: ar.allocDir, widmgr: ar.widmgr, consulConfigs: ar.clientConfig.GetConsulConfigs(hookLogger), - consulClientConstructor: consul.NewConsulClientFactory(config.Node), + consulClientConstructor: consul.NewConsulClientFactory(config), hookResources: ar.hookResources, envBuilder: newEnvBuilder, logger: hookLogger, diff --git a/client/allocrunner/consul_hook.go b/client/allocrunner/consul_hook.go index 54a64c8f2d2..b4e087e06f8 100644 --- a/client/allocrunner/consul_hook.go +++ b/client/allocrunner/consul_hook.go @@ -4,6 +4,7 @@ package allocrunner import ( + "context" "fmt" consulapi "github.com/hashicorp/consul/api" @@ -27,7 +28,9 @@ type consulHook struct { hookResources *cstructs.AllocHookResources envBuilder *taskenv.Builder - logger log.Logger + logger log.Logger + shutdownCtx context.Context + shutdownCancelFn context.CancelFunc } type consulHookConfig struct { @@ -51,6 +54,7 @@ type consulHookConfig struct { } func newConsulHook(cfg consulHookConfig) *consulHook { + shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background()) h := &consulHook{ alloc: cfg.alloc, allocdir: cfg.allocdir, @@ -59,6 +63,8 @@ func newConsulHook(cfg consulHookConfig) *consulHook { consulClientConstructor: cfg.consulClientConstructor, hookResources: cfg.hookResources, envBuilder: cfg.envBuilder(), + shutdownCtx: shutdownCtx, + shutdownCancelFn: shutdownCancelFn, } h.logger = cfg.logger.Named(h.Name()) return h @@ -225,7 +231,12 @@ func (h *consulHook) getConsulToken(cluster string, req consul.JWTLoginRequest) return nil, fmt.Errorf("failed to retrieve Consul client for cluster %s: %v", cluster, err) } - return client.DeriveTokenWithJWT(req) + t, err := client.DeriveTokenWithJWT(req) + if err == nil { + err = client.TokenPreflightCheck(h.shutdownCtx, t) + } + + return t, err } func (h *consulHook) clientForCluster(cluster string) (consul.Client, error) { @@ -248,6 +259,11 @@ func (h *consulHook) Postrun() error { return nil } +// Shutdown will get called when the client is gracefully stopping. +func (h *consulHook) Shutdown() { + h.shutdownCancelFn() +} + // Destroy cleans up any remaining Consul tokens if the alloc is GC'd or fails // to restore after a client restart. func (h *consulHook) Destroy() error { diff --git a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go index 483157af203..5797f9a22eb 100644 --- a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go +++ b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" + "oss.indeed.com/go/libtime" "oss.indeed.com/go/libtime/decay" ) @@ -38,7 +39,7 @@ const ( // envoyBootstrapInitialGap is the initial amount of time the envoy bootstrap // retry loop will wait, exponentially increasing each iteration, not including // jitter. - envoyBoostrapInitialGap = 1 * time.Second + envoyBootstrapInitialGap = 1 * time.Second // envoyBootstrapMaxJitter is the maximum amount of jitter applied to the // wait gap each iteration of the envoy bootstrap retry loop. @@ -76,10 +77,16 @@ func newConsulTransportConfig(cc *config.ConsulConfig) consulTransportConfig { } } +type allocServicesClient interface { + AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) +} + type envoyBootstrapHookConfig struct { alloc *structs.Allocation consul consulTransportConfig consulNamespace string + consulServices allocServicesClient + node *structs.Node logger hclog.Logger } @@ -94,11 +101,13 @@ func decodeTriState(b *bool) string { } } -func newEnvoyBootstrapHookConfig(alloc *structs.Allocation, consul *config.ConsulConfig, consulNamespace string, logger hclog.Logger) *envoyBootstrapHookConfig { +func newEnvoyBootstrapHookConfig(alloc *structs.Allocation, consul *config.ConsulConfig, consulNamespace string, consulServices allocServicesClient, node *structs.Node, logger hclog.Logger) *envoyBootstrapHookConfig { return &envoyBootstrapHookConfig{ alloc: alloc, consul: newConsulTransportConfig(consul), consulNamespace: consulNamespace, + consulServices: consulServices, + node: node, logger: logger, } } @@ -134,28 +143,38 @@ type envoyBootstrapHook struct { envoyBootstrapWaitTime time.Duration // envoyBootstrapInitialGap is the initial wait gap when retrying - envoyBoostrapInitialGap time.Duration + envoyBootstrapInitialGap time.Duration // envoyBootstrapMaxJitter is the maximum amount of jitter applied to retries envoyBootstrapMaxJitter time.Duration // envoyBootstrapExpSleep controls exponential waiting - envoyBootstrapExpSleep func(time.Duration) + envoyBootstrapExpSleep libtime.Sleeper + + // consulServices queries the Consul service catalog for preflight checks + consulServices allocServicesClient // logger is used to log things logger hclog.Logger } func newEnvoyBootstrapHook(c *envoyBootstrapHookConfig) *envoyBootstrapHook { + + waitTime := durationFromMeta(c.node, + "consul.service_preflight_check.timeout", envoyBootstrapWaitTime) + initialGap := durationFromMeta(c.node, + "consul.service_preflight_check.base", envoyBootstrapInitialGap) + return &envoyBootstrapHook{ - alloc: c.alloc, - consulConfig: c.consul, - consulNamespace: c.consulNamespace, - envoyBootstrapWaitTime: envoyBootstrapWaitTime, - envoyBoostrapInitialGap: envoyBoostrapInitialGap, - envoyBootstrapMaxJitter: envoyBootstrapMaxJitter, - envoyBootstrapExpSleep: time.Sleep, - logger: c.logger.Named(envoyBootstrapHookName), + alloc: c.alloc, + consulConfig: c.consul, + consulNamespace: c.consulNamespace, + envoyBootstrapWaitTime: waitTime, + envoyBootstrapInitialGap: initialGap, + envoyBootstrapMaxJitter: envoyBootstrapMaxJitter, + envoyBootstrapExpSleep: libtime.NewSleeper(), + consulServices: c.consulServices, + logger: c.logger.Named(envoyBootstrapHookName), } } @@ -281,7 +300,8 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *ifs.TaskPrestart } h.logger.Debug("check for SI token for task", "task", req.Task.Name, "exists", siToken != "") - bootstrap := h.newEnvoyBootstrapArgs(h.alloc.TaskGroup, service, grpcAddr, envoyAdminBind, envoyReadyBind, siToken, bootstrapFilePath) + proxyID := h.proxyServiceID(h.alloc.TaskGroup, service) + bootstrap := h.newEnvoyBootstrapArgs(service, grpcAddr, envoyAdminBind, envoyReadyBind, siToken, bootstrapFilePath, proxyID) // Create command line arguments bootstrapArgs := bootstrap.args() @@ -316,13 +336,20 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *ifs.TaskPrestart // keep track of latest error returned from exec-ing consul envoy bootstrap var cmdErr error - // Since Consul services are registered asynchronously with this task - // hook running, retry until timeout or success. backoffOpts := decay.BackoffOptions{ MaxSleepTime: h.envoyBootstrapWaitTime, - InitialGapSize: h.envoyBoostrapInitialGap, + InitialGapSize: h.envoyBootstrapInitialGap, MaxJitterSize: h.envoyBootstrapMaxJitter, + Sleeper: h.envoyBootstrapExpSleep, + } + + err = h.servicePreflightCheck(ctx, backoffOpts, proxyID) + if err != nil { + return err } + + // Since Consul services are registered asynchronously with this task + // hook running, retry until timeout or success. backoffErr := decay.Backoff(func() (bool, error) { // If hook is killed, just stop. select { @@ -486,12 +513,11 @@ func (h *envoyBootstrapHook) proxyServiceID(group string, service *structs.Servi // // https://www.consul.io/commands/connect/envoy#consul-connect-envoy func (h *envoyBootstrapHook) newEnvoyBootstrapArgs( - group string, service *structs.Service, - grpcAddr, envoyAdminBind, envoyReadyBind, siToken, filepath string, + service *structs.Service, + grpcAddr, envoyAdminBind, envoyReadyBind, siToken, filepath, proxyID string, ) envoyBootstrapArgs { namespace := h.getConsulNamespace() - proxyID := h.proxyServiceID(group, service) var gateway string switch { @@ -608,3 +634,61 @@ func (h *envoyBootstrapHook) maybeLoadSIToken(task, dir string) (string, error) h.logger.Trace("recovered pre-existing SI token", "task", task) return string(token), nil } + +func (h *envoyBootstrapHook) servicePreflightCheck( + ctx context.Context, backoffOpts decay.BackoffOptions, proxyServiceID string) error { + + // keep track of latest error returned from Consul or from missing service + var apiErr error + var allocServices *serviceregistration.AllocRegistration + + backoffErr := decay.Backoff(func() (bool, error) { + // If hook is killed, just stop. + select { + case <-ctx.Done(): + return false, nil + default: + } + + allocServices, apiErr = h.consulServices.AllocRegistrations(h.alloc.ID) + if apiErr != nil { + return true, apiErr + } + + if allocServices != nil { + for _, taskServices := range allocServices.Tasks { + for id := range taskServices.Services { + if id == proxyServiceID { + return false, nil + } + } + } + } + apiErr = fmt.Errorf("missing %q", proxyServiceID) + return true, apiErr + }, backoffOpts) + + // Wrap the last error we saw set that as our status. + if backoffErr != nil { + return structs.NewRecoverableError( + fmt.Errorf("%w: %v; see: ", + errEnvoyBootstrapError, + apiErr, + ), + true) + } + + return nil +} + +func durationFromMeta(node *structs.Node, key string, defaultDur time.Duration) time.Duration { + val := node.Meta[key] + if key == "" { + return defaultDur + } + d, err := time.ParseDuration(val) + if err != nil || d == 0 { + return defaultDur + } + return d +} diff --git a/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go b/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go index 0e6b32d2915..e5095e3d1aa 100644 --- a/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go +++ b/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go @@ -15,6 +15,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/testutil" agentconsul "github.com/hashicorp/nomad/command/agent/consul" @@ -33,6 +35,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/plugins/drivers/fsisolation" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" ) @@ -66,7 +69,8 @@ func TestEnvoyBootstrapHook_maybeLoadSIToken(t *testing.T) { } t.Run("file does not exist", func(t *testing.T) { - h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)}) + h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{ + logger: testlog.HCLogger(t), node: mock.Node()}) cfg, err := h.maybeLoadSIToken("task1", "/does/not/exist") require.NoError(t, err) // absence of token is not an error require.Equal(t, "", cfg) @@ -76,7 +80,8 @@ func TestEnvoyBootstrapHook_maybeLoadSIToken(t *testing.T) { token := uuid.Generate() f := writeTmp(t, token, 0440) - h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)}) + h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{ + logger: testlog.HCLogger(t), node: mock.Node()}) cfg, err := h.maybeLoadSIToken("task1", f) require.NoError(t, err) require.Equal(t, token, cfg) @@ -86,7 +91,8 @@ func TestEnvoyBootstrapHook_maybeLoadSIToken(t *testing.T) { token := uuid.Generate() f := writeTmp(t, token, 0200) - h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)}) + h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{ + logger: testlog.HCLogger(t), node: mock.Node()}) cfg, err := h.maybeLoadSIToken("task1", f) require.Error(t, err) require.False(t, os.IsNotExist(err)) @@ -342,15 +348,15 @@ func TestEnvoyBootstrapHook_with_SI_token(t *testing.T) { require.NoError(t, err) namespacesClient := agentconsul.NewNamespacesClient(consulAPIClient.Namespaces(), consulAPIClient.Agent()) - consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) - go consulClient.Run() - defer consulClient.Shutdown() - require.NoError(t, consulClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter()))) + serviceClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) + go serviceClient.Run() + defer serviceClient.Shutdown() + must.NoError(t, serviceClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter()))) // Run Connect bootstrap Hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: consulConfig.Address, - }, consulNamespace, logger)) + }, consulNamespace, serviceClient, mock.Node(), logger)) req := &interfaces.TaskPrestartRequest{ Task: sidecarTask, TaskDir: allocDir.NewTaskDir(sidecarTask.Name), @@ -388,10 +394,10 @@ func TestEnvoyBootstrapHook_with_SI_token(t *testing.T) { require.Equal(t, token, value) } -// TestTaskRunner_EnvoyBootstrapHook_sidecar_ok asserts the EnvoyBootstrapHook +// TestEnvoyBootstrapHook_sidecar_ok asserts the EnvoyBootstrapHook // creates Envoy's bootstrap.json configuration based on Connect proxy sidecars // registered for the task. -func TestTaskRunner_EnvoyBootstrapHook_sidecar_ok(t *testing.T) { +func TestEnvoyBootstrapHook_sidecar_ok(t *testing.T) { ci.Parallel(t) testutil.RequireConsul(t) @@ -440,15 +446,15 @@ func TestTaskRunner_EnvoyBootstrapHook_sidecar_ok(t *testing.T) { require.NoError(t, err) namespacesClient := agentconsul.NewNamespacesClient(consulAPIClient.Namespaces(), consulAPIClient.Agent()) - consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) - go consulClient.Run() - defer consulClient.Shutdown() - require.NoError(t, consulClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter()))) + serviceClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) + go serviceClient.Run() + defer serviceClient.Shutdown() + require.NoError(t, serviceClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter()))) // Run Connect bootstrap Hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: consulConfig.Address, - }, consulNamespace, logger)) + }, consulNamespace, serviceClient, mock.Node(), logger)) req := &interfaces.TaskPrestartRequest{ Task: sidecarTask, TaskDir: allocDir.NewTaskDir(sidecarTask.Name), @@ -483,7 +489,7 @@ func TestTaskRunner_EnvoyBootstrapHook_sidecar_ok(t *testing.T) { require.Equal(t, "", value) } -func TestTaskRunner_EnvoyBootstrapHook_gateway_ok(t *testing.T) { +func TestEnvoyBootstrapHook_gateway_ok(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) @@ -527,7 +533,7 @@ func TestTaskRunner_EnvoyBootstrapHook_gateway_ok(t *testing.T) { // Run Connect bootstrap hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: consulConfig.Address, - }, consulNamespace, logger)) + }, consulNamespace, serviceClient, mock.Node(), logger)) req := &interfaces.TaskPrestartRequest{ Task: alloc.Job.TaskGroups[0].Tasks[0], @@ -563,9 +569,9 @@ func TestTaskRunner_EnvoyBootstrapHook_gateway_ok(t *testing.T) { require.Equal(t, "ingress-gateway", out.Node.Cluster) } -// TestTaskRunner_EnvoyBootstrapHook_Noop asserts that the Envoy bootstrap hook +// TestEnvoyBootstrapHook_Noop asserts that the Envoy bootstrap hook // is a noop for non-Connect proxy sidecar / gateway tasks. -func TestTaskRunner_EnvoyBootstrapHook_Noop(t *testing.T) { +func TestEnvoyBootstrapHook_Noop(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) @@ -578,7 +584,7 @@ func TestTaskRunner_EnvoyBootstrapHook_Noop(t *testing.T) { // not get hit. h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: "http://127.0.0.2:1", - }, consulNamespace, logger)) + }, consulNamespace, nil, mock.Node(), logger)) req := &interfaces.TaskPrestartRequest{ Task: task, TaskDir: allocDir.NewTaskDir(task.Name), @@ -596,10 +602,10 @@ func TestTaskRunner_EnvoyBootstrapHook_Noop(t *testing.T) { require.True(t, os.IsNotExist(err)) } -// TestTaskRunner_EnvoyBootstrapHook_RecoverableError asserts the Envoy -// bootstrap hook returns a Recoverable error if the bootstrap command runs but -// fails. -func TestTaskRunner_EnvoyBootstrapHook_RecoverableError(t *testing.T) { +// TestEnvoyBootstrapHook_CommandFailed asserts the Envoy bootstrap +// hook returns a Recoverable error if the bootstrap command runs but fails, and +// that we retry the appropriate number of times +func TestEnvoyBootstrapHook_CommandFailed(t *testing.T) { ci.Parallel(t) testutil.RequireConsul(t) @@ -641,40 +647,63 @@ func TestTaskRunner_EnvoyBootstrapHook_RecoverableError(t *testing.T) { allocDir, cleanup := allocdir.TestAllocDir(t, logger, "EnvoyBootstrap", alloc.ID) defer cleanup() + begin := time.Now() + // Unlike the successful test above, do NOT register the group services - // yet. This should cause a recoverable error similar to if Consul was - // not running. + // yet. This should cause a recoverable error similar to if Consul was not + // running. We're adding a mock services client here so that the preflight + // check passes, so that we can exercise the retry logic specific to the + // bootstrap command. // Run Connect bootstrap Hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: testConsul.HTTPAddr, - }, consulNamespace, logger)) + }, consulNamespace, newMockAllocServicesClient(tg.Services[0], nil), mock.Node(), logger)) - // Lower the allowable wait time for testing + // Lower the allowable wait time for testing and keep track of retry backoff + // iterations h.envoyBootstrapWaitTime = 1 * time.Second - h.envoyBoostrapInitialGap = 100 * time.Millisecond + h.envoyBootstrapInitialGap = 100 * time.Millisecond + sleeper := &mockSleeper{} + h.envoyBootstrapExpSleep = sleeper req := &interfaces.TaskPrestartRequest{ Task: sidecarTask, TaskDir: allocDir.NewTaskDir(sidecarTask.Name), TaskEnv: taskenv.NewEmptyTaskEnv(), } - require.NoError(t, req.TaskDir.Build(fsisolation.None, nil, sidecarTask.User)) + must.NoError(t, req.TaskDir.Build(fsisolation.None, nil, sidecarTask.User)) resp := &interfaces.TaskPrestartResponse{} // Run the hook err := h.Prestart(context.Background(), req, resp) - require.ErrorIs(t, err, errEnvoyBootstrapError) - require.True(t, structs.IsRecoverable(err)) + must.ErrorIs(t, err, errEnvoyBootstrapError) + must.True(t, structs.IsRecoverable(err)) + must.False(t, resp.Done) - // Assert no file was written + // Current time should be at least start time + total wait time, and we + // should hit at least 2 iterations + minimum := begin.Add(h.envoyBootstrapWaitTime) + must.True(t, time.Now().After(minimum)) + must.GreaterEq(t, 2, sleeper.iterations) + + // No bootstrap config file should be written _, err = os.Open(filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")) - require.Error(t, err) - require.True(t, os.IsNotExist(err)) + must.Error(t, err) + must.True(t, os.IsNotExist(err)) +} + +type mockSleeper struct { + iterations int } -func TestTaskRunner_EnvoyBootstrapHook_retryTimeout(t *testing.T) { +func (m *mockSleeper) Sleep(d time.Duration) { + m.iterations++ + time.Sleep(d) +} + +func TestEnvoyBootstrapHook_PreflightFailed(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) @@ -720,23 +749,26 @@ func TestTaskRunner_EnvoyBootstrapHook_retryTimeout(t *testing.T) { consulConfig := consulapi.DefaultConfig() consulConfig.Address = testConsul.HTTPAddr - // Do NOT register group services, causing the hook to retry until timeout + consulAPIClient, err := consulapi.NewClient(consulConfig) + must.NoError(t, err) + namespacesClient := agentconsul.NewNamespacesClient(consulAPIClient.Namespaces(), consulAPIClient.Agent()) + + serviceClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) + + // Do NOT register group services, causing the hook to retry until timeout. + // Note that here we expect the preflight check timeout to happen // Run Connect bootstrap hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: consulConfig.Address, - }, consulNamespace, logger)) - - // Keep track of the retry backoff iterations - iterations := 0 + }, consulNamespace, serviceClient, mock.Node(), logger)) - // Lower the allowable wait time for testing - h.envoyBootstrapWaitTime = 3 * time.Second - h.envoyBoostrapInitialGap = 1 * time.Second - h.envoyBootstrapExpSleep = func(d time.Duration) { - iterations++ - time.Sleep(d) - } + // Lower the allowable wait time for testing and keep track of retry backoff + // iterations + h.envoyBootstrapWaitTime = 1 * time.Second + h.envoyBootstrapInitialGap = 100 * time.Millisecond + sleeper := &mockSleeper{} + h.envoyBootstrapExpSleep = sleeper // Create the prestart request req := &interfaces.TaskPrestartRequest{ @@ -744,30 +776,29 @@ func TestTaskRunner_EnvoyBootstrapHook_retryTimeout(t *testing.T) { TaskDir: allocDir.NewTaskDir(sidecarTask.Name), TaskEnv: taskenv.NewEmptyTaskEnv(), } - require.NoError(t, req.TaskDir.Build(fsisolation.None, nil, sidecarTask.User)) + must.NoError(t, req.TaskDir.Build(fsisolation.None, nil, sidecarTask.User)) var resp interfaces.TaskPrestartResponse // Run the hook and get the error - err := h.Prestart(context.Background(), req, &resp) - require.ErrorIs(t, err, errEnvoyBootstrapError) + err = h.Prestart(context.Background(), req, &resp) + must.ErrorIs(t, err, errEnvoyBootstrapError) + must.True(t, structs.IsRecoverable(err)) + must.False(t, resp.Done) - // Current time should be at least start time + total wait time + // Current time should be at least start time + total wait time, and we + // should hit at least 2 iterations minimum := begin.Add(h.envoyBootstrapWaitTime) - require.True(t, time.Now().After(minimum)) + must.True(t, time.Now().After(minimum)) + must.GreaterEq(t, 2, sleeper.iterations) - // Should hit at least 2 iterations - require.Greater(t, 2, iterations) - - // Make sure we captured the recoverable-ness of the error - _, ok := err.(*structs.RecoverableError) - require.True(t, ok) - - // Assert the hook is not done (it failed) - require.False(t, resp.Done) + // No bootstrap config file should be written + _, err = os.Open(filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")) + must.Error(t, err) + must.True(t, os.IsNotExist(err)) } -func TestTaskRunner_EnvoyBootstrapHook_extractNameAndKind(t *testing.T) { +func TestEnvoyBootstrapHook_extractNameAndKind(t *testing.T) { t.Run("connect sidecar", func(t *testing.T) { kind, name, err := (*envoyBootstrapHook)(nil).extractNameAndKind( structs.NewTaskKind(structs.ConnectProxyPrefix, "foo"), @@ -801,13 +832,15 @@ func TestTaskRunner_EnvoyBootstrapHook_extractNameAndKind(t *testing.T) { }) } -func TestTaskRunner_EnvoyBootstrapHook_grpcAddress(t *testing.T) { +func TestEnvoyBootstrapHook_grpcAddress(t *testing.T) { ci.Parallel(t) bridgeH := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig( mock.ConnectIngressGatewayAlloc("bridge"), new(config.ConsulConfig), consulNamespace, + nil, + mock.Node(), testlog.HCLogger(t), )) @@ -815,6 +848,8 @@ func TestTaskRunner_EnvoyBootstrapHook_grpcAddress(t *testing.T) { mock.ConnectIngressGatewayAlloc("host"), new(config.ConsulConfig), consulNamespace, + nil, + mock.Node(), testlog.HCLogger(t), )) @@ -832,7 +867,7 @@ func TestTaskRunner_EnvoyBootstrapHook_grpcAddress(t *testing.T) { }) } -func TestTaskRunner_EnvoyBootstrapHook_isConnectKind(t *testing.T) { +func TestEnvoyBootstrapHook_isConnectKind(t *testing.T) { ci.Parallel(t) require.True(t, isConnectKind(structs.ConnectProxyPrefix)) @@ -842,3 +877,48 @@ func TestTaskRunner_EnvoyBootstrapHook_isConnectKind(t *testing.T) { require.False(t, isConnectKind("")) require.False(t, isConnectKind("something")) } + +type mockAllocServicesClient struct { + service *structs.Service + lock sync.RWMutex + requestCount int + err error +} + +func (m *mockAllocServicesClient) resetError(err error) { + m.lock.Lock() + defer m.lock.Unlock() + m.requestCount = 0 + m.err = err +} + +func (m *mockAllocServicesClient) AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) { + m.lock.RLock() + defer m.lock.RUnlock() + m.requestCount++ + if m.err != nil { + return nil, m.err + } + reg := &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "foo": &serviceregistration.ServiceRegistrations{ + Services: map[string]*serviceregistration.ServiceRegistration{ + m.service.Name: &serviceregistration.ServiceRegistration{ + ServiceID: "", + CheckIDs: map[string]struct{}{}, + CheckOnUpdate: map[string]string{}, + Service: &consulapi.AgentService{}, + Checks: []*consulapi.AgentCheck{}, + SidecarService: &consulapi.AgentService{}, + SidecarChecks: []*consulapi.AgentCheck{}, + }, + }, + }, + }, + } + return reg, nil +} + +func newMockAllocServicesClient(service *structs.Service, err error) allocServicesClient { + return &mockAllocServicesClient{service: service, err: err} +} diff --git a/client/allocrunner/taskrunner/sids_hook.go b/client/allocrunner/taskrunner/sids_hook.go index 7d5e780a045..99b47a09b73 100644 --- a/client/allocrunner/taskrunner/sids_hook.go +++ b/client/allocrunner/taskrunner/sids_hook.go @@ -151,7 +151,8 @@ func (h *sidsHook) Prestart( } } - // need to ask for a new SI token & persist it to disk + // COMPAT(1.9): this code path exists only to support the legacy (non-WI) + // workflow. remove for 1.9.0. if token == "" { if token, err = h.deriveSIToken(ctx); err != nil { return err @@ -255,7 +256,7 @@ func (h *sidsHook) kill(ctx context.Context, reason error) { func (h *sidsHook) tryDerive(ctx context.Context, ch chan<- siDerivationResult) { for attempt := 0; backoff(ctx, attempt); attempt++ { - tokens, err := h.sidsClient.DeriveSITokens(h.alloc, []string{h.task.Name}) + tokens, err := h.sidsClient.DeriveSITokens(ctx, h.alloc, []string{h.task.Name}) switch { case err == nil: diff --git a/client/allocrunner/taskrunner/sids_hook_test.go b/client/allocrunner/taskrunner/sids_hook_test.go index c93a303624c..0df13f3cb5d 100644 --- a/client/allocrunner/taskrunner/sids_hook_test.go +++ b/client/allocrunner/taskrunner/sids_hook_test.go @@ -191,7 +191,7 @@ func TestSIDSHook_deriveSIToken_timeout(t *testing.T) { r := require.New(t) siClient := consulclient.NewMockServiceIdentitiesClient() - siClient.DeriveTokenFn = func(allocation *structs.Allocation, strings []string) (m map[string]string, err error) { + siClient.DeriveTokenFn = func(context.Context, *structs.Allocation, []string) (m map[string]string, err error) { select { // block forever, hopefully triggering a timeout in the caller } @@ -288,7 +288,7 @@ func TestTaskRunner_DeriveSIToken_UnWritableTokenFile(t *testing.T) { trConfig.ClientConfig.GetDefaultConsul().Token = uuid.Generate() // derive token works just fine - deriveFn := func(*structs.Allocation, []string) (map[string]string, error) { + deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { return map[string]string{task.Name: uuid.Generate()}, nil } siClient := trConfig.ConsulSI.(*consulclient.MockServiceIdentitiesClient) diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 737e05f8f84..b0fdcea1a4d 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -145,9 +145,11 @@ func (tr *TaskRunner) initHooks() { if task.UsesConnect() { tg := tr.Alloc().Job.LookupTaskGroup(tr.Alloc().TaskGroup) + consulCfg := tr.clientConfig.GetConsulConfigs(tr.logger)[task.GetConsulClusterName(tg)] + // Enable the Service Identity hook only if the Nomad client is configured // with a consul token, indicating that Consul ACLs are enabled - if tr.clientConfig.GetConsulConfigs(tr.logger)[task.GetConsulClusterName(tg)].Token != "" { + if consulCfg != nil && consulCfg.Token != "" { tr.runnerHooks = append(tr.runnerHooks, newSIDSHook(sidsHookConfig{ alloc: tr.Alloc(), task: tr.Task(), @@ -162,14 +164,15 @@ func (tr *TaskRunner) initHooks() { tr.runnerHooks = append(tr.runnerHooks, newEnvoyVersionHook(newEnvoyVersionHookConfig(alloc, tr.consulProxiesClientFunc, hookLogger)), newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, - tr.clientConfig.ConsulConfigs[task.GetConsulClusterName(tg)], + consulCfg, consulNamespace, + tr.consulServiceClient, + tr.clientConfig.Node, hookLogger)), ) } else if task.Kind.IsConnectNative() { tr.runnerHooks = append(tr.runnerHooks, newConnectNativeHook( - newConnectNativeHookConfig(alloc, - tr.clientConfig.ConsulConfigs[task.GetConsulClusterName(tg)], hookLogger), + newConnectNativeHookConfig(alloc, consulCfg, hookLogger), )) } } diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 7e820d9cc41..73061acb5fb 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -1466,7 +1466,7 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) { // control when we get a Consul SI token token := uuid.Generate() waitCh := make(chan struct{}) - deriveFn := func(*structs.Allocation, []string) (map[string]string, error) { + deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { <-waitCh return map[string]string{task.Name: token}, nil } @@ -1530,7 +1530,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) { // control when we get a Consul SI token (recoverable failure on first call) token := uuid.Generate() deriveCount := 0 - deriveFn := func(*structs.Allocation, []string) (map[string]string, error) { + deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { if deriveCount > 0 { return map[string]string{task.Name: token}, nil diff --git a/client/client.go b/client/client.go index e1fdc98fcf0..3a765c82b92 100644 --- a/client/client.go +++ b/client/client.go @@ -4,6 +4,7 @@ package client import ( + "context" "errors" "fmt" "maps" @@ -28,7 +29,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/getter" "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" - consulApi "github.com/hashicorp/nomad/client/consul" + consulApiShim "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/fingerprint" @@ -232,7 +233,7 @@ type Client struct { // consulProxiesFunc gets an interface to Nomad's custom Consul client for // looking up supported envoy versions - consulProxiesFunc consulApi.SupportedProxiesAPIFunc + consulProxiesFunc consulApiShim.SupportedProxiesAPIFunc // consulCatalog is the subset of Consul's Catalog API Nomad uses for self // service discovery @@ -256,7 +257,7 @@ type Client struct { // tokensClient is Nomad Client's custom Consul client for requesting Consul // Service Identity tokens through Nomad Server. - tokensClient consulApi.ServiceIdentityAPI + tokensClient consulApiShim.ServiceIdentityAPI // vaultClients is used to interact with Vault for token and secret renewals vaultClients map[string]vaultclient.VaultClient @@ -348,7 +349,7 @@ var ( // registered via https://golang.org/pkg/net/rpc/#Server.RegisterName in place // of the client's normal RPC handlers. This allows server tests to override // the behavior of the client. -func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxiesFunc consulApi.SupportedProxiesAPIFunc, consulServices serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) { +func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxiesFunc consulApiShim.SupportedProxiesAPIFunc, consulServices serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) { // Create the tls wrapper var tlsWrap tlsutil.RegionWrapper if cfg.TLSConfig.EnableRPC { @@ -2813,7 +2814,7 @@ func (c *Client) newAllocRunnerConfig( // identity tokens. // DEPRECATED: remove in 1.9.0 func (c *Client) setupConsulTokenClient() error { - tc := consulApi.NewIdentitiesClient(c.logger, c.deriveSIToken) + tc := consulApiShim.NewIdentitiesClient(c.logger, c.deriveSIToken) c.tokensClient = tc return nil } @@ -2960,7 +2961,7 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli // deriveSIToken takes an allocation and a set of tasks and derives Consul // Service Identity tokens for each of the tasks by requesting them from the // Nomad Server. -func (c *Client) deriveSIToken(alloc *structs.Allocation, taskNames []string) (map[string]string, error) { +func (c *Client) deriveSIToken(ctx context.Context, alloc *structs.Allocation, taskNames []string) (map[string]string, error) { tasks, err := verifiedTasks(c.logger, alloc, taskNames) if err != nil { return nil, err @@ -3001,7 +3002,36 @@ func (c *Client) deriveSIToken(alloc *structs.Allocation, taskNames []string) (m // https://www.consul.io/api/acl/tokens.html#read-a-token // https://www.consul.io/docs/internals/security.html + consulConfigs := c.config.GetConsulConfigs(c.logger) + consulClientConstructor := consulApiShim.NewConsulClientFactory(c.config) + + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + tgNs := tg.Consul.GetNamespace() + + for task, secretID := range resp.Tokens { + t := tg.LookupTask(task) + ns := t.Consul.GetNamespace() + if ns == "" { + ns = tgNs + } + cluster := tg.LookupTask(task).GetConsulClusterName(tg) + consulConfig := consulConfigs[cluster] + consulClient, err := consulClientConstructor(consulConfig, c.logger) + if err != nil { + return nil, err + } + + err = consulClient.TokenPreflightCheck(ctx, &consulapi.ACLToken{ + Namespace: ns, + SecretID: secretID, + }) + if err != nil { + return nil, err + } + } + m := maps.Clone(resp.Tokens) + return m, nil } diff --git a/client/config/config.go b/client/config/config.go index 13db700b5bc..1628a86d1fa 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -1019,3 +1019,7 @@ func (c *Config) GetDefaultConsul() *structsc.ConsulConfig { func (c *Config) GetDefaultVault() *structsc.VaultConfig { return c.VaultConfigs[structs.VaultDefaultCluster] } + +func (c *Config) GetNode() *structs.Node { + return c.Node +} diff --git a/client/consul/consul.go b/client/consul/consul.go index 26096319cef..0de503964fb 100644 --- a/client/consul/consul.go +++ b/client/consul/consul.go @@ -4,30 +4,33 @@ package consul import ( + "context" "fmt" + "time" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" - + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/useragent" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" ) -// TokenDeriverFunc takes an allocation and a set of tasks and derives a -// service identity token for each. Requests go through nomad server. -type TokenDeriverFunc func(*structs.Allocation, []string) (map[string]string, error) +// TokenDeriverFunc takes an allocation and a set of tasks and derives a service +// identity token for each. Requests go through nomad server and the local +// Consul agent. +type TokenDeriverFunc func(context.Context, *structs.Allocation, []string) (map[string]string, error) // ServiceIdentityAPI is the interface the Nomad Client uses to request Consul -// Service Identity tokens through Nomad Server. +// Service Identity tokens through Nomad Server. (Deprecated: will be removed in 1.9.0) // // ACL requirements // - acl:write (used by Server only) type ServiceIdentityAPI interface { // DeriveSITokens contacts the nomad server and requests consul service // identity tokens be generated for tasks in the allocation. - DeriveSITokens(alloc *structs.Allocation, tasks []string) (map[string]string, error) + DeriveSITokens(ctx context.Context, alloc *structs.Allocation, tasks []string) (map[string]string, error) } // SupportedProxiesAPI is the interface the Nomad Client uses to request from @@ -50,13 +53,17 @@ type JWTLoginRequest struct { } // Client is the interface that the nomad client uses to interact with -// Consul. +// Consul tokens type Client interface { // DeriveTokenWithJWT logs into Consul using JWT and retrieves a Consul ACL // token. DeriveTokenWithJWT(JWTLoginRequest) (*consulapi.ACLToken, error) RevokeTokens([]*consulapi.ACLToken) error + + // TokenPreflightCheck verifies that a token has been replicated before we + // try to use it for registering services or bootstrapping Envoy + TokenPreflightCheck(context.Context, *consulapi.ACLToken) error } type consulClient struct { @@ -70,15 +77,26 @@ type consulClient struct { config *config.ConsulConfig logger hclog.Logger + + // preflightCheckTimeout/BaseInterval control how long the client will wait + // for Consul ACLs tokens to be fully replicated before giving up on the + // allocation; these are configurable via node metadata + preflightCheckTimeout time.Duration + preflightCheckBaseInterval time.Duration } // ConsulClientFunc creates a new Consul client for the specific Consul config type ConsulClientFunc func(config *config.ConsulConfig, logger hclog.Logger) (Client, error) +// NodeGetter breaks a circular dependency between client/config.Config and this +// package +type NodeGetter interface { + GetNode() *structs.Node +} + // NewConsulClientFactory returns a ConsulClientFunc that closes over the // partition -func NewConsulClientFactory(node *structs.Node) ConsulClientFunc { - partition := node.Attributes["consul.partition"] +func NewConsulClientFactory(nodeGetter NodeGetter) ConsulClientFunc { return func(config *config.ConsulConfig, logger hclog.Logger) (Client, error) { if config == nil { @@ -87,10 +105,19 @@ func NewConsulClientFactory(node *structs.Node) ConsulClientFunc { logger = logger.Named("consul").With("name", config.Name) + node := nodeGetter.GetNode() + partition := node.Attributes["consul.partition"] + preflightCheckTimeout := durationFromMeta( + node, "consul.token_preflight_check.timeout", time.Second*10) + preflightCheckBaseInterval := durationFromMeta( + node, "consul.token_preflight_check.base", time.Millisecond*500) + c := &consulClient{ - config: config, - logger: logger, - partition: partition, + config: config, + logger: logger, + partition: partition, + preflightCheckTimeout: preflightCheckTimeout, + preflightCheckBaseInterval: preflightCheckBaseInterval, } // Get the Consul API configuration @@ -115,6 +142,18 @@ func NewConsulClientFactory(node *structs.Node) ConsulClientFunc { } } +func durationFromMeta(node *structs.Node, key string, defaultDur time.Duration) time.Duration { + val := node.Meta[key] + if key == "" { + return defaultDur + } + d, err := time.ParseDuration(val) + if err != nil || d == 0 { + return defaultDur + } + return d +} + // DeriveTokenWithJWT takes a JWT from request and returns a consul token. func (c *consulClient) DeriveTokenWithJWT(req JWTLoginRequest) (*consulapi.ACLToken, error) { t, _, err := c.client.ACL().Login(&consulapi.ACLLoginParams{ @@ -141,3 +180,39 @@ func (c *consulClient) RevokeTokens(tokens []*consulapi.ACLToken) error { return mErr.ErrorOrNil() } + +// TokenPreflightCheck verifies that a token has been replicated before we +// try to use it for registering services or bootstrapping Envoy +func (c *consulClient) TokenPreflightCheck(pctx context.Context, t *consulapi.ACLToken) error { + timer, timerStop := helper.NewStoppedTimer() + defer timerStop() + + var retry uint64 + var err error + ctx, cancel := context.WithTimeout(pctx, c.preflightCheckTimeout) + defer cancel() + + for { + _, _, err = c.client.ACL().TokenReadSelf(&consulapi.QueryOptions{ + Namespace: t.Namespace, + Partition: c.partition, + AllowStale: true, + Token: t.SecretID, + }) + if err == nil { + return nil + } + + retry++ + backoff := helper.Backoff( + c.preflightCheckBaseInterval, c.preflightCheckBaseInterval*2, retry) + c.logger.Trace("Consul token not ready", "error", err, "backoff", backoff) + timer.Reset(backoff) + select { + case <-ctx.Done(): + return err + case <-timer.C: + continue + } + } +} diff --git a/client/consul/consul_test.go b/client/consul/consul_test.go new file mode 100644 index 00000000000..1cd25fcc07b --- /dev/null +++ b/client/consul/consul_test.go @@ -0,0 +1,131 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/shoenig/test/must" +) + +type mockConsulServer struct { + httpSrv *httptest.Server + + lock sync.RWMutex + errorCodeOnTokenSelf int + countTokenSelf int +} + +func (m *mockConsulServer) resetTokenSelf(errNo int) { + m.lock.Lock() + defer m.lock.Unlock() + m.countTokenSelf = 0 + m.errorCodeOnTokenSelf = errNo +} + +func newMockConsulServer() *mockConsulServer { + + srv := &mockConsulServer{} + + mux := http.NewServeMux() + mux.HandleFunc("/v1/acl/token/self", func(w http.ResponseWriter, r *http.Request) { + + srv.lock.RLock() + defer srv.lock.RUnlock() + srv.countTokenSelf++ + + if srv.errorCodeOnTokenSelf == 0 { + secretID := r.Header.Get("X-Consul-Token") + token := &consulapi.ACLToken{ + SecretID: secretID, + } + buf, _ := json.Marshal(token) + fmt.Fprintf(w, string(buf)) + return + } + + w.WriteHeader(srv.errorCodeOnTokenSelf) + fmt.Fprintf(w, "{}") + }) + + srv.httpSrv = httptest.NewServer(mux) + return srv +} + +type testClientCfg struct{ node *structs.Node } + +func (c *testClientCfg) GetNode() *structs.Node { + return c.node +} + +// TestConsul_TokenPreflightCheck verifies the retry logic for +func TestConsul_TokenPreflightCheck(t *testing.T) { + + consulSrv := newMockConsulServer() + consulSrv.resetTokenSelf(404) + + node := mock.Node() + node.Meta["consul.token_preflight_check.timeout"] = "100ms" + node.Meta["consul.token_preflight_check.base"] = "10ms" + clientCfg := &testClientCfg{node} + + factory := NewConsulClientFactory(clientCfg) + + cfg := &config.ConsulConfig{ + Addr: consulSrv.httpSrv.URL, + } + client, err := factory(cfg, testlog.HCLogger(t)) + must.NoError(t, err) + + token := &consulapi.ACLToken{ + SecretID: uuid.Generate(), + Namespace: "foo", + } + + preflightErrorCh := make(chan error) + + ctx1, cancel1 := context.WithTimeout(context.TODO(), time.Second*5) + defer cancel1() + + go func() { + preflightErrorCh <- client.TokenPreflightCheck(ctx1, token) + }() + + select { + case <-ctx1.Done(): + t.Fatal("test timed out before check timed out") + case err := <-preflightErrorCh: + must.EqError(t, err, "Unexpected response code: 404 ({})") + must.GreaterEq(t, 5, consulSrv.countTokenSelf) + } + + consulSrv.resetTokenSelf(0) + ctx2, cancel2 := context.WithTimeout(context.TODO(), time.Second*5) + defer cancel2() + + go func() { + preflightErrorCh <- client.TokenPreflightCheck(ctx2, token) + }() + + select { + case <-ctx2.Done(): + t.Fatal("test timed out and check should not have timed out") + case err := <-preflightErrorCh: + must.NoError(t, err, must.Sprintf("preflight should pass: %v", err)) + must.Eq(t, 1, consulSrv.countTokenSelf) + } +} diff --git a/client/consul/consul_testing.go b/client/consul/consul_testing.go index b0e281c5839..4ea9403366b 100644 --- a/client/consul/consul_testing.go +++ b/client/consul/consul_testing.go @@ -4,6 +4,7 @@ package consul import ( + "context" "crypto/md5" "encoding/hex" @@ -48,3 +49,7 @@ func (mc *MockConsulClient) RevokeTokens(tokens []*consulapi.ACLToken) error { } return nil } + +func (mc *MockConsulClient) TokenPreflightCheck(_ context.Context, _ *consulapi.ACLToken) error { + return nil +} diff --git a/client/consul/identities.go b/client/consul/identities.go index e7cf3669456..47125daae22 100644 --- a/client/consul/identities.go +++ b/client/consul/identities.go @@ -4,6 +4,8 @@ package consul import ( + "context" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -25,8 +27,8 @@ func NewIdentitiesClient(logger hclog.Logger, tokenDeriver TokenDeriverFunc) *id } } -func (c *identitiesClient) DeriveSITokens(alloc *structs.Allocation, tasks []string) (map[string]string, error) { - tokens, err := c.tokenDeriver(alloc, tasks) +func (c *identitiesClient) DeriveSITokens(ctx context.Context, alloc *structs.Allocation, tasks []string) (map[string]string, error) { + tokens, err := c.tokenDeriver(ctx, alloc, tasks) if err != nil { c.logger.Error("error deriving SI token", "error", err, "alloc_id", alloc.ID, "task_names", tasks) return nil, err diff --git a/client/consul/identities_test.go b/client/consul/identities_test.go index e307efc3214..3b17087d788 100644 --- a/client/consul/identities_test.go +++ b/client/consul/identities_test.go @@ -4,36 +4,37 @@ package consul import ( + "context" "errors" "testing" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) func TestSI_DeriveTokens(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) - dFunc := func(alloc *structs.Allocation, taskNames []string) (map[string]string, error) { + dFunc := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { return map[string]string{"a": "b"}, nil } tc := NewIdentitiesClient(logger, dFunc) - tokens, err := tc.DeriveSITokens(nil, nil) - require.NoError(t, err) - require.Equal(t, map[string]string{"a": "b"}, tokens) + tokens, err := tc.DeriveSITokens(context.TODO(), nil, nil) + must.NoError(t, err) + must.Eq(t, map[string]string{"a": "b"}, tokens) } func TestSI_DeriveTokens_error(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) - dFunc := func(alloc *structs.Allocation, taskNames []string) (map[string]string, error) { + dFunc := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { return nil, errors.New("some failure") } tc := NewIdentitiesClient(logger, dFunc) - _, err := tc.DeriveSITokens(&structs.Allocation{ID: "a1"}, nil) - require.Error(t, err) + _, err := tc.DeriveSITokens(context.TODO(), &structs.Allocation{ID: "a1"}, nil) + must.Error(t, err) } diff --git a/client/consul/identities_testing.go b/client/consul/identities_testing.go index b2a3e51a2b4..05eabb0d8f5 100644 --- a/client/consul/identities_testing.go +++ b/client/consul/identities_testing.go @@ -4,6 +4,7 @@ package consul import ( + "context" "sync" "github.com/hashicorp/nomad/helper/uuid" @@ -35,13 +36,13 @@ func NewMockServiceIdentitiesClient() *MockServiceIdentitiesClient { } } -func (mtc *MockServiceIdentitiesClient) DeriveSITokens(alloc *structs.Allocation, tasks []string) (map[string]string, error) { +func (mtc *MockServiceIdentitiesClient) DeriveSITokens(ctx context.Context, alloc *structs.Allocation, tasks []string) (map[string]string, error) { mtc.lock.Lock() defer mtc.lock.Unlock() // if the DeriveTokenFn is explicitly set, use that if mtc.DeriveTokenFn != nil { - return mtc.DeriveTokenFn(alloc, tasks) + return mtc.DeriveTokenFn(ctx, alloc, tasks) } // generate a token for each task, unless the mock has an error ready for diff --git a/website/content/docs/integrations/consul/service-mesh.mdx b/website/content/docs/integrations/consul/service-mesh.mdx index a39f6751874..6bd63c2757d 100644 --- a/website/content/docs/integrations/consul/service-mesh.mdx +++ b/website/content/docs/integrations/consul/service-mesh.mdx @@ -188,7 +188,7 @@ Consul service mesh sidecar proxy. All Nomad client nodes using network namespac must have these CNI plugins [installed][cni_install]. To use [`transparent_proxy`][] mode, Nomad client nodes will also need the -[`consul-cni`][] plugin installed. See the Linux post-installation [steps](/nomad/docs/install#post-installation-steps) for more detail on how to install CNI plugins. +[`consul-cni`][] plugin installed. See the Linux post-installation [steps](/nomad/docs/install#post-installation-steps) for more detail on how to install CNI plugins. ## Run the Service Mesh-enabled Services @@ -481,6 +481,21 @@ only be accessible from the host filesystem. However, the sidecar task secrets directory may not be available in systems where it is mounted in a temporary filesystem. +Bootstrapping the Envoy proxy requires that the Consul ACL token and service +registration have successfully replicated to whichever Consul server the local +Consul agent is connected to. Nomad clients poll for this value with exponential +backoff and a timeout. You can adjust the timeouts on a given node by setting +node metadata values via the command line or in the [`client.meta`][] agent +configuration block. The default values are shown below: + +```shell-session +nomad node meta apply -node-id $nodeID \ + consul.token_preflight_check.timeout=10s \ + consul.token_preflight_check.base=500ms \ + consul.service_preflight_check.timeout=60s \ + consul.service_preflight_check.base=1s +``` + [count-dashboard]: /img/count-dashboard.png [consul_acl]: https://github.com/hashicorp/consul/issues/7414 [gh-9907]: https://github.com/hashicorp/nomad/issues/9907 @@ -499,3 +514,4 @@ filesystem. [cni_plugins]: /nomad/docs/networking/cni#cni-reference-plugins [consul_dns_port]: /consul/docs/agent/config/config-files#dns_port [`network.dns`]: /nomad/docs/job-specification/network#dns-parameters +[`client.meta`]: /nomad/docs/configuration/client#meta