Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kds): experimental event based watchdog #7624

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ jobs:

if [[ "<< parameters.deltaKDS >>" == true ]]; then
export KUMA_DELTA_KDS=true
export KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_ENABLED=true
fi

if [[ "<< parameters.target >>" == "" ]]; then
Expand Down
8 changes: 8 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,14 @@ experimental:
# The drawback is that you cannot use filtered out tags for traffic routing.
# If empty, no filter is applied.
ingressTagFilters: [] # ENV: KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS
# KDS event based watchdog settings. It is a more optimal way to generate KDS snapshot config.
kdsEventBasedWatchdog:
# If true, then experimental event based watchdog to generate KDS snapshot is used.
enabled: false # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_ENABLED
# How often we flush changes when experimental event based watchdog is used.
flushInterval: 5s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FLUSH_INTERVAL
# How often we schedule full KDS resync when experimental event based watchdog is used.
fullResyncInterval: 60s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL

proxy:
gateway:
Expand Down
8 changes: 8 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,14 @@ experimental:
# The drawback is that you cannot use filtered out tags for traffic routing.
# If empty, no filter is applied.
ingressTagFilters: [] # ENV: KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS
# KDS event based watchdog settings. It is a more optimal way to generate KDS snapshot config.
kdsEventBasedWatchdog:
# If true, then experimental event based watchdog to generate KDS snapshot is used.
enabled: false # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_ENABLED
# How often we flush changes when experimental event based watchdog is used.
flushInterval: 5s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FLUSH_INTERVAL
# How often we schedule full KDS resync when experimental event based watchdog is used.
fullResyncInterval: 60s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL

proxy:
gateway:
Expand Down
16 changes: 16 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ var DefaultConfig = func() Config {
KDSDeltaEnabled: false,
UseTagFirstVirtualOutboundModel: false,
IngressTagFilters: []string{},
KDSEventBasedWatchdog: ExperimentalKDSEventBasedWatchdog{
Enabled: false,
FlushInterval: config_types.Duration{Duration: 5 * time.Second},
FullResyncInterval: config_types.Duration{Duration: 1 * time.Minute},
},
},
Proxy: xds.DefaultProxyConfig(),
InterCp: intercp.DefaultInterCpConfig(),
Expand Down Expand Up @@ -388,6 +393,17 @@ type ExperimentalConfig struct {
// The drawback is that you cannot use filtered out tags for traffic routing.
// If empty, no filter is applied.
IngressTagFilters []string `json:"ingressTagFilters" envconfig:"KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS"`
// KDS event based watchdog settings. It is a more optimal way to generate KDS snapshot config.
KDSEventBasedWatchdog ExperimentalKDSEventBasedWatchdog `json:"kdsEventBasedWatchdog"`
}

type ExperimentalKDSEventBasedWatchdog struct {
// If true, then experimental event based watchdog to generate KDS snapshot is used.
Enabled bool `json:"enabled" envconfig:"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_ENABLED"`
// How often we flush changes when experimental event based watchdog is used.
FlushInterval config_types.Duration `json:"flushInterval" envconfig:"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FLUSH_INTERVAL"`
// How often we schedule full KDS resync when experimental event based watchdog is used.
FullResyncInterval config_types.Duration `json:"fullResyncInterval" envconfig:"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL"`
}

func (e ExperimentalConfig) Validate() error {
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,14 @@ experimental:
# The drawback is that you cannot use filtered out tags for traffic routing.
# If empty, no filter is applied.
ingressTagFilters: [] # ENV: KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS
# KDS event based watchdog settings. It is a more optimal way to generate KDS snapshot config.
kdsEventBasedWatchdog:
# If true, then experimental event based watchdog to generate KDS snapshot is used.
enabled: false # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_ENABLED
# How often we flush changes when experimental event based watchdog is used.
flushInterval: 5s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FLUSH_INTERVAL
# How often we schedule full KDS resync when experimental event based watchdog is used.
fullResyncInterval: 60s # ENV: KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL

proxy:
gateway:
Expand Down
10 changes: 10 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Experimental.KDSDeltaEnabled).To(BeTrue())
Expect(cfg.Experimental.UseTagFirstVirtualOutboundModel).To(BeFalse())
Expect(cfg.Experimental.IngressTagFilters).To(ContainElements("kuma.io/service"))
Expect(cfg.Experimental.KDSEventBasedWatchdog.Enabled).To(BeTrue())
Expect(cfg.Experimental.KDSEventBasedWatchdog.FlushInterval.Duration).To(Equal(10 * time.Second))
Expect(cfg.Experimental.KDSEventBasedWatchdog.FullResyncInterval.Duration).To(Equal(15 * time.Second))

Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1))
},
Expand Down Expand Up @@ -664,6 +667,10 @@ experimental:
kdsDeltaEnabled: true
useTagFirstVirtualOutboundModel: false
ingressTagFilters: ["kuma.io/service"]
kdsEventBasedWatchdog:
enabled: true
flushInterval: 10s
fullResyncInterval: 15s
proxy:
gateway:
globalDownstreamMaxConnections: 1
Expand Down Expand Up @@ -906,6 +913,9 @@ proxy:
"KUMA_EXPERIMENTAL_KUBE_OUTBOUNDS_AS_VIPS": "true",
"KUMA_EXPERIMENTAL_USE_TAG_FIRST_VIRTUAL_OUTBOUND_MODEL": "false",
"KUMA_EXPERIMENTAL_INGRESS_TAG_FILTERS": "kuma.io/service",
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_ENABLED": "true",
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FLUSH_INTERVAL": "10s",
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL": "15s",
"KUMA_PROXY_GATEWAY_GLOBAL_DOWNSTREAM_MAX_CONNECTIONS": "1",
"KUMA_TRACING_OPENTELEMETRY_ENDPOINT": "otel-collector:4317",
},
Expand Down
7 changes: 5 additions & 2 deletions pkg/kds/v2/reconcile/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import (

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"

"github.com/kumahq/kuma/pkg/core/resources/model"
cache_kds_v2 "github.com/kumahq/kuma/pkg/kds/v2/cache"
)

// Reconciler re-computes configuration for a given node.
type Reconciler interface {
Reconcile(context.Context, *envoy_core.Node) error
Reconcile(context.Context, *envoy_core.Node, map[model.ResourceType]struct{}) error
Clear(context.Context, *envoy_core.Node) error
}

// Generates a snapshot of xDS resources for a given node.
type SnapshotGenerator interface {
GenerateSnapshot(context.Context, *envoy_core.Node) (envoy_cache.ResourceSnapshot, error)
GenerateSnapshot(context.Context, *envoy_core.Node, cache_kds_v2.SnapshotBuilder, map[model.ResourceType]struct{}) (envoy_cache.ResourceSnapshot, error)
}
31 changes: 25 additions & 6 deletions pkg/kds/v2/reconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

config_core "github.com/kumahq/kuma/pkg/config/core"
Expand Down Expand Up @@ -64,19 +65,37 @@ func (r *reconciler) Clear(ctx context.Context, node *envoy_core.Node) error {
return nil
}

func (r *reconciler) Reconcile(ctx context.Context, node *envoy_core.Node) error {
new, err := r.generator.GenerateSnapshot(ctx, node)
func (r *reconciler) Reconcile(ctx context.Context, node *envoy_core.Node, changedTypes map[core_model.ResourceType]struct{}) error {
id, err := r.hashId(ctx, node)
if err != nil {
return err
}
if new == nil {
return errors.New("nil snapshot")
old, _ := r.cache.GetSnapshot(id)

// construct builder with unchanged types from the old snapshot
builder := cache_v2.NewSnapshotBuilder()
if old != nil {
for _, typ := range util_kds_v2.GetSupportedTypes() {
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
resType := core_model.ResourceType(typ)
if _, ok := changedTypes[resType]; ok {
continue
}

oldRes := old.GetResources(typ)
if len(oldRes) > 0 {
builder = builder.With(resType, maps.Values(oldRes))
}
}
}
id, err := r.hashId(ctx, node)

new, err := r.generator.GenerateSnapshot(ctx, node, builder, changedTypes)
if err != nil {
return err
}
old, _ := r.cache.GetSnapshot(id)
if new == nil {
return errors.New("nil snapshot")
}

new = r.Version(new, old)
r.logChanges(new, old, node)
r.meterConfigReadyForDelivery(new, old)
Expand Down
14 changes: 8 additions & 6 deletions pkg/kds/v2/reconcile/snapshot_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,27 @@ func Any(context.Context, string, kds.Features, model.Resource) bool {
return true
}

func NewSnapshotGenerator(resourceManager core_manager.ReadOnlyResourceManager, types []model.ResourceType, filter reconcile.ResourceFilter, mapper reconcile.ResourceMapper) SnapshotGenerator {
func NewSnapshotGenerator(resourceManager core_manager.ReadOnlyResourceManager, filter reconcile.ResourceFilter, mapper reconcile.ResourceMapper) SnapshotGenerator {
return &snapshotGenerator{
resourceManager: resourceManager,
resourceTypes: types,
resourceFilter: filter,
resourceMapper: mapper,
}
}

type snapshotGenerator struct {
resourceManager core_manager.ReadOnlyResourceManager
resourceTypes []model.ResourceType
resourceFilter reconcile.ResourceFilter
resourceMapper reconcile.ResourceMapper
}

func (s *snapshotGenerator) GenerateSnapshot(ctx context.Context, node *envoy_core.Node) (envoy_cache.ResourceSnapshot, error) {
builder := cache_kds_v2.NewSnapshotBuilder()
for _, typ := range s.resourceTypes {
func (s *snapshotGenerator) GenerateSnapshot(
ctx context.Context,
node *envoy_core.Node,
builder cache_kds_v2.SnapshotBuilder,
resTypes map[model.ResourceType]struct{},
) (envoy_cache.ResourceSnapshot, error) {
for typ := range resTypes {
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
resources, err := s.getResources(ctx, typ, node)
if err != nil {
return nil, err
Expand Down
36 changes: 32 additions & 4 deletions pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"

kuma_cp "github.com/kumahq/kuma/pkg/config/app/kuma-cp"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/model"
core_runtime "github.com/kumahq/kuma/pkg/core/runtime"
"github.com/kumahq/kuma/pkg/events"
"github.com/kumahq/kuma/pkg/kds/reconcile"
kds_server "github.com/kumahq/kuma/pkg/kds/server"
reconcile_v2 "github.com/kumahq/kuma/pkg/kds/v2/reconcile"
Expand All @@ -35,13 +37,13 @@ func New(
nackBackoff time.Duration,
) (Server, error) {
hasher, cache := newKDSContext(log)
generator := reconcile_v2.NewSnapshotGenerator(rt.ReadOnlyResourceManager(), providedTypes, filter, mapper)
generator := reconcile_v2.NewSnapshotGenerator(rt.ReadOnlyResourceManager(), filter, mapper)
statsCallbacks, err := util_xds.NewStatsCallbacks(rt.Metrics(), "kds_delta")
if err != nil {
return nil, err
}
reconciler := reconcile_v2.NewReconciler(hasher, cache, generator, rt.Config().Mode, statsCallbacks, rt.Tenants())
syncTracker, err := newSyncTracker(log, reconciler, refresh, rt.Metrics())
syncTracker, err := newSyncTracker(log, reconciler, refresh, rt.Metrics(), providedTypes, rt.EventBus(), rt.Config().Experimental.KDSEventBasedWatchdog)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -71,7 +73,15 @@ func DefaultStatusTracker(rt core_runtime.Runtime, log logr.Logger) StatusTracke
}, log)
}

func newSyncTracker(log logr.Logger, reconciler reconcile_v2.Reconciler, refresh time.Duration, metrics core_metrics.Metrics) (envoy_xds.Callbacks, error) {
func newSyncTracker(
log logr.Logger,
reconciler reconcile_v2.Reconciler,
refresh time.Duration,
metrics core_metrics.Metrics,
providedTypes []model.ResourceType,
eventBus events.EventBus,
experimentalWatchdogCfg kuma_cp.ExperimentalKDSEventBasedWatchdog,
) (envoy_xds.Callbacks, error) {
kdsGenerations := prometheus.NewSummary(prometheus.SummaryOpts{
Name: "kds_delta_generation",
Help: "Summary of KDS Snapshot generation",
Expand All @@ -87,8 +97,26 @@ func newSyncTracker(log logr.Logger, reconciler reconcile_v2.Reconciler, refresh
if err := metrics.Register(kdsGenerationsErrors); err != nil {
return nil, err
}
changedTypes := map[model.ResourceType]struct{}{}
for _, typ := range providedTypes {
changedTypes[typ] = struct{}{}
}
return util_xds_v3.NewWatchdogCallbacks(func(ctx context.Context, node *envoy_core.Node, streamID int64) (util_watchdog.Watchdog, error) {
log := log.WithValues("streamID", streamID, "node", node)
lukidzi marked this conversation as resolved.
Show resolved Hide resolved
if experimentalWatchdogCfg.Enabled {
return &EventBasedWatchdog{
Ctx: ctx,
Node: node,
Listener: eventBus.Subscribe(),
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
Reconciler: reconciler,
ProvidedTypes: changedTypes,
KdsGenerations: kdsGenerations,
KdsGenerationsErrors: kdsGenerationsErrors,
Log: log,
FlushInterval: experimentalWatchdogCfg.FlushInterval.Duration,
FullResyncInterval: experimentalWatchdogCfg.FullResyncInterval.Duration,
}, nil
}
return &util_watchdog.SimpleWatchdog{
NewTicker: func() *time.Ticker {
return time.NewTicker(refresh)
Expand All @@ -99,7 +127,7 @@ func newSyncTracker(log logr.Logger, reconciler reconcile_v2.Reconciler, refresh
kdsGenerations.Observe(float64(core.Now().Sub(start).Milliseconds()))
}()
log.V(1).Info("on tick")
return reconciler.Reconcile(ctx, node)
return reconciler.Reconcile(ctx, node, changedTypes)
},
OnError: func(err error) {
kdsGenerationsErrors.Inc()
Expand Down
83 changes: 83 additions & 0 deletions pkg/kds/v2/server/event_based_watchdog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package server

import (
"context"
"time"

envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"

"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/events"
"github.com/kumahq/kuma/pkg/kds/v2/reconcile"
"github.com/kumahq/kuma/pkg/multitenant"
util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog"
)

type EventBasedWatchdog struct {
Ctx context.Context
Node *envoy_core.Node
Listener events.Listener
Reconciler reconcile.Reconciler
ProvidedTypes map[model.ResourceType]struct{}
KdsGenerations prometheus.Summary
KdsGenerationsErrors prometheus.Counter
Log logr.Logger
FlushInterval time.Duration
FullResyncInterval time.Duration
}

var _ util_watchdog.Watchdog = &EventBasedWatchdog{}

func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
tenantID, _ := multitenant.TenantFromCtx(e.Ctx)
flushTicker := time.NewTicker(e.FlushInterval)
defer flushTicker.Stop()
fullResyncTicker := time.NewTicker(e.FullResyncInterval)
defer fullResyncTicker.Stop()

// for the first reconcile assign all types
changedTypes := e.ProvidedTypes

for {
select {
case <-stop:
if err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil {
e.Log.Error(err, "reconcile clear failed")
}
e.Listener.Close()
return
case <-flushTicker.C:
if len(changedTypes) == 0 {
continue
}
e.Log.V(1).Info("reconcile", "changedTypes", changedTypes)
start := core.Now()
if err := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes); err != nil {
e.Log.Error(err, "reconcile failed", "changedTypes", changedTypes)
e.KdsGenerationsErrors.Inc()
} else {
e.KdsGenerations.Observe(float64(core.Now().Sub(start).Milliseconds()))
changedTypes = map[model.ResourceType]struct{}{}
}
case <-fullResyncTicker.C:
e.Log.V(1).Info("schedule full resync")
changedTypes = e.ProvidedTypes
case event := <-e.Listener.Recv():
resChange, ok := event.(events.ResourceChangedEvent)
if !ok {
continue
}
if resChange.TenantID != tenantID {
continue
}
if _, ok := e.ProvidedTypes[resChange.Type]; !ok {
continue
}
e.Log.V(1).Info("schedule sync for type", "typ", resChange.Type)
changedTypes[resChange.Type] = struct{}{}
}
}
}
Loading