From 2598e4f00091f25b9f4224c30c872e996bd8a83c Mon Sep 17 00:00:00 2001 From: hc-github-team-nomad-core <82989552+hc-github-team-nomad-core@users.noreply.github.com> Date: Fri, 17 Mar 2023 12:12:10 -0400 Subject: [PATCH] nsd: always set deregister flag after deregistration of group (#16289) (#16536) (manual cherry-pick of ed498f8ddb0579cc4e7b01b485c0608428274a39) * services: always set deregister flag after deregistration of group This PR fixes a bug where the group service hook's deregister flag was not set in some cases, causing the hook to attempt deregistrations twice during job updates (alloc replacement). In the tests ... we used to assert on the wrong behvior (remove twice) which has now been corrected to assert we remove only once. This bug was "silent" in the Consul provider world because the error logs for double deregistration only show up in Consul logs; with the Nomad provider the error logs are in the Nomad agent logs. * services: cleanup group service hook tests Co-authored-by: Seth Hoenig --- .changelog/16289.txt | 3 + client/allocrunner/groupservice_hook.go | 62 +++++++------ client/allocrunner/groupservice_hook_test.go | 94 ++++++++++---------- helper/funcs.go | 8 ++ 4 files changed, 92 insertions(+), 75 deletions(-) create mode 100644 .changelog/16289.txt diff --git a/.changelog/16289.txt b/.changelog/16289.txt new file mode 100644 index 000000000000..f11e0dd46882 --- /dev/null +++ b/.changelog/16289.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Fixed a bug where a service would be deregistered twice +``` diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 3e567537bb6a..99b61015aa42 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -5,12 +5,13 @@ import ( "sync" "time" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) @@ -42,7 +43,7 @@ type groupServiceHook struct { // and check registration and deregistration. serviceRegWrapper *wrapper.HandlerWrapper - logger log.Logger + logger hclog.Logger // The following fields may be updated canary bool @@ -59,11 +60,11 @@ type groupServiceHook struct { type groupServiceHookConfig struct { alloc *structs.Allocation - restarter agentconsul.WorkloadRestarter + restarter serviceregistration.WorkloadRestarter taskEnvBuilder *taskenv.Builder networkStatusGetter networkStatusGetter shutdownDelayCtx context.Context - logger log.Logger + logger hclog.Logger // namespace is the Nomad or Consul namespace in which service // registrations will be made. @@ -120,23 +121,26 @@ func (h *groupServiceHook) Prerun() error { h.prerun = true h.mu.Unlock() }() - return h.prerunLocked() + return h.preRunLocked() } -func (h *groupServiceHook) prerunLocked() error { +// caller must hold h.lock +func (h *groupServiceHook) preRunLocked() error { if len(h.services) == 0 { return nil } - services := h.getWorkloadServices() + services := h.getWorkloadServicesLocked() return h.serviceRegWrapper.RegisterWorkload(services) } +// Update is run when a job submitter modifies service(s) (but not much else - +// otherwise a full alloc replacement would occur). func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { h.mu.Lock() defer h.mu.Unlock() - oldWorkloadServices := h.getWorkloadServices() + oldWorkloadServices := h.getWorkloadServicesLocked() // Store new updated values out of request canary := false @@ -168,7 +172,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { h.namespace = req.Alloc.ServiceProviderNamespace() // Create new task services struct with those new values - newWorkloadServices := h.getWorkloadServices() + newWorkloadServices := h.getWorkloadServicesLocked() if !h.prerun { // Update called before Prerun. Update alloc and exit to allow @@ -188,21 +192,20 @@ func (h *groupServiceHook) PreTaskRestart() error { }() h.preKillLocked() - return h.prerunLocked() + return h.preRunLocked() } func (h *groupServiceHook) PreKill() { - h.mu.Lock() - defer h.mu.Unlock() - h.preKillLocked() + helper.WithLock(&h.mu, h.preKillLocked) } -// implements the PreKill hook but requires the caller hold the lock +// implements the PreKill hook +// +// caller must hold h.lock func (h *groupServiceHook) preKillLocked() { // If we have a shutdown delay deregister group services and then wait // before continuing to kill tasks. - h.deregister() - h.deregistered = true + h.deregisterLocked() if h.delay == 0 { return @@ -219,24 +222,31 @@ func (h *groupServiceHook) preKillLocked() { } func (h *groupServiceHook) Postrun() error { - h.mu.Lock() - defer h.mu.Unlock() - - if !h.deregistered { - h.deregister() - } + helper.WithLock(&h.mu, h.deregisterLocked) return nil } -// deregister services from Consul. -func (h *groupServiceHook) deregister() { +// deregisterLocked will deregister services from Consul/Nomad service provider. +// +// caller must hold h.lock +func (h *groupServiceHook) deregisterLocked() { + if h.deregistered { + return + } + if len(h.services) > 0 { - workloadServices := h.getWorkloadServices() + workloadServices := h.getWorkloadServicesLocked() h.serviceRegWrapper.RemoveWorkload(workloadServices) } + + h.deregistered = true } -func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadServices { +// getWorkloadServicesLocked returns the set of workload services currently +// on the hook. +// +// caller must hold h.lock +func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices { // Interpolate with the task's environment interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services) diff --git a/client/allocrunner/groupservice_hook_test.go b/client/allocrunner/groupservice_hook_test.go index e05df8cbc19e..606dab8f37a3 100644 --- a/client/allocrunner/groupservice_hook_test.go +++ b/client/allocrunner/groupservice_hook_test.go @@ -14,7 +14,7 @@ import ( "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil) @@ -50,22 +50,21 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } // TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks @@ -92,23 +91,23 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) // Incease shutdown Delay alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) // Assert that update updated the delay value - require.Equal(t, h.delay, 15*time.Second) + must.Eq(t, h.delay, 15*time.Second) // Remove shutdown delay alloc.Job.TaskGroups[0].ShutdownDelay = nil req = &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) // Assert that update updated the delay value - require.Equal(t, h.delay, 0*time.Second) + must.Eq(t, h.delay, 0*time.Second) } // TestGroupServiceHook_GroupServices asserts group service hooks with group @@ -133,22 +132,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } // TestGroupServiceHook_GroupServices_Nomad asserts group service hooks with @@ -179,25 +177,24 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) // Trigger our hook requests. req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.Update(req)) + must.NoError(t, h.Postrun()) + must.NoError(t, h.PreTaskRestart()) // Ensure the Nomad mock provider has the expected operations. ops := nomadMockClient.GetOps() - require.Len(t, ops, 5) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun // Ensure the Consul mock provider has zero operations. - require.Len(t, consulMockClient.GetOps(), 0) + must.SliceEmpty(t, consulMockClient.GetOps()) } // TestGroupServiceHook_Error asserts group service hooks with group @@ -234,22 +231,21 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } func TestGroupServiceHook_getWorkloadServices(t *testing.T) { @@ -284,6 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) { logger: logger, }) - services := h.getWorkloadServices() - require.Len(t, services.Services, 1) + services := h.getWorkloadServicesLocked() + must.Len(t, 1, services.Services) } diff --git a/helper/funcs.go b/helper/funcs.go index fb962e9bf51e..7d8f1d3bd162 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -9,6 +9,7 @@ import ( "reflect" "regexp" "strings" + "sync" "time" multierror "github.com/hashicorp/go-multierror" @@ -477,3 +478,10 @@ OUTER: } return true } + +// WithLock executes a function while holding a lock. +func WithLock(lock sync.Locker, f func()) { + lock.Lock() + defer lock.Unlock() + f() +}