Skip to content

Commit 3a594f6

Browse files
committed
Update roundrobin to use endpointsharding
1 parent 724f450 commit 3a594f6

File tree

14 files changed

+717
-442
lines changed

14 files changed

+717
-442
lines changed

balancer/endpointsharding/endpointsharding_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
package endpointsharding
19+
package endpointsharding_test
2020

2121
import (
2222
"context"
@@ -28,6 +28,7 @@ import (
2828

2929
"google.golang.org/grpc"
3030
"google.golang.org/grpc/balancer"
31+
"google.golang.org/grpc/balancer/endpointsharding"
3132
"google.golang.org/grpc/credentials/insecure"
3233
"google.golang.org/grpc/grpclog"
3334
"google.golang.org/grpc/internal"
@@ -55,7 +56,7 @@ var logger = grpclog.Component("endpoint-sharding-test")
5556

5657
func init() {
5758
var err error
58-
gracefulSwitchPickFirst, err = ParseConfig(json.RawMessage(PickFirstConfig))
59+
gracefulSwitchPickFirst, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
5960
if err != nil {
6061
logger.Fatal(err)
6162
}
@@ -75,7 +76,7 @@ func (fakePetioleBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptio
7576
ClientConn: cc,
7677
bOpts: opts,
7778
}
78-
fp.Balancer = NewBalancer(fp, opts)
79+
fp.Balancer = endpointsharding.NewBalancer(fp, opts)
7980
return fp
8081
}
8182

@@ -105,7 +106,7 @@ func (fp *fakePetiole) UpdateClientConnState(state balancer.ClientConnState) err
105106
}
106107

107108
func (fp *fakePetiole) UpdateState(state balancer.State) {
108-
childStates := ChildStatesFromPicker(state.Picker)
109+
childStates := endpointsharding.ChildStatesFromPicker(state.Picker)
109110
// Both child states should be present in the child picker. States and
110111
// picker change over the lifecycle of test, but there should always be two.
111112
if len(childStates) != 2 {

balancer/pickfirst/pickfirstleaf/pickfirstleaf.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
618618
// Record a connection attempt when exiting CONNECTING.
619619
if newState.ConnectivityState == connectivity.TransientFailure {
620620
sd.connectionFailedInFirstPass = true
621+
sd.lastErr = newState.ConnectionError
621622
connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target)
622623
}
623624

@@ -702,7 +703,6 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
702703
})
703704
}
704705
case connectivity.TransientFailure:
705-
sd.lastErr = newState.ConnectionError
706706
sd.effectiveState = connectivity.TransientFailure
707707
// Since we're re-using common SubConns while handling resolver
708708
// updates, we could receive an out of turn TRANSIENT_FAILURE from
@@ -728,7 +728,6 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
728728
switch newState.ConnectivityState {
729729
case connectivity.TransientFailure:
730730
b.numTF = (b.numTF + 1) % b.subConns.Len()
731-
sd.lastErr = newState.ConnectionError
732731
if b.numTF%b.subConns.Len() == 0 {
733732
b.updateBalancerState(balancer.State{
734733
ConnectivityState: connectivity.TransientFailure,

balancer/roundrobin/roundrobin.go

+57-36
Original file line numberDiff line numberDiff line change
@@ -22,60 +22,81 @@
2222
package roundrobin
2323

2424
import (
25-
rand "math/rand/v2"
26-
"sync/atomic"
25+
"encoding/json"
26+
"fmt"
2727

2828
"google.golang.org/grpc/balancer"
29-
"google.golang.org/grpc/balancer/base"
29+
"google.golang.org/grpc/balancer/endpointsharding"
30+
"google.golang.org/grpc/balancer/pickfirst/pickfirstleaf"
3031
"google.golang.org/grpc/grpclog"
32+
internalgrpclog "google.golang.org/grpc/internal/grpclog"
33+
"google.golang.org/grpc/serviceconfig"
3134
)
3235

3336
// Name is the name of round_robin balancer.
3437
const Name = "round_robin"
3538

36-
var logger = grpclog.Component("roundrobin")
37-
38-
// newBuilder creates a new roundrobin balancer builder.
39-
func newBuilder() balancer.Builder {
40-
return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
41-
}
39+
var (
40+
logger = grpclog.Component("roundrobin")
41+
// endpointSharding which specifies pick first children.
42+
endpointShardingLBConfig serviceconfig.LoadBalancingConfig
43+
)
4244

4345
func init() {
44-
balancer.Register(newBuilder())
46+
var err error
47+
endpointShardingLBConfig, err = endpointsharding.ParseConfig(json.RawMessage(endpointsharding.PickFirstConfig))
48+
if err != nil {
49+
logger.Fatal(err)
50+
}
51+
balancer.Register(builder{})
4552
}
4653

47-
type rrPickerBuilder struct{}
54+
type builder struct{}
4855

49-
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
50-
logger.Infof("roundrobinPicker: Build called with info: %v", info)
51-
if len(info.ReadySCs) == 0 {
52-
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
53-
}
54-
scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
55-
for sc := range info.ReadySCs {
56-
scs = append(scs, sc)
56+
func (bb builder) Name() string {
57+
return Name
58+
}
59+
60+
func (bb builder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
61+
bal := &rrBalancer{
62+
cc: cc,
63+
child: endpointsharding.NewBalancer(cc, opts),
5764
}
58-
return &rrPicker{
59-
subConns: scs,
60-
// Start at a random index, as the same RR balancer rebuilds a new
61-
// picker when SubConn states change, and we don't want to apply excess
62-
// load to the first server in the list.
63-
next: uint32(rand.IntN(len(scs))),
65+
bal.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[%p] ", bal))
66+
bal.logger.Infof("Created")
67+
return bal
68+
}
69+
70+
type rrBalancer struct {
71+
cc balancer.ClientConn
72+
child balancer.Balancer
73+
logger *internalgrpclog.PrefixLogger
74+
}
75+
76+
func (b *rrBalancer) Close() {
77+
b.child.Close()
78+
}
79+
80+
func (b *rrBalancer) ExitIdle() {
81+
// Should always be ok, as child is endpoint sharding.
82+
if ei, ok := b.child.(balancer.ExitIdler); ok {
83+
ei.ExitIdle()
6484
}
6585
}
6686

67-
type rrPicker struct {
68-
// subConns is the snapshot of the roundrobin balancer when this picker was
69-
// created. The slice is immutable. Each Get() will do a round robin
70-
// selection from it and return the selected SubConn.
71-
subConns []balancer.SubConn
72-
next uint32
87+
func (b *rrBalancer) ResolverError(err error) {
88+
// Will cause inline picker update from endpoint sharding.
89+
b.child.ResolverError(err)
7390
}
7491

75-
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
76-
subConnsLen := uint32(len(p.subConns))
77-
nextIndex := atomic.AddUint32(&p.next, 1)
92+
func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
93+
// Enable the health listener in pickfirst children for client side health
94+
// checks and outlier detection, if configured.
95+
ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState)
96+
ccs.BalancerConfig = endpointShardingLBConfig
97+
return b.child.UpdateClientConnState(ccs)
98+
}
7899

79-
sc := p.subConns[nextIndex%subConnsLen]
80-
return balancer.PickResult{SubConn: sc}, nil
100+
func (b *rrBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
101+
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
81102
}

balancer/weightedroundrobin/balancer.go

+1
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ func (b *wrrBalancer) Close() {
405405
ew.stopORCAListener()
406406
}
407407
}
408+
b.child.Close()
408409
}
409410

410411
func (b *wrrBalancer) ExitIdle() {

0 commit comments

Comments
 (0)