Skip to content

Commit

Permalink
health, grpc: Deliver health service updates through the health liste…
Browse files Browse the repository at this point in the history
…ner (#7900)
  • Loading branch information
arjan-bal authored Dec 12, 2024
1 parent c1b6b37 commit 38a8b9a
Show file tree
Hide file tree
Showing 4 changed files with 397 additions and 70 deletions.
73 changes: 68 additions & 5 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ import (
"google.golang.org/grpc/status"
)

var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
var (
setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
// noOpRegisterHealthListenerFn is used when client side health checking is
// disabled. It sends a single READY update on the registered listener.
noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() {
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
return func() {}
}
)

// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
Expand Down Expand Up @@ -277,10 +285,17 @@ type healthData struct {
// to the LB policy. This is stored to avoid sending updates when the
// SubConn has already exited connectivity state READY.
connectivityState connectivity.State
// closeHealthProducer stores function to close the ref counted health
// producer. The health producer is automatically closed when the SubConn
// state changes.
closeHealthProducer func()
}

func newHealthData(s connectivity.State) *healthData {
return &healthData{connectivityState: s}
return &healthData{
connectivityState: s,
closeHealthProducer: func() {},
}
}

// updateState is invoked by grpc to push a subConn state update to the
Expand Down Expand Up @@ -413,6 +428,37 @@ func (acbw *acBalancerWrapper) closeProducers() {
}
}

// healthProducerRegisterFn is a type alias for the health producer's function
// for registering listeners.
type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()

// healthListenerRegFn returns a function to register a listener for health
// updates. If client side health checks are disabled, the registered listener
// will get a single READY (raw connectivity state) update.
//
// Client side health checking is enabled when all the following
// conditions are satisfied:
// 1. Health checking is not disabled using the dial option.
// 2. The health package is imported.
// 3. The health check config is present in the service config.
func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() {
if acbw.ccb.cc.dopts.disableHealthCheck {
return noOpRegisterHealthListenerFn
}
regHealthLisFn := internal.RegisterClientHealthCheckListener
if regHealthLisFn == nil {
// The health package is not imported.
return noOpRegisterHealthListenerFn
}
cfg := acbw.ac.cc.healthCheckConfig()
if cfg == nil {
return noOpRegisterHealthListenerFn
}
return func(ctx context.Context, listener func(balancer.SubConnState)) func() {
return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener)
}
}

// RegisterHealthListener accepts a health listener from the LB policy. It sends
// updates to the health listener as long as the SubConn's connectivity state
// doesn't change and a new health listener is not registered. To invalidate
Expand All @@ -421,6 +467,7 @@ func (acbw *acBalancerWrapper) closeProducers() {
func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
acbw.healthData.closeHealthProducer()
// listeners should not be registered when the connectivity state
// isn't Ready. This may happen when the balancer registers a listener
// after the connectivityState is updated, but before it is notified
Expand All @@ -436,17 +483,33 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
return
}

registerFn := acbw.healthListenerRegFn()
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
// Don't send updates if a new listener is registered.
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
curHD := acbw.healthData
if curHD != hd {
if acbw.healthData != hd {
return
}
listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Serialize the health updates from the health producer with
// other calls into the LB policy.
listenerWrapper := func(scs balancer.SubConnState) {
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
if acbw.healthData != hd {
return
}
listener(scs)
})
}

hd.closeHealthProducer = registerFn(ctx, listenerWrapper)
})
}
106 changes: 106 additions & 0 deletions health/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package health

import (
"context"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/status"
)

func init() {
producerBuilderSingleton = &producerBuilder{}
internal.RegisterClientHealthCheckListener = registerClientSideHealthCheckListener
}

type producerBuilder struct{}

var producerBuilderSingleton *producerBuilder

// Build constructs and returns a producer and its cleanup function.
func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
p := &healthServiceProducer{
cc: cci.(grpc.ClientConnInterface),
cancel: func() {},
}
return p, func() {
p.mu.Lock()
defer p.mu.Unlock()
p.cancel()
}
}

type healthServiceProducer struct {
// The following fields are initialized at build time and read-only after
// that and therefore do not need to be guarded by a mutex.
cc grpc.ClientConnInterface

mu sync.Mutex
cancel func()
}

// registerClientSideHealthCheckListener accepts a listener to provide server
// health state via the health service.
func registerClientSideHealthCheckListener(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) func() {
pr, closeFn := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*healthServiceProducer)
p.mu.Lock()
defer p.mu.Unlock()
p.cancel()
if listener == nil {
return closeFn
}

ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel

go p.startHealthCheck(ctx, sc, serviceName, listener)
return closeFn
}

func (p *healthServiceProducer) startHealthCheck(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) {
newStream := func(method string) (any, error) {
return p.cc.NewStream(ctx, &grpc.StreamDesc{ServerStreams: true}, method)
}

setConnectivityState := func(state connectivity.State, err error) {
listener(balancer.SubConnState{
ConnectivityState: state,
ConnectionError: err,
})
}

// Call the function through the internal variable as tests use it for
// mocking.
err := internal.HealthCheckFunc(ctx, newStream, setConnectivityState, serviceName)
if err == nil {
return
}
if status.Code(err) == codes.Unimplemented {
logger.Errorf("Subchannel health check is unimplemented at server side, thus health check is disabled for SubConn %p", sc)
} else {
logger.Errorf("Health checking failed for SubConn %p: %v", sc, err)
}
}
4 changes: 4 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
var (
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc HealthChecker
// RegisterClientHealthCheckListener is used to provide a listener for
// updates from the client-side health checking service. It returns a
// function that can be called to stop the health producer.
RegisterClientHealthCheckListener any // func(ctx context.Context, sc balancer.SubConn, serviceName string, listener func(balancer.SubConnState)) func()
// BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string)
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
Expand Down
Loading

0 comments on commit 38a8b9a

Please sign in to comment.