Skip to content

Commit

Permalink
feat(kuma-cp): introduce zone health checks (#7821)
Browse files Browse the repository at this point in the history
## Explanation

Client:
* Send health check periodically

Server:
* When we get a health check, mark it in the corresponding `ZoneInsight`
* Introduce new component `ZoneWatch` that listens for `ZoneOpenedStream` and marks the `(tenantID, zone)` for watching
* Periodically check all watched `(tenantID, zone)` and if the time of last health check in `ZoneInsight` is too late, send `ZoneWentOffline`
* All opened streams also listen for `ZoneWentOffline` events and the handler returns if said event is received, ending the stream

We store the info in `ZoneInsight` because:
All instances need to potentially kill streams but not every instance will receive a health check from connected zones

## Tests

The need for `time.Sleep` in the tests comes about because it happens asynchronously that:
1) `ZoneWatch` subscribes to `ZoneOpenedStream` events in `Start`
  in reality, this is guaranteed to happen before ZoneOpenedStream events are sent by the fact that we only send them in response to new gRPC streams being opened
2) `ZoneOpenedStream` is witnessed by `ZoneWatch`
  the test adds a `time.Sleep` because we only want to update the health check time once and in particular _after_ the zone starts being watched and then check that it's disconnected
  in reality, a zone will continually send its health check ping so it _eventually_ will be updated after the initial seen last time.

Signed-off-by: Mike Beaumont <mjboamail@gmail.com>
  • Loading branch information
michaelbeaumont authored Oct 10, 2023
1 parent 4c067a8 commit baa72b6
Show file tree
Hide file tree
Showing 13 changed files with 720 additions and 168 deletions.
282 changes: 151 additions & 131 deletions api/mesh/v1alpha1/kds.pb.go

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions api/mesh/v1alpha1/kds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1";

import "envoy/service/discovery/v3/discovery.proto";
import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";

service KumaDiscoveryService {
rpc StreamKumaResources(stream envoy.service.discovery.v3.DiscoveryRequest)
Expand All @@ -22,10 +23,12 @@ message KumaResource {
google.protobuf.Any spec = 2;
}

message ZoneHealthCheckRequest {
}
message ZoneHealthCheckRequest {}

message ZoneHealthCheckResponse {
// The the interval that the global control plane
// expects between health check pings
google.protobuf.Duration interval = 1;
}

service GlobalKDSService {
Expand Down
8 changes: 8 additions & 0 deletions docs/generated/raw/protos/ZoneHealthCheckResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
"$ref": "#/definitions/ZoneHealthCheckResponse",
"definitions": {
"ZoneHealthCheckResponse": {
"properties": {
"interval": {
"pattern": "^([0-9]+\\.?[0-9]*|\\.[0-9]+)s$",
"type": "string",
"description": "The the interval that the global control plane expects between health check pings",
"format": "regex"
}
},
"additionalProperties": true,
"type": "object",
"title": "Zone Health Check Response"
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second))
Expect(cfg.Multizone.Global.KDS.DisableSOTW).To(BeTrue())
Expect(cfg.Multizone.Global.KDS.ResponseBackoff.Duration).To(Equal(time.Second))
Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration).To(Equal(11 * time.Second))
Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration).To(Equal(110 * time.Second))
Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685"))
Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1"))
Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa"))
Expand Down Expand Up @@ -568,6 +570,9 @@ multizone:
nackBackoff: 11s
responseBackoff: 1s
disableSOTW: true
zoneHealthCheck:
pollInterval: 11s
timeout: 110s
zone:
globalAddress: "grpc://1.1.1.1:5685"
name: "zone-1"
Expand Down Expand Up @@ -867,6 +872,8 @@ tracing:
"KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s",
"KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF": "1s",
"KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW": "true",
"KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_POLL_INTERVAL": "11s",
"KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_TIMEOUT": "110s",
"KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685",
"KUMA_MULTIZONE_ZONE_NAME": "zone-1",
"KUMA_MULTIZONE_ZONE_KDS_ROOT_CA_FILE": "/rootCa",
Expand Down
24 changes: 24 additions & 0 deletions pkg/config/multizone/kds.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type KdsServerConfig struct {
// ResponseBackoff is a time Global CP waits before sending ACK/NACK.
// This is a way to slow down Zone CP from sending resources too often.
ResponseBackoff config_types.Duration `json:"responseBackoff" envconfig:"kuma_multizone_global_kds_response_backoff"`
// ZoneHealthCheck holds config for ensuring zones are online
ZoneHealthCheck ZoneHealthCheckConfig `json:"zoneHealthCheck"`
}

var _ config.Config = &KdsServerConfig{}
Expand Down Expand Up @@ -73,6 +75,9 @@ func (c *KdsServerConfig) Validate() error {
if _, err := config_types.TLSCiphers(c.TlsCipherSuites); err != nil {
errs = multierr.Append(errs, errors.New(".TlsCipherSuites"+err.Error()))
}
if err := c.ZoneHealthCheck.Validate(); err != nil {
errs = multierr.Append(errs, errors.Wrap(err, "invalid zoneHealthCheck config"))
}
return errs
}

Expand Down Expand Up @@ -104,3 +109,22 @@ func (k KdsClientConfig) Sanitize() {
func (k KdsClientConfig) Validate() error {
return nil
}

type ZoneHealthCheckConfig struct {
// PollInterval is the interval between the global CP checking ZoneInsight for
// health check pings and interval between zone CP sending health check pings
PollInterval config_types.Duration `json:"pollInterval" envconfig:"kuma_multizone_global_kds_zone_health_check_poll_interval"`
// Timeout is the time after the last health check that a zone counts as
// no longer online
Timeout config_types.Duration `json:"timeout" envconfig:"kuma_multizone_global_kds_zone_health_check_timeout"`
}

func (c ZoneHealthCheckConfig) Sanitize() {
}

func (c ZoneHealthCheckConfig) Validate() error {
if (c.Timeout.Duration > 0) != (c.PollInterval.Duration > 0) {
return errors.New("timeout and pollInterval must both be either set or unset")
}
return nil
}
13 changes: 13 additions & 0 deletions pkg/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ type Listener interface {
Close()
}

func NewNeverListener() Listener {
return &neverRecvListener{}
}

type neverRecvListener struct{}

func (*neverRecvListener) Recv() <-chan Event {
return nil
}

func (*neverRecvListener) Close() {
}

type Predicate = func(event Event) bool

type Emitter interface {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kds/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,11 @@ func (f Features) HasFeature(feature string) bool {
return f[feature]
}

const FeaturesMetadataKey string = "features"

// FeatureZoneToken means that the zone control plane can handle incoming Zone Token from global control plane.
const FeatureZoneToken string = "zone-token"

// FeatureZonePingHealth means that the zone control plane sends pings to the
// global control plane to indicate it's still running.
const FeatureZonePingHealth string = "zone-ping-health"
22 changes: 22 additions & 0 deletions pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand Down Expand Up @@ -159,6 +160,24 @@ func Setup(rt runtime.Runtime) error {
for _, filter := range rt.KDSContext().GlobalServerFiltersV2 {
streamInterceptors = append(streamInterceptors, filter)
}

if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) {
zwLog := kdsGlobalLog.WithName("zone-watch")
zw, err := mux.NewZoneWatch(
zwLog,
rt.Config().Multizone.Global.KDS.ZoneHealthCheck,
rt.Metrics(),
rt.EventBus(),
rt.ReadOnlyResourceManager(),
rt.Extensions(),
)
if err != nil {
return errors.Wrap(err, "couldn't create ZoneWatch")
}
if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil {
return err
}
}
return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer(
onSessionStarted,
rt.KDSContext().GlobalServerFilters,
Expand All @@ -172,12 +191,15 @@ func Setup(rt runtime.Runtime) error {
streamInterceptors,
rt.Extensions(),
rt.Config().Store.Upsert,
rt.EventBus(),
rt.Config().Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration,
),
mux.NewKDSSyncServiceServer(
onGlobalToZoneSyncConnect,
onZoneToGlobalSyncConnect,
rt.KDSContext().GlobalServerFiltersV2,
rt.Extensions(),
rt.EventBus(),
),
)))
}
Expand Down
45 changes: 45 additions & 0 deletions pkg/kds/mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/x509"
"net/url"
"os"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/registry"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/kds"
"github.com/kumahq/kuma/pkg/kds/service"
"github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/version"
Expand Down Expand Up @@ -98,12 +100,15 @@ func (c *client) Start(stop <-chan struct{}) (errs error) {
withKDSCtx, cancel := context.WithCancel(metadata.AppendToOutgoingContext(c.ctx,
"client-id", c.clientID,
KDSVersionHeaderKey, KDSVersionV3,
kds.FeaturesMetadataKey, kds.FeatureZonePingHealth,
))
defer cancel()

log := muxClientLog.WithValues("client-id", c.clientID)
errorCh := make(chan error)

c.startHealthCheck(withKDSCtx, log, conn, stop, errorCh)

go c.startXDSConfigs(withKDSCtx, log, conn, stop, errorCh)
go c.startStats(withKDSCtx, log, conn, stop, errorCh)
go c.startClusters(withKDSCtx, log, conn, stop, errorCh)
Expand Down Expand Up @@ -282,6 +287,46 @@ func (c *client) startClusters(
c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh)
}

func (c *client) startHealthCheck(
ctx context.Context,
log logr.Logger,
conn *grpc.ClientConn,
stop <-chan struct{},
errorCh chan error,
) {
client := mesh_proto.NewGlobalKDSServiceClient(conn)
log = log.WithValues("rpc", "healthcheck")
log.Info("starting")

go func() {
prevInterval := 5 * time.Minute
ticker := time.NewTicker(prevInterval)
defer ticker.Stop()
for {
log.Info("sending health check")
resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{})
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(err, "health check failed")
errorCh <- errors.Wrap(err, "zone health check request failed")
} else if interval := resp.Interval.AsDuration(); interval > 0 {
if prevInterval != interval {
prevInterval = interval
log.Info("Global CP requested new healthcheck interval", "interval", interval)
}
ticker.Reset(interval)
}

select {
case <-ticker.C:
continue
case <-stop:
log.Info("stopping")
return
}
}
}()
}

func (c *client) handleProcessingErrors(
stream grpc.ClientStream,
log logr.Logger,
Expand Down
49 changes: 45 additions & 4 deletions pkg/kds/mux/zone_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@ package mux

import (
"context"
"slices"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/events"
"github.com/kumahq/kuma/pkg/kds"
"github.com/kumahq/kuma/pkg/kds/service"
"github.com/kumahq/kuma/pkg/kds/util"
"github.com/kumahq/kuma/pkg/log"
"github.com/kumahq/kuma/pkg/multitenant"
)

type FilterV2 interface {
Expand All @@ -38,6 +44,7 @@ type KDSSyncServiceServer struct {
zoneToGlobalCb OnZoneToGlobalSyncConnectFunc
filters []FilterV2
extensions context.Context
eventBus events.EventBus
mesh_proto.UnimplementedKDSSyncServiceServer
}

Expand All @@ -46,32 +53,41 @@ func NewKDSSyncServiceServer(
zoneToGlobalCb OnZoneToGlobalSyncConnectFunc,
filters []FilterV2,
extensions context.Context,
eventBus events.EventBus,
) *KDSSyncServiceServer {
return &KDSSyncServiceServer{
globalToZoneCb: globalToZoneCb,
zoneToGlobalCb: zoneToGlobalCb,
filters: filters,
extensions: extensions,
eventBus: eventBus,
}
}

var _ mesh_proto.KDSSyncServiceServer = &KDSSyncServiceServer{}

func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error {
logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions)
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
}
logger = logger.WithValues("clientID", clientID)
logger = logger.WithValues("clientID", zone)
for _, filter := range g.filters {
if err := filter.InterceptServerStream(stream); err != nil {
return errors.Wrap(err, "closing KDS stream following a callback error")
}
}

shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone)
defer shouldDisconnectStream.Close()

processingErrorsCh := make(chan error)
go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh)
select {
case <-shouldDisconnectStream.Recv():
logger.Info("ending stream, zone health check failed")
return nil
case <-stream.Context().Done():
logger.Info("GlobalToZoneSync rpc stream stopped")
return nil
Expand All @@ -86,19 +102,26 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService

func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error {
logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions)
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
}
logger = logger.WithValues("clientID", clientID)
logger = logger.WithValues("clientID", zone)
for _, filter := range g.filters {
if err := filter.InterceptServerStream(stream); err != nil {
return errors.Wrap(err, "closing KDS stream following a callback error")
}
}

shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone)
defer shouldDisconnectStream.Close()

processingErrorsCh := make(chan error)
go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh)
select {
case <-shouldDisconnectStream.Recv():
logger.Info("ending stream, zone health check failed")
return nil
case <-stream.Context().Done():
logger.Info("ZoneToGlobalSync rpc stream stopped")
return nil
Expand All @@ -110,3 +133,21 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService
return status.Error(codes.Internal, "stream failed")
}
}

func (g *KDSSyncServiceServer) watchZoneHealthCheck(streamContext context.Context, zone string) events.Listener {
tenantID, _ := multitenant.TenantFromCtx(streamContext)
md, _ := metadata.FromIncomingContext(streamContext)

shouldDisconnectStream := events.NewNeverListener()

features := md.Get(kds.FeaturesMetadataKey)
if slices.Contains(features, kds.FeatureZonePingHealth) {
shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool {
disconnectEvent, ok := e.(service.ZoneWentOffline)
return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone
})
g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID})
}

return shouldDisconnectStream
}
Loading

0 comments on commit baa72b6

Please sign in to comment.