From e9116e3d856ba91a6bef9dadb9714f100baa3e0e Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 26 Oct 2017 16:59:37 -0700 Subject: [PATCH 1/2] GetClientAllocs handles garbage collection events --- nomad/node_endpoint.go | 30 ++++++++++++- nomad/node_endpoint_test.go | 90 +++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 2 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index a8778216ffd..dc7e2d6a097 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -679,6 +679,12 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, return fmt.Errorf("missing node ID") } + // numOldAllocs is used to detect if there is a garbage collection event + // that effects the node. When an allocation is garbage collected, that does + // not change the modify index changes and thus the query won't unblock, + // even though the set of allocations on the node has changed. + var numOldAllocs int + // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, @@ -708,8 +714,17 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply.Allocs = make(map[string]uint64) reply.MigrateTokens = make(map[string]string) + // preferTableIndex is used to determine whether we should build the + // response index based on the full table indexes versus the modify + // indexes of the allocations on the specific node. This is prefered + // in the case that the node doesn't yet have allocations or when we + // detect a GC that effects the node. + preferTableIndex := true + // Setup the output - if len(allocs) != 0 { + if numAllocs := len(allocs); numAllocs != 0 { + preferTableIndex = false + for _, alloc := range allocs { reply.Allocs[alloc.ID] = alloc.AllocModifyIndex @@ -742,7 +757,18 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } - } else { + + // Determine if we have less allocations than before. This + // indicates there was a garbage collection + if numAllocs < numOldAllocs { + preferTableIndex = true + } + + // Store the new number of allocations + numOldAllocs = numAllocs + } + + if preferTableIndex { // Use the last index that affected the nodes table index, err := state.Index("allocs") if err != nil { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 772a40e3e04..f6ee39d98a4 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1328,6 +1328,96 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { } } +func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { + t.Parallel() + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + node.CreateIndex = resp.Index + node.ModifyIndex = resp.Index + + // Inject fake allocations async + alloc1 := mock.Alloc() + alloc1.NodeID = node.ID + alloc2 := mock.Alloc() + alloc2.NodeID = node.ID + state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) + start := time.Now() + time.AfterFunc(100*time.Millisecond, func() { + err := state.UpsertAllocs(100, []*structs.Allocation{alloc1, alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Lookup the allocs in a blocking query + req := &structs.NodeSpecificRequest{ + NodeID: node.ID, + SecretID: node.SecretID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 50, + MaxQueryTime: time.Second, + }, + } + var resp2 structs.NodeClientAllocsResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + // Should block at least 100ms + if time.Since(start) < 100*time.Millisecond { + t.Fatalf("too fast") + } + + if resp2.Index != 100 { + t.Fatalf("Bad index: %d %d", resp2.Index, 100) + } + + if len(resp2.Allocs) != 2 || resp2.Allocs[alloc1.ID] != 100 { + t.Fatalf("bad: %#v", resp2.Allocs) + } + + // Delete an allocation + time.AfterFunc(100*time.Millisecond, func() { + err := state.DeleteEval(200, nil, []string{alloc2.ID}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.QueryOptions.MinQueryIndex = 150 + var resp3 structs.NodeClientAllocsResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3); err != nil { + t.Fatalf("err: %v", err) + } + + if time.Since(start) < 100*time.Millisecond { + t.Fatalf("too fast") + } + if resp3.Index != 200 { + t.Fatalf("Bad index: %d %d", resp3.Index, 200) + } + if len(resp3.Allocs) != 1 || resp3.Allocs[alloc1.ID] != 100 { + t.Fatalf("bad: %#v", resp3.Allocs) + } +} + // A MigrateToken should not be created if an allocation shares the same node // with its previous allocation func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) { From 560acea31d2ae63124345d2c811b4672d60868cc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 27 Oct 2017 09:50:10 -0700 Subject: [PATCH 2/2] Changelog + assert --- CHANGELOG.md | 2 ++ nomad/node_endpoint_test.go | 40 +++++++++++-------------------------- 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0267b8ac15..61922276ab6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,8 @@ BUG FIXES: change [GH-3214] * api: Fix search handling of jobs with more than four hyphens and case were length could cause lookup error [GH-3203] + * client: Improve the speed at which clients detect garbage collection events + [GH_-3452] * client: Fix lock contention that could cause a node to miss a heartbeat and be marked as down [GH-3195] * driver/docker: Fix docker user specified syslogging [GH-3184] diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index f6ee39d98a4..ed04e82f99b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1330,6 +1330,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { t.Parallel() + assert := assert.New(t) s1 := testServer(t, nil) defer s1.Shutdown() codec := rpcClient(t, s1) @@ -1344,9 +1345,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { // Fetch the response var resp structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) node.CreateIndex = resp.Index node.ModifyIndex = resp.Index @@ -1359,10 +1358,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) start := time.Now() time.AfterFunc(100*time.Millisecond, func() { - err := state.UpsertAllocs(100, []*structs.Allocation{alloc1, alloc2}) - if err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(state.UpsertAllocs(100, []*structs.Allocation{alloc1, alloc2})) }) // Lookup the allocs in a blocking query @@ -1376,45 +1372,33 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { }, } var resp2 structs.NodeClientAllocsResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2); err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2)) // Should block at least 100ms if time.Since(start) < 100*time.Millisecond { t.Fatalf("too fast") } - if resp2.Index != 100 { - t.Fatalf("Bad index: %d %d", resp2.Index, 100) - } - - if len(resp2.Allocs) != 2 || resp2.Allocs[alloc1.ID] != 100 { - t.Fatalf("bad: %#v", resp2.Allocs) + assert.EqualValues(100, resp2.Index) + if assert.Len(resp2.Allocs, 2) { + assert.EqualValues(100, resp2.Allocs[alloc1.ID]) } // Delete an allocation time.AfterFunc(100*time.Millisecond, func() { - err := state.DeleteEval(200, nil, []string{alloc2.ID}) - if err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID})) }) req.QueryOptions.MinQueryIndex = 150 var resp3 structs.NodeClientAllocsResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3); err != nil { - t.Fatalf("err: %v", err) - } + assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3)) if time.Since(start) < 100*time.Millisecond { t.Fatalf("too fast") } - if resp3.Index != 200 { - t.Fatalf("Bad index: %d %d", resp3.Index, 200) - } - if len(resp3.Allocs) != 1 || resp3.Allocs[alloc1.ID] != 100 { - t.Fatalf("bad: %#v", resp3.Allocs) + assert.EqualValues(200, resp3.Index) + if assert.Len(resp3.Allocs, 1) { + assert.EqualValues(100, resp3.Allocs[alloc1.ID]) } }