From 0f4a0a933e09013bc42634debe5f02517e086cb7 Mon Sep 17 00:00:00 2001 From: Thibault Gilles Date: Wed, 17 Oct 2018 11:06:26 +0200 Subject: [PATCH] Improve blocking queries on services that do not exist When making a blocking query on a missing service (was never registered, or is not registered anymore) the query returns as soon as any service is updated. On clusters with frequent updates these queries virtually do not block, and clients with no protections againt this waste ressources on the agent and server side. Clients that do protect against this get updates later than they should because of the backoff time they implement between requests. To remedy that we track the last index when a service was last unregistered and instead block on it when a query targets a missing service. This reduces the number of unnecessary updates and still correctly notify client when a change do happen. --- agent/consul/state/catalog.go | 70 ++++++++++++++++++++---------- agent/consul/state/catalog_test.go | 60 +++++++++++++++++++++++-- 2 files changed, 104 insertions(+), 26 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 162eed79cf66..02cec7fe0bab 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -13,6 +13,10 @@ import ( const ( servicesTableName = "services" + + // serviceLastExtinctionIndexName keeps track of the last raft index when the last instance + // of any service was unregistered. This is used by blocking queries on missing services. + serviceLastExtinctionIndexName = "service_last_extinction" ) // nodesTableSchema returns a new table schema used for storing node @@ -841,19 +845,30 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) } // maxIndexForService return the maximum Raft Index for a service -// If the index is not set for the service, it will return: -// - maxIndex(nodes, services) if checks is false -// - maxIndex(nodes, services, checks) if checks is true -func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 { - transaction, err := tx.First("index", "id", serviceIndexName(serviceName)) - if err == nil { - if idx, ok := transaction.(*IndexEntry); ok { - return idx.Value - } +// If the index is not set for the service, it will return the missing +// service index. +// The service_last_extinction is set to the last raft index when a service +// was unregistered (or 0 if no services were ever unregistered). This +// allows blocking queries to +// * return when the last instance of a service is removed +// * block until an instance for this service is available, or another +// service is unregistered. +func maxIndexForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) uint64 { + if !serviceExists { + res, err := tx.First("index", "id", serviceLastExtinctionIndexName) + if missingIdx, ok := res.(*IndexEntry); ok && err == nil { + return missingIdx.Value + } + } + + res, err := tx.First("index", "id", serviceIndexName(serviceName)) + if idx, ok := res.(*IndexEntry); ok && err == nil { + return idx.Value } if checks { return maxIndexTxn(tx, "nodes", "services", "checks") } + return maxIndexTxn(tx, "nodes", "services") } @@ -873,9 +888,6 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool tx := s.db.Txn(false) defer tx.Abort() - // Get the table index. - idx := maxIndexForService(tx, serviceName, false) - // Function for lookup var f func() (memdb.ResultIterator, error) if !connect { @@ -905,6 +917,10 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } + + // Get the table index. + idx := maxIndexForService(tx, serviceName, len(results) > 0, false) + return idx, results, nil } @@ -914,9 +930,6 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string tx := s.db.Txn(false) defer tx.Abort() - // Get the table index. - idx := maxIndexForService(tx, service, false) - // List all the services. services, err := tx.Get("services", "service", service) if err != nil { @@ -925,9 +938,11 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string ws.Add(services.WatchCh()) // Gather all the services and apply the tag filter. + serviceExists := false var results structs.ServiceNodes for service := services.Next(); service != nil; service = services.Next() { svc := service.(*structs.ServiceNode) + serviceExists = true if !serviceTagsFilter(svc, tags) { results = append(results, svc) } @@ -938,6 +953,9 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string if err != nil { return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) } + // Get the table index. + idx := maxIndexForService(tx, service, serviceExists, false) + return idx, results, nil } @@ -1214,6 +1232,11 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err) } } + + if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil { + return fmt.Errorf("failed updating missing service index: %s", err) + } + } } else { return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err) @@ -1438,7 +1461,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string, defer tx.Abort() // Get the table index. - idx := maxIndexForService(tx, serviceName, true) + idx := maxIndexForService(tx, serviceName, true, true) // Return the checks. iter, err := tx.Get("checks", "service", serviceName) if err != nil { @@ -1627,9 +1650,6 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect tx := s.db.Txn(false) defer tx.Abort() - // Get the table index. - idx := maxIndexForService(tx, serviceName, true) - // Function for lookup var f func() (memdb.ResultIterator, error) if !connect { @@ -1654,6 +1674,10 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect for service := iter.Next(); service != nil; service = iter.Next() { results = append(results, service.(*structs.ServiceNode)) } + + // Get the table index. + idx := maxIndexForService(tx, serviceName, len(results) > 0, true) + return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err) } @@ -1663,9 +1687,6 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags tx := s.db.Txn(false) defer tx.Abort() - // Get the table index. - idx := maxIndexForService(tx, serviceName, true) - // Query the state store for the service. iter, err := tx.Get("services", "service", serviceName) if err != nil { @@ -1674,13 +1695,18 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags ws.Add(iter.WatchCh()) // Return the results, filtering by tag. + serviceExists := false var results structs.ServiceNodes for service := iter.Next(); service != nil; service = iter.Next() { svc := service.(*structs.ServiceNode) + serviceExists = true if !serviceTagsFilter(svc, tags) { results = append(results, svc) } } + + // Get the table index. + idx := maxIndexForService(tx, serviceName, serviceExists, true) return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 3b1314f960da..2f932a053651 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -2848,16 +2848,68 @@ func TestIndexIndependence(t *testing.T) { ensureServiceVersion(t, s, ws, "service_shared", 17, 0) testRegisterService(t, s, 18, "node1", "service_new") - // Since service does not exists anymore, its index should be last insert - // The behaviour is the same as all non-existing services, meaning - // we properly did collect garbage - ensureServiceVersion(t, s, ws, "service_shared", 18, 0) + + // Since service does not exists anymore, its index should be that of + // the last deleted service + ensureServiceVersion(t, s, ws, "service_shared", 17, 0) + // No index should exist anymore, it must have been garbage collected ensureIndexForService(t, s, ws, "service_shared", 0) if !watchFired(ws) { t.Fatalf("bad") } +} + +func TestMissingServiceIndex(t *testing.T) { + s := testStateStore(t) + + // Querying with no matches gives an empty response + ws := memdb.NewWatchSet() + idx, res, err := s.CheckServiceNodes(ws, "service1") + require.Nil(t, err) + require.Nil(t, res) + + // index should be 0 for a non existing service at startup + require.Equal(t, uint64(0), idx) + + testRegisterNode(t, s, 0, "node1") + + // node operations should not affect missing service index + ensureServiceVersion(t, s, ws, "service1", 0, 0) + + testRegisterService(t, s, 10, "node1", "service1") + ensureServiceVersion(t, s, ws, "service1", 10, 1) + s.DeleteService(11, "node1", "service1") + // service1 is now missing, its index is now that of the last index a service was + // deleted at + ensureServiceVersion(t, s, ws, "service1", 11, 0) + + testRegisterService(t, s, 12, "node1", "service2") + ensureServiceVersion(t, s, ws, "service2", 12, 1) + + // missing service index does not change even though another service have been + // registered + ensureServiceVersion(t, s, ws, "service1", 11, 0) + ensureServiceVersion(t, s, ws, "i_do_not_exist", 11, 0) + + // registering a service on another node does not affect missing service + // index + testRegisterNode(t, s, 13, "node2") + testRegisterService(t, s, 14, "node2", "service3") + ensureServiceVersion(t, s, ws, "service3", 14, 1) + ensureServiceVersion(t, s, ws, "service1", 11, 0) + + // unregistering a service bumps missing service index + s.DeleteService(15, "node2", "service3") + ensureServiceVersion(t, s, ws, "service3", 15, 0) + ensureServiceVersion(t, s, ws, "service2", 12, 1) + ensureServiceVersion(t, s, ws, "service1", 15, 0) + ensureServiceVersion(t, s, ws, "i_do_not_exist", 15, 0) + + // registering again a missing service correctly updates its index + testRegisterService(t, s, 16, "node1", "service1") + ensureServiceVersion(t, s, ws, "service1", 16, 1) } func TestStateStore_CheckServiceNodes(t *testing.T) {