Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve blocking queries on services that do not exist #4810

Merged
merged 1 commit into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 48 additions & 22 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (

const (
servicesTableName = "services"

// serviceLastExtinctionIndexName keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services.
serviceLastExtinctionIndexName = "service_last_extinction"
)

// nodesTableSchema returns a new table schema used for storing node
Expand Down Expand Up @@ -841,19 +845,30 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string)
}

// maxIndexForService return the maximum Raft Index for a service
// If the index is not set for the service, it will return:
// - maxIndex(nodes, services) if checks is false
// - maxIndex(nodes, services, checks) if checks is true
func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 {
transaction, err := tx.First("index", "id", serviceIndexName(serviceName))
if err == nil {
if idx, ok := transaction.(*IndexEntry); ok {
return idx.Value
}
// If the index is not set for the service, it will return the missing
// service index.
// The service_last_extinction is set to the last raft index when a service
// was unregistered (or 0 if no services were ever unregistered). This
// allows blocking queries to
// * return when the last instance of a service is removed
// * block until an instance for this service is available, or another
// service is unregistered.
func maxIndexForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) uint64 {
if !serviceExists {
res, err := tx.First("index", "id", serviceLastExtinctionIndexName)
if missingIdx, ok := res.(*IndexEntry); ok && err == nil {
return missingIdx.Value
}
}

res, err := tx.First("index", "id", serviceIndexName(serviceName))
if idx, ok := res.(*IndexEntry); ok && err == nil {
return idx.Value
}
if checks {
return maxIndexTxn(tx, "nodes", "services", "checks")
}

return maxIndexTxn(tx, "nodes", "services")
}

Expand All @@ -873,9 +888,6 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
tx := s.db.Txn(false)
defer tx.Abort()

// Get the table index.
idx := maxIndexForService(tx, serviceName, false)

// Function for lookup
var f func() (memdb.ResultIterator, error)
if !connect {
Expand Down Expand Up @@ -905,6 +917,10 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}

// Get the table index.
idx := maxIndexForService(tx, serviceName, len(results) > 0, false)

return idx, results, nil
}

Expand All @@ -914,9 +930,6 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
tx := s.db.Txn(false)
defer tx.Abort()

// Get the table index.
idx := maxIndexForService(tx, service, false)

// List all the services.
services, err := tx.Get("services", "service", service)
if err != nil {
Expand All @@ -925,9 +938,11 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
ws.Add(services.WatchCh())

// Gather all the services and apply the tag filter.
serviceExists := false
var results structs.ServiceNodes
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
serviceExists = true
if !serviceTagsFilter(svc, tags) {
results = append(results, svc)
}
Expand All @@ -938,6 +953,9 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tags []string
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}
// Get the table index.
idx := maxIndexForService(tx, service, serviceExists, false)

return idx, results, nil
}

Expand Down Expand Up @@ -1214,6 +1232,11 @@ func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID
return fmt.Errorf("[FAILED] deleting serviceIndex %s: %s", svc.ServiceName, err)
}
}

if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil {
return fmt.Errorf("failed updating missing service index: %s", err)
}

}
} else {
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
Expand Down Expand Up @@ -1438,7 +1461,7 @@ func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
defer tx.Abort()

// Get the table index.
idx := maxIndexForService(tx, serviceName, true)
idx := maxIndexForService(tx, serviceName, true, true)
// Return the checks.
iter, err := tx.Get("checks", "service", serviceName)
if err != nil {
Expand Down Expand Up @@ -1627,9 +1650,6 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
tx := s.db.Txn(false)
defer tx.Abort()

// Get the table index.
idx := maxIndexForService(tx, serviceName, true)

// Function for lookup
var f func() (memdb.ResultIterator, error)
if !connect {
Expand All @@ -1654,6 +1674,10 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
for service := iter.Next(); service != nil; service = iter.Next() {
results = append(results, service.(*structs.ServiceNode))
}

// Get the table index.
idx := maxIndexForService(tx, serviceName, len(results) > 0, true)

return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
}

Expand All @@ -1663,9 +1687,6 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
tx := s.db.Txn(false)
defer tx.Abort()

// Get the table index.
idx := maxIndexForService(tx, serviceName, true)

// Query the state store for the service.
iter, err := tx.Get("services", "service", serviceName)
if err != nil {
Expand All @@ -1674,13 +1695,18 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
ws.Add(iter.WatchCh())

// Return the results, filtering by tag.
serviceExists := false
var results structs.ServiceNodes
for service := iter.Next(); service != nil; service = iter.Next() {
svc := service.(*structs.ServiceNode)
serviceExists = true
if !serviceTagsFilter(svc, tags) {
results = append(results, svc)
}
}

// Get the table index.
idx := maxIndexForService(tx, serviceName, serviceExists, true)
return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err)
}

Expand Down
60 changes: 56 additions & 4 deletions agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2848,16 +2848,68 @@ func TestIndexIndependence(t *testing.T) {
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)

testRegisterService(t, s, 18, "node1", "service_new")
// Since service does not exists anymore, its index should be last insert
// The behaviour is the same as all non-existing services, meaning
// we properly did collect garbage
ensureServiceVersion(t, s, ws, "service_shared", 18, 0)

// Since service does not exists anymore, its index should be that of
// the last deleted service
ensureServiceVersion(t, s, ws, "service_shared", 17, 0)

// No index should exist anymore, it must have been garbage collected
ensureIndexForService(t, s, ws, "service_shared", 0)
if !watchFired(ws) {
t.Fatalf("bad")
}
}

func TestMissingServiceIndex(t *testing.T) {
s := testStateStore(t)

// Querying with no matches gives an empty response
ws := memdb.NewWatchSet()
idx, res, err := s.CheckServiceNodes(ws, "service1")
require.Nil(t, err)
require.Nil(t, res)

// index should be 0 for a non existing service at startup
require.Equal(t, uint64(0), idx)

testRegisterNode(t, s, 0, "node1")

// node operations should not affect missing service index
ensureServiceVersion(t, s, ws, "service1", 0, 0)

testRegisterService(t, s, 10, "node1", "service1")
ensureServiceVersion(t, s, ws, "service1", 10, 1)

s.DeleteService(11, "node1", "service1")
// service1 is now missing, its index is now that of the last index a service was
// deleted at
ensureServiceVersion(t, s, ws, "service1", 11, 0)

testRegisterService(t, s, 12, "node1", "service2")
ensureServiceVersion(t, s, ws, "service2", 12, 1)

// missing service index does not change even though another service have been
// registered
ensureServiceVersion(t, s, ws, "service1", 11, 0)
ensureServiceVersion(t, s, ws, "i_do_not_exist", 11, 0)

// registering a service on another node does not affect missing service
// index
testRegisterNode(t, s, 13, "node2")
testRegisterService(t, s, 14, "node2", "service3")
ensureServiceVersion(t, s, ws, "service3", 14, 1)
ensureServiceVersion(t, s, ws, "service1", 11, 0)

// unregistering a service bumps missing service index
s.DeleteService(15, "node2", "service3")
ensureServiceVersion(t, s, ws, "service3", 15, 0)
ensureServiceVersion(t, s, ws, "service2", 12, 1)
ensureServiceVersion(t, s, ws, "service1", 15, 0)
ensureServiceVersion(t, s, ws, "i_do_not_exist", 15, 0)

// registering again a missing service correctly updates its index
testRegisterService(t, s, 16, "node1", "service1")
ensureServiceVersion(t, s, ws, "service1", 16, 1)
}

func TestStateStore_CheckServiceNodes(t *testing.T) {
Expand Down