Skip to content

Commit

Permalink
grpclb: fallback after init (#2681)
Browse files Browse the repository at this point in the history
regenerate picker when switching between fallback/non-fallback, because new SubConn state might not be updated for cached SubConns
  • Loading branch information
menghanl authored Apr 2, 2019
1 parent 955eb8a commit 4745f6a
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 50 deletions.
95 changes: 70 additions & 25 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
doneCh: make(chan struct{}),

manualResolver: r,
csEvltr: &balancer.ConnectivityStateEvaluator{},
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
Expand Down Expand Up @@ -238,15 +237,15 @@ type lbBalancer struct {
// but with only READY SCs will be gerenated.
backendAddrs []resolver.Address
// Roundrobin functionalities.
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
picker balancer.Picker
// Support fallback to resolved backend addresses if there's no response
// from remote balancer within fallbackTimeout.
fallbackTimerExpired bool
serverListReceived bool
remoteBalancerConnected bool
serverListReceived bool
inFallback bool
// resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
// when resolved address updates are received, and read in the goroutine
// handling fallback.
Expand All @@ -264,13 +263,16 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
return
}

if lb.state == connectivity.Connecting {
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}

var readySCs []balancer.SubConn
if lb.usePickFirst {
if lb.state == connectivity.Ready || lb.state == connectivity.Idle {
for _, sc := range lb.subConns {
readySCs = append(readySCs, sc)
break
}
for _, sc := range lb.subConns {
readySCs = append(readySCs, sc)
break
}
} else {
for _, a := range lb.backendAddrs {
Expand All @@ -286,10 +288,13 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
// If there's no ready SubConns, always re-pick. This is to avoid drops
// unless at least one SubConn is ready. Otherwise we may drop more
// often than want because of drops + re-picks(which become re-drops).
//
// This doesn't seem to be necessary after the connecting check above.
// Kept for safety.
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
return
}
if len(lb.fullServerList) <= 0 {
if lb.inFallback {
lb.picker = newRRPicker(readySCs)
return
}
Expand All @@ -305,6 +310,34 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
prevLBPicker.updateReadySCs(readySCs)
}

// aggregateSubConnStats calculate the aggregated state of SubConns in
// lb.SubConns. These SubConns are subconns in use (when switching between
// fallback and grpclb). lb.scState contains states for all SubConns, including
// those in cache (SubConns are cached for 10 seconds after remove).
//
// The aggregated state is:
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
var numConnecting uint64

for _, sc := range lb.subConns {
if state, ok := lb.scStates[sc]; ok {
switch state {
case connectivity.Ready:
return connectivity.Ready
case connectivity.Connecting:
numConnecting++
}
}
}
if numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}

func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
Expand All @@ -328,18 +361,33 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi
// kept the sc's state in scStates. Remove state for this sc here.
delete(lb.scStates, sc)
}
// Force regenerate picker if
// - this sc became ready from not-ready
// - this sc became not-ready from ready
lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)

// Enter fallback when the aggregated state is not Ready and the connection
// to remote balancer is lost.
if lb.state != connectivity.Ready {
if !lb.inFallback && !lb.remoteBalancerConnected {
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
}
}
}

// updateStateAndPicker re-calculate the aggregated state, and regenerate picker
// if overall state is changed.
//
// If forceRegeneratePicker is true, picker will be regenerated.
func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
oldAggrState := lb.state
lb.state = lb.csEvltr.RecordTransition(oldS, s)

lb.state = lb.aggregateSubConnStates()
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (oldS == connectivity.Ready) != (s == connectivity.Ready) ||
(lb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
lb.regeneratePicker(false)
// - caller wants to regenerate
// - the aggregated state changed
if forceRegeneratePicker || (lb.state != oldAggrState) {
lb.regeneratePicker(resetDrop)
}

lb.cc.UpdateBalancerState(lb.state, lb.picker)
Expand All @@ -357,11 +405,11 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
return
}
lb.mu.Lock()
if lb.serverListReceived {
if lb.inFallback || lb.serverListReceived {
lb.mu.Unlock()
return
}
lb.fallbackTimerExpired = true
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.mu.Unlock()
}
Expand Down Expand Up @@ -405,10 +453,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {

lb.mu.Lock()
lb.resolvedBackendAddrs = backendAddrs
// If serverListReceived is true, connection to remote balancer was
// successful and there's no need to do fallback anymore.
// If fallbackTimerExpired is false, fallback hasn't happened yet.
if !lb.serverListReceived && lb.fallbackTimerExpired {
if lb.inFallback {
// This means we received a new list of resolved backends, and we are
// still in fallback mode. Need to update the list of backends we are
// using to the new list of backends.
Expand Down
42 changes: 29 additions & 13 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,26 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
backendAddrs = append(backendAddrs, addr)
}

// Call refreshSubConns to create/remove SubConns.
// Call refreshSubConns to create/remove SubConns. If we are in fallback,
// this is also exiting fallback.
lb.refreshSubConns(backendAddrs, true)
// Regenerate and update picker no matter if there's update on backends (if
// any SubConn will be newed/removed). Because since the full serverList was
// different, there might be updates in drops or pick weights(different
// number of duplicates). We need to update picker with the fulllist.
//
// Now with cache, even if SubConn was newed/removed, there might be no
// state changes.
lb.regeneratePicker(true)
lb.cc.UpdateBalancerState(lb.state, lb.picker)
}

// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
// indicating whether the backendAddrs are different from the cached
// backendAddrs (whether any SubConn was newed/removed).
// refreshSubConns creates/removes SubConns with backendAddrs, and refreshes
// balancer state and picker.
//
// Caller must hold lb.mu.
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) {
defer func() {
// Regenerate and update picker after refreshing subconns because with
// cache, even if SubConn was newed/removed, there might be no state
// changes (the subconn will be kept in cache, not actually
// newed/removed).
lb.updateStateAndPicker(true, true)
}()

lb.inFallback = !fromGRPCLBServer

opts := balancer.NewSubConnOptions{}
if fromGRPCLBServer {
opts.CredsBundle = lb.grpclbBackendCreds
Expand Down Expand Up @@ -218,6 +220,9 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
if err != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}
lb.mu.Lock()
lb.remoteBalancerConnected = true
lb.mu.Unlock()

// grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{
Expand Down Expand Up @@ -270,6 +275,17 @@ func (lb *lbBalancer) watchRemoteBalancer() {
// Trigger a re-resolve when the stream errors.
lb.cc.cc.ResolveNow(resolver.ResolveNowOption{})

lb.mu.Lock()
lb.remoteBalancerConnected = false
lb.fullServerList = nil
// Enter fallback when connection to remote balancer is lost, and the
// aggregated state is not Ready.
if !lb.inFallback && lb.state != connectivity.Ready {
// Entering fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
}
lb.mu.Unlock()

if !doBackoff {
retryCount = 0
continue
Expand Down
80 changes: 68 additions & 12 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,21 @@ func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServe
b.stats.merge(req.GetClientStats())
}
}()
for v := range b.sls {
resp = &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: v,
},
for {
select {
case v := <-b.sls:
resp = &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: v,
},
}
case <-stream.Context().Done():
return stream.Context().Err()
}
if err := stream.Send(resp); err != nil {
return err
}
}
<-b.done
return nil
}

type testServer struct {
Expand Down Expand Up @@ -297,6 +300,9 @@ type testServers struct {
backends []*grpc.Server
beIPs []net.IP
bePorts []int

lbListener net.Listener
beListeners []net.Listener
}

func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), err error) {
Expand All @@ -317,7 +323,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)

beListeners = append(beListeners, beLis)
beListeners = append(beListeners, newRestartableListener(beLis))
}
backends := startBackends(beServerName, false, beListeners...)

Expand All @@ -327,6 +333,7 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
err = fmt.Errorf("failed to create the listener for the load balancer %v", err)
return
}
lbLis = newRestartableListener(lbLis)
lbCreds := &serverNameCheckCreds{
sn: lbServerName,
}
Expand All @@ -344,6 +351,9 @@ func newLoadBalancer(numberOfBackends int) (tss *testServers, cleanup func(), er
backends: backends,
beIPs: beIPs,
bePorts: bePorts,

lbListener: lbLis,
beListeners: beListeners,
}
cleanup = func() {
defer stopBackends(backends)
Expand Down Expand Up @@ -712,7 +722,7 @@ func TestFallback(t *testing.T) {
testC := testpb.NewTestServiceClient(cc)

r.UpdateState(resolver.State{Addresses: []resolver.Address{{
Addr: "",
Addr: "invalid.address",
Type: resolver.GRPCLB,
ServerName: lbServerName,
}, {
Expand All @@ -723,7 +733,7 @@ func TestFallback(t *testing.T) {

var p peer.Peer
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
if p.Addr.String() != beLis.Addr().String() {
t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
Expand All @@ -739,16 +749,62 @@ func TestFallback(t *testing.T) {
ServerName: beServerName,
}}})

var backendUsed bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
return
backendUsed = true
break
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
if !backendUsed {
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
}

// Close backend and remote balancer connections, should use fallback.
tss.beListeners[0].(*restartableListener).stopPreviousConns()
tss.lbListener.(*restartableListener).stopPreviousConns()
time.Sleep(time.Second)

var fallbackUsed bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.String() == beLis.Addr().String() {
fallbackUsed = true
break
}
time.Sleep(time.Millisecond)
}
if !fallbackUsed {
t.Fatalf("No RPC sent to fallback after 1 second")
}

// Restart backend and remote balancer, should not use backends.
tss.beListeners[0].(*restartableListener).restart()
tss.lbListener.(*restartableListener).restart()
tss.ls.sls <- sl

time.Sleep(time.Second)

var backendUsed2 bool
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] {
backendUsed2 = true
break
}
time.Sleep(time.Millisecond)
}
if !backendUsed2 {
t.Fatalf("No RPC sent to backend behind remote balancer after 1 second")
}
}

// The remote balancer sends response with duplicates to grpclb client.
Expand Down
Loading

0 comments on commit 4745f6a

Please sign in to comment.