Skip to content

Commit

Permalink
Review comments #2.
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Nov 8, 2019
1 parent 408017c commit f648eba
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 47 deletions.
23 changes: 14 additions & 9 deletions xds/internal/client/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client

import (
"fmt"
"net"
"strings"

"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -55,7 +56,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
target := v2c.watchMap[ldsResource].target[0]

returnCluster := ""
localCache := make(map[string]*xdspb.RouteConfiguration)
localCache := make(map[string]string)
for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
Expand All @@ -71,7 +72,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
}

// If we get here, it means that this resource was a good one.
localCache[rc.GetName()] = rc
localCache[rc.GetName()] = cluster
if v2c.isRouteConfigurationInteresting(rc) {
returnCluster = cluster
}
Expand Down Expand Up @@ -116,16 +117,20 @@ func (v2c *v2Client) isRouteConfigurationInteresting(rc *xdspb.RouteConfiguratio
// field must be set. Inside that route message, the cluster field will
// contain the clusterName we are looking for.
func getClusterFromRouteConfiguration(rc *xdspb.RouteConfiguration, target string) string {
host := stripPort(target)
host, _, err := net.SplitHostPort(target)
if err != nil {
return ""
}
for _, vh := range rc.GetVirtualHosts() {
for _, domain := range vh.GetDomains() {
// TODO: Add support for wildcard matching here?
if domain == host {
if len(vh.GetRoutes()) > 0 {
dr := vh.Routes[len(vh.Routes)-1]
if dr.GetMatch() == nil && dr.GetRoute() != nil {
return dr.GetRoute().GetCluster()
}
if domain != host {
continue
}
if len(vh.GetRoutes()) > 0 {
dr := vh.Routes[len(vh.Routes)-1]
if dr.GetMatch() == nil && dr.GetRoute() != nil {
return dr.GetRoute().GetCluster()
}
}
}
Expand Down
31 changes: 19 additions & 12 deletions xds/internal/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (

type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient

const (
listenerURL = "type.googleapis.com/envoy.api.v2.Listener"
routeURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration"
clusterURL = "type.googleapis.com/envoy.api.v2.Cluster"
endpointURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)

type resourceType int

const (
Expand All @@ -35,29 +42,29 @@ const (
edsResource
)

const (
listenerURL = "type.googleapis.com/envoy.api.v2.Listener"
routeURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration"
clusterURL = "type.googleapis.com/envoy.api.v2.Cluster"
endpointURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
type watchState int

const (
// Using raw constants here instead of enums because it seems to hard to
// get that to work with atomic operations which expect int* operands.
watchEnqueued = 0
watchCancelled = 1
watchStarted = 2
watchEnqueued watchState = iota
watchCancelled
watchStarted
)

type watchInfo struct {
wType resourceType
target []string
state int32
state watchState
callback interface{}
timer *time.Timer
}

func (wi *watchInfo) cancel() {
wi.state = watchCancelled
if wi.timer != nil {
wi.timer.Stop()
}
}

type ldsUpdate struct {
routeName string
}
Expand Down
49 changes: 23 additions & 26 deletions xds/internal/client/v2client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/buffer"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
)

const defaultWatchTimer = 5 * time.Second
// The value chosen here is based on the default value of the
// initial_fetch_timeout field in corepb.ConfigSource proto.
const defaultWatchTimer = 15 * time.Second

// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a
// single ADS stream on which the different types of xDS requests and responses
Expand All @@ -54,20 +55,21 @@ type v2Client struct {
// watch API, and it is read and acted upon by the send() goroutine.
watchCh *buffer.Unbounded

// Message specific watch infos, protected by the below mutex. These are
mu sync.Mutex
// Message specific watch infos, protected by the above mutex. These are
// written to, after successfully reading from the update channel, and are
// read from when recovering from a broken stream to resend the xDS
// messages. When the user of this client object cancels a watch call,
// these are set to nil.
mu sync.Mutex
// these are set to nil. All accesses to the map protected and any value
// inside the map should be protected with the above mutex.
watchMap map[resourceType]*watchInfo

// rdsCache maintains a cache of validated route configurations received
// through RDS responses. We cache all valid resources, whether or not we
// are interested in them when we received them (because we could become
// interested in them in the future and the server wont send us those
// resources again).
rdsCache map[string]*xdspb.RouteConfiguration
// rdsCache maintains a mapping of {routeConfigName --> clusterName} from
// validated route configurations received in RDS responses. We cache all
// valid route configurations, whether or not we are interested in them
// when we received them (because we could become interested in them in the
// future and the server wont send us those resources again).
// Protected by the above mutex.
rdsCache map[string]string
}

// newV2Client creates a new v2Client initialized with the passed arguments.
Expand All @@ -78,7 +80,7 @@ func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int)
backoff: backoff,
watchCh: buffer.NewUnbounded(),
watchMap: make(map[resourceType]*watchInfo),
rdsCache: make(map[string]*xdspb.RouteConfiguration),
rdsCache: make(map[string]string),
}
v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())

Expand Down Expand Up @@ -180,7 +182,7 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
v2c.mu.Lock()
if wi.state == watchCancelled {
v2c.mu.Unlock()
return
continue
}
wi.state = watchStarted
target := wi.target
Expand Down Expand Up @@ -236,6 +238,8 @@ func (v2c *v2Client) recv(stream adsStream) bool {
// corresponding to received LDS responses will be pushed to the provided
// callback. The caller can cancel the watch by invoking the returned cancel
// function.
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) {
wi := &watchInfo{wType: ldsResource, target: []string{target}, callback: ldsCb}
v2c.watchCh.Put(wi)
Expand All @@ -254,6 +258,8 @@ func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func())
// corresponding to received RDS responses will be pushed to the provided
// callback. The caller can cancel the watch by invoking the returned cancel
// function.
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) {
wi := &watchInfo{wType: rdsResource, target: []string{routeName}, callback: rdsCb}
v2c.watchCh.Put(wi)
Expand All @@ -264,6 +270,7 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func(
wi.state = watchCancelled
return
}
v2c.watchMap[rdsResource].cancel()
v2c.watchMap[rdsResource] = nil
}
}
Expand All @@ -275,9 +282,7 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func(
// Caller should hold v2c.mu
func (v2c *v2Client) updateWatchMap(wi *watchInfo) {
if existing := v2c.watchMap[wi.wType]; existing != nil {
if existing.timer != nil {
existing.timer.Stop()
}
existing.cancel()
}

v2c.watchMap[wi.wType] = wi
Expand All @@ -299,19 +304,11 @@ func (v2c *v2Client) checkWatchTargetInCache(wi *watchInfo) {
switch wi.wType {
case rdsResource:
routeName := wi.target[0]
if rc := v2c.rdsCache[routeName]; rc != nil {
if cluster := v2c.rdsCache[routeName]; cluster != "" {
if v2c.watchMap[ldsResource] == nil {
grpclog.Warningf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName)
return
}
target := v2c.watchMap[ldsResource].target[0]
cluster := getClusterFromRouteConfiguration(rc, target)
if cluster == "" {
// This should ideally never happen because we cache only
// validated resources.
grpclog.Warningf("xds: no cluster found in cached route config: %+v", rc)
return
}
wi.timer.Stop()
wi.callback.(rdsCallback)(rdsUpdate{cluster: cluster}, nil)
}
Expand Down

0 comments on commit f648eba

Please sign in to comment.