Skip to content

Commit

Permalink
Make Connect health queryies unblock correctly in all cases and use o…
Browse files Browse the repository at this point in the history
…ptimal number of watch chans. Fixes #5506.
  • Loading branch information
banks committed Mar 19, 2019
1 parent 69ac5d0 commit 8bc9759
Show file tree
Hide file tree
Showing 3 changed files with 436 additions and 64 deletions.
91 changes: 73 additions & 18 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1896,33 +1896,88 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect

// Return the results.
var results structs.ServiceNodes

// For connect queries we need a list of any proxy service names in the result
// set. Rather than have different code path for connect and non-connect, we
// use the same one in both cases. For non-empty non-connect results,
// serviceNames will always have exactly one element which is the same as
// serviceName. For Connect there might be multiple different service names -
// one for each service name a proxy is registered under, and the target
// service name IFF there is at least one Connect-native instance of that
// service. Either way there is usually only one distinct name if proxies are
// named consistently but could be multiple.
serviceNames := make(map[string]struct{}, 2)
for service := iter.Next(); service != nil; service = iter.Next() {
results = append(results, service.(*structs.ServiceNode))
sn := service.(*structs.ServiceNode)
results = append(results, sn)
serviceNames[sn.ServiceName] = struct{}{}
}

// watchOptimized tracks if we meet the necessary condition to optimize
// WatchSet size. That is that every service name represented in the result
// set must have a service-specific index we can watch instead of many radix
// nodes for all the actual nodes touched. This saves us watching potentially
// thousands of watch chans for large services which may need many goroutines.
// It also avoids the performance cliff that is hit when watchLimit is hit
// (~682 service instances). See
// https://github.com/hashicorp/consul/issues/4984
watchOptimized := false
idx := uint64(0)
if len(serviceNames) > 0 {
// Assume optimization will work since it really should at this point. For
// safety we'll sanity check this below for each service name.
watchOptimized = true

// Fetch indexes for all names services in result set.
for svcName := range serviceNames {
// We know service values should exist since the serviceNames map is only
// populated if there is at least one result above. so serviceExists arg
// below is always true.
svcIdx, svcCh := maxIndexAndWatchChForService(tx, svcName, true, true)
// Take the max index represented
if idx < svcIdx {
idx = svcIdx
}
if svcCh != nil {
// Watch the service-specific index for changes in liu of all iradix nodes
// for checks etc.
ws.Add(svcCh)
} else {
// No svcCh shouldn't really happen by construction but just in case it
// does due to a bug, fall back to the more expensive old way of
// watching every node we touch.
watchOptimized = false
}
}
} else {
// If we have no results, we should use the index of the last service
// extinction event so we don't go backwards when services de-register. We
// use target serviceName here but it actually doesn't matter. No chan will
// be returned as we can't use the optimization in this case (and don't need
// to as there is only one chan to watch anyway).
idx, _ = maxIndexAndWatchChForService(tx, serviceName, false, true)
}

// Get the table index.
idx, ch := maxIndexAndWatchChForService(tx, serviceName, len(results) > 0, true)

// Create a nil watchset to pass below, we'll only pass the real one if we
// need to. Nil watchers are safe/allowed and saves some allocation too.
var fallbackWS memdb.WatchSet
if ch == nil {
// There was no explicit channel returned that corresponds to the service
// index. That means we need to fallback to watching everything we touch in
// the DB as normal. We plumb the caller's watchset through (note it's a map
// so this is a by-reference assignment.)
if !watchOptimized {
// We weren't able to use the optimization of watching only service indexes
// for some reason. That means we need to fallback to watching everything we
// touch in the DB as normal. We plumb the caller's watchset through (note
// it's a map so this is a by-reference assignment.)
fallbackWS = ws
// We also need to watch the iterator from earlier too.
fallbackWS.Add(iter.WatchCh())
} else {
// There was a valid service index, and non-empty result. In this case it is
// sufficient just to watch the service index's chan since that _must_ be
// written to if the result of this method is going to change. This saves us
// watching potentially thousands of watch chans for large services which
// may need many goroutines. It also avoid the performance cliff that is hit
// when watchLimit is hit (~682 service instances). See
// https://github.com/hashicorp/consul/issues/4984
ws.Add(ch)
} else if connect {
// If this is a connect query then there is a subtlety to watch out for.
// In addition to watching the proxy service indexes for changes above, we
// need to still keep an eye on the connect service index in case a new
// proxy with a new name registers - we are only watching proxy service
// names we know about above so we'd miss that otherwise. Thankfully this
// is only ever one extra chan to watch and will catch any changes to
// proxy registrations for this target service.
ws.Add(iter.WatchCh())
}

return s.parseCheckServiceNodes(tx, fallbackWS, idx, serviceName, results, err)
Expand Down
Loading

0 comments on commit 8bc9759

Please sign in to comment.