Skip to content

Commit

Permalink
[envoyproxy#583][Sotw] Ensure resources are properly sent again if en…
Browse files Browse the repository at this point in the history
…voy unsubscribes then subscribes again to a resource

Fix potential deadlock in sotw-ads related to improper cleanup of watches in Linear cache when using delta in non-wildcard
Fix improper request set on sotw responses in Linear cache
Replaced lastResponse in sotw server by staged resources pending ACK

Signed-off-by: Valerian Roche <valerian.roche@datadoghq.com>
  • Loading branch information
valerian-roche committed Aug 25, 2022
1 parent af7a06d commit 4e00420
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 116 deletions.
20 changes: 20 additions & 0 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type Response interface {
// Get the version in the Response.
GetVersion() (string, error)

// Get the list of resources part of the response without having to cast resources
GetResourceNames() []string

// Get the context provided during response creation.
GetContext() context.Context
}
Expand Down Expand Up @@ -142,6 +145,9 @@ type RawResponse struct {
// Resources to be included in the response.
Resources []types.ResourceWithTTL

// Names of the resources included in the response
ResourceNames []string

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
// This allows for more lightweight updates that server only to update the TTL timer.
Expand Down Expand Up @@ -191,6 +197,9 @@ type PassthroughResponse struct {
// The discovery response that needs to be sent as is, without any marshaling transformations.
DiscoveryResponse *discovery.DiscoveryResponse

// Names of the resources set in the response
ResourceNames []string

ctx context.Context
}

Expand Down Expand Up @@ -288,6 +297,12 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover
return marshaledResponse.(*discovery.DeltaDiscoveryResponse), nil
}

// GetResourceNames returns the list of resources returned within the response
// without having to decode the resources
func (r *RawResponse) GetResourceNames() []string {
return r.ResourceNames
}

// GetRequest returns the original Discovery Request.
func (r *RawResponse) GetRequest() *discovery.DiscoveryRequest {
return r.Request
Expand Down Expand Up @@ -350,6 +365,11 @@ func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryRespon
return r.DiscoveryResponse, nil
}

// GetResourceNames returns the list of resources included within the response
func (r *PassthroughResponse) GetResourceNames() []string {
return r.ResourceNames
}

// GetDeltaDiscoveryResponse returns the final passthrough Delta Discovery Response.
func (r *DeltaPassthroughResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) {
return r.DeltaDiscoveryResponse, nil
Expand Down
85 changes: 63 additions & 22 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/log"
)

type watches = map[chan Response]struct{}
type watches = map[ResponseWatch]struct{}

// LinearCache supports collections of opaque resources. This cache has a
// single collection indexed by resource names and manages resource versions
Expand Down Expand Up @@ -112,45 +112,57 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
return out
}

func (cache *LinearCache) respond(value chan Response, staleResources []string) {
func (cache *LinearCache) respond(req *Request, value chan Response, staleResources []string) {
var resources []types.ResourceWithTTL
var resourceNames []string
// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
for _, resource := range cache.resources {
resourceNames = make([]string, 0, len(cache.resources))
for name, resource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
resourceNames = append(resourceNames, name)
}
} else {
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
resourceNames = make([]string, 0, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
resourceNames = append(resourceNames, name)
}
}
}
value <- &RawResponse{
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
Version: cache.getVersion(),
Ctx: context.Background(),
Request: &Request{TypeUrl: cache.typeURL},
Resources: resources,
ResourceNames: resourceNames,
Version: cache.getVersion(),
Ctx: context.Background(),
}
}

func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
// de-duplicate watches that need to be responded
notifyList := make(map[chan Response][]string)
notifyList := make(map[ResponseWatch][]string)
for name := range modified {
for watch := range cache.watches[name] {
notifyList[watch] = append(notifyList[watch], name)

// Make sure we clean the watch for ALL resources it might be associated with,
// as the channel will no longer be listened to
for _, resource := range watch.Request.ResourceNames {
delete(cache.watches[resource], watch)
}
}
delete(cache.watches, name)
}
for value, stale := range notifyList {
cache.respond(value, stale)
for watch, stale := range notifyList {
cache.respond(watch.Request, watch.Response, stale)
}
for value := range cache.watchAll {
cache.respond(value, nil)
for watch := range cache.watchAll {
cache.respond(watch.Request, watch.Response, nil)
}
cache.watchAll = make(watches)

Expand Down Expand Up @@ -306,7 +318,7 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
// been updated between the last version and the current version. This avoids the problem
// of sending empty updates whenever an irrelevant resource changes.
stale := false
staleResources := []string{} // empty means all
var staleResources []string // empty means all

// strip version prefix if it is present
var lastVersion uint64
Expand All @@ -323,45 +335,74 @@ func (cache *LinearCache) CreateWatch(request *Request, clientState ClientState,
if err != nil {
stale = true
staleResources = request.ResourceNames
} else if len(request.ResourceNames) == 0 {
if cache.log != nil {
cache.log.Debugf("Watch is stale as version failed to parse %s", err.Error())
}
} else if clientState.IsWildcard() {
stale = lastVersion != cache.version
if cache.log != nil {
cache.log.Debugf("Watch is stale as version differs for wildcard watch")
}
} else {
// Non wildcard case, we only reply resources that have effectively changed since the version set in the request
// This is used for instance in EDS
for _, name := range request.ResourceNames {
// When a resource is removed, its version defaults 0 and it is not considered stale.
if lastVersion < cache.versionVector[name] {
// The resource does not exist currently, we won't reply for it
if resourceVersion, ok := cache.versionVector[name]; !ok {
continue
} else if lastVersion < resourceVersion {
// The version of the request is older than the last change for the resource, return it
stale = true
staleResources = append(staleResources, name)
} else if _, ok := clientState.GetKnownResources()[name]; !ok {
// Resource is not currently known by the client (e.g. a resource is added in the resourceNames)
stale = true
staleResources = append(staleResources, name)
}
}
if cache.log != nil && stale {
cache.log.Debugf("Watch is stale with stale resources %v", staleResources)
}
}
if stale {
cache.respond(value, staleResources)
cache.respond(request, value, staleResources)
return nil
}
// Create open watches since versions are up to date.
if len(request.ResourceNames) == 0 {
cache.watchAll[value] = struct{}{}
watch := ResponseWatch{request, value}
if clientState.IsWildcard() {
if cache.log != nil {
cache.log.Infof("[linear cache] open watch for %s all resources, system version %q",
cache.typeURL, cache.getVersion())
}
cache.watchAll[watch] = struct{}{}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, value)
delete(cache.watchAll, watch)
}
}

// Non-wildcard case
if cache.log != nil {
cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q",
cache.typeURL, request.ResourceNames, cache.getVersion())
}
for _, name := range request.ResourceNames {
set, exists := cache.watches[name]
if !exists {
set = make(watches)
cache.watches[name] = set
}
set[value] = struct{}{}
set[watch] = struct{}{}
}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, name := range request.ResourceNames {
set, exists := cache.watches[name]
if exists {
delete(set, value)
delete(set, watch)
}
if len(set) == 0 {
delete(cache.watches, name)
Expand Down
Loading

0 comments on commit 4e00420

Please sign in to comment.