@@ -33,7 +33,6 @@ import (
33
33
"sync"
34
34
"time"
35
35
36
- "google.golang.org/grpc/attributes"
37
36
"google.golang.org/grpc/balancer"
38
37
"google.golang.org/grpc/balancer/pickfirst/internal"
39
38
"google.golang.org/grpc/connectivity"
63
62
// It is changed to "pick_first" in init() if this balancer is to be
64
63
// registered as the default pickfirst.
65
64
Name = "pick_first_leaf"
66
-
67
- // enableHealthListenerValue is the resolver state attribute value used to
68
- // enable pickfirst to listen for health updates when operating under a
69
- // petiole policy.
70
- enableHealthListenerValue = & struct {}{}
71
65
)
72
66
73
67
const (
@@ -118,8 +112,9 @@ func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalan
118
112
119
113
// EnableHealthListener updates the state to configure pickfirst for using a
120
114
// generic health listener.
121
- func EnableHealthListener (attrs * attributes.Attributes ) * attributes.Attributes {
122
- return attrs .WithValue (enableHealthListenerKeyType {}, enableHealthListenerValue )
115
+ func EnableHealthListener (state resolver.State ) resolver.State {
116
+ state .Attributes = state .Attributes .WithValue (enableHealthListenerKeyType {}, true )
117
+ return state
123
118
}
124
119
125
120
type pfConfig struct {
@@ -170,7 +165,10 @@ type pickfirstBalancer struct {
170
165
// The mutex is used to ensure synchronization of updates triggered
171
166
// from the idle picker and the already serialized resolver,
172
167
// SubConn state updates.
173
- mu sync.Mutex
168
+ mu sync.Mutex
169
+ // The raw connectivity state based on SubConn state updates and resolver
170
+ // updates, i.e. independent of SubConn health updates. It is tracked
171
+ // separately to support the sticky TF behaviour described in A62.
174
172
connectivityState connectivity.State
175
173
// State reported to the channel. It will be the health state when being
176
174
// used as a leaf policy and the connectivityState is READY.
@@ -227,7 +225,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
227
225
b .resolverErrorLocked (errors .New ("produced zero addresses" ))
228
226
return balancer .ErrBadResolverState
229
227
}
230
- b .healthCheckingEnabled = state .ResolverState .Attributes .Value (enableHealthListenerKeyType {}) == enableHealthListenerValue
228
+ b .healthCheckingEnabled = state .ResolverState .Attributes .Value (enableHealthListenerKeyType {}).( bool )
231
229
cfg , ok := state .BalancerConfig .(pfConfig )
232
230
if state .BalancerConfig != nil && ! ok {
233
231
return fmt .Errorf ("pickfirst: received illegal BalancerConfig (type %T): %v: %w" , state .BalancerConfig , state .BalancerConfig , balancer .ErrBadResolverState )
@@ -564,7 +562,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
564
562
// To prevent pickers from returning these obsolete SubConns, this logic
565
563
// is included to check if the current list of active SubConns includes this
566
564
// SubConn.
567
- if activeSD , found := b . subConns . Get (sd . addr ); ! found || activeSD != sd {
565
+ if ! b . isActiveSCData (sd ) {
568
566
return
569
567
}
570
568
if newState .ConnectivityState == connectivity .Shutdown {
@@ -706,14 +704,19 @@ func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) {
706
704
}
707
705
}
708
706
707
+ func (b * pickfirstBalancer ) isActiveSCData (sd * scData ) bool {
708
+ activeSD , found := b .subConns .Get (sd .addr )
709
+ return found && activeSD == sd
710
+ }
711
+
709
712
func (b * pickfirstBalancer ) updateSubConnHealthState (sd * scData , state balancer.SubConnState ) {
710
713
b .mu .Lock ()
711
714
defer b .mu .Unlock ()
712
715
// Previously relevant SubConns can still callback with state updates.
713
716
// To prevent pickers from returning these obsolete SubConns, this logic
714
717
// is included to check if the current list of active SubConns includes
715
718
// this SubConn.
716
- if activeSD , found := b . subConns . Get (sd . addr ); ! found || activeSD != sd {
719
+ if ! b . isActiveSCData (sd ) {
717
720
return
718
721
}
719
722
switch state .ConnectivityState {
@@ -725,7 +728,7 @@ func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.
725
728
case connectivity .TransientFailure :
726
729
b .updateConcludedStateLocked (balancer.State {
727
730
ConnectivityState : connectivity .TransientFailure ,
728
- Picker : & picker {err : fmt .Errorf ("health check failure: %v" , state .ConnectionError )},
731
+ Picker : & picker {err : fmt .Errorf ("pickfirst: health check failure: %v" , state .ConnectionError )},
729
732
})
730
733
case connectivity .Connecting :
731
734
b .updateConcludedStateLocked (balancer.State {
@@ -737,18 +740,22 @@ func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.
737
740
}
738
741
}
739
742
743
+ // updateConcludedStateLocked stores the state reported to the channel and calls
744
+ // ClientConn.UpdateState(). As an optimization, it avoid sending duplicate
745
+ // updates to the channel for state CONNECTING.
740
746
func (b * pickfirstBalancer ) updateConcludedStateLocked (newState balancer.State ) {
741
- // Optimization to not send duplicate CONNECTING and IDLE updates.
747
+ // Optimization to not send duplicate CONNECTING updates.
742
748
if newState .ConnectivityState == b .concludedState && b .concludedState == connectivity .Connecting {
743
749
return
744
750
}
745
751
b .forceUpdateConcludedStateLocked (newState )
746
752
}
747
753
748
- // A separate function to force update the ClientConn state is required to send
749
- // the first CONNECTING update since the channel doesn't correctly assume that
750
- // LB policies start in CONNECTING and relies on LB policy to send an initial
751
- // CONNECTING update.
754
+ // forceUpdateConcludedStateLocked stores the state reported to the channel and
755
+ // calls ClientConn.UpdateState().
756
+ // A separate function is defined to force update the ClientConn state since the
757
+ // channel doesn't correctly assume that LB policies start in CONNECTING and
758
+ // relies on LB policy to send an initial CONNECTING update.
752
759
func (b * pickfirstBalancer ) forceUpdateConcludedStateLocked (newState balancer.State ) {
753
760
b .concludedState = newState .ConnectivityState
754
761
b .cc .UpdateState (newState )
0 commit comments