diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 51a03da0ce..69eda9b921 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -66,7 +66,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func()) + CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (delayedResponse bool, cancel func()) } // ConfigFetcher fetches configuration resources from cache diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 23c7ddc282..44d84cfcb7 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -382,7 +382,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va } } -func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (bool, func()) { cache.mu.Lock() defer cache.mu.Unlock() @@ -412,10 +412,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} - return cache.cancelDeltaWatch(watchID) + return false, cache.cancelDeltaWatch(watchID) } - return nil + return false, nil } func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index 9fdfb090d6..7f1b9813d5 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -47,12 +47,12 @@ func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, val return cache.CreateWatch(request, state, value) } -func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (bool, func()) { key := mux.ClassifyDelta(request) cache, exists := mux.Caches[key] if !exists { value <- nil - return nil + return false, nil } return cache.CreateDeltaWatch(request, state, value) } diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 29fa2d8ab9..2180468fee 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -313,7 +313,7 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string info.mu.Lock() // Respond to delta watches for the node. - err := cache.respondDeltaWatches(ctx, info, snapshot) + err := cache.respondDeltaWatches(ctx, info, s) if err != nil { info.mu.Unlock() continue @@ -904,7 +904,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] } // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (bool, func()) { nodeID := cache.hash.ID(request.GetNode()) t := request.GetTypeUrl() @@ -934,12 +934,12 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err) } // We don't need to respond. We're handling this in a better way in ads. - // response, err := cache.respondDelta(context.Background(), snapshot, request, value, state) + response, err := cache.respondDelta(context.Background(), snapshot, request, value, state) if err != nil { cache.log.Errorf("failed to respond with delta response: %s", err) } - delayedResponse = true // response == nil + delayedResponse = response == nil } if delayedResponse { @@ -952,10 +952,10 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream } info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) - return cache.cancelDeltaWatch(nodeID, watchID) + return delayedResponse, cache.cancelDeltaWatch(nodeID, watchID) } - return nil + return false, nil } func GetEnvoyNodeStr(node *core.Node) string {