@@ -28,11 +28,11 @@ package endpointsharding
28
28
import (
29
29
"encoding/json"
30
30
"errors"
31
- "fmt"
32
- "math/rand"
33
31
"sync"
34
32
"sync/atomic"
35
33
34
+ rand "math/rand/v2"
35
+
36
36
"google.golang.org/grpc/balancer"
37
37
"google.golang.org/grpc/balancer/base"
38
38
"google.golang.org/grpc/connectivity"
@@ -66,7 +66,9 @@ type endpointSharding struct {
66
66
cc balancer.ClientConn
67
67
bOpts balancer.BuildOptions
68
68
69
+ childMu sync.Mutex // syncs balancer.Balancer calls into children
69
70
children atomic.Pointer [resolver.EndpointMap ]
71
+ closed bool
70
72
71
73
// inhibitChildUpdates is set during UpdateClientConnState/ResolverError
72
74
// calls (calls to children will each produce an update, only want one
@@ -80,19 +82,11 @@ type endpointSharding struct {
80
82
// for endpoints that are no longer present. It also updates all the children,
81
83
// and sends a single synchronous update of the childrens' aggregated state at
82
84
// the end of the UpdateClientConnState operation. If any endpoint has no
83
- // addresses, returns error without forwarding any updates . Otherwise returns
84
- // first error found from a child, but fully processes the new update.
85
+ // addresses it will ignore that endpoint . Otherwise, returns first error found
86
+ // from a child, but fully processes the new update.
85
87
func (es * endpointSharding ) UpdateClientConnState (state balancer.ClientConnState ) error {
86
- if len (state .ResolverState .Endpoints ) == 0 {
87
- return errors .New ("endpoints list is empty" )
88
- }
89
- // Check/return early if any endpoints have no addresses.
90
- // TODO: make this configurable if needed.
91
- for i , endpoint := range state .ResolverState .Endpoints {
92
- if len (endpoint .Addresses ) == 0 {
93
- return fmt .Errorf ("endpoint %d has empty addresses" , i )
94
- }
95
- }
88
+ es .childMu .Lock ()
89
+ defer es .childMu .Unlock ()
96
90
97
91
es .inhibitChildUpdates .Store (true )
98
92
defer func () {
@@ -106,6 +100,9 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
106
100
107
101
// Update/Create new children.
108
102
for _ , endpoint := range state .ResolverState .Endpoints {
103
+ if len (endpoint .Addresses ) == 0 {
104
+ continue
105
+ }
109
106
if _ , ok := newChildren .Get (endpoint ); ok {
110
107
// Endpoint child was already created, continue to avoid duplicate
111
108
// update.
@@ -153,6 +150,8 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
153
150
// children and sends a single synchronous update of the childStates at the end
154
151
// of the ResolverError operation.
155
152
func (es * endpointSharding ) ResolverError (err error ) {
153
+ es .childMu .Lock ()
154
+ defer es .childMu .Unlock ()
156
155
es .inhibitChildUpdates .Store (true )
157
156
defer func () {
158
157
es .inhibitChildUpdates .Store (false )
@@ -170,11 +169,14 @@ func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubCon
170
169
}
171
170
172
171
func (es * endpointSharding ) Close () {
172
+ es .childMu .Lock ()
173
+ defer es .childMu .Unlock ()
173
174
children := es .children .Load ()
174
175
for _ , child := range children .Values () {
175
176
bal := child .(balancer.Balancer )
176
177
bal .Close ()
177
178
}
179
+ es .closed = true
178
180
}
179
181
180
182
// updateState updates this component's state. It sends the aggregated state,
@@ -234,7 +236,7 @@ func (es *endpointSharding) updateState() {
234
236
p := & pickerWithChildStates {
235
237
pickers : pickers ,
236
238
childStates : childStates ,
237
- next : uint32 (rand .Intn (len (pickers ))),
239
+ next : uint32 (rand .IntN (len (pickers ))),
238
240
}
239
241
es .cc .UpdateState (balancer.State {
240
242
ConnectivityState : aggState ,
@@ -282,6 +284,17 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
282
284
bw .es .mu .Lock ()
283
285
bw .childState .State = state
284
286
bw .es .mu .Unlock ()
287
+ // When a child balancer says it's IDLE, ping it to exit idle and reconnect.
288
+ // TODO: In the future, perhaps make this a knob in configuration.
289
+ if ei , ok := bw .Balancer .(balancer.ExitIdler ); state .ConnectivityState == connectivity .Idle && ok {
290
+ go func () {
291
+ bw .es .childMu .Lock ()
292
+ if ! bw .es .closed {
293
+ ei .ExitIdle ()
294
+ }
295
+ bw .es .childMu .Unlock ()
296
+ }()
297
+ }
285
298
bw .es .updateState ()
286
299
}
287
300
0 commit comments