Skip to content

Commit ab9cd38

Browse files
committed
Review comments
1 parent 5d1d853 commit ab9cd38

File tree

5 files changed

+107
-104
lines changed

5 files changed

+107
-104
lines changed

balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1495,7 +1495,7 @@ func subConnAddresses(ctx context.Context, cc *testutils.BalancerClientConn, sub
14951495
return addresses, nil
14961496
}
14971497

1498-
// stateStoringBalancer stores the state of the subconns being created.
1498+
// stateStoringBalancer stores the state of the SubConns being created.
14991499
type stateStoringBalancer struct {
15001500
balancer.Balancer
15011501
mu sync.Mutex

balancer_wrapper.go

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -277,10 +277,17 @@ type healthData struct {
277277
// to the LB policy. This is stored to avoid sending updates when the
278278
// SubConn has already exited connectivity state READY.
279279
connectivityState connectivity.State
280+
// closeHealthProducer stores function to close the ref counted health
281+
// producer. The health producer is automatically closed when the SubConn
282+
// state changes.
283+
closeHealthProducer func()
280284
}
281285

282286
func newHealthData(s connectivity.State) *healthData {
283-
return &healthData{connectivityState: s}
287+
return &healthData{
288+
connectivityState: s,
289+
closeHealthProducer: func() {},
290+
}
284291
}
285292

286293
// updateState is invoked by grpc to push a subConn state update to the
@@ -420,10 +427,11 @@ func (acbw *acBalancerWrapper) closeProducers() {
420427
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
421428
// later release.
422429
type HealthCheckOptions struct {
423-
// Name of the gRPC service running on the server for reporting health state.
424-
// If the service name is empty, client side health checking will be disabled.
425-
HealthServiceName string
426-
Listener func(balancer.SubConnState)
430+
// ServiceName of the gRPC service running on the server for reporting health state.
431+
ServiceName string
432+
// Listener is called when the health update is received from the health
433+
// service running on the server.
434+
Listener func(balancer.SubConnState)
427435
}
428436

429437
// RegisterHealthListener accepts a health listener from the LB policy. It sends
@@ -434,6 +442,7 @@ type HealthCheckOptions struct {
434442
func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
435443
acbw.healthMu.Lock()
436444
defer acbw.healthMu.Unlock()
445+
acbw.healthData.closeHealthProducer()
437446
// listeners should not be registered when the connectivity state
438447
// isn't Ready. This may happen when the balancer registers a listener
439448
// after the connectivityState is updated, but before it is notified
@@ -445,14 +454,21 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
445454
// registered health listeners.
446455
hd := newHealthData(connectivity.Ready)
447456
acbw.healthData = hd
457+
if listener == nil {
458+
return
459+
}
448460

449461
// Client side health checking is enabled when all the following
450462
// conditions are satisfied:
451-
// 1. The health check config is present in the service config.
452-
// 2. Health checking is not disabled using the dial option.
453-
// 3. The health package is imported.
463+
// 1. Health checking is not disabled using the dial option.
464+
// 2. The health package is imported.
465+
// 3. The health check config is present in the service config.
466+
healthCheckEnabled := !acbw.ccb.cc.dopts.disableHealthCheck
454467
regHealthLisFn := internal.RegisterClientHealthCheckListener
455-
healthCheckEnabled := !acbw.ccb.cc.dopts.disableHealthCheck && regHealthLisFn != nil
468+
if regHealthLisFn == nil {
469+
// The health package is not imported.
470+
healthCheckEnabled = false
471+
}
456472
var cfg *healthCheckConfig
457473
if healthCheckEnabled {
458474
// Avoid acquiring cc.mu unless necessary.
@@ -467,14 +483,11 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
467483
// Don't send updates if a new listener is registered.
468484
acbw.healthMu.Lock()
469485
defer acbw.healthMu.Unlock()
470-
curHD := acbw.healthData
471-
if curHD != hd {
486+
if acbw.healthData != hd {
472487
return
473488
}
474489
if !healthCheckEnabled {
475-
if listener != nil {
476-
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
477-
}
490+
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
478491
return
479492
}
480493
// Serialize the health updates from the health producer with
@@ -485,23 +498,19 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
485498
return
486499
}
487500
acbw.healthMu.Lock()
488-
curHD := acbw.healthData
489-
acbw.healthMu.Unlock()
490-
if curHD != hd {
501+
defer acbw.healthMu.Unlock()
502+
if acbw.healthData != hd {
491503
return
492504
}
493505
listener(scs)
494506
})
495507
}
496508

497-
if listener == nil {
498-
listenerWrapper = nil
499-
}
500-
501509
healthOpts := HealthCheckOptions{
502-
HealthServiceName: cfg.ServiceName,
503-
Listener: listenerWrapper,
510+
ServiceName: cfg.ServiceName,
511+
Listener: listenerWrapper,
504512
}
505-
regHealthLisFn.(func(context.Context, balancer.SubConn, HealthCheckOptions))(ctx, acbw, healthOpts)
513+
fn := regHealthLisFn.(func(context.Context, balancer.SubConn, HealthCheckOptions) func())
514+
hd.closeHealthProducer = fn(ctx, acbw, healthOpts)
506515
})
507516
}

health/producer.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333

3434
func init() {
3535
producerBuilderSingleton = &producerBuilder{}
36-
internal.RegisterClientHealthCheckListener = RegisterClientSideHealthCheckListener
36+
internal.RegisterClientHealthCheckListener = registerClientSideHealthCheckListener
3737
}
3838

3939
type producerBuilder struct{}
@@ -68,34 +68,29 @@ type healthServiceProducer struct {
6868
cancelDone chan (struct{})
6969
}
7070

71-
// RegisterClientSideHealthCheckListener accepts a listener to provide server
71+
// registerClientSideHealthCheckListener accepts a listener to provide server
7272
// health state via the health service.
73-
//
74-
// # Experimental
75-
//
76-
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
77-
// later release.
78-
func RegisterClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions) {
79-
pr, _ := sc.GetOrBuildProducer(producerBuilderSingleton)
73+
func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions) func() {
74+
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
8075
p := pr.(*healthServiceProducer)
8176
p.mu.Lock()
8277
defer p.mu.Unlock()
8378
p.cancel()
8479
<-p.cancelDone
8580
if opts.Listener == nil {
86-
return
81+
return closeFn
8782
}
8883

8984
p.cancelDone = make(chan struct{})
9085
ctx, cancel := context.WithCancel(ctx)
9186
p.cancel = cancel
9287

9388
go p.startHealthCheck(ctx, sc, opts, p.cancelDone)
89+
return closeFn
9490
}
9591

9692
func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, opts grpc.HealthCheckOptions, closeCh chan struct{}) {
9793
defer close(closeCh)
98-
serviceName := opts.HealthServiceName
9994
newStream := func(method string) (any, error) {
10095
return p.cc.NewStream(ctx, &grpc.StreamDesc{ServerStreams: true}, method)
10196
}
@@ -109,7 +104,7 @@ func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balance
109104

110105
// Call the function through the internal variable as tests use it for
111106
// mocking.
112-
err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, serviceName)
107+
err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, opts.ServiceName)
113108
if err == nil {
114109
return
115110
}

internal/internal.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ var (
3232
// HealthCheckFunc is used to provide client-side LB channel health checking
3333
HealthCheckFunc HealthChecker
3434
// RegisterClientHealthCheckListener is used to provide a listener for
35-
// updates from the client-side health checking service.
36-
RegisterClientHealthCheckListener any // func(context.Context, balancer.SubConn, grpc.HealthCheckOptions)
35+
// updates from the client-side health checking service. It returns a
36+
// function that can be called to stop the health producer.
37+
RegisterClientHealthCheckListener any // func(context.Context, balancer.SubConn, grpc.HealthCheckOptions) func()
3738
// BalancerUnregister is exported by package balancer to unregister a balancer.
3839
BalancerUnregister func(name string)
3940
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by

0 commit comments

Comments
 (0)