Skip to content

Commit

Permalink
chore(revert): revert "feat(kuma-cp): introduce zone health checks" (#…
Browse files Browse the repository at this point in the history
…8017)

Revert "feat(kuma-cp): introduce zone health checks (#7821)"

This reverts commit baa72b6.

---

Signed-off-by: slonka <slonka@users.noreply.github.com>
  • Loading branch information
slonka authored Oct 11, 2023
1 parent a1d088d commit 386ab53
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 720 deletions.
282 changes: 131 additions & 151 deletions api/mesh/v1alpha1/kds.pb.go

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions api/mesh/v1alpha1/kds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1";

import "envoy/service/discovery/v3/discovery.proto";
import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";

service KumaDiscoveryService {
rpc StreamKumaResources(stream envoy.service.discovery.v3.DiscoveryRequest)
Expand All @@ -23,12 +22,10 @@ message KumaResource {
google.protobuf.Any spec = 2;
}

message ZoneHealthCheckRequest {}
message ZoneHealthCheckRequest {
}

message ZoneHealthCheckResponse {
// The the interval that the global control plane
// expects between health check pings
google.protobuf.Duration interval = 1;
}

service GlobalKDSService {
Expand Down
8 changes: 0 additions & 8 deletions docs/generated/raw/protos/ZoneHealthCheckResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,6 @@
"$ref": "#/definitions/ZoneHealthCheckResponse",
"definitions": {
"ZoneHealthCheckResponse": {
"properties": {
"interval": {
"pattern": "^([0-9]+\\.?[0-9]*|\\.[0-9]+)s$",
"type": "string",
"description": "The the interval that the global control plane expects between health check pings",
"format": "regex"
}
},
"additionalProperties": true,
"type": "object",
"title": "Zone Health Check Response"
Expand Down
7 changes: 0 additions & 7 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second))
Expect(cfg.Multizone.Global.KDS.DisableSOTW).To(BeTrue())
Expect(cfg.Multizone.Global.KDS.ResponseBackoff.Duration).To(Equal(time.Second))
Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration).To(Equal(11 * time.Second))
Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration).To(Equal(110 * time.Second))
Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685"))
Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1"))
Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa"))
Expand Down Expand Up @@ -570,9 +568,6 @@ multizone:
nackBackoff: 11s
responseBackoff: 1s
disableSOTW: true
zoneHealthCheck:
pollInterval: 11s
timeout: 110s
zone:
globalAddress: "grpc://1.1.1.1:5685"
name: "zone-1"
Expand Down Expand Up @@ -872,8 +867,6 @@ tracing:
"KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s",
"KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF": "1s",
"KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW": "true",
"KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_POLL_INTERVAL": "11s",
"KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_TIMEOUT": "110s",
"KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685",
"KUMA_MULTIZONE_ZONE_NAME": "zone-1",
"KUMA_MULTIZONE_ZONE_KDS_ROOT_CA_FILE": "/rootCa",
Expand Down
24 changes: 0 additions & 24 deletions pkg/config/multizone/kds.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ type KdsServerConfig struct {
// ResponseBackoff is a time Global CP waits before sending ACK/NACK.
// This is a way to slow down Zone CP from sending resources too often.
ResponseBackoff config_types.Duration `json:"responseBackoff" envconfig:"kuma_multizone_global_kds_response_backoff"`
// ZoneHealthCheck holds config for ensuring zones are online
ZoneHealthCheck ZoneHealthCheckConfig `json:"zoneHealthCheck"`
}

var _ config.Config = &KdsServerConfig{}
Expand Down Expand Up @@ -75,9 +73,6 @@ func (c *KdsServerConfig) Validate() error {
if _, err := config_types.TLSCiphers(c.TlsCipherSuites); err != nil {
errs = multierr.Append(errs, errors.New(".TlsCipherSuites"+err.Error()))
}
if err := c.ZoneHealthCheck.Validate(); err != nil {
errs = multierr.Append(errs, errors.Wrap(err, "invalid zoneHealthCheck config"))
}
return errs
}

Expand Down Expand Up @@ -109,22 +104,3 @@ func (k KdsClientConfig) Sanitize() {
func (k KdsClientConfig) Validate() error {
return nil
}

type ZoneHealthCheckConfig struct {
// PollInterval is the interval between the global CP checking ZoneInsight for
// health check pings and interval between zone CP sending health check pings
PollInterval config_types.Duration `json:"pollInterval" envconfig:"kuma_multizone_global_kds_zone_health_check_poll_interval"`
// Timeout is the time after the last health check that a zone counts as
// no longer online
Timeout config_types.Duration `json:"timeout" envconfig:"kuma_multizone_global_kds_zone_health_check_timeout"`
}

func (c ZoneHealthCheckConfig) Sanitize() {
}

func (c ZoneHealthCheckConfig) Validate() error {
if (c.Timeout.Duration > 0) != (c.PollInterval.Duration > 0) {
return errors.New("timeout and pollInterval must both be either set or unset")
}
return nil
}
13 changes: 0 additions & 13 deletions pkg/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,6 @@ type Listener interface {
Close()
}

func NewNeverListener() Listener {
return &neverRecvListener{}
}

type neverRecvListener struct{}

func (*neverRecvListener) Recv() <-chan Event {
return nil
}

func (*neverRecvListener) Close() {
}

type Predicate = func(event Event) bool

type Emitter interface {
Expand Down
6 changes: 0 additions & 6 deletions pkg/kds/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,5 @@ func (f Features) HasFeature(feature string) bool {
return f[feature]
}

const FeaturesMetadataKey string = "features"

// FeatureZoneToken means that the zone control plane can handle incoming Zone Token from global control plane.
const FeatureZoneToken string = "zone-token"

// FeatureZonePingHealth means that the zone control plane sends pings to the
// global control plane to indicate it's still running.
const FeatureZonePingHealth string = "zone-ping-health"
22 changes: 0 additions & 22 deletions pkg/kds/global/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand Down Expand Up @@ -160,24 +159,6 @@ func Setup(rt runtime.Runtime) error {
for _, filter := range rt.KDSContext().GlobalServerFiltersV2 {
streamInterceptors = append(streamInterceptors, filter)
}

if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) {
zwLog := kdsGlobalLog.WithName("zone-watch")
zw, err := mux.NewZoneWatch(
zwLog,
rt.Config().Multizone.Global.KDS.ZoneHealthCheck,
rt.Metrics(),
rt.EventBus(),
rt.ReadOnlyResourceManager(),
rt.Extensions(),
)
if err != nil {
return errors.Wrap(err, "couldn't create ZoneWatch")
}
if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil {
return err
}
}
return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer(
onSessionStarted,
rt.KDSContext().GlobalServerFilters,
Expand All @@ -191,15 +172,12 @@ func Setup(rt runtime.Runtime) error {
streamInterceptors,
rt.Extensions(),
rt.Config().Store.Upsert,
rt.EventBus(),
rt.Config().Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration,
),
mux.NewKDSSyncServiceServer(
onGlobalToZoneSyncConnect,
onZoneToGlobalSyncConnect,
rt.KDSContext().GlobalServerFiltersV2,
rt.Extensions(),
rt.EventBus(),
),
)))
}
Expand Down
45 changes: 0 additions & 45 deletions pkg/kds/mux/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/x509"
"net/url"
"os"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/registry"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/kds"
"github.com/kumahq/kuma/pkg/kds/service"
"github.com/kumahq/kuma/pkg/metrics"
"github.com/kumahq/kuma/pkg/version"
Expand Down Expand Up @@ -100,15 +98,12 @@ func (c *client) Start(stop <-chan struct{}) (errs error) {
withKDSCtx, cancel := context.WithCancel(metadata.AppendToOutgoingContext(c.ctx,
"client-id", c.clientID,
KDSVersionHeaderKey, KDSVersionV3,
kds.FeaturesMetadataKey, kds.FeatureZonePingHealth,
))
defer cancel()

log := muxClientLog.WithValues("client-id", c.clientID)
errorCh := make(chan error)

c.startHealthCheck(withKDSCtx, log, conn, stop, errorCh)

go c.startXDSConfigs(withKDSCtx, log, conn, stop, errorCh)
go c.startStats(withKDSCtx, log, conn, stop, errorCh)
go c.startClusters(withKDSCtx, log, conn, stop, errorCh)
Expand Down Expand Up @@ -287,46 +282,6 @@ func (c *client) startClusters(
c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh)
}

func (c *client) startHealthCheck(
ctx context.Context,
log logr.Logger,
conn *grpc.ClientConn,
stop <-chan struct{},
errorCh chan error,
) {
client := mesh_proto.NewGlobalKDSServiceClient(conn)
log = log.WithValues("rpc", "healthcheck")
log.Info("starting")

go func() {
prevInterval := 5 * time.Minute
ticker := time.NewTicker(prevInterval)
defer ticker.Stop()
for {
log.Info("sending health check")
resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{})
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(err, "health check failed")
errorCh <- errors.Wrap(err, "zone health check request failed")
} else if interval := resp.Interval.AsDuration(); interval > 0 {
if prevInterval != interval {
prevInterval = interval
log.Info("Global CP requested new healthcheck interval", "interval", interval)
}
ticker.Reset(interval)
}

select {
case <-ticker.C:
continue
case <-stop:
log.Info("stopping")
return
}
}
}()
}

func (c *client) handleProcessingErrors(
stream grpc.ClientStream,
log logr.Logger,
Expand Down
49 changes: 4 additions & 45 deletions pkg/kds/mux/zone_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,16 @@ package mux

import (
"context"
"slices"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/events"
"github.com/kumahq/kuma/pkg/kds"
"github.com/kumahq/kuma/pkg/kds/service"
"github.com/kumahq/kuma/pkg/kds/util"
"github.com/kumahq/kuma/pkg/log"
"github.com/kumahq/kuma/pkg/multitenant"
)

type FilterV2 interface {
Expand All @@ -44,7 +38,6 @@ type KDSSyncServiceServer struct {
zoneToGlobalCb OnZoneToGlobalSyncConnectFunc
filters []FilterV2
extensions context.Context
eventBus events.EventBus
mesh_proto.UnimplementedKDSSyncServiceServer
}

Expand All @@ -53,41 +46,32 @@ func NewKDSSyncServiceServer(
zoneToGlobalCb OnZoneToGlobalSyncConnectFunc,
filters []FilterV2,
extensions context.Context,
eventBus events.EventBus,
) *KDSSyncServiceServer {
return &KDSSyncServiceServer{
globalToZoneCb: globalToZoneCb,
zoneToGlobalCb: zoneToGlobalCb,
filters: filters,
extensions: extensions,
eventBus: eventBus,
}
}

var _ mesh_proto.KDSSyncServiceServer = &KDSSyncServiceServer{}

func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error {
logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions)
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
}
logger = logger.WithValues("clientID", zone)
logger = logger.WithValues("clientID", clientID)
for _, filter := range g.filters {
if err := filter.InterceptServerStream(stream); err != nil {
return errors.Wrap(err, "closing KDS stream following a callback error")
}
}

shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone)
defer shouldDisconnectStream.Close()

processingErrorsCh := make(chan error)
go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh)
select {
case <-shouldDisconnectStream.Recv():
logger.Info("ending stream, zone health check failed")
return nil
case <-stream.Context().Done():
logger.Info("GlobalToZoneSync rpc stream stopped")
return nil
Expand All @@ -102,26 +86,19 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService

func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error {
logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions)
zone, err := util.ClientIDFromIncomingCtx(stream.Context())
clientID, err := util.ClientIDFromIncomingCtx(stream.Context())
if err != nil {
return err
}
logger = logger.WithValues("clientID", zone)
logger = logger.WithValues("clientID", clientID)
for _, filter := range g.filters {
if err := filter.InterceptServerStream(stream); err != nil {
return errors.Wrap(err, "closing KDS stream following a callback error")
}
}

shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone)
defer shouldDisconnectStream.Close()

processingErrorsCh := make(chan error)
go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh)
select {
case <-shouldDisconnectStream.Recv():
logger.Info("ending stream, zone health check failed")
return nil
case <-stream.Context().Done():
logger.Info("ZoneToGlobalSync rpc stream stopped")
return nil
Expand All @@ -133,21 +110,3 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService
return status.Error(codes.Internal, "stream failed")
}
}

func (g *KDSSyncServiceServer) watchZoneHealthCheck(streamContext context.Context, zone string) events.Listener {
tenantID, _ := multitenant.TenantFromCtx(streamContext)
md, _ := metadata.FromIncomingContext(streamContext)

shouldDisconnectStream := events.NewNeverListener()

features := md.Get(kds.FeaturesMetadataKey)
if slices.Contains(features, kds.FeatureZonePingHealth) {
shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool {
disconnectEvent, ok := e.(service.ZoneWentOffline)
return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone
})
g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID})
}

return shouldDisconnectStream
}
Loading

0 comments on commit 386ab53

Please sign in to comment.