Skip to content

Commit

Permalink
services: fix data integrity errors for Nomad native services
Browse files Browse the repository at this point in the history
This changeset fixes three potential data integrity issues between allocations
and their Nomad native service registrations.

* When a node is marked down because it missed heartbeats, we remove Vault and
  Consul tokens (for the pre-Workload Identity workflows) after we've written
  the node update to Raft. This is unavoidably non-transactional because the
  Consul and Vault servers aren't in the same Raft cluster as Nomad itself. But
  we've unnecessarily mirrored this same behavior to deregister Nomad
  services. This makes it possible for the leader to successfully write the node
  update to Raft without removing services.

  To address this, move the delete into the same Raft transaction. One minor
  caveat with this approach is the upgrade path: if the leader is upgraded first
  and a node is marked down during this window, older followers will have stale
  information until they are also upgraded. This is unavoidable without
  requiring the leader to unconditionally make an extra Raft write for every
  down node until 2 LTS versions after Nomad 1.8.0. This temporary reduction in
  data integrity for stale reads seems like a reasonable tradeoff.

* When an allocation is marked client-terminal from the client in
  `UpdateAllocsFromClient`, we have an opportunity to ensure data integrity by
  deregistering services for that allocation.

* When an allocation is deleted during eval garbage collection, we have an
  opportunity to ensure data integrity by deregistering services for that
  allocation. This is a cheap no-op if the allocation has been previously marked
  client-terminal.

This changeset does not address client-side retries for the originally reported
issue, which will be done in a separate PR.

Ref: #16616
  • Loading branch information
tgross committed May 15, 2024
1 parent c9fd93c commit 4d220da
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 91 deletions.
25 changes: 0 additions & 25 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,31 +692,6 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
_ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, true)
}

// Identify the service registrations current placed on the downed
// node.
serviceRegistrations, err := n.srv.State().GetServiceRegistrationsByNodeID(ws, args.NodeID)
if err != nil {
n.logger.Error("looking up service registrations for node failed",
"node_id", args.NodeID, "error", err)
return err
}

// If the node has service registrations assigned to it, delete these
// via Raft.
if l := len(serviceRegistrations); l > 0 {
n.logger.Debug("deleting service registrations on node due to down state",
"num_service_registrations", l, "node_id", args.NodeID)

deleteRegReq := structs.ServiceRegistrationDeleteByNodeIDRequest{NodeID: args.NodeID}

_, index, err = n.srv.raftApply(structs.ServiceRegistrationDeleteByNodeIDRequestType, &deleteRegReq)
if err != nil {
n.logger.Error("failed to delete service registrations for node",
"node_id", args.NodeID, "error", err)
return err
}
}

default:
ttl, err := n.srv.resetHeartbeatTimer(args.NodeID)
if err != nil {
Expand Down
204 changes: 144 additions & 60 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) {

// Create a node and upsert this into state.
node := mock.Node()
require.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node))
must.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node))

// Generate service registrations, ensuring the nodeID is set to the
// generated node from above.
Expand All @@ -1498,16 +1498,16 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) {
}

// Upsert the service registrations into state.
require.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services))
must.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services))

// Check the service registrations are in state as we expect, so we can
// have confidence in the rest of the test.
ws := memdb.NewWatchSet()
nodeRegs, err := testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID)
require.NoError(t, err)
require.Len(t, nodeRegs, 2)
require.Equal(t, nodeRegs[0].NodeID, node.ID)
require.Equal(t, nodeRegs[1].NodeID, node.ID)
must.NoError(t, err)
must.Len(t, 2, nodeRegs)
must.Eq(t, nodeRegs[0].NodeID, node.ID)
must.Eq(t, nodeRegs[1].NodeID, node.ID)

// Generate and trigger a node down status update. This mimics what happens
// when the node fails its heart-beating.
Expand All @@ -1520,13 +1520,17 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) {
var reply structs.NodeUpdateResponse

nodeEndpoint := NewNodeEndpoint(testServer, nil)
require.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))
must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))

// Query our state, to ensure the node service registrations have been
// removed.
nodeRegs, err = testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID)
require.NoError(t, err)
require.Len(t, nodeRegs, 0)
must.NoError(t, err)
must.Len(t, 0, nodeRegs)

// Re-send the status update, to ensure we get no error if service
// registrations have already been removed
must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))
}

// TestClientEndpoint_UpdateDrain asserts the ability to initiate drain
Expand Down Expand Up @@ -3044,7 +3048,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
}
}

func TestClientEndpoint_UpdateAlloc(t *testing.T) {
func TestNode_UpdateAlloc(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
Expand All @@ -3057,7 +3061,6 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
require := require.New(t)

// Create the register request
node := mock.Node()
Expand All @@ -3068,34 +3071,28 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {

// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))

state := s1.fsm.State()
store := s1.fsm.State()
// Inject mock job
job := mock.Job()
job.ID = "mytestjob"
err := state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)
require.Nil(err)
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job))

// Inject fake allocations
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.NodeID = node.ID
err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
require.Nil(err)
must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)))
alloc.TaskGroup = job.TaskGroups[0].Name

alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
err = state.UpsertJobSummary(99, mock.JobSummary(alloc2.JobID))
require.Nil(err)
must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc2.JobID)))
alloc2.TaskGroup = job.TaskGroups[0].Name

err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc, alloc2})
require.Nil(err)
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc, alloc2}))

// Attempt updates of more than one alloc for the same job
clientAlloc1 := new(structs.Allocation)
Expand All @@ -3113,36 +3110,31 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {
}
var resp2 structs.NodeAllocsResponse
start := time.Now()
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2)
require.Nil(err)
require.NotEqual(uint64(0), resp2.Index)

if diff := time.Since(start); diff < batchUpdateInterval {
t.Fatalf("too fast: %v", diff)
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2))
must.Greater(t, 0, resp2.Index)
must.GreaterEq(t, batchUpdateInterval, time.Since(start))

// Lookup the alloc
ws := memdb.NewWatchSet()
out, err := state.AllocByID(ws, alloc.ID)
require.Nil(err)
require.Equal(structs.AllocClientStatusFailed, out.ClientStatus)
require.True(out.ModifyTime > 0)
out, err := store.AllocByID(ws, alloc.ID)
must.NoError(t, err)
must.Eq(t, structs.AllocClientStatusFailed, out.ClientStatus)
must.Greater(t, 0, out.ModifyTime)

// Assert that exactly one eval with TriggeredBy EvalTriggerRetryFailedAlloc exists
evaluations, err := state.EvalsByJob(ws, job.Namespace, job.ID)
require.Nil(err)
require.True(len(evaluations) != 0)
evaluations, err := store.EvalsByJob(ws, job.Namespace, job.ID)
must.NoError(t, err)
must.Greater(t, 0, len(evaluations))
foundCount := 0
for _, resultEval := range evaluations {
if resultEval.TriggeredBy == structs.EvalTriggerRetryFailedAlloc && resultEval.WaitUntil.IsZero() {
foundCount++
}
}
require.Equal(1, foundCount, "Should create exactly one eval for failed allocs")

must.Eq(t, 1, foundCount, must.Sprint("Should create exactly one eval for failed allocs"))
}

func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
func TestNode_UpdateAlloc_NodeNotReady(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
Expand All @@ -3157,30 +3149,26 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)
require.NoError(t, err)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))

// Inject mock job and allocation.
state := s1.fsm.State()
store := s1.fsm.State()

job := mock.Job()
err = state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)
require.NoError(t, err)
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job))

alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.ClientStatus = structs.AllocClientStatusRunning

err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
require.NoError(t, err)
err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
require.NoError(t, err)
must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)))
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}))

// Mark node as down.
err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 101, node.ID, structs.NodeStatusDown, time.Now().UnixNano(), nil)
require.NoError(t, err)
must.NoError(t, store.UpdateNodeStatus(
structs.MsgTypeTestSetup, 101, node.ID, structs.NodeStatusDown, time.Now().UnixNano(), nil))

// Try to update alloc.
updatedAlloc := new(structs.Allocation)
Expand All @@ -3192,31 +3180,127 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
WriteRequest: structs.WriteRequest{Region: "global"},
}
var allocUpdateResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not allowed to update allocs")
err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
must.ErrorContains(t, err, "not allowed to update allocs")

// Send request without an explicit node ID.
updatedAlloc.NodeID = ""
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "missing node ID")
must.ErrorContains(t, err, "missing node ID")

// Send request with invalid node ID.
updatedAlloc.NodeID = "not-valid"
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "node lookup failed")
must.ErrorContains(t, err, "node lookup failed")

// Send request with non-existing node ID.
updatedAlloc.NodeID = uuid.Generate()
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not found")
must.ErrorContains(t, err, "not found")

// Mark node as ready and try again.
err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 102, node.ID, structs.NodeStatusReady, time.Now().UnixNano(), nil)
require.NoError(t, err)
must.NoError(t, store.UpdateNodeStatus(
structs.MsgTypeTestSetup, 102, node.ID, structs.NodeStatusReady, time.Now().UnixNano(), nil))

updatedAlloc.NodeID = node.ID
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.NoError(t, err)
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp))
}

func TestNode_UpdateAllocServiceRegistrations(t *testing.T) {
ci.Parallel(t)

srv, cleanup := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})

defer cleanup()
codec := rpcClient(t, srv)
testutil.WaitForLeader(t, srv.RPC)

store := srv.fsm.State()
index := uint64(100)

// Inject mock node, job, allocations for that job, and service
// registrations for those allocs
node := mock.Node()
index++
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node))

job := mock.Job()
job.ID = "mytestjob"
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job))

alloc0 := mock.Alloc()
alloc0.JobID = job.ID
alloc0.NodeID = node.ID
index++
must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc0.JobID)))
alloc0.TaskGroup = job.TaskGroups[0].Name

alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.NodeID = node.ID
index++
must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID)))
alloc1.TaskGroup = job.TaskGroups[0].Name

alloc2 := mock.Alloc() // will have no service registration
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
index++
must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID)))
alloc2.TaskGroup = job.TaskGroups[0].Name

index++
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc0, alloc1, alloc2}))

serviceFor := func(allocID string, port int) *structs.ServiceRegistration {
return &structs.ServiceRegistration{
ID: fmt.Sprintf("_nomad-task-%s-group-api-countdash-api-http", allocID),
ServiceName: "countdash-api",
Namespace: job.Namespace,
NodeID: node.ID,
Datacenter: node.Datacenter,
JobID: job.ID,
AllocID: allocID,
Tags: []string{"bar"},
Address: "192.168.200.200",
Port: port,
}
}

service0 := serviceFor(alloc0.ID, 29001)
service1 := serviceFor(alloc1.ID, 29001)
index++
must.NoError(t, store.UpsertServiceRegistrations(
structs.MsgTypeTestSetup, index, []*structs.ServiceRegistration{service0, service1}))

// no-op
update := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc0, alloc1, alloc2},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeAllocsResponse
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp))

services, err := store.GetServiceRegistrationsByNodeID(nil, node.ID)
must.NoError(t, err)
must.Len(t, 2, services, must.Sprint("no-op update should not have deleted services"))

// fail one allocation
alloc0 = alloc0.Copy()
alloc0.ClientStatus = structs.AllocClientStatusFailed
update = &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc0, alloc1, alloc2},
WriteRequest: structs.WriteRequest{Region: "global"},
}
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp))

services, err = store.GetServiceRegistrationsByNodeID(nil, node.ID)
must.NoError(t, err)
must.Eq(t, []*structs.ServiceRegistration{service1}, services,
must.Sprint("failing an allocation should result in its service being deleted"))
}

func TestClientEndpoint_BatchUpdate(t *testing.T) {
Expand Down
Loading

0 comments on commit 4d220da

Please sign in to comment.