diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 51a03da0ce..42d53f6b33 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -66,7 +66,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func()) + CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func(), delayedResponse bool) } // ConfigFetcher fetches configuration resources from cache @@ -127,7 +127,7 @@ type RawResponse struct { Version string // Resources to be included in the response. - Resources []types.ResourceWithTTL + Resources []VTMarshaledResource // Whether this is a heartbeat response. For xDS versions that support TTL, this // will be converted into a response that doesn't contain the actual resource protobuf. @@ -151,7 +151,7 @@ type RawDeltaResponse struct { SystemVersionInfo string // Resources to be included in the response. - Resources []types.ResourceWithTTL + Resources []VTMarshaledResource // RemovedResources is a list of resource aliases which should be dropped by the consuming client. RemovedResources []string @@ -212,17 +212,9 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro marshaledResources := make([]*anypb.Any, len(r.Resources)) for i, resource := range r.Resources { - maybeTtldResource, resourceType, err := r.maybeCreateTTLResource(resource) - if err != nil { - return nil, err - } - marshaledResource, err := MarshalResource(maybeTtldResource) - if err != nil { - return nil, err - } marshaledResources[i] = &anypb.Any{ - TypeUrl: resourceType, - Value: marshaledResource, + TypeUrl: r.GetRequest().GetTypeUrl(), + Value: resource.Resource, } } @@ -243,19 +235,14 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro // This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load. func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) { marshaledResponse := r.marshaledResponse.Load() - if marshaledResponse == nil { marshaledResources := make([]*discovery.Resource, 0) for _, resource := range r.Resources { - name := GetResourceName(resource.Resource) + name := resource.Name if name == "" { continue } - marshaledResource, err := MarshalResource(resource.Resource) - if err != nil { - return nil, err - } if resource.Version == "" { return nil, errors.New("failed to get a resource hash") @@ -264,7 +251,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover Name: name, Resource: &anypb.Any{ TypeUrl: r.GetDeltaRequest().GetTypeUrl(), - Value: marshaledResource, + Value: resource.Resource, }, Version: resource.Version, }) @@ -278,7 +265,6 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover } r.marshaledResponse.Store(marshaledResponse) } - return marshaledResponse.(*discovery.DeltaDiscoveryResponse), nil } diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index 27a7c6517c..9424938966 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -18,13 +18,12 @@ import ( "context" "strings" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // groups together resource-related arguments for the createDeltaResponse function type resourceContainer struct { - resourceMap map[string]types.ResourceWithTTL + resourceMap map[string]VTMarshaledResource versionMap map[string]string systemVersion string } @@ -66,7 +65,7 @@ func containsPrefixedKey(data map[string]string, keyLike string) ([]string, bool return resNames, len(resNames) > 0 } -func containsPrefixedKeyResources(data map[string]types.ResourceWithTTL, keyLike string) ([]string, bool) { +func containsPrefixedKeyResources(data map[string]VTMarshaledResource, keyLike string) ([]string, bool) { resNames := make([]string, 0) for key := range data { if strings.Contains(key, keyLike) { @@ -79,11 +78,11 @@ func containsPrefixedKeyResources(data map[string]types.ResourceWithTTL, keyLike func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string - var filtered map[string]types.ResourceWithTTL + var filtered map[string]VTMarshaledResource var toRemove []string switch { case state.IsWildcard(): - filtered = make(map[string]types.ResourceWithTTL) + filtered = make(map[string]VTMarshaledResource) nextVersionMap = make(map[string]string, len(resources.resourceMap)) for name, r := range resources.resourceMap { // Since we've already precomputed the version hashes of the new snapshot, @@ -104,7 +103,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St } } default: - filtered = make(map[string]types.ResourceWithTTL) + filtered = make(map[string]VTMarshaledResource) nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map @@ -121,7 +120,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St if r, ok := resources.resourceMap[versionName]; ok { nextVersion := resources.versionMap[versionName] if prevVersion != nextVersion { - filtered[GetResourceName(r.Resource)] = r + filtered[r.Name] = r } nextVersionMap[versionName] = nextVersion } else if found { @@ -133,7 +132,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { - filtered[GetResourceName(r.Resource)] = r + filtered[r.Name] = r } nextVersionMap[name] = nextVersion } else if found { @@ -143,8 +142,8 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St } } - filteredResources := make([]types.ResourceWithTTL, 0) - filteredResourceNames := make([]string, 0) + filteredResources := make([]VTMarshaledResource, len(filtered)) + filteredResourceNames := make([]string, len(filtered)) for name, r := range filtered { filteredResources = append(filteredResources, r) filteredResourceNames = append(filteredResourceNames, name) diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 23c7ddc282..ad4e2eecde 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -114,19 +114,27 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { } func (cache *LinearCache) respond(value chan Response, staleResources []string) { - var resources []types.ResourceWithTTL + var resources []VTMarshaledResource // TODO: optimize the resources slice creations across different clients if len(staleResources) == 0 { - resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) + resources = make([]VTMarshaledResource, 0, len(cache.resources)) for _, resource := range cache.resources { - resources = append(resources, types.ResourceWithTTL{Resource: resource}) + out, err := resource.MarshalVTStrict() + if err != nil { + continue + } + resources = append(resources, VTMarshaledResource{Resource: out}) } } else { - resources = make([]types.ResourceWithTTL, 0, len(staleResources)) + resources = make([]VTMarshaledResource, 0, len(staleResources)) for _, name := range staleResources { resource := cache.resources[name] if resource != nil { - resources = append(resources, types.ResourceWithTTL{Resource: resource}) + out, err := resource.MarshalVTStrict() + if err != nil { + continue + } + resources = append(resources, VTMarshaledResource{Resource: out}) } } } @@ -180,10 +188,16 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe // We converted types.Resource to types.ResourceWithTTL in Snapshot cache. // In order to avoid affects in any other types of caches, we loop and convert // back types.Resource to types.ResourceWithTTL - resources := make(map[string]types.ResourceWithTTL) + resources := make(map[string]VTMarshaledResource) for name, resource := range cache.resources { - resources[name] = types.ResourceWithTTL{ - Resource: resource, + out, err := resource.MarshalVTStrict() + if err != nil { + continue + } + resources[name] = VTMarshaledResource{ + Name: name, + Resource: out, + Version: cache.versionMap[name], } } @@ -382,7 +396,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va } } -func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (func(), bool) { cache.mu.Lock() defer cache.mu.Unlock() @@ -412,10 +426,10 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} - return cache.cancelDeltaWatch(watchID) + return cache.cancelDeltaWatch(watchID), true } - return nil + return nil, false } func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index 9fdfb090d6..56d2addbbf 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -47,12 +47,12 @@ func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, val return cache.CreateWatch(request, state, value) } -func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (func(), bool) { key := mux.ClassifyDelta(request) cache, exists := mux.Caches[key] if !exists { value <- nil - return nil + return nil, true } return cache.CreateDeltaWatch(request, state, value) } diff --git a/pkg/cache/v3/resource.go b/pkg/cache/v3/resource.go index 460b22dc96..087ea47f1c 100644 --- a/pkg/cache/v3/resource.go +++ b/pkg/cache/v3/resource.go @@ -113,10 +113,10 @@ func GetResourceNames(resources []types.Resource) []string { } // GetResourceWithTTLNames returns the resource names for a list of valid xDS response types. -func GetResourceWithTTLNames(resources []types.ResourceWithTTL) []string { +func GetResourceWithTTLNames(resources []VTMarshaledResource) []string { out := make([]string, len(resources)) for i, r := range resources { - out[i] = GetResourceName(r.Resource) + out[i] = r.Name } return out } @@ -139,19 +139,20 @@ func GetResourceReferences(resources map[string]types.ResourceWithTTL) map[resou func GetAllResourceReferences(resourceGroups [types.UnknownType]Resources) map[resource.Type]map[string]bool { ret := map[resource.Type]map[string]bool{} + // Commenting the following as we switch to using VTMarshaledResource // We only check resources that we expect to have references to other resources. - responseTypesWithReferences := map[types.ResponseType]struct{}{ - types.Cluster: {}, - types.Listener: {}, - types.ScopedRoute: {}, - } - - for responseType, resourceGroup := range resourceGroups { - if _, ok := responseTypesWithReferences[types.ResponseType(responseType)]; ok { - items := resourceGroup.Items - getResourceReferences(items, ret) - } - } + //responseTypesWithReferences := map[types.ResponseType]struct{}{ + // types.Cluster: {}, + // types.Listener: {}, + // types.ScopedRoute: {}, + //} + // + //for responseType, resourceGroup := range resourceGroups { + // if _, ok := responseTypesWithReferences[types.ResponseType(responseType)]; ok { + // items := resourceGroup.Items + // getResourceReferences(items, ret) + // } + //} return ret } diff --git a/pkg/cache/v3/resources.go b/pkg/cache/v3/resources.go index 60f0ea09f4..862e29064c 100644 --- a/pkg/cache/v3/resources.go +++ b/pkg/cache/v3/resources.go @@ -1,6 +1,10 @@ package cache -import "github.com/envoyproxy/go-control-plane/pkg/cache/types" +import ( + "fmt" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "time" +) // Resources is a versioned group of resources. type Resources struct { @@ -8,7 +12,32 @@ type Resources struct { Version string // Items in the group indexed by name. - Items map[string]types.ResourceWithTTL + Items map[string]VTMarshaledResource +} + +type VTMarshaledResource struct { + Name string + Version string + Resource []byte + TTL *time.Duration +} + +// IndexAndMarshalResourcesByName creates a map from the resource name to the marshaled resource. +func IndexAndMarshalResourcesByName(items []types.ResourceWithTTL) map[string]VTMarshaledResource { + indexed := make(map[string]VTMarshaledResource, len(items)) + for _, item := range items { + out, err := item.Resource.MarshalVTStrict() + if err != nil { + fmt.Printf("failed to MarshalVTStrict resource %s: %v\n", GetResourceName(item.Resource), err) + continue + } + indexed[GetResourceName(item.Resource)] = VTMarshaledResource{ + Name: GetResourceName(item.Resource), + Version: item.Version, + Resource: out, + } + } + return indexed } // IndexResourcesByName creates a map from the resource name to the resource. @@ -42,6 +71,6 @@ func NewResources(version string, items []types.Resource) Resources { func NewResourcesWithTTL(version string, items []types.ResourceWithTTL) Resources { return Resources{ Version: version, - Items: IndexResourcesByName(items), + Items: IndexAndMarshalResourcesByName(items), } } diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 29fa2d8ab9..07897cc18f 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -18,10 +18,8 @@ import ( "context" "fmt" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -43,12 +41,12 @@ type ResourceSnapshot interface { // GetResourcesAndTTL returns all resources of the type indicted by // typeURL, together with their TTL. - GetResourcesAndTTL(typeURL string) map[string]types.ResourceWithTTL + GetResourcesAndTTL(typeURL string) map[string]VTMarshaledResource // GetResources returns all resources of the type indicted by // typeURL. This is identical to GetResourcesAndTTL, except that // the TTL is omitted. - GetResources(typeURL string) map[string]types.Resource + GetResources(typeURL string) map[string]VTMarshaledResource // ConstructVersionMap is a hint that a delta watch will soon make a // call to GetVersionMap. The snapshot should construct an internal @@ -212,7 +210,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) // TODO(snowp): Construct this once per type instead of once per watch. - resourcesWithTTL := map[string]types.ResourceWithTTL{} + resourcesWithTTL := map[string]VTMarshaledResource{} for k, v := range resources { if v.TTL != nil { resourcesWithTTL[k] = v @@ -256,17 +254,21 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string if currentResources.Items == nil { // Fresh resources - currentResources.Items = make(map[string]types.ResourceWithTTL) + currentResources.Items = make(map[string]VTMarshaledResource) } for name, r := range resourcesUpserted { - //if typ == resource.EndpointType { - // cla := r.Resource.(*endpoint.ClusterLoadAssignment) - // if len(cla.Endpoints) == 0 { - // log2.Info().Msgf("BatchUpsertResources: Writing claname=%s endpoints=%d", cla.ClusterName, len(cla.Endpoints)) - // } - //} - currentResources.Items[name] = *r + out, err := r.Resource.MarshalVTStrict() + if err != nil { + fmt.Printf("failed to MarshalVTStrict resource %s: %v\n", name, err) + continue + } + currentResources.Items[name] = VTMarshaledResource{ + Resource: out, + Name: name, + Version: r.Version, + TTL: r.TTL, + } } // Change in version @@ -337,17 +339,21 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty if currentResources.Items == nil { // Fresh resources - currentResources.Items = make(map[string]types.ResourceWithTTL) + currentResources.Items = make(map[string]VTMarshaledResource) } for name, r := range resourcesUpserted { - //if typ == resource.EndpointType { - // cla := r.Resource.(*endpoint.ClusterLoadAssignment) - // if len(cla.Endpoints) == 0 { - // log2.Info().Msgf("UpsertResources: Writing claname=%s endpoints=%d", cla.ClusterName, len(cla.Endpoints)) - // } - //} - currentResources.Items[name] = *r + out, err := r.Resource.MarshalVTStrict() + if err != nil { + fmt.Printf("failed to MarshalVTStrict resource %s: %v\n", name, err) + continue + } + currentResources.Items[name] = VTMarshaledResource{ + Resource: out, + Name: name, + Version: r.Version, + TTL: r.TTL, + } } // Change in version @@ -411,7 +417,7 @@ func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, ty newResources := false if prevResources.Items == nil { newResources = true - prevResources.Items = make(map[string]types.ResourceWithTTL) + prevResources.Items = make(map[string]VTMarshaledResource) } currentVersion := cache.ParseSystemVersionInfo(prevResources.Version) @@ -426,7 +432,18 @@ func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, ty for k, v := range resourcesUpserted { _, ok := prevResources.Items[k] if !ok { - prevResources.Items[k] = *v + + out, err := v.Resource.MarshalVTStrict() + if err != nil { + fmt.Printf("failed to MarshalVTStrict resource %s: %v\n", k, err) + continue + } + prevResources.Items[k] = VTMarshaledResource{ + Resource: out, + Name: k, + Version: v.Version, + TTL: v.TTL, + } } } currentVersion++ @@ -484,117 +501,117 @@ func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, ty } } else if typ == resource.EndpointType { - resourceToDelete := resourcesToDeleted[0] - resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/") - serviceName := resourceToDeleteParts[4] - zone := resourceToDeleteParts[5] - portString := strings.Split(resourcesToDeleted[0], "_")[1] - claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString) - - for node_, snapshot := range cache.snapshots { - currentResources := snapshot.(*Snapshot).Resources[types.Endpoint] - if rsc, found := currentResources.Items[claName]; found { - cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) - for i, _ := range cla.Endpoints { - if cla.Endpoints[i].Locality.Zone == zone { - newEndpoints := make([]*endpoint.LbEndpoint, 0) - for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints { - if resourceToDelete == GetResourceName(lbEndpoint) { - continue - } - newEndpoints = append(newEndpoints, lbEndpoint) - } - - cla.Endpoints[i].LbEndpoints = newEndpoints - } - } - - currentResources.Items[claName] = types.ResourceWithTTL{ - Resource: cla, - } - } - - // Update - currentVersion := cache.ParseSystemVersionInfo(currentResources.Version) - currentVersion++ - currentResources.Version = fmt.Sprintf("%d", currentVersion) - - snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources - cache.snapshots[node_] = snapshot - - // Respond deltas - if info, ok := cache.status[node_]; ok { - info.mu.Lock() - _ = cache.respondDeltaWatches(ctx, info, snapshot) - info.mu.Unlock() - } - } + //resourceToDelete := resourcesToDeleted[0] + //resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/") + //serviceName := resourceToDeleteParts[4] + //zone := resourceToDeleteParts[5] + //portString := strings.Split(resourcesToDeleted[0], "_")[1] + //claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString) + // + //for node_, snapshot := range cache.snapshots { + // currentResources := snapshot.(*Snapshot).Resources[types.Endpoint] + // if rsc, found := currentResources.Items[claName]; found { + // cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) + // for i, _ := range cla.Endpoints { + // if cla.Endpoints[i].Locality.Zone == zone { + // newEndpoints := make([]*endpoint.LbEndpoint, 0) + // for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints { + // if resourceToDelete == GetResourceName(lbEndpoint) { + // continue + // } + // newEndpoints = append(newEndpoints, lbEndpoint) + // } + // + // cla.Endpoints[i].LbEndpoints = newEndpoints + // } + // } + // + // currentResources.Items[claName] = types.ResourceWithTTL{ + // Resource: cla, + // } + // } + // + // // Update + // currentVersion := cache.ParseSystemVersionInfo(currentResources.Version) + // currentVersion++ + // currentResources.Version = fmt.Sprintf("%d", currentVersion) + // + // snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources + // cache.snapshots[node_] = snapshot + // + // // Respond deltas + // if info, ok := cache.status[node_]; ok { + // info.mu.Lock() + // _ = cache.respondDeltaWatches(ctx, info, snapshot) + // info.mu.Unlock() + // } + //} } return nil } func (cache *snapshotCache) DrainResources(ctx context.Context, _ string, typ string, resourcesToDrain []string) error { - cache.mu.Lock() - defer cache.mu.Unlock() - - fmt.Printf("local DrainResources resource %v", resourcesToDrain) - - resourceToDelete := resourcesToDrain[0] - resourceToDeleteParts := strings.Split(resourcesToDrain[0], "/") - serviceName := resourceToDeleteParts[4] - zone := resourceToDeleteParts[5] - portString := strings.Split(resourcesToDrain[0], "_")[1] - claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString) - - cache.log.Infof("DeleteResources claName=%s", claName) - - for node_, snapshot := range cache.snapshots { - didModify := false - currentResources := snapshot.(*Snapshot).Resources[types.Endpoint] - if rsc, found := currentResources.Items[claName]; found { - cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) - for i, _ := range cla.Endpoints { - if cla.Endpoints[i].Locality.Zone == zone { - newEndpoints := make([]*endpoint.LbEndpoint, 0) - for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints { - if resourceToDelete == GetResourceName(lbEndpoint) { - didModify = true - cache.log.Infof("Drain and remove endpoint %s", resourceToDelete) - - // Set to UNHEALTHY/DRAINING and let Envoy gracefully remove them. - lbEndpoint.HealthStatus = core.HealthStatus_DRAINING - // continue - } - newEndpoints = append(newEndpoints, lbEndpoint) - } - - cla.Endpoints[i].LbEndpoints = newEndpoints - } - } - - currentResources.Items[claName] = types.ResourceWithTTL{ - Resource: cla, - } - } - - if didModify { - // Update - currentVersion := cache.ParseSystemVersionInfo(currentResources.Version) - currentVersion++ - currentResources.Version = fmt.Sprintf("%d", currentVersion) - - snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources - cache.snapshots[node_] = snapshot - - // Respond deltas - if info, ok := cache.status[node_]; ok { - info.mu.Lock() - _ = cache.respondDeltaWatches(ctx, info, snapshot) - info.mu.Unlock() - } - } - } + //cache.mu.Lock() + //defer cache.mu.Unlock() + // + //fmt.Printf("local DrainResources resource %v", resourcesToDrain) + // + //resourceToDelete := resourcesToDrain[0] + //resourceToDeleteParts := strings.Split(resourcesToDrain[0], "/") + //serviceName := resourceToDeleteParts[4] + //zone := resourceToDeleteParts[5] + //portString := strings.Split(resourcesToDrain[0], "_")[1] + //claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString) + // + //cache.log.Infof("DeleteResources claName=%s", claName) + // + //for node_, snapshot := range cache.snapshots { + // didModify := false + // currentResources := snapshot.(*Snapshot).Resources[types.Endpoint] + // if rsc, found := currentResources.Items[claName]; found { + // cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) + // for i, _ := range cla.Endpoints { + // if cla.Endpoints[i].Locality.Zone == zone { + // newEndpoints := make([]*endpoint.LbEndpoint, 0) + // for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints { + // if resourceToDelete == GetResourceName(lbEndpoint) { + // didModify = true + // cache.log.Infof("Drain and remove endpoint %s", resourceToDelete) + // + // // Set to UNHEALTHY/DRAINING and let Envoy gracefully remove them. + // lbEndpoint.HealthStatus = core.HealthStatus_DRAINING + // // continue + // } + // newEndpoints = append(newEndpoints, lbEndpoint) + // } + // + // cla.Endpoints[i].LbEndpoints = newEndpoints + // } + // } + // + // currentResources.Items[claName] = types.ResourceWithTTL{ + // Resource: cla, + // } + // } + // + // if didModify { + // // Update + // currentVersion := cache.ParseSystemVersionInfo(currentResources.Version) + // currentVersion++ + // currentResources.Version = fmt.Sprintf("%d", currentVersion) + // + // snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources + // cache.snapshots[node_] = snapshot + // + // // Respond deltas + // if info, ok := cache.status[node_]; ok { + // info.mu.Lock() + // _ = cache.respondDeltaWatches(ctx, info, snapshot) + // info.mu.Unlock() + // } + // } + //} return nil } @@ -753,7 +770,7 @@ func nameSet(names []string) map[string]bool { } // superset checks that all resources are listed in the names set. -func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) error { +func superset(names map[string]bool, resources map[string]VTMarshaledResource) error { for resourceName := range resources { if _, exists := names[resourceName]; !exists { return fmt.Errorf("%q not listed", resourceName) @@ -855,7 +872,7 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { // Respond to a watch with the snapshot value. The value channel should have capacity not to block. // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 -func (cache *snapshotCache) respond(ctx context.Context, request *Request, value chan Response, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) error { +func (cache *snapshotCache) respond(ctx context.Context, request *Request, value chan Response, resources map[string]VTMarshaledResource, version string, heartbeat bool) error { // for ADS, the request names must match the snapshot names // if they do not, then the watch is never responded, and it is expected that envoy makes another request if len(request.GetResourceNames()) != 0 && cache.ads { @@ -875,8 +892,8 @@ func (cache *snapshotCache) respond(ctx context.Context, request *Request, value } } -func createResponse(ctx context.Context, request *Request, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) Response { - filtered := make([]types.ResourceWithTTL, 0, len(resources)) +func createResponse(ctx context.Context, request *Request, resources map[string]VTMarshaledResource, version string, heartbeat bool) Response { + filtered := make([]VTMarshaledResource, 0, len(resources)) // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version @@ -904,7 +921,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] } // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (func(), bool) { nodeID := cache.hash.ID(request.GetNode()) t := request.GetTypeUrl() @@ -928,34 +945,40 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream // - a snapshot exists, but we failed to initialize its version map // - we attempted to issue a response, but the caller is already up to date delayedResponse := !exists + var response *RawDeltaResponse = nil if exists { err := snapshot.ConstructVersionMap() if err != nil { cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err) } - // We don't need to respond. We're handling this in a better way in ads. - // response, err := cache.respondDelta(context.Background(), snapshot, request, value, state) + response, err = cache.respondDelta(context.Background(), snapshot, request, value, state) if err != nil { cache.log.Errorf("failed to respond with delta response: %s", err) } - delayedResponse = true // response == nil + delayedResponse = (response == nil) || (len(snapshot.GetResourcesAndTTL(request.GetTypeUrl())) == 0) } if delayedResponse { watchID := cache.nextDeltaWatchID() + info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) + + lenResources := 0 if exists { + resources := snapshot.GetResources(request.GetTypeUrl()) + if resources != nil { + lenResources = len(resources) + } cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t)) + return cache.cancelDeltaWatch(nodeID, watchID), lenResources == 0 } else { cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID) + return cache.cancelDeltaWatch(nodeID, watchID), true } - - info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) - return cache.cancelDeltaWatch(nodeID, watchID) } - return nil + return nil, false } func GetEnvoyNodeStr(node *core.Node) string { @@ -985,27 +1008,10 @@ func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceS // Only send a response if there were changes // We want to respond immediately for the first wildcard request in a stream, even if the response is empty // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { - //changedResourceNames := make([]string, 0, len(resp.Resources)) - //for _, rsc := range resp.Resources { - // if resp.GetDeltaRequest().GetTypeUrl() == resource.EndpointType { - // changedResourceNames = append(changedResourceNames, GetResourceName(rsc.Resource)) - // cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) - // claName := GetResourceName(rsc.Resource) - // changedResourceNames = append(changedResourceNames, fmt.Sprintf("%s:%d", GetResourceName(rsc.Resource), len(cla.Endpoints))) - // // HACK - // if (strings.Contains(claName, "tardis-") && len(cla.Endpoints) == 0) || - // (strings.Contains(claName, "sc-sent-post-") && len(cla.Endpoints) == 0) || - // (strings.Contains(claName, "mmoe-v2-1-image") && len(cla.Endpoints) == 0) || - // (strings.Contains(claName, "mmoe-v2-1-video") && len(cla.Endpoints) == 0) { - // log2.Info().Msgf("Skipping because %s has 0 endpoints", claName) - // return nil, nil - // } - // nodeString := GetEnvoyNodeStr(resp.GetDeltaRequest().GetNode()) - // log2.Info().Msgf("createDeltaResponse [changed][%s]: %v", nodeString, changedResourceNames) - // log2.Info().Msgf("createDeltaResponse [removed][%s]: %v", nodeString, resp.RemovedResources) - // } - //} + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsFirst()) { + + fmt.Printf("will respond: %d resources, typeUrl=%s\n", len(resp.Resources)+len(resp.RemovedResources), request.GetTypeUrl()) + if cache.log != nil { cache.log.Debugf("node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", request.GetNode().GetId(), request.GetTypeUrl(), GetResourceWithTTLNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) @@ -1016,6 +1022,8 @@ func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceS case <-ctx.Done(): return resp, context.Canceled } + } else { + fmt.Printf("will respond NOT: %d resources, typeUrl=%s\n", len(resp.Resources)+len(resp.RemovedResources), request.GetTypeUrl()) } return nil, nil } diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index eff7bfe3ed..18401c1051 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -121,23 +121,12 @@ func (s *Snapshot) Consistent() error { } // GetResources selects snapshot resources by type, returning the map of resources. -func (s *Snapshot) GetResources(typeURL resource.Type) map[string]types.Resource { - resources := s.GetResourcesAndTTL(typeURL) - if resources == nil { - return nil - } - - withoutTTL := make(map[string]types.Resource, len(resources)) - - for k, v := range resources { - withoutTTL[k] = v.Resource - } - - return withoutTTL +func (s *Snapshot) GetResources(typeURL resource.Type) map[string]VTMarshaledResource { + return s.GetResourcesAndTTL(typeURL) } // GetResourcesAndTTL selects snapshot resources by type, returning the map of resources and the associated TTL. -func (s *Snapshot) GetResourcesAndTTL(typeURL resource.Type) map[string]types.ResourceWithTTL { +func (s *Snapshot) GetResourcesAndTTL(typeURL resource.Type) map[string]VTMarshaledResource { if s == nil { return nil } @@ -193,7 +182,7 @@ func (s *Snapshot) ConstructVersionMap() error { return fmt.Errorf("failed to get resource version: %w", err) } - s.VersionMap[typeURL][GetResourceName(r.Resource)] = r.Version + s.VersionMap[typeURL][r.Name] = r.Version } }