Skip to content

Commit

Permalink
balancer: add SubConn.Shutdown; deprecate Balancer.RemoveSubConn (#6493)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Aug 4, 2023
1 parent 4fe8d3d commit 7aceafc
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 54 deletions.
9 changes: 9 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ type SubConn interface {
// creates a new one and returns it. Returns a close function which must
// be called when the Producer is no longer needed.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
// Shutdown shuts down the SubConn gracefully. Any started RPCs will be
// allowed to complete. No future calls should be made on the SubConn.
// One final state update will be delivered to the StateListener (or
// UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to
// indicate the shutdown operation. This may be delivered before
// in-progress RPCs are complete and the actual connection is closed.
Shutdown()
}

// NewSubConnOptions contains options to create new SubConn.
Expand Down Expand Up @@ -161,6 +168,8 @@ type ClientConn interface {
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
//
// Deprecated: use SubConn.Shutdown instead.
RemoveSubConn(SubConn)
// UpdateAddresses updates the addresses used in the passed in SubConn.
// gRPC checks if the currently connected address is still in the new list.
Expand Down
2 changes: 2 additions & 0 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

func (sc *testSubConn) Connect() {}

func (sc *testSubConn) Shutdown() {}

func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, nil
}
Expand Down
13 changes: 12 additions & 1 deletion balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,13 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
return connectivity.TransientFailure
}

// UpdateSubConnState is unused; NewSubConn's options always specifies
// updateSubConnState as the listener.
func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
logger.Errorf("grpclb: UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
}

func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if logger.V(2) {
logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
Expand Down Expand Up @@ -373,8 +379,13 @@ func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop
if forceRegeneratePicker || (lb.state != oldAggrState) {
lb.regeneratePicker(resetDrop)
}
var cc balancer.ClientConn = lb.cc
if lb.usePickFirst {
// Bypass the caching layer that would wrap the picker.
cc = lb.cc.ClientConn
}

lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
}

// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
Expand Down
13 changes: 8 additions & 5 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
if oldUsePickFirst {
// If old SubConn were created for pickfirst, bypass cache and
// remove directly.
lb.cc.cc.RemoveSubConn(sc)
lb.cc.ClientConn.RemoveSubConn(sc)
} else {
lb.cc.RemoveSubConn(sc)
}
Expand All @@ -144,16 +144,17 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}
if sc != nil {
if len(backendAddrs) == 0 {
lb.cc.cc.RemoveSubConn(sc)
lb.cc.ClientConn.RemoveSubConn(sc)
delete(lb.subConns, scKey)
return
}
lb.cc.cc.UpdateAddresses(sc, backendAddrs)
lb.cc.ClientConn.UpdateAddresses(sc, backendAddrs)
sc.Connect()
return
}
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
// This bypasses the cc wrapper with SubConn cache.
sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts)
sc, err := lb.cc.ClientConn.NewSubConn(backendAddrs, opts)
if err != nil {
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
return
Expand All @@ -176,6 +177,8 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback

if _, ok := lb.subConns[addrWithoutAttrs]; !ok {
// Use addrWithMD to create the SubConn.
var sc balancer.SubConn
opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) }
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
Expand Down Expand Up @@ -419,7 +422,7 @@ func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() {
}
}
// Trigger a re-resolve when the stream errors.
ccw.lb.cc.cc.ResolveNow(resolver.ResolveNowOptions{})
ccw.lb.cc.ClientConn.ResolveNow(resolver.ResolveNowOptions{})

ccw.lb.mu.Lock()
ccw.lb.remoteBalancerConnected = false
Expand Down
42 changes: 34 additions & 8 deletions balancer/grpclb/grpclb_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ const subConnCacheTime = time.Second * 10
// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
// SubConns will be kept in cache for subConnCacheTime before being removed.
//
// Its new and remove methods are updated to do cache first.
// Its NewSubconn and SubConn.Shutdown methods are updated to do cache first.
type lbCacheClientConn struct {
cc balancer.ClientConn
balancer.ClientConn

timeout time.Duration

mu sync.Mutex
Expand All @@ -113,7 +114,7 @@ type subConnCacheEntry struct {

func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn {
return &lbCacheClientConn{
cc: cc,
ClientConn: cc,
timeout: subConnCacheTime,
subConnCache: make(map[resolver.Address]*subConnCacheEntry),
subConnToAddr: make(map[balancer.SubConn]resolver.Address),
Expand All @@ -137,16 +138,27 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
return entry.sc, nil
}

scNew, err := ccc.cc.NewSubConn(addrs, opts)
scNew, err := ccc.ClientConn.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
scNew = &lbCacheSubConn{SubConn: scNew, ccc: ccc}

ccc.subConnToAddr[scNew] = addrWithoutAttrs
return scNew, nil
}

func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
}

type lbCacheSubConn struct {
balancer.SubConn
ccc *lbCacheClientConn
}

func (sc *lbCacheSubConn) Shutdown() {
ccc := sc.ccc
ccc.mu.Lock()
defer ccc.mu.Unlock()
addr, ok := ccc.subConnToAddr[sc]
Expand All @@ -160,7 +172,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
// same address, and those SubConns are all removed. We remove sc
// immediately here.
delete(ccc.subConnToAddr, sc)
ccc.cc.RemoveSubConn(sc)
sc.SubConn.Shutdown()
}
return
}
Expand All @@ -176,7 +188,7 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
if entry.abortDeleting {
return
}
ccc.cc.RemoveSubConn(sc)
sc.SubConn.Shutdown()
delete(ccc.subConnToAddr, sc)
delete(ccc.subConnCache, addr)
})
Expand All @@ -195,14 +207,28 @@ func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
}

func (ccc *lbCacheClientConn) UpdateState(s balancer.State) {
ccc.cc.UpdateState(s)
s.Picker = &lbCachePicker{Picker: s.Picker}
ccc.ClientConn.UpdateState(s)
}

func (ccc *lbCacheClientConn) close() {
ccc.mu.Lock()
defer ccc.mu.Unlock()
// Only cancel all existing timers. There's no need to remove SubConns.
for _, entry := range ccc.subConnCache {
entry.cancel()
}
ccc.mu.Unlock()
}

type lbCachePicker struct {
balancer.Picker
}

func (cp *lbCachePicker) Pick(i balancer.PickInfo) (balancer.PickResult, error) {
res, err := cp.Picker.Pick(i)
if err != nil {
return res, err
}
res.SubConn = res.SubConn.(*lbCacheSubConn).SubConn
return res, nil
}
13 changes: 9 additions & 4 deletions balancer/grpclb/grpclb_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ import (

type mockSubConn struct {
balancer.SubConn
mcc *mockClientConn
}

func (msc *mockSubConn) Shutdown() {
msc.mcc.mu.Lock()
defer msc.mcc.mu.Unlock()
delete(msc.mcc.subConns, msc)
}

type mockClientConn struct {
Expand All @@ -46,17 +53,15 @@ func newMockClientConn() *mockClientConn {
}

func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &mockSubConn{}
sc := &mockSubConn{mcc: mcc}
mcc.mu.Lock()
defer mcc.mu.Unlock()
mcc.subConns[sc] = addrs[0]
return sc, nil
}

func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
mcc.mu.Lock()
defer mcc.mu.Unlock()
delete(mcc.subConns, sc)
sc.Shutdown()
}

const testCacheTimeout = 100 * time.Millisecond
Expand Down
8 changes: 7 additions & 1 deletion balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}
acbw := &acBalancerWrapper{
ccb: ccb,
ac: ac,
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
stateListener: opts.StateListener,
Expand Down Expand Up @@ -370,7 +371,8 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
ac *addrConn // read-only
ac *addrConn // read-only
ccb *ccBalancerWrapper // read-only
stateListener func(balancer.SubConnState)

mu sync.Mutex
Expand All @@ -389,6 +391,10 @@ func (acbw *acBalancerWrapper) Connect() {
go acbw.ac.connect()
}

func (acbw *acBalancerWrapper) Shutdown() {
acbw.ccb.RemoveSubConn(acbw)
}

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
Expand Down
8 changes: 1 addition & 7 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,7 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
}

func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
bw.gsb.mu.Lock()
if !bw.gsb.balancerCurrentOrPending(bw) {
bw.gsb.mu.Unlock()
return
}
bw.gsb.mu.Unlock()
bw.gsb.cc.RemoveSubConn(sc)
sc.Shutdown()
}

func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
Expand Down
18 changes: 13 additions & 5 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type testingLogger interface {

// TestSubConn implements the SubConn interface, to be used in tests.
type TestSubConn struct {
tcc *TestClientConn // the CC that owns this SubConn
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
Expand Down Expand Up @@ -81,6 +82,16 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
}
}

// Shutdown pushes the SubConn to the RemoveSubConn channel in the parent
// TestClientConn.
func (tsc *TestSubConn) Shutdown() {
tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc)
select {
case tsc.tcc.RemoveSubConnCh <- tsc:
default:
}
}

// String implements stringer to print human friendly error message.
func (tsc *TestSubConn) String() string {
return tsc.id
Expand Down Expand Up @@ -121,6 +132,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn {
// NewSubConn creates a new SubConn.
func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &TestSubConn{
tcc: tcc,
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
Expand All @@ -143,11 +155,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon

// RemoveSubConn removes the SubConn.
func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc)
select {
case tcc.RemoveSubConnCh <- sc.(*TestSubConn):
default:
}
sc.(*TestSubConn).Shutdown()
}

// UpdateAddresses updates the addresses on the SubConn.
Expand Down
56 changes: 56 additions & 0 deletions test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,3 +1061,59 @@ func (s) TestBalancerProducerHonorsContext(t *testing.T) {
t.Fatalf("RPC error: %v; want status.Code(err)=%v", err, codes.Canceled)
}
}

// TestSubConnShutdown confirms that the Shutdown method on subconns properly
// initiates their shutdown.
func (s) TestSubConnShutdown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

gotShutdown := grpcsync.NewEvent()

bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
StateListener: func(scs balancer.SubConnState) {
switch scs.ConnectivityState {
case connectivity.Connecting:
// Ignored.
case connectivity.Ready:
sc.Shutdown()
case connectivity.Shutdown:
gotShutdown.Fire()
default:
t.Errorf("got unexpected state %q in listener", scs.ConnectivityState)
}
},
}
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts)
if err != nil {
return err
}
sc.Connect()
// Report the state as READY to unblock ss.Start(), which waits for ready.
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready})
return nil
},
}

const testBalName = "shutdown-test-balancer"
stub.Register(testBalName, bf)
t.Logf("Registered balancer %s...", testBalName)

ss := &stubserver.StubServer{}
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(
fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName),
)); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

select {
case <-gotShutdown.Done():
// Success
case <-ctx.Done():
t.Fatalf("Timed out waiting for gotShutdown to be fired.")
}
}
Loading

0 comments on commit 7aceafc

Please sign in to comment.