Skip to content

Commit d66fc3a

Browse files
authoredOct 29, 2024··
balancer/endpointsharding: Call ExitIdle() on child if child reports IDLE (#7782)
1 parent 2e3f547 commit d66fc3a

File tree

1 file changed

+21
-0
lines changed

1 file changed

+21
-0
lines changed
 

‎balancer/endpointsharding/endpointsharding.go

+21
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ type endpointSharding struct {
6666
cc balancer.ClientConn
6767
bOpts balancer.BuildOptions
6868

69+
childMu sync.Mutex // syncs balancer.Balancer calls into children
6970
children atomic.Pointer[resolver.EndpointMap]
71+
closed bool
7072

7173
// inhibitChildUpdates is set during UpdateClientConnState/ResolverError
7274
// calls (calls to children will each produce an update, only want one
@@ -83,6 +85,9 @@ type endpointSharding struct {
8385
// addresses it will ignore that endpoint. Otherwise, returns first error found
8486
// from a child, but fully processes the new update.
8587
func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
88+
es.childMu.Lock()
89+
defer es.childMu.Unlock()
90+
8691
es.inhibitChildUpdates.Store(true)
8792
defer func() {
8893
es.inhibitChildUpdates.Store(false)
@@ -145,6 +150,8 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
145150
// children and sends a single synchronous update of the childStates at the end
146151
// of the ResolverError operation.
147152
func (es *endpointSharding) ResolverError(err error) {
153+
es.childMu.Lock()
154+
defer es.childMu.Unlock()
148155
es.inhibitChildUpdates.Store(true)
149156
defer func() {
150157
es.inhibitChildUpdates.Store(false)
@@ -162,11 +169,14 @@ func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubCon
162169
}
163170

164171
func (es *endpointSharding) Close() {
172+
es.childMu.Lock()
173+
defer es.childMu.Unlock()
165174
children := es.children.Load()
166175
for _, child := range children.Values() {
167176
bal := child.(balancer.Balancer)
168177
bal.Close()
169178
}
179+
es.closed = true
170180
}
171181

172182
// updateState updates this component's state. It sends the aggregated state,
@@ -274,6 +284,17 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
274284
bw.es.mu.Lock()
275285
bw.childState.State = state
276286
bw.es.mu.Unlock()
287+
// When a child balancer says it's IDLE, ping it to exit idle and reconnect.
288+
// TODO: In the future, perhaps make this a knob in configuration.
289+
if ei, ok := bw.Balancer.(balancer.ExitIdler); state.ConnectivityState == connectivity.Idle && ok {
290+
go func() {
291+
bw.es.childMu.Lock()
292+
if !bw.es.closed {
293+
ei.ExitIdle()
294+
}
295+
bw.es.childMu.Unlock()
296+
}()
297+
}
277298
bw.es.updateState()
278299
}
279300

0 commit comments

Comments
 (0)
Please sign in to comment.