Skip to content

Commit

Permalink
Review comments #2.
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Dec 4, 2019
1 parent 068afc4 commit 789f31c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 23 deletions.
37 changes: 22 additions & 15 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,13 @@ type closeUpdate struct{}
// is exposed to gRPC and implements the balancer.ClientConn interface which is
// exposed to the edsBalancer.
type cdsBalancer struct {
cc balancer.ClientConn
bOpts balancer.BuildOptions
updateCh *buffer.Unbounded
client xdsClientInterface
cancelWatch func()
edsLB balancer.V2Balancer
cc balancer.ClientConn
bOpts balancer.BuildOptions
updateCh *buffer.Unbounded
client xdsClientInterface
cancelWatch func()
edsLB balancer.V2Balancer
clusterToWatch string

// The only thing protected by this mutex is the closed boolean. This is
// checked by all methods before acting on updates.
Expand All @@ -177,6 +178,20 @@ func (b *cdsBalancer) run() {
b.updateCh.Load()
switch update := u.(type) {
case *ccUpdate:
// We first handle errors, if any, and then proceed with handling
// the update, only if the status quo has changed.
if err := update.err; err != nil {
// TODO: Should we cancel the watch only on specific errors?
if b.cancelWatch != nil {
b.cancelWatch()
}
if b.edsLB != nil {
b.edsLB.ResolverError(err)
}
}
if b.client == update.client && b.clusterToWatch == update.clusterName {
break
}
if update.client != nil {
// Since the cdsBalancer doesn't own the xdsClient object, we
// don't have to bother about closing the old client here, but
Expand All @@ -188,15 +203,7 @@ func (b *cdsBalancer) run() {
}
if update.clusterName != "" {
b.cancelWatch = b.client.WatchCluster(update.clusterName, b.handleClusterUpdate)
}
if err := update.err; err != nil {
// TODO: Should we cancel the watch only on specific errors?
if b.cancelWatch != nil {
b.cancelWatch()
}
if b.edsLB != nil {
b.edsLB.ResolverError(err)
}
b.clusterToWatch = update.clusterName
}
case *scUpdate:
if b.edsLB == nil {
Expand Down
42 changes: 34 additions & 8 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,10 @@ func TestUpdateClientConnState(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cdsB, _, cancel := setup()
defer cancel()
defer cdsB.Close()
defer func() {
cancel()
cdsB.Close()
}()

if err := cdsB.UpdateClientConnState(test.ccs); err != test.wantErr {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
Expand Down Expand Up @@ -378,13 +380,33 @@ func TestUpdateClientConnStateAfterClose(t *testing.T) {
}
}

// TestUpdateClientConnStateWithSameState verifies that a ClientConnState
// update with the same cluster and xdsClient does not cause the cdsBalancer to
// create a new watch.
func TestUpdateClientConnStateWithSameState(t *testing.T) {
xdsC, cdsB, _, cancel := setupWithWatch(t)
defer func() {
cancel()
cdsB.Close()
}()

if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}
if err := xdsC.waitForWatch(clusterName); err == nil {
t.Fatal("Waiting for WatchCluster() should have timed out, but returned with nil error")
}
}

// TestHandleClusterUpdate invokes the registered CDS watch callback with
// different updates and verifies that the expect ClientConnState is propagated
// to the edsBalancer.
func TestHandleClusterUpdate(t *testing.T) {
xdsC, cdsB, edsB, cancel := setupWithWatch(t)
defer cancel()
defer cdsB.Close()
defer func() {
cancel()
cdsB.Close()
}()

tests := []struct {
name string
Expand Down Expand Up @@ -422,8 +444,10 @@ func TestHandleClusterUpdate(t *testing.T) {
// is propagated to the edsBalancer.
func TestResolverError(t *testing.T) {
xdsC, cdsB, edsB, cancel := setupWithWatch(t)
defer cancel()
defer cdsB.Close()
defer func() {
cancel()
cdsB.Close()
}()

cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName}
wantCCS := edsCCS(serviceName, false, xdsC)
Expand All @@ -445,8 +469,10 @@ func TestResolverError(t *testing.T) {
// verifies that the update is propagated to the edsBalancer.
func TestUpdateSubConnState(t *testing.T) {
xdsC, cdsB, edsB, cancel := setupWithWatch(t)
defer cancel()
defer cdsB.Close()
defer func() {
cancel()
cdsB.Close()
}()

cdsUpdate := xdsclient.CDSUpdate{ServiceName: serviceName}
wantCCS := edsCCS(serviceName, false, xdsC)
Expand Down

0 comments on commit 789f31c

Please sign in to comment.