diff --git a/.changelog/23381.txt b/.changelog/23381.txt new file mode 100644 index 00000000000..4aa458af39b --- /dev/null +++ b/.changelog/23381.txt @@ -0,0 +1,3 @@ +```release-note:bug +consul: Fixed a bug where service registration and Envoy bootstrap would not wait for Consul ACL tokens and services to be replicated to the local agent +``` diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index ae57c4c7982..21e7aef053f 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -130,7 +130,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { allocdir: ar.allocDir, widmgr: ar.widmgr, consulConfigs: ar.clientConfig.GetConsulConfigs(hookLogger), - consulClientConstructor: consul.NewConsulClientFactory(config.Node), + consulClientConstructor: consul.NewConsulClientFactory(config), hookResources: ar.hookResources, envBuilder: newEnvBuilder, logger: hookLogger, diff --git a/client/allocrunner/consul_hook.go b/client/allocrunner/consul_hook.go index 54a64c8f2d2..b4e087e06f8 100644 --- a/client/allocrunner/consul_hook.go +++ b/client/allocrunner/consul_hook.go @@ -4,6 +4,7 @@ package allocrunner import ( + "context" "fmt" consulapi "github.com/hashicorp/consul/api" @@ -27,7 +28,9 @@ type consulHook struct { hookResources *cstructs.AllocHookResources envBuilder *taskenv.Builder - logger log.Logger + logger log.Logger + shutdownCtx context.Context + shutdownCancelFn context.CancelFunc } type consulHookConfig struct { @@ -51,6 +54,7 @@ type consulHookConfig struct { } func newConsulHook(cfg consulHookConfig) *consulHook { + shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background()) h := &consulHook{ alloc: cfg.alloc, allocdir: cfg.allocdir, @@ -59,6 +63,8 @@ func newConsulHook(cfg consulHookConfig) *consulHook { consulClientConstructor: cfg.consulClientConstructor, hookResources: cfg.hookResources, envBuilder: cfg.envBuilder(), + shutdownCtx: shutdownCtx, + shutdownCancelFn: shutdownCancelFn, } h.logger = cfg.logger.Named(h.Name()) return h @@ -225,7 +231,12 @@ func (h *consulHook) getConsulToken(cluster string, req consul.JWTLoginRequest) return nil, fmt.Errorf("failed to retrieve Consul client for cluster %s: %v", cluster, err) } - return client.DeriveTokenWithJWT(req) + t, err := client.DeriveTokenWithJWT(req) + if err == nil { + err = client.TokenPreflightCheck(h.shutdownCtx, t) + } + + return t, err } func (h *consulHook) clientForCluster(cluster string) (consul.Client, error) { @@ -248,6 +259,11 @@ func (h *consulHook) Postrun() error { return nil } +// Shutdown will get called when the client is gracefully stopping. +func (h *consulHook) Shutdown() { + h.shutdownCancelFn() +} + // Destroy cleans up any remaining Consul tokens if the alloc is GC'd or fails // to restore after a client restart. func (h *consulHook) Destroy() error { diff --git a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go index 483157af203..5797f9a22eb 100644 --- a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go +++ b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" + "oss.indeed.com/go/libtime" "oss.indeed.com/go/libtime/decay" ) @@ -38,7 +39,7 @@ const ( // envoyBootstrapInitialGap is the initial amount of time the envoy bootstrap // retry loop will wait, exponentially increasing each iteration, not including // jitter. - envoyBoostrapInitialGap = 1 * time.Second + envoyBootstrapInitialGap = 1 * time.Second // envoyBootstrapMaxJitter is the maximum amount of jitter applied to the // wait gap each iteration of the envoy bootstrap retry loop. @@ -76,10 +77,16 @@ func newConsulTransportConfig(cc *config.ConsulConfig) consulTransportConfig { } } +type allocServicesClient interface { + AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) +} + type envoyBootstrapHookConfig struct { alloc *structs.Allocation consul consulTransportConfig consulNamespace string + consulServices allocServicesClient + node *structs.Node logger hclog.Logger } @@ -94,11 +101,13 @@ func decodeTriState(b *bool) string { } } -func newEnvoyBootstrapHookConfig(alloc *structs.Allocation, consul *config.ConsulConfig, consulNamespace string, logger hclog.Logger) *envoyBootstrapHookConfig { +func newEnvoyBootstrapHookConfig(alloc *structs.Allocation, consul *config.ConsulConfig, consulNamespace string, consulServices allocServicesClient, node *structs.Node, logger hclog.Logger) *envoyBootstrapHookConfig { return &envoyBootstrapHookConfig{ alloc: alloc, consul: newConsulTransportConfig(consul), consulNamespace: consulNamespace, + consulServices: consulServices, + node: node, logger: logger, } } @@ -134,28 +143,38 @@ type envoyBootstrapHook struct { envoyBootstrapWaitTime time.Duration // envoyBootstrapInitialGap is the initial wait gap when retrying - envoyBoostrapInitialGap time.Duration + envoyBootstrapInitialGap time.Duration // envoyBootstrapMaxJitter is the maximum amount of jitter applied to retries envoyBootstrapMaxJitter time.Duration // envoyBootstrapExpSleep controls exponential waiting - envoyBootstrapExpSleep func(time.Duration) + envoyBootstrapExpSleep libtime.Sleeper + + // consulServices queries the Consul service catalog for preflight checks + consulServices allocServicesClient // logger is used to log things logger hclog.Logger } func newEnvoyBootstrapHook(c *envoyBootstrapHookConfig) *envoyBootstrapHook { + + waitTime := durationFromMeta(c.node, + "consul.service_preflight_check.timeout", envoyBootstrapWaitTime) + initialGap := durationFromMeta(c.node, + "consul.service_preflight_check.base", envoyBootstrapInitialGap) + return &envoyBootstrapHook{ - alloc: c.alloc, - consulConfig: c.consul, - consulNamespace: c.consulNamespace, - envoyBootstrapWaitTime: envoyBootstrapWaitTime, - envoyBoostrapInitialGap: envoyBoostrapInitialGap, - envoyBootstrapMaxJitter: envoyBootstrapMaxJitter, - envoyBootstrapExpSleep: time.Sleep, - logger: c.logger.Named(envoyBootstrapHookName), + alloc: c.alloc, + consulConfig: c.consul, + consulNamespace: c.consulNamespace, + envoyBootstrapWaitTime: waitTime, + envoyBootstrapInitialGap: initialGap, + envoyBootstrapMaxJitter: envoyBootstrapMaxJitter, + envoyBootstrapExpSleep: libtime.NewSleeper(), + consulServices: c.consulServices, + logger: c.logger.Named(envoyBootstrapHookName), } } @@ -281,7 +300,8 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *ifs.TaskPrestart } h.logger.Debug("check for SI token for task", "task", req.Task.Name, "exists", siToken != "") - bootstrap := h.newEnvoyBootstrapArgs(h.alloc.TaskGroup, service, grpcAddr, envoyAdminBind, envoyReadyBind, siToken, bootstrapFilePath) + proxyID := h.proxyServiceID(h.alloc.TaskGroup, service) + bootstrap := h.newEnvoyBootstrapArgs(service, grpcAddr, envoyAdminBind, envoyReadyBind, siToken, bootstrapFilePath, proxyID) // Create command line arguments bootstrapArgs := bootstrap.args() @@ -316,13 +336,20 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *ifs.TaskPrestart // keep track of latest error returned from exec-ing consul envoy bootstrap var cmdErr error - // Since Consul services are registered asynchronously with this task - // hook running, retry until timeout or success. backoffOpts := decay.BackoffOptions{ MaxSleepTime: h.envoyBootstrapWaitTime, - InitialGapSize: h.envoyBoostrapInitialGap, + InitialGapSize: h.envoyBootstrapInitialGap, MaxJitterSize: h.envoyBootstrapMaxJitter, + Sleeper: h.envoyBootstrapExpSleep, + } + + err = h.servicePreflightCheck(ctx, backoffOpts, proxyID) + if err != nil { + return err } + + // Since Consul services are registered asynchronously with this task + // hook running, retry until timeout or success. backoffErr := decay.Backoff(func() (bool, error) { // If hook is killed, just stop. select { @@ -486,12 +513,11 @@ func (h *envoyBootstrapHook) proxyServiceID(group string, service *structs.Servi // // https://www.consul.io/commands/connect/envoy#consul-connect-envoy func (h *envoyBootstrapHook) newEnvoyBootstrapArgs( - group string, service *structs.Service, - grpcAddr, envoyAdminBind, envoyReadyBind, siToken, filepath string, + service *structs.Service, + grpcAddr, envoyAdminBind, envoyReadyBind, siToken, filepath, proxyID string, ) envoyBootstrapArgs { namespace := h.getConsulNamespace() - proxyID := h.proxyServiceID(group, service) var gateway string switch { @@ -608,3 +634,61 @@ func (h *envoyBootstrapHook) maybeLoadSIToken(task, dir string) (string, error) h.logger.Trace("recovered pre-existing SI token", "task", task) return string(token), nil } + +func (h *envoyBootstrapHook) servicePreflightCheck( + ctx context.Context, backoffOpts decay.BackoffOptions, proxyServiceID string) error { + + // keep track of latest error returned from Consul or from missing service + var apiErr error + var allocServices *serviceregistration.AllocRegistration + + backoffErr := decay.Backoff(func() (bool, error) { + // If hook is killed, just stop. + select { + case <-ctx.Done(): + return false, nil + default: + } + + allocServices, apiErr = h.consulServices.AllocRegistrations(h.alloc.ID) + if apiErr != nil { + return true, apiErr + } + + if allocServices != nil { + for _, taskServices := range allocServices.Tasks { + for id := range taskServices.Services { + if id == proxyServiceID { + return false, nil + } + } + } + } + apiErr = fmt.Errorf("missing %q", proxyServiceID) + return true, apiErr + }, backoffOpts) + + // Wrap the last error we saw set that as our status. + if backoffErr != nil { + return structs.NewRecoverableError( + fmt.Errorf("%w: %v; see: ", + errEnvoyBootstrapError, + apiErr, + ), + true) + } + + return nil +} + +func durationFromMeta(node *structs.Node, key string, defaultDur time.Duration) time.Duration { + val := node.Meta[key] + if key == "" { + return defaultDur + } + d, err := time.ParseDuration(val) + if err != nil || d == 0 { + return defaultDur + } + return d +} diff --git a/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go b/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go index 0e6b32d2915..e5095e3d1aa 100644 --- a/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go +++ b/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go @@ -15,6 +15,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/testutil" agentconsul "github.com/hashicorp/nomad/command/agent/consul" @@ -33,6 +35,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/plugins/drivers/fsisolation" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" ) @@ -66,7 +69,8 @@ func TestEnvoyBootstrapHook_maybeLoadSIToken(t *testing.T) { } t.Run("file does not exist", func(t *testing.T) { - h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)}) + h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{ + logger: testlog.HCLogger(t), node: mock.Node()}) cfg, err := h.maybeLoadSIToken("task1", "/does/not/exist") require.NoError(t, err) // absence of token is not an error require.Equal(t, "", cfg) @@ -76,7 +80,8 @@ func TestEnvoyBootstrapHook_maybeLoadSIToken(t *testing.T) { token := uuid.Generate() f := writeTmp(t, token, 0440) - h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)}) + h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{ + logger: testlog.HCLogger(t), node: mock.Node()}) cfg, err := h.maybeLoadSIToken("task1", f) require.NoError(t, err) require.Equal(t, token, cfg) @@ -86,7 +91,8 @@ func TestEnvoyBootstrapHook_maybeLoadSIToken(t *testing.T) { token := uuid.Generate() f := writeTmp(t, token, 0200) - h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{logger: testlog.HCLogger(t)}) + h := newEnvoyBootstrapHook(&envoyBootstrapHookConfig{ + logger: testlog.HCLogger(t), node: mock.Node()}) cfg, err := h.maybeLoadSIToken("task1", f) require.Error(t, err) require.False(t, os.IsNotExist(err)) @@ -342,15 +348,15 @@ func TestEnvoyBootstrapHook_with_SI_token(t *testing.T) { require.NoError(t, err) namespacesClient := agentconsul.NewNamespacesClient(consulAPIClient.Namespaces(), consulAPIClient.Agent()) - consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) - go consulClient.Run() - defer consulClient.Shutdown() - require.NoError(t, consulClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter()))) + serviceClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) + go serviceClient.Run() + defer serviceClient.Shutdown() + must.NoError(t, serviceClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter()))) // Run Connect bootstrap Hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: consulConfig.Address, - }, consulNamespace, logger)) + }, consulNamespace, serviceClient, mock.Node(), logger)) req := &interfaces.TaskPrestartRequest{ Task: sidecarTask, TaskDir: allocDir.NewTaskDir(sidecarTask.Name), @@ -388,10 +394,10 @@ func TestEnvoyBootstrapHook_with_SI_token(t *testing.T) { require.Equal(t, token, value) } -// TestTaskRunner_EnvoyBootstrapHook_sidecar_ok asserts the EnvoyBootstrapHook +// TestEnvoyBootstrapHook_sidecar_ok asserts the EnvoyBootstrapHook // creates Envoy's bootstrap.json configuration based on Connect proxy sidecars // registered for the task. -func TestTaskRunner_EnvoyBootstrapHook_sidecar_ok(t *testing.T) { +func TestEnvoyBootstrapHook_sidecar_ok(t *testing.T) { ci.Parallel(t) testutil.RequireConsul(t) @@ -440,15 +446,15 @@ func TestTaskRunner_EnvoyBootstrapHook_sidecar_ok(t *testing.T) { require.NoError(t, err) namespacesClient := agentconsul.NewNamespacesClient(consulAPIClient.Namespaces(), consulAPIClient.Agent()) - consulClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) - go consulClient.Run() - defer consulClient.Shutdown() - require.NoError(t, consulClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter()))) + serviceClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) + go serviceClient.Run() + defer serviceClient.Shutdown() + require.NoError(t, serviceClient.RegisterWorkload(agentconsul.BuildAllocServices(mock.Node(), alloc, agentconsul.NoopRestarter()))) // Run Connect bootstrap Hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: consulConfig.Address, - }, consulNamespace, logger)) + }, consulNamespace, serviceClient, mock.Node(), logger)) req := &interfaces.TaskPrestartRequest{ Task: sidecarTask, TaskDir: allocDir.NewTaskDir(sidecarTask.Name), @@ -483,7 +489,7 @@ func TestTaskRunner_EnvoyBootstrapHook_sidecar_ok(t *testing.T) { require.Equal(t, "", value) } -func TestTaskRunner_EnvoyBootstrapHook_gateway_ok(t *testing.T) { +func TestEnvoyBootstrapHook_gateway_ok(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) @@ -527,7 +533,7 @@ func TestTaskRunner_EnvoyBootstrapHook_gateway_ok(t *testing.T) { // Run Connect bootstrap hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: consulConfig.Address, - }, consulNamespace, logger)) + }, consulNamespace, serviceClient, mock.Node(), logger)) req := &interfaces.TaskPrestartRequest{ Task: alloc.Job.TaskGroups[0].Tasks[0], @@ -563,9 +569,9 @@ func TestTaskRunner_EnvoyBootstrapHook_gateway_ok(t *testing.T) { require.Equal(t, "ingress-gateway", out.Node.Cluster) } -// TestTaskRunner_EnvoyBootstrapHook_Noop asserts that the Envoy bootstrap hook +// TestEnvoyBootstrapHook_Noop asserts that the Envoy bootstrap hook // is a noop for non-Connect proxy sidecar / gateway tasks. -func TestTaskRunner_EnvoyBootstrapHook_Noop(t *testing.T) { +func TestEnvoyBootstrapHook_Noop(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) @@ -578,7 +584,7 @@ func TestTaskRunner_EnvoyBootstrapHook_Noop(t *testing.T) { // not get hit. h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: "http://127.0.0.2:1", - }, consulNamespace, logger)) + }, consulNamespace, nil, mock.Node(), logger)) req := &interfaces.TaskPrestartRequest{ Task: task, TaskDir: allocDir.NewTaskDir(task.Name), @@ -596,10 +602,10 @@ func TestTaskRunner_EnvoyBootstrapHook_Noop(t *testing.T) { require.True(t, os.IsNotExist(err)) } -// TestTaskRunner_EnvoyBootstrapHook_RecoverableError asserts the Envoy -// bootstrap hook returns a Recoverable error if the bootstrap command runs but -// fails. -func TestTaskRunner_EnvoyBootstrapHook_RecoverableError(t *testing.T) { +// TestEnvoyBootstrapHook_CommandFailed asserts the Envoy bootstrap +// hook returns a Recoverable error if the bootstrap command runs but fails, and +// that we retry the appropriate number of times +func TestEnvoyBootstrapHook_CommandFailed(t *testing.T) { ci.Parallel(t) testutil.RequireConsul(t) @@ -641,40 +647,63 @@ func TestTaskRunner_EnvoyBootstrapHook_RecoverableError(t *testing.T) { allocDir, cleanup := allocdir.TestAllocDir(t, logger, "EnvoyBootstrap", alloc.ID) defer cleanup() + begin := time.Now() + // Unlike the successful test above, do NOT register the group services - // yet. This should cause a recoverable error similar to if Consul was - // not running. + // yet. This should cause a recoverable error similar to if Consul was not + // running. We're adding a mock services client here so that the preflight + // check passes, so that we can exercise the retry logic specific to the + // bootstrap command. // Run Connect bootstrap Hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: testConsul.HTTPAddr, - }, consulNamespace, logger)) + }, consulNamespace, newMockAllocServicesClient(tg.Services[0], nil), mock.Node(), logger)) - // Lower the allowable wait time for testing + // Lower the allowable wait time for testing and keep track of retry backoff + // iterations h.envoyBootstrapWaitTime = 1 * time.Second - h.envoyBoostrapInitialGap = 100 * time.Millisecond + h.envoyBootstrapInitialGap = 100 * time.Millisecond + sleeper := &mockSleeper{} + h.envoyBootstrapExpSleep = sleeper req := &interfaces.TaskPrestartRequest{ Task: sidecarTask, TaskDir: allocDir.NewTaskDir(sidecarTask.Name), TaskEnv: taskenv.NewEmptyTaskEnv(), } - require.NoError(t, req.TaskDir.Build(fsisolation.None, nil, sidecarTask.User)) + must.NoError(t, req.TaskDir.Build(fsisolation.None, nil, sidecarTask.User)) resp := &interfaces.TaskPrestartResponse{} // Run the hook err := h.Prestart(context.Background(), req, resp) - require.ErrorIs(t, err, errEnvoyBootstrapError) - require.True(t, structs.IsRecoverable(err)) + must.ErrorIs(t, err, errEnvoyBootstrapError) + must.True(t, structs.IsRecoverable(err)) + must.False(t, resp.Done) - // Assert no file was written + // Current time should be at least start time + total wait time, and we + // should hit at least 2 iterations + minimum := begin.Add(h.envoyBootstrapWaitTime) + must.True(t, time.Now().After(minimum)) + must.GreaterEq(t, 2, sleeper.iterations) + + // No bootstrap config file should be written _, err = os.Open(filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")) - require.Error(t, err) - require.True(t, os.IsNotExist(err)) + must.Error(t, err) + must.True(t, os.IsNotExist(err)) +} + +type mockSleeper struct { + iterations int } -func TestTaskRunner_EnvoyBootstrapHook_retryTimeout(t *testing.T) { +func (m *mockSleeper) Sleep(d time.Duration) { + m.iterations++ + time.Sleep(d) +} + +func TestEnvoyBootstrapHook_PreflightFailed(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) @@ -720,23 +749,26 @@ func TestTaskRunner_EnvoyBootstrapHook_retryTimeout(t *testing.T) { consulConfig := consulapi.DefaultConfig() consulConfig.Address = testConsul.HTTPAddr - // Do NOT register group services, causing the hook to retry until timeout + consulAPIClient, err := consulapi.NewClient(consulConfig) + must.NoError(t, err) + namespacesClient := agentconsul.NewNamespacesClient(consulAPIClient.Namespaces(), consulAPIClient.Agent()) + + serviceClient := agentconsul.NewServiceClient(consulAPIClient.Agent(), namespacesClient, logger, true) + + // Do NOT register group services, causing the hook to retry until timeout. + // Note that here we expect the preflight check timeout to happen // Run Connect bootstrap hook h := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, &config.ConsulConfig{ Addr: consulConfig.Address, - }, consulNamespace, logger)) - - // Keep track of the retry backoff iterations - iterations := 0 + }, consulNamespace, serviceClient, mock.Node(), logger)) - // Lower the allowable wait time for testing - h.envoyBootstrapWaitTime = 3 * time.Second - h.envoyBoostrapInitialGap = 1 * time.Second - h.envoyBootstrapExpSleep = func(d time.Duration) { - iterations++ - time.Sleep(d) - } + // Lower the allowable wait time for testing and keep track of retry backoff + // iterations + h.envoyBootstrapWaitTime = 1 * time.Second + h.envoyBootstrapInitialGap = 100 * time.Millisecond + sleeper := &mockSleeper{} + h.envoyBootstrapExpSleep = sleeper // Create the prestart request req := &interfaces.TaskPrestartRequest{ @@ -744,30 +776,29 @@ func TestTaskRunner_EnvoyBootstrapHook_retryTimeout(t *testing.T) { TaskDir: allocDir.NewTaskDir(sidecarTask.Name), TaskEnv: taskenv.NewEmptyTaskEnv(), } - require.NoError(t, req.TaskDir.Build(fsisolation.None, nil, sidecarTask.User)) + must.NoError(t, req.TaskDir.Build(fsisolation.None, nil, sidecarTask.User)) var resp interfaces.TaskPrestartResponse // Run the hook and get the error - err := h.Prestart(context.Background(), req, &resp) - require.ErrorIs(t, err, errEnvoyBootstrapError) + err = h.Prestart(context.Background(), req, &resp) + must.ErrorIs(t, err, errEnvoyBootstrapError) + must.True(t, structs.IsRecoverable(err)) + must.False(t, resp.Done) - // Current time should be at least start time + total wait time + // Current time should be at least start time + total wait time, and we + // should hit at least 2 iterations minimum := begin.Add(h.envoyBootstrapWaitTime) - require.True(t, time.Now().After(minimum)) + must.True(t, time.Now().After(minimum)) + must.GreaterEq(t, 2, sleeper.iterations) - // Should hit at least 2 iterations - require.Greater(t, 2, iterations) - - // Make sure we captured the recoverable-ness of the error - _, ok := err.(*structs.RecoverableError) - require.True(t, ok) - - // Assert the hook is not done (it failed) - require.False(t, resp.Done) + // No bootstrap config file should be written + _, err = os.Open(filepath.Join(req.TaskDir.SecretsDir, "envoy_bootstrap.json")) + must.Error(t, err) + must.True(t, os.IsNotExist(err)) } -func TestTaskRunner_EnvoyBootstrapHook_extractNameAndKind(t *testing.T) { +func TestEnvoyBootstrapHook_extractNameAndKind(t *testing.T) { t.Run("connect sidecar", func(t *testing.T) { kind, name, err := (*envoyBootstrapHook)(nil).extractNameAndKind( structs.NewTaskKind(structs.ConnectProxyPrefix, "foo"), @@ -801,13 +832,15 @@ func TestTaskRunner_EnvoyBootstrapHook_extractNameAndKind(t *testing.T) { }) } -func TestTaskRunner_EnvoyBootstrapHook_grpcAddress(t *testing.T) { +func TestEnvoyBootstrapHook_grpcAddress(t *testing.T) { ci.Parallel(t) bridgeH := newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig( mock.ConnectIngressGatewayAlloc("bridge"), new(config.ConsulConfig), consulNamespace, + nil, + mock.Node(), testlog.HCLogger(t), )) @@ -815,6 +848,8 @@ func TestTaskRunner_EnvoyBootstrapHook_grpcAddress(t *testing.T) { mock.ConnectIngressGatewayAlloc("host"), new(config.ConsulConfig), consulNamespace, + nil, + mock.Node(), testlog.HCLogger(t), )) @@ -832,7 +867,7 @@ func TestTaskRunner_EnvoyBootstrapHook_grpcAddress(t *testing.T) { }) } -func TestTaskRunner_EnvoyBootstrapHook_isConnectKind(t *testing.T) { +func TestEnvoyBootstrapHook_isConnectKind(t *testing.T) { ci.Parallel(t) require.True(t, isConnectKind(structs.ConnectProxyPrefix)) @@ -842,3 +877,48 @@ func TestTaskRunner_EnvoyBootstrapHook_isConnectKind(t *testing.T) { require.False(t, isConnectKind("")) require.False(t, isConnectKind("something")) } + +type mockAllocServicesClient struct { + service *structs.Service + lock sync.RWMutex + requestCount int + err error +} + +func (m *mockAllocServicesClient) resetError(err error) { + m.lock.Lock() + defer m.lock.Unlock() + m.requestCount = 0 + m.err = err +} + +func (m *mockAllocServicesClient) AllocRegistrations(allocID string) (*serviceregistration.AllocRegistration, error) { + m.lock.RLock() + defer m.lock.RUnlock() + m.requestCount++ + if m.err != nil { + return nil, m.err + } + reg := &serviceregistration.AllocRegistration{ + Tasks: map[string]*serviceregistration.ServiceRegistrations{ + "foo": &serviceregistration.ServiceRegistrations{ + Services: map[string]*serviceregistration.ServiceRegistration{ + m.service.Name: &serviceregistration.ServiceRegistration{ + ServiceID: "", + CheckIDs: map[string]struct{}{}, + CheckOnUpdate: map[string]string{}, + Service: &consulapi.AgentService{}, + Checks: []*consulapi.AgentCheck{}, + SidecarService: &consulapi.AgentService{}, + SidecarChecks: []*consulapi.AgentCheck{}, + }, + }, + }, + }, + } + return reg, nil +} + +func newMockAllocServicesClient(service *structs.Service, err error) allocServicesClient { + return &mockAllocServicesClient{service: service, err: err} +} diff --git a/client/allocrunner/taskrunner/sids_hook.go b/client/allocrunner/taskrunner/sids_hook.go index 7d5e780a045..99b47a09b73 100644 --- a/client/allocrunner/taskrunner/sids_hook.go +++ b/client/allocrunner/taskrunner/sids_hook.go @@ -151,7 +151,8 @@ func (h *sidsHook) Prestart( } } - // need to ask for a new SI token & persist it to disk + // COMPAT(1.9): this code path exists only to support the legacy (non-WI) + // workflow. remove for 1.9.0. if token == "" { if token, err = h.deriveSIToken(ctx); err != nil { return err @@ -255,7 +256,7 @@ func (h *sidsHook) kill(ctx context.Context, reason error) { func (h *sidsHook) tryDerive(ctx context.Context, ch chan<- siDerivationResult) { for attempt := 0; backoff(ctx, attempt); attempt++ { - tokens, err := h.sidsClient.DeriveSITokens(h.alloc, []string{h.task.Name}) + tokens, err := h.sidsClient.DeriveSITokens(ctx, h.alloc, []string{h.task.Name}) switch { case err == nil: diff --git a/client/allocrunner/taskrunner/sids_hook_test.go b/client/allocrunner/taskrunner/sids_hook_test.go index c93a303624c..0df13f3cb5d 100644 --- a/client/allocrunner/taskrunner/sids_hook_test.go +++ b/client/allocrunner/taskrunner/sids_hook_test.go @@ -191,7 +191,7 @@ func TestSIDSHook_deriveSIToken_timeout(t *testing.T) { r := require.New(t) siClient := consulclient.NewMockServiceIdentitiesClient() - siClient.DeriveTokenFn = func(allocation *structs.Allocation, strings []string) (m map[string]string, err error) { + siClient.DeriveTokenFn = func(context.Context, *structs.Allocation, []string) (m map[string]string, err error) { select { // block forever, hopefully triggering a timeout in the caller } @@ -288,7 +288,7 @@ func TestTaskRunner_DeriveSIToken_UnWritableTokenFile(t *testing.T) { trConfig.ClientConfig.GetDefaultConsul().Token = uuid.Generate() // derive token works just fine - deriveFn := func(*structs.Allocation, []string) (map[string]string, error) { + deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { return map[string]string{task.Name: uuid.Generate()}, nil } siClient := trConfig.ConsulSI.(*consulclient.MockServiceIdentitiesClient) diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 737e05f8f84..b0fdcea1a4d 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -145,9 +145,11 @@ func (tr *TaskRunner) initHooks() { if task.UsesConnect() { tg := tr.Alloc().Job.LookupTaskGroup(tr.Alloc().TaskGroup) + consulCfg := tr.clientConfig.GetConsulConfigs(tr.logger)[task.GetConsulClusterName(tg)] + // Enable the Service Identity hook only if the Nomad client is configured // with a consul token, indicating that Consul ACLs are enabled - if tr.clientConfig.GetConsulConfigs(tr.logger)[task.GetConsulClusterName(tg)].Token != "" { + if consulCfg != nil && consulCfg.Token != "" { tr.runnerHooks = append(tr.runnerHooks, newSIDSHook(sidsHookConfig{ alloc: tr.Alloc(), task: tr.Task(), @@ -162,14 +164,15 @@ func (tr *TaskRunner) initHooks() { tr.runnerHooks = append(tr.runnerHooks, newEnvoyVersionHook(newEnvoyVersionHookConfig(alloc, tr.consulProxiesClientFunc, hookLogger)), newEnvoyBootstrapHook(newEnvoyBootstrapHookConfig(alloc, - tr.clientConfig.ConsulConfigs[task.GetConsulClusterName(tg)], + consulCfg, consulNamespace, + tr.consulServiceClient, + tr.clientConfig.Node, hookLogger)), ) } else if task.Kind.IsConnectNative() { tr.runnerHooks = append(tr.runnerHooks, newConnectNativeHook( - newConnectNativeHookConfig(alloc, - tr.clientConfig.ConsulConfigs[task.GetConsulClusterName(tg)], hookLogger), + newConnectNativeHookConfig(alloc, consulCfg, hookLogger), )) } } diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 7e820d9cc41..73061acb5fb 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -1466,7 +1466,7 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) { // control when we get a Consul SI token token := uuid.Generate() waitCh := make(chan struct{}) - deriveFn := func(*structs.Allocation, []string) (map[string]string, error) { + deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { <-waitCh return map[string]string{task.Name: token}, nil } @@ -1530,7 +1530,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) { // control when we get a Consul SI token (recoverable failure on first call) token := uuid.Generate() deriveCount := 0 - deriveFn := func(*structs.Allocation, []string) (map[string]string, error) { + deriveFn := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { if deriveCount > 0 { return map[string]string{task.Name: token}, nil diff --git a/client/client.go b/client/client.go index e1fdc98fcf0..3a765c82b92 100644 --- a/client/client.go +++ b/client/client.go @@ -4,6 +4,7 @@ package client import ( + "context" "errors" "fmt" "maps" @@ -28,7 +29,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/getter" "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" - consulApi "github.com/hashicorp/nomad/client/consul" + consulApiShim "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/fingerprint" @@ -232,7 +233,7 @@ type Client struct { // consulProxiesFunc gets an interface to Nomad's custom Consul client for // looking up supported envoy versions - consulProxiesFunc consulApi.SupportedProxiesAPIFunc + consulProxiesFunc consulApiShim.SupportedProxiesAPIFunc // consulCatalog is the subset of Consul's Catalog API Nomad uses for self // service discovery @@ -256,7 +257,7 @@ type Client struct { // tokensClient is Nomad Client's custom Consul client for requesting Consul // Service Identity tokens through Nomad Server. - tokensClient consulApi.ServiceIdentityAPI + tokensClient consulApiShim.ServiceIdentityAPI // vaultClients is used to interact with Vault for token and secret renewals vaultClients map[string]vaultclient.VaultClient @@ -348,7 +349,7 @@ var ( // registered via https://golang.org/pkg/net/rpc/#Server.RegisterName in place // of the client's normal RPC handlers. This allows server tests to override // the behavior of the client. -func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxiesFunc consulApi.SupportedProxiesAPIFunc, consulServices serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) { +func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxiesFunc consulApiShim.SupportedProxiesAPIFunc, consulServices serviceregistration.Handler, rpcs map[string]interface{}) (*Client, error) { // Create the tls wrapper var tlsWrap tlsutil.RegionWrapper if cfg.TLSConfig.EnableRPC { @@ -2813,7 +2814,7 @@ func (c *Client) newAllocRunnerConfig( // identity tokens. // DEPRECATED: remove in 1.9.0 func (c *Client) setupConsulTokenClient() error { - tc := consulApi.NewIdentitiesClient(c.logger, c.deriveSIToken) + tc := consulApiShim.NewIdentitiesClient(c.logger, c.deriveSIToken) c.tokensClient = tc return nil } @@ -2960,7 +2961,7 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli // deriveSIToken takes an allocation and a set of tasks and derives Consul // Service Identity tokens for each of the tasks by requesting them from the // Nomad Server. -func (c *Client) deriveSIToken(alloc *structs.Allocation, taskNames []string) (map[string]string, error) { +func (c *Client) deriveSIToken(ctx context.Context, alloc *structs.Allocation, taskNames []string) (map[string]string, error) { tasks, err := verifiedTasks(c.logger, alloc, taskNames) if err != nil { return nil, err @@ -3001,7 +3002,36 @@ func (c *Client) deriveSIToken(alloc *structs.Allocation, taskNames []string) (m // https://www.consul.io/api/acl/tokens.html#read-a-token // https://www.consul.io/docs/internals/security.html + consulConfigs := c.config.GetConsulConfigs(c.logger) + consulClientConstructor := consulApiShim.NewConsulClientFactory(c.config) + + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + tgNs := tg.Consul.GetNamespace() + + for task, secretID := range resp.Tokens { + t := tg.LookupTask(task) + ns := t.Consul.GetNamespace() + if ns == "" { + ns = tgNs + } + cluster := tg.LookupTask(task).GetConsulClusterName(tg) + consulConfig := consulConfigs[cluster] + consulClient, err := consulClientConstructor(consulConfig, c.logger) + if err != nil { + return nil, err + } + + err = consulClient.TokenPreflightCheck(ctx, &consulapi.ACLToken{ + Namespace: ns, + SecretID: secretID, + }) + if err != nil { + return nil, err + } + } + m := maps.Clone(resp.Tokens) + return m, nil } diff --git a/client/config/config.go b/client/config/config.go index 13db700b5bc..1628a86d1fa 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -1019,3 +1019,7 @@ func (c *Config) GetDefaultConsul() *structsc.ConsulConfig { func (c *Config) GetDefaultVault() *structsc.VaultConfig { return c.VaultConfigs[structs.VaultDefaultCluster] } + +func (c *Config) GetNode() *structs.Node { + return c.Node +} diff --git a/client/consul/consul.go b/client/consul/consul.go index 26096319cef..0de503964fb 100644 --- a/client/consul/consul.go +++ b/client/consul/consul.go @@ -4,30 +4,33 @@ package consul import ( + "context" "fmt" + "time" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" - + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/useragent" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" ) -// TokenDeriverFunc takes an allocation and a set of tasks and derives a -// service identity token for each. Requests go through nomad server. -type TokenDeriverFunc func(*structs.Allocation, []string) (map[string]string, error) +// TokenDeriverFunc takes an allocation and a set of tasks and derives a service +// identity token for each. Requests go through nomad server and the local +// Consul agent. +type TokenDeriverFunc func(context.Context, *structs.Allocation, []string) (map[string]string, error) // ServiceIdentityAPI is the interface the Nomad Client uses to request Consul -// Service Identity tokens through Nomad Server. +// Service Identity tokens through Nomad Server. (Deprecated: will be removed in 1.9.0) // // ACL requirements // - acl:write (used by Server only) type ServiceIdentityAPI interface { // DeriveSITokens contacts the nomad server and requests consul service // identity tokens be generated for tasks in the allocation. - DeriveSITokens(alloc *structs.Allocation, tasks []string) (map[string]string, error) + DeriveSITokens(ctx context.Context, alloc *structs.Allocation, tasks []string) (map[string]string, error) } // SupportedProxiesAPI is the interface the Nomad Client uses to request from @@ -50,13 +53,17 @@ type JWTLoginRequest struct { } // Client is the interface that the nomad client uses to interact with -// Consul. +// Consul tokens type Client interface { // DeriveTokenWithJWT logs into Consul using JWT and retrieves a Consul ACL // token. DeriveTokenWithJWT(JWTLoginRequest) (*consulapi.ACLToken, error) RevokeTokens([]*consulapi.ACLToken) error + + // TokenPreflightCheck verifies that a token has been replicated before we + // try to use it for registering services or bootstrapping Envoy + TokenPreflightCheck(context.Context, *consulapi.ACLToken) error } type consulClient struct { @@ -70,15 +77,26 @@ type consulClient struct { config *config.ConsulConfig logger hclog.Logger + + // preflightCheckTimeout/BaseInterval control how long the client will wait + // for Consul ACLs tokens to be fully replicated before giving up on the + // allocation; these are configurable via node metadata + preflightCheckTimeout time.Duration + preflightCheckBaseInterval time.Duration } // ConsulClientFunc creates a new Consul client for the specific Consul config type ConsulClientFunc func(config *config.ConsulConfig, logger hclog.Logger) (Client, error) +// NodeGetter breaks a circular dependency between client/config.Config and this +// package +type NodeGetter interface { + GetNode() *structs.Node +} + // NewConsulClientFactory returns a ConsulClientFunc that closes over the // partition -func NewConsulClientFactory(node *structs.Node) ConsulClientFunc { - partition := node.Attributes["consul.partition"] +func NewConsulClientFactory(nodeGetter NodeGetter) ConsulClientFunc { return func(config *config.ConsulConfig, logger hclog.Logger) (Client, error) { if config == nil { @@ -87,10 +105,19 @@ func NewConsulClientFactory(node *structs.Node) ConsulClientFunc { logger = logger.Named("consul").With("name", config.Name) + node := nodeGetter.GetNode() + partition := node.Attributes["consul.partition"] + preflightCheckTimeout := durationFromMeta( + node, "consul.token_preflight_check.timeout", time.Second*10) + preflightCheckBaseInterval := durationFromMeta( + node, "consul.token_preflight_check.base", time.Millisecond*500) + c := &consulClient{ - config: config, - logger: logger, - partition: partition, + config: config, + logger: logger, + partition: partition, + preflightCheckTimeout: preflightCheckTimeout, + preflightCheckBaseInterval: preflightCheckBaseInterval, } // Get the Consul API configuration @@ -115,6 +142,18 @@ func NewConsulClientFactory(node *structs.Node) ConsulClientFunc { } } +func durationFromMeta(node *structs.Node, key string, defaultDur time.Duration) time.Duration { + val := node.Meta[key] + if key == "" { + return defaultDur + } + d, err := time.ParseDuration(val) + if err != nil || d == 0 { + return defaultDur + } + return d +} + // DeriveTokenWithJWT takes a JWT from request and returns a consul token. func (c *consulClient) DeriveTokenWithJWT(req JWTLoginRequest) (*consulapi.ACLToken, error) { t, _, err := c.client.ACL().Login(&consulapi.ACLLoginParams{ @@ -141,3 +180,39 @@ func (c *consulClient) RevokeTokens(tokens []*consulapi.ACLToken) error { return mErr.ErrorOrNil() } + +// TokenPreflightCheck verifies that a token has been replicated before we +// try to use it for registering services or bootstrapping Envoy +func (c *consulClient) TokenPreflightCheck(pctx context.Context, t *consulapi.ACLToken) error { + timer, timerStop := helper.NewStoppedTimer() + defer timerStop() + + var retry uint64 + var err error + ctx, cancel := context.WithTimeout(pctx, c.preflightCheckTimeout) + defer cancel() + + for { + _, _, err = c.client.ACL().TokenReadSelf(&consulapi.QueryOptions{ + Namespace: t.Namespace, + Partition: c.partition, + AllowStale: true, + Token: t.SecretID, + }) + if err == nil { + return nil + } + + retry++ + backoff := helper.Backoff( + c.preflightCheckBaseInterval, c.preflightCheckBaseInterval*2, retry) + c.logger.Trace("Consul token not ready", "error", err, "backoff", backoff) + timer.Reset(backoff) + select { + case <-ctx.Done(): + return err + case <-timer.C: + continue + } + } +} diff --git a/client/consul/consul_test.go b/client/consul/consul_test.go new file mode 100644 index 00000000000..1cd25fcc07b --- /dev/null +++ b/client/consul/consul_test.go @@ -0,0 +1,131 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/shoenig/test/must" +) + +type mockConsulServer struct { + httpSrv *httptest.Server + + lock sync.RWMutex + errorCodeOnTokenSelf int + countTokenSelf int +} + +func (m *mockConsulServer) resetTokenSelf(errNo int) { + m.lock.Lock() + defer m.lock.Unlock() + m.countTokenSelf = 0 + m.errorCodeOnTokenSelf = errNo +} + +func newMockConsulServer() *mockConsulServer { + + srv := &mockConsulServer{} + + mux := http.NewServeMux() + mux.HandleFunc("/v1/acl/token/self", func(w http.ResponseWriter, r *http.Request) { + + srv.lock.RLock() + defer srv.lock.RUnlock() + srv.countTokenSelf++ + + if srv.errorCodeOnTokenSelf == 0 { + secretID := r.Header.Get("X-Consul-Token") + token := &consulapi.ACLToken{ + SecretID: secretID, + } + buf, _ := json.Marshal(token) + fmt.Fprintf(w, string(buf)) + return + } + + w.WriteHeader(srv.errorCodeOnTokenSelf) + fmt.Fprintf(w, "{}") + }) + + srv.httpSrv = httptest.NewServer(mux) + return srv +} + +type testClientCfg struct{ node *structs.Node } + +func (c *testClientCfg) GetNode() *structs.Node { + return c.node +} + +// TestConsul_TokenPreflightCheck verifies the retry logic for +func TestConsul_TokenPreflightCheck(t *testing.T) { + + consulSrv := newMockConsulServer() + consulSrv.resetTokenSelf(404) + + node := mock.Node() + node.Meta["consul.token_preflight_check.timeout"] = "100ms" + node.Meta["consul.token_preflight_check.base"] = "10ms" + clientCfg := &testClientCfg{node} + + factory := NewConsulClientFactory(clientCfg) + + cfg := &config.ConsulConfig{ + Addr: consulSrv.httpSrv.URL, + } + client, err := factory(cfg, testlog.HCLogger(t)) + must.NoError(t, err) + + token := &consulapi.ACLToken{ + SecretID: uuid.Generate(), + Namespace: "foo", + } + + preflightErrorCh := make(chan error) + + ctx1, cancel1 := context.WithTimeout(context.TODO(), time.Second*5) + defer cancel1() + + go func() { + preflightErrorCh <- client.TokenPreflightCheck(ctx1, token) + }() + + select { + case <-ctx1.Done(): + t.Fatal("test timed out before check timed out") + case err := <-preflightErrorCh: + must.EqError(t, err, "Unexpected response code: 404 ({})") + must.GreaterEq(t, 5, consulSrv.countTokenSelf) + } + + consulSrv.resetTokenSelf(0) + ctx2, cancel2 := context.WithTimeout(context.TODO(), time.Second*5) + defer cancel2() + + go func() { + preflightErrorCh <- client.TokenPreflightCheck(ctx2, token) + }() + + select { + case <-ctx2.Done(): + t.Fatal("test timed out and check should not have timed out") + case err := <-preflightErrorCh: + must.NoError(t, err, must.Sprintf("preflight should pass: %v", err)) + must.Eq(t, 1, consulSrv.countTokenSelf) + } +} diff --git a/client/consul/consul_testing.go b/client/consul/consul_testing.go index b0e281c5839..4ea9403366b 100644 --- a/client/consul/consul_testing.go +++ b/client/consul/consul_testing.go @@ -4,6 +4,7 @@ package consul import ( + "context" "crypto/md5" "encoding/hex" @@ -48,3 +49,7 @@ func (mc *MockConsulClient) RevokeTokens(tokens []*consulapi.ACLToken) error { } return nil } + +func (mc *MockConsulClient) TokenPreflightCheck(_ context.Context, _ *consulapi.ACLToken) error { + return nil +} diff --git a/client/consul/identities.go b/client/consul/identities.go index e7cf3669456..47125daae22 100644 --- a/client/consul/identities.go +++ b/client/consul/identities.go @@ -4,6 +4,8 @@ package consul import ( + "context" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" ) @@ -25,8 +27,8 @@ func NewIdentitiesClient(logger hclog.Logger, tokenDeriver TokenDeriverFunc) *id } } -func (c *identitiesClient) DeriveSITokens(alloc *structs.Allocation, tasks []string) (map[string]string, error) { - tokens, err := c.tokenDeriver(alloc, tasks) +func (c *identitiesClient) DeriveSITokens(ctx context.Context, alloc *structs.Allocation, tasks []string) (map[string]string, error) { + tokens, err := c.tokenDeriver(ctx, alloc, tasks) if err != nil { c.logger.Error("error deriving SI token", "error", err, "alloc_id", alloc.ID, "task_names", tasks) return nil, err diff --git a/client/consul/identities_test.go b/client/consul/identities_test.go index e307efc3214..3b17087d788 100644 --- a/client/consul/identities_test.go +++ b/client/consul/identities_test.go @@ -4,36 +4,37 @@ package consul import ( + "context" "errors" "testing" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) func TestSI_DeriveTokens(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) - dFunc := func(alloc *structs.Allocation, taskNames []string) (map[string]string, error) { + dFunc := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { return map[string]string{"a": "b"}, nil } tc := NewIdentitiesClient(logger, dFunc) - tokens, err := tc.DeriveSITokens(nil, nil) - require.NoError(t, err) - require.Equal(t, map[string]string{"a": "b"}, tokens) + tokens, err := tc.DeriveSITokens(context.TODO(), nil, nil) + must.NoError(t, err) + must.Eq(t, map[string]string{"a": "b"}, tokens) } func TestSI_DeriveTokens_error(t *testing.T) { ci.Parallel(t) logger := testlog.HCLogger(t) - dFunc := func(alloc *structs.Allocation, taskNames []string) (map[string]string, error) { + dFunc := func(context.Context, *structs.Allocation, []string) (map[string]string, error) { return nil, errors.New("some failure") } tc := NewIdentitiesClient(logger, dFunc) - _, err := tc.DeriveSITokens(&structs.Allocation{ID: "a1"}, nil) - require.Error(t, err) + _, err := tc.DeriveSITokens(context.TODO(), &structs.Allocation{ID: "a1"}, nil) + must.Error(t, err) } diff --git a/client/consul/identities_testing.go b/client/consul/identities_testing.go index b2a3e51a2b4..05eabb0d8f5 100644 --- a/client/consul/identities_testing.go +++ b/client/consul/identities_testing.go @@ -4,6 +4,7 @@ package consul import ( + "context" "sync" "github.com/hashicorp/nomad/helper/uuid" @@ -35,13 +36,13 @@ func NewMockServiceIdentitiesClient() *MockServiceIdentitiesClient { } } -func (mtc *MockServiceIdentitiesClient) DeriveSITokens(alloc *structs.Allocation, tasks []string) (map[string]string, error) { +func (mtc *MockServiceIdentitiesClient) DeriveSITokens(ctx context.Context, alloc *structs.Allocation, tasks []string) (map[string]string, error) { mtc.lock.Lock() defer mtc.lock.Unlock() // if the DeriveTokenFn is explicitly set, use that if mtc.DeriveTokenFn != nil { - return mtc.DeriveTokenFn(alloc, tasks) + return mtc.DeriveTokenFn(ctx, alloc, tasks) } // generate a token for each task, unless the mock has an error ready for diff --git a/website/content/docs/integrations/consul/service-mesh.mdx b/website/content/docs/integrations/consul/service-mesh.mdx index a39f6751874..6bd63c2757d 100644 --- a/website/content/docs/integrations/consul/service-mesh.mdx +++ b/website/content/docs/integrations/consul/service-mesh.mdx @@ -188,7 +188,7 @@ Consul service mesh sidecar proxy. All Nomad client nodes using network namespac must have these CNI plugins [installed][cni_install]. To use [`transparent_proxy`][] mode, Nomad client nodes will also need the -[`consul-cni`][] plugin installed. See the Linux post-installation [steps](/nomad/docs/install#post-installation-steps) for more detail on how to install CNI plugins. +[`consul-cni`][] plugin installed. See the Linux post-installation [steps](/nomad/docs/install#post-installation-steps) for more detail on how to install CNI plugins. ## Run the Service Mesh-enabled Services @@ -481,6 +481,21 @@ only be accessible from the host filesystem. However, the sidecar task secrets directory may not be available in systems where it is mounted in a temporary filesystem. +Bootstrapping the Envoy proxy requires that the Consul ACL token and service +registration have successfully replicated to whichever Consul server the local +Consul agent is connected to. Nomad clients poll for this value with exponential +backoff and a timeout. You can adjust the timeouts on a given node by setting +node metadata values via the command line or in the [`client.meta`][] agent +configuration block. The default values are shown below: + +```shell-session +nomad node meta apply -node-id $nodeID \ + consul.token_preflight_check.timeout=10s \ + consul.token_preflight_check.base=500ms \ + consul.service_preflight_check.timeout=60s \ + consul.service_preflight_check.base=1s +``` + [count-dashboard]: /img/count-dashboard.png [consul_acl]: https://github.com/hashicorp/consul/issues/7414 [gh-9907]: https://github.com/hashicorp/nomad/issues/9907 @@ -499,3 +514,4 @@ filesystem. [cni_plugins]: /nomad/docs/networking/cni#cni-reference-plugins [consul_dns_port]: /consul/docs/agent/config/config-files#dns_port [`network.dns`]: /nomad/docs/job-specification/network#dns-parameters +[`client.meta`]: /nomad/docs/configuration/client#meta