diff --git a/.changelog/20590.txt b/.changelog/20590.txt new file mode 100644 index 000000000000..4406f131d281 --- /dev/null +++ b/.changelog/20590.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Fixed bug where Nomad services might not be deregistered when nodes are marked down or allocations are terminal +``` diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index f009f41b9e8b..2b0bacc4f26f 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -37,8 +37,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { eval := mock.Eval() eval.Status = structs.EvalStatusFailed store.UpsertJobSummary(999, mock.JobSummary(eval.JobID)) - err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval}) - require.Nil(t, err) + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})) // Insert mock job with rescheduling disabled job := mock.Job() @@ -47,8 +46,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { Attempts: 0, Interval: 0 * time.Second, } - err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job) - require.Nil(t, err) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job)) // Insert "dead" alloc alloc := mock.Alloc() @@ -64,10 +62,22 @@ func TestCoreScheduler_EvalGC(t *testing.T) { alloc2.ClientStatus = structs.AllocClientStatusLost alloc2.JobID = eval.JobID alloc2.TaskGroup = job.TaskGroups[0].Name - err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2})) + + // Insert service for "dead" alloc + service := &structs.ServiceRegistration{ + ID: fmt.Sprintf("_nomad-task-%s-group-api-countdash-api-http", alloc.ID), + ServiceName: "countdash-api", + Namespace: eval.Namespace, + NodeID: alloc.NodeID, + Datacenter: "dc1", + JobID: eval.JobID, + AllocID: alloc.ID, + Address: "192.168.200.200", + Port: 29001, + } + must.NoError(t, store.UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 1002, []*structs.ServiceRegistration{service})) // Update the time tables to make this work tt := s1.fsm.TimeTable() @@ -75,43 +85,30 @@ func TestCoreScheduler_EvalGC(t *testing.T) { // Create a core scheduler snap, err := store.Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) core := NewCoreScheduler(s1, snap) // Attempt the GC gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) - err = core.Process(gc) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, core.Process(gc)) // Should be gone ws := memdb.NewWatchSet() out, err := store.EvalByID(ws, eval.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != nil { - t.Fatalf("bad: %v", out) - } + must.NoError(t, err) + must.Nil(t, out, must.Sprint("expected eval to be GC'd")) outA, err := store.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA != nil { - t.Fatalf("bad: %v", outA) - } + must.NoError(t, err) + must.Nil(t, outA, must.Sprint("expected alloc to be GC'd")) outA2, err := store.AllocByID(ws, alloc2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA2 != nil { - t.Fatalf("bad: %v", outA2) - } + must.NoError(t, err) + must.Nil(t, outA2, must.Sprint("expected alloc to be GC'd")) + + services, err := store.GetServiceRegistrationsByNodeID(nil, alloc.NodeID) + must.NoError(t, err) + must.Len(t, 0, services) } // Tests GC behavior on allocations being rescheduled diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index fb670962b5d9..b858a7f2032c 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -686,31 +686,6 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct _ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, true) } - // Identify the service registrations current placed on the downed - // node. - serviceRegistrations, err := n.srv.State().GetServiceRegistrationsByNodeID(ws, args.NodeID) - if err != nil { - n.logger.Error("looking up service registrations for node failed", - "node_id", args.NodeID, "error", err) - return err - } - - // If the node has service registrations assigned to it, delete these - // via Raft. - if l := len(serviceRegistrations); l > 0 { - n.logger.Debug("deleting service registrations on node due to down state", - "num_service_registrations", l, "node_id", args.NodeID) - - deleteRegReq := structs.ServiceRegistrationDeleteByNodeIDRequest{NodeID: args.NodeID} - - _, index, err = n.srv.raftApply(structs.ServiceRegistrationDeleteByNodeIDRequestType, &deleteRegReq) - if err != nil { - n.logger.Error("failed to delete service registrations for node", - "node_id", args.NodeID, "error", err) - return err - } - } - default: ttl, err := n.srv.resetHeartbeatTimer(args.NodeID) if err != nil { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 7a1d54d80c98..730b3abc3fa2 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1354,7 +1354,7 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) { // Create a node and upsert this into state. node := mock.Node() - require.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) + must.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) // Generate service registrations, ensuring the nodeID is set to the // generated node from above. @@ -1365,16 +1365,16 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) { } // Upsert the service registrations into state. - require.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) + must.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) // Check the service registrations are in state as we expect, so we can // have confidence in the rest of the test. ws := memdb.NewWatchSet() nodeRegs, err := testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID) - require.NoError(t, err) - require.Len(t, nodeRegs, 2) - require.Equal(t, nodeRegs[0].NodeID, node.ID) - require.Equal(t, nodeRegs[1].NodeID, node.ID) + must.NoError(t, err) + must.Len(t, 2, nodeRegs) + must.Eq(t, nodeRegs[0].NodeID, node.ID) + must.Eq(t, nodeRegs[1].NodeID, node.ID) // Generate and trigger a node down status update. This mimics what happens // when the node fails its heart-beating. @@ -1387,13 +1387,17 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) { var reply structs.NodeUpdateResponse nodeEndpoint := NewNodeEndpoint(testServer, nil) - require.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply)) + must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply)) // Query our state, to ensure the node service registrations have been // removed. nodeRegs, err = testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID) - require.NoError(t, err) - require.Len(t, nodeRegs, 0) + must.NoError(t, err) + must.Len(t, 0, nodeRegs) + + // Re-send the status update, to ensure we get no error if service + // registrations have already been removed + must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply)) } // TestClientEndpoint_UpdateDrain asserts the ability to initiate drain @@ -2911,7 +2915,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { } } -func TestClientEndpoint_UpdateAlloc(t *testing.T) { +func TestNode_UpdateAlloc(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { @@ -2924,7 +2928,6 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { defer cleanupS1() codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) - require := require.New(t) // Create the register request node := mock.Node() @@ -2935,34 +2938,28 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { // Fetch the response var resp structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) - state := s1.fsm.State() + store := s1.fsm.State() // Inject mock job job := mock.Job() job.ID = "mytestjob" - err := state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job) - require.Nil(err) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)) // Inject fake allocations alloc := mock.Alloc() alloc.JobID = job.ID alloc.NodeID = node.ID - err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) - require.Nil(err) + must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))) alloc.TaskGroup = job.TaskGroups[0].Name alloc2 := mock.Alloc() alloc2.JobID = job.ID alloc2.NodeID = node.ID - err = state.UpsertJobSummary(99, mock.JobSummary(alloc2.JobID)) - require.Nil(err) + must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc2.JobID))) alloc2.TaskGroup = job.TaskGroups[0].Name - err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc, alloc2}) - require.Nil(err) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc, alloc2})) // Attempt updates of more than one alloc for the same job clientAlloc1 := new(structs.Allocation) @@ -2980,36 +2977,31 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { } var resp2 structs.NodeAllocsResponse start := time.Now() - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2) - require.Nil(err) - require.NotEqual(uint64(0), resp2.Index) - - if diff := time.Since(start); diff < batchUpdateInterval { - t.Fatalf("too fast: %v", diff) - } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2)) + must.Greater(t, 0, resp2.Index) + must.GreaterEq(t, batchUpdateInterval, time.Since(start)) // Lookup the alloc ws := memdb.NewWatchSet() - out, err := state.AllocByID(ws, alloc.ID) - require.Nil(err) - require.Equal(structs.AllocClientStatusFailed, out.ClientStatus) - require.True(out.ModifyTime > 0) + out, err := store.AllocByID(ws, alloc.ID) + must.NoError(t, err) + must.Eq(t, structs.AllocClientStatusFailed, out.ClientStatus) + must.Greater(t, 0, out.ModifyTime) // Assert that exactly one eval with TriggeredBy EvalTriggerRetryFailedAlloc exists - evaluations, err := state.EvalsByJob(ws, job.Namespace, job.ID) - require.Nil(err) - require.True(len(evaluations) != 0) + evaluations, err := store.EvalsByJob(ws, job.Namespace, job.ID) + must.NoError(t, err) + must.Greater(t, 0, len(evaluations)) foundCount := 0 for _, resultEval := range evaluations { if resultEval.TriggeredBy == structs.EvalTriggerRetryFailedAlloc && resultEval.WaitUntil.IsZero() { foundCount++ } } - require.Equal(1, foundCount, "Should create exactly one eval for failed allocs") - + must.Eq(t, 1, foundCount, must.Sprint("Should create exactly one eval for failed allocs")) } -func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { +func TestNode_UpdateAlloc_NodeNotReady(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, nil) @@ -3024,15 +3016,13 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.GenericResponse - err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp) - require.NoError(t, err) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) // Inject mock job and allocation. - state := s1.fsm.State() + store := s1.fsm.State() job := mock.Job() - err = state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job) - require.NoError(t, err) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)) alloc := mock.Alloc() alloc.JobID = job.ID @@ -3040,14 +3030,12 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { alloc.TaskGroup = job.TaskGroups[0].Name alloc.ClientStatus = structs.AllocClientStatusRunning - err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) - require.NoError(t, err) - err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) - require.NoError(t, err) + must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})) // Mark node as down. - err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 101, node.ID, structs.NodeStatusDown, time.Now().UnixNano(), nil) - require.NoError(t, err) + must.NoError(t, store.UpdateNodeStatus( + structs.MsgTypeTestSetup, 101, node.ID, structs.NodeStatusDown, time.Now().UnixNano(), nil)) // Try to update alloc. updatedAlloc := new(structs.Allocation) @@ -3059,31 +3047,127 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var allocUpdateResp structs.NodeAllocsResponse - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "not allowed to update allocs") + err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) + must.ErrorContains(t, err, "not allowed to update allocs") // Send request without an explicit node ID. updatedAlloc.NodeID = "" err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "missing node ID") + must.ErrorContains(t, err, "missing node ID") // Send request with invalid node ID. updatedAlloc.NodeID = "not-valid" err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "node lookup failed") + must.ErrorContains(t, err, "node lookup failed") // Send request with non-existing node ID. updatedAlloc.NodeID = uuid.Generate() err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "not found") + must.ErrorContains(t, err, "not found") // Mark node as ready and try again. - err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 102, node.ID, structs.NodeStatusReady, time.Now().UnixNano(), nil) - require.NoError(t, err) + must.NoError(t, store.UpdateNodeStatus( + structs.MsgTypeTestSetup, 102, node.ID, structs.NodeStatusReady, time.Now().UnixNano(), nil)) updatedAlloc.NodeID = node.ID - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.NoError(t, err) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)) +} + +func TestNode_UpdateAllocServiceRegistrations(t *testing.T) { + ci.Parallel(t) + + srv, cleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + + defer cleanup() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + + store := srv.fsm.State() + index := uint64(100) + + // Inject mock node, job, allocations for that job, and service + // registrations for those allocs + node := mock.Node() + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node)) + + job := mock.Job() + job.ID = "mytestjob" + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + + alloc0 := mock.Alloc() + alloc0.JobID = job.ID + alloc0.NodeID = node.ID + index++ + must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc0.JobID))) + alloc0.TaskGroup = job.TaskGroups[0].Name + + alloc1 := mock.Alloc() + alloc1.JobID = job.ID + alloc1.NodeID = node.ID + index++ + must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID))) + alloc1.TaskGroup = job.TaskGroups[0].Name + + alloc2 := mock.Alloc() // will have no service registration + alloc2.JobID = job.ID + alloc2.NodeID = node.ID + index++ + must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID))) + alloc2.TaskGroup = job.TaskGroups[0].Name + + index++ + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc0, alloc1, alloc2})) + + serviceFor := func(allocID string, port int) *structs.ServiceRegistration { + return &structs.ServiceRegistration{ + ID: fmt.Sprintf("_nomad-task-%s-group-api-countdash-api-http", allocID), + ServiceName: "countdash-api", + Namespace: job.Namespace, + NodeID: node.ID, + Datacenter: node.Datacenter, + JobID: job.ID, + AllocID: allocID, + Tags: []string{"bar"}, + Address: "192.168.200.200", + Port: port, + } + } + + service0 := serviceFor(alloc0.ID, 29001) + service1 := serviceFor(alloc1.ID, 29002) + index++ + must.NoError(t, store.UpsertServiceRegistrations( + structs.MsgTypeTestSetup, index, []*structs.ServiceRegistration{service0, service1})) + + // no-op + update := &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc0, alloc1, alloc2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.NodeAllocsResponse + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp)) + + services, err := store.GetServiceRegistrationsByNodeID(nil, node.ID) + must.NoError(t, err) + must.Len(t, 2, services, must.Sprint("no-op update should not have deleted services")) + + // fail one allocation + alloc0 = alloc0.Copy() + alloc0.ClientStatus = structs.AllocClientStatusFailed + update = &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc0, alloc1, alloc2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp)) + + services, err = store.GetServiceRegistrationsByNodeID(nil, node.ID) + must.NoError(t, err) + must.Eq(t, []*structs.ServiceRegistration{service1}, services, + must.Sprint("failing an allocation should result in its service being deleted")) } func TestClientEndpoint_BatchUpdate(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index ac1fbeeecf26..abeb5147928c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1131,6 +1131,12 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil { return fmt.Errorf("index update failed: %v", err) } + + // Deregister any services on the node in the same transaction + if copyNode.Status == structs.NodeStatusDown { + s.deleteServiceRegistrationByNodeIDTxn(txn, txn.Index, copyNode.ID) + } + return nil } @@ -3615,6 +3621,10 @@ func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitia // Mark that we have made a successful modification to the allocs // table. allocsTableUpdated = true + + if err := s.deleteServiceRegistrationByAllocIDTxn(txn, index, alloc); err != nil { + return fmt.Errorf("service registration delete for alloc failed: %v", err) + } } // Update the indexes @@ -3961,6 +3971,13 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc * if err := s.setJobStatuses(index, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } + + if copyAlloc.ClientTerminalStatus() { + if err := s.deleteServiceRegistrationByAllocIDTxn(txn, index, copyAlloc.ID); err != nil { + return err + } + } + return nil } diff --git a/nomad/state/state_store_service_registration.go b/nomad/state/state_store_service_registration.go index f4f7beae6516..72390e420387 100644 --- a/nomad/state/state_store_service_registration.go +++ b/nomad/state/state_store_service_registration.go @@ -129,11 +129,25 @@ func (s *StateStore) DeleteServiceRegistrationByNodeID( txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() - num, err := txn.DeleteAll(TableServiceRegistrations, indexNodeID, nodeID) + err := s.deleteServiceRegistrationByNodeIDTxn(txn, index, nodeID) if err != nil { return fmt.Errorf("deleting service registrations failed: %v", err) } + return txn.Commit() +} + +// deleteServiceRegistrationByNodeIDTxn deletes all service registrations that +// belong on a single node, in an existing transaction. If there are no +// registrations tied to the nodeID, the call will noop without an error. +func (s *StateStore) deleteServiceRegistrationByNodeIDTxn( + txn *txn, index uint64, nodeID string) error { + + num, err := txn.DeleteAll(TableServiceRegistrations, indexNodeID, nodeID) + if err != nil { + return err + } + // If we did not delete any entries, do not update the index table. // Otherwise, update the table with the latest index. switch num { @@ -144,8 +158,31 @@ func (s *StateStore) DeleteServiceRegistrationByNodeID( return fmt.Errorf("index update failed: %v", err) } } + return nil +} - return txn.Commit() +// deleteServiceRegistrationByAllocIDTxn deletes all service registrations that +// belong to an allocation, in an existing transaction. If there are no +// registrations tied to the alloc ID, the call will noop without an error. +func (s *StateStore) deleteServiceRegistrationByAllocIDTxn( + txn *txn, index uint64, allocID string) error { + + num, err := txn.DeleteAll(TableServiceRegistrations, indexAllocID, allocID) + if err != nil { + return err + } + + // If we did not delete any entries, do not update the index table. + // Otherwise, update the table with the latest index. + switch num { + case 0: + return nil + default: + if err := txn.Insert(tableIndex, &IndexEntry{TableServiceRegistrations, index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + } + return nil } // GetServiceRegistrations returns an iterator that contains all service