Skip to content

Commit

Permalink
xds/ringhash: use StateListener instead of UpdateSubConnState (#6522)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Aug 9, 2023
1 parent 3fa17cc commit 175c84c
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 50 deletions.
17 changes: 13 additions & 4 deletions xds/internal/balancer/ringhash/ringhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,12 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
addrsSet.Set(addr, true)
newWeight := getWeightAttribute(addr)
if val, ok := b.subConns.Get(addr); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
HealthCheckEnabled: true,
StateListener: func(state balancer.SubConnState) { b.updateSubConnState(sc, state) },
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
b.logger.Warningf("Failed to create new SubConn: %v", err)
continue
Expand Down Expand Up @@ -256,7 +261,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
b.subConns.Delete(addr)
addrsUpdated = true
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
// The entry will be deleted in updateSubConnState.
}
}
return addrsUpdated
Expand Down Expand Up @@ -321,7 +326,11 @@ func (b *ringhashBalancer) ResolverError(err error) {
})
}

// UpdateSubConnState updates the per-SubConn state stored in the ring, and also
func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

// updateSubConnState updates the per-SubConn state stored in the ring, and also
// the aggregated state.
//
// It triggers an update to cc when:
Expand All @@ -332,7 +341,7 @@ func (b *ringhashBalancer) ResolverError(err error) {
// - the aggregated state is changed
// - the same picker will be sent again, but this update may trigger a re-pick
// for some RPCs.
func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if logger.V(2) {
b.logger.Infof("Handle SubConn state change: %p, %v", sc, s)
Expand Down
96 changes: 50 additions & 46 deletions xds/internal/balancer/ringhash/ringhash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) {

func (s) TestOneSubConn(t *testing.T) {
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]}
cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1})
cc, _, p0 := setupTest(t, []resolver.Address{wantAddr1})
ring0 := p0.(*picker).ring

firstHash := ring0.items[0].hash
Expand All @@ -156,16 +156,16 @@ func (s) TestOneSubConn(t *testing.T) {
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc0 := ring0.items[0].sc.sc
sc0 := ring0.items[0].sc.sc.(*testutils.TestSubConn)
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}

// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

// Test pick with one backend.
p1 := <-cc.NewPickerCh
Expand All @@ -186,7 +186,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
cc, _, p0 := setupTest(t, wantAddrs)
// This test doesn't update addresses, so this ring will be used by all the
// pickers.
ring0 := p0.(*picker).ring
Expand All @@ -200,16 +200,16 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
// The picked SubConn should be the second in the ring.
sc0 := ring0.items[1].sc.sc
sc0 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}

// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
Expand All @@ -219,7 +219,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
}

// Turn down the subConn in use.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p2 := <-cc.NewPickerCh
// Pick with the same hash should be queued, because the SubConn after the
// first picked is Idle.
Expand All @@ -228,16 +228,16 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
}

// The third SubConn in the ring should connect.
sc1 := ring0.items[2].sc.sc
sc1 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-sc1.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc1)
}

// Send state updates to Ready.
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// New picks should all return this SubConn.
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
Expand All @@ -248,19 +248,19 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
}

// Now, after backoff, the first picked SubConn will turn Idle.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
// The picks above should have queued Connect() for the first picked
// SubConn, so this Idle state change will trigger a Connect().
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}

// After the first picked SubConn turn Ready, new picks should return it
// again (even though the second picked SubConn is also Ready).
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p4 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
Expand All @@ -279,7 +279,7 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
cc, _, p0 := setupTest(t, wantAddrs)
// This test doesn't update addresses, so this ring will be used by all the
// pickers.
ring0 := p0.(*picker).ring
Expand All @@ -292,16 +292,16 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc0 := ring0.items[1].sc.sc
sc0 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}

// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

// First hash should always pick sc0.
p1 := <-cc.NewPickerCh
Expand All @@ -318,14 +318,14 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc1 := ring0.items[2].sc.sc
sc1 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-sc1.ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc1)
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

// With the new generated picker, hash2 always picks sc1.
p2 := <-cc.NewPickerCh
Expand Down Expand Up @@ -419,58 +419,62 @@ func (s) TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) {
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
_, b, p0 := setupTest(t, wantAddrs)
_, _, p0 := setupTest(t, wantAddrs)
ring0 := p0.(*picker).ring

// ringhash won't tell SCs to connect until there is an RPC, so simulate
// one now.
p0.Pick(balancer.PickInfo{Ctx: context.Background()})

// Turn the first subconn to transient failure.
sc0 := ring0.items[0].sc.sc
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc0 := ring0.items[0].sc.sc.(*testutils.TestSubConn)
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the second subconn to connect (because overall state is
// Connect (when one subconn is TF)).
sc1 := ring0.items[1].sc.sc
sc1 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-sc1.ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc1)
}

// Turn the second subconn to TF. This will set the overall state to TF.
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the third subconn to connect.
sc2 := ring0.items[2].sc.sc
sc2 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
select {
case <-sc2.(*testutils.TestSubConn).ConnectCh:
case <-sc2.ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc2)
}

// Turn the third subconn to TF. This will set the overall state to TF.
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the first subconn to connect.
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc0)
}

// Turn the third subconn to TF again.
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})

// This will not trigger any new Connect() on the SubConns, because sc0 is
// still attempting to connect, and we only need one SubConn to connect.
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-sc0.ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc0)
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-sc1.ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc1)
case <-sc2.(*testutils.TestSubConn).ConnectCh:
case <-sc2.ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc2)
case <-time.After(defaultTestShortTimeout):
}
Expand Down

0 comments on commit 175c84c

Please sign in to comment.