diff --git a/api/generic/insights.go b/api/generic/insights.go index 5681bf1743a1..d78165a02c70 100644 --- a/api/generic/insights.go +++ b/api/generic/insights.go @@ -26,6 +26,7 @@ func GetSubscription[S Subscription, T interface{ GetSubscriptions() []S }](t T, type Insight interface { proto.Message IsOnline() bool + // GetLastSubscription returns that last subscription or nil if there are no subscriptions GetLastSubscription() Subscription GetSubscription(id string) Subscription AllSubscriptions() []Subscription diff --git a/pkg/core/resources/apis/mesh/proxy_type.go b/pkg/core/resources/apis/mesh/proxy_type.go index e56fbb52c1c8..f0fd6009ec23 100644 --- a/pkg/core/resources/apis/mesh/proxy_type.go +++ b/pkg/core/resources/apis/mesh/proxy_type.go @@ -15,6 +15,20 @@ func ProxyTypeFromResourceType(t core_model.ResourceType) (mesh_proto.ProxyType, return mesh_proto.IngressProxyType, nil case ZoneEgressType: return mesh_proto.EgressProxyType, nil + default: + return "", errors.Errorf("%s does not have a corresponding proxy type", t) + } +} + +func ResourceTypeDescriptorFromProxyType(proxyType mesh_proto.ProxyType) (core_model.ResourceTypeDescriptor, error) { + switch proxyType { + case mesh_proto.DataplaneProxyType: + return DataplaneResourceTypeDescriptor, nil + case mesh_proto.IngressProxyType: + return ZoneIngressResourceTypeDescriptor, nil + case mesh_proto.EgressProxyType: + return ZoneEgressResourceTypeDescriptor, nil + default: + return core_model.ResourceTypeDescriptor{}, errors.Errorf("%s does not have a corresponding resource type", proxyType) } - return "", errors.Errorf("%s does not have a corresponding proxy type", t) } diff --git a/pkg/core/resources/model/resource.go b/pkg/core/resources/model/resource.go index 4f8392ce3cbc..929ec002d675 100644 --- a/pkg/core/resources/model/resource.go +++ b/pkg/core/resources/model/resource.go @@ -253,28 +253,28 @@ func (d ResourceTypeDescriptor) HasInsights() bool { func (d ResourceTypeDescriptor) NewInsight() Resource { if !d.HasInsights() { - panic("No insight type precondition broken") + panic(fmt.Sprintf("No insight for type %s precondition broken", d.Name)) } return newObject(d.Insight) } func (d ResourceTypeDescriptor) NewInsightList() ResourceList { if !d.HasInsights() { - panic("No insight type precondition broken") + panic(fmt.Sprintf("No insight for type %s precondition broken", d.Name)) } return d.Insight.Descriptor().NewList() } func (d ResourceTypeDescriptor) NewOverview() Resource { if !d.HasInsights() { - panic("No insight type precondition broken") + panic(fmt.Sprintf("No insight for type %s precondition broken", d.Name)) } return newObject(d.Overview) } func (d ResourceTypeDescriptor) NewOverviewList() ResourceList { if !d.HasInsights() { - panic("No insight type precondition broken") + panic(fmt.Sprintf("No insight for type %s precondition broken", d.Name)) } return d.Overview.Descriptor().NewList() } diff --git a/pkg/util/xds/v3/watchdog_callbacks.go b/pkg/util/xds/v3/watchdog_callbacks.go index 748ddff3826a..f4242a55534d 100644 --- a/pkg/util/xds/v3/watchdog_callbacks.go +++ b/pkg/util/xds/v3/watchdog_callbacks.go @@ -14,6 +14,12 @@ type Watchdog interface { Start(ctx context.Context) } +type WatchdogFunc func(ctx context.Context) + +func (f WatchdogFunc) Start(ctx context.Context) { + f(ctx) +} + type NewNodeWatchdogFunc func(ctx context.Context, node *envoy_core.Node, streamId int64) (Watchdog, error) func NewWatchdogCallbacks(newNodeWatchdog NewNodeWatchdogFunc) envoy_xds.Callbacks { diff --git a/pkg/xds/server/callbacks/dataplane_callbacks.go b/pkg/xds/server/callbacks/dataplane_callbacks.go deleted file mode 100644 index f0c55a686ef1..000000000000 --- a/pkg/xds/server/callbacks/dataplane_callbacks.go +++ /dev/null @@ -1,156 +0,0 @@ -package callbacks - -import ( - "context" - "sync" - - "github.com/pkg/errors" - - core_model "github.com/kumahq/kuma/pkg/core/resources/model" - core_xds "github.com/kumahq/kuma/pkg/core/xds" - util_xds "github.com/kumahq/kuma/pkg/util/xds" -) - -// DataplaneCallbacks are XDS callbacks that keep the context of Kuma Dataplane. -// In the ideal world we could assume that one Dataplane has one xDS stream. -// Due to race network latencies etc. there might be a situation when one Dataplane has many xDS streams for the short period of time. -// Those callbacks helps us to deal with such situation. -// -// Keep in mind that it does not solve many xDS streams across many instances of the Control Plane. -// If there are many instances of the Control Plane and Dataplane reconnects, there might be an old stream -// in one instance of CP and a new stream in a new instance of CP. -type DataplaneCallbacks interface { - // OnProxyConnected is executed when proxy is connected after it was disconnected before. - OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error - // OnProxyReconnected is executed when proxy is already connected, but there is another stream. - // This can happen when there is a delay with closing the old connection from the proxy to the control plane. - OnProxyReconnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error - // OnProxyDisconnected is executed only when the last stream of the proxy disconnects. - OnProxyDisconnected(ctx context.Context, streamID core_xds.StreamID, dpKey core_model.ResourceKey) -} - -type xdsCallbacks struct { - callbacks DataplaneCallbacks - util_xds.NoopCallbacks - - sync.RWMutex - dpStreams map[core_xds.StreamID]dpStream - activeStreams map[core_model.ResourceKey]int -} - -func DataplaneCallbacksToXdsCallbacks(callbacks DataplaneCallbacks) util_xds.Callbacks { - return &xdsCallbacks{ - callbacks: callbacks, - dpStreams: map[core_xds.StreamID]dpStream{}, - activeStreams: map[core_model.ResourceKey]int{}, - } -} - -type dpStream struct { - dp *core_model.ResourceKey - ctx context.Context -} - -var _ util_xds.Callbacks = &xdsCallbacks{} - -func (d *xdsCallbacks) OnStreamClosed(streamID core_xds.StreamID) { - var lastStreamDpKey *core_model.ResourceKey - d.Lock() - dpStream := d.dpStreams[streamID] - if dpKey := dpStream.dp; dpKey != nil { - d.activeStreams[*dpKey]-- - if d.activeStreams[*dpKey] == 0 { - lastStreamDpKey = dpKey - delete(d.activeStreams, *dpKey) - } - } - delete(d.dpStreams, streamID) - d.Unlock() - if lastStreamDpKey != nil { - // execute callback after lock is freed, so heavy callback implementation won't block every callback for every DPP. - d.callbacks.OnProxyDisconnected(dpStream.ctx, streamID, *lastStreamDpKey) - } -} - -func (d *xdsCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_xds.DiscoveryRequest) error { - if request.NodeId() == "" { - // from https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-versioning: - // Only the first request on a stream is guaranteed to carry the node identifier. - // The subsequent discovery requests on the same stream may carry an empty node identifier. - // This holds true regardless of the acceptance of the discovery responses on the same stream. - // The node identifier should always be identical if present more than once on the stream. - // It is sufficient to only check the first message for the node identifier as a result. - return nil - } - - d.RLock() - alreadyProcessed := d.dpStreams[streamID].dp != nil - d.RUnlock() - if alreadyProcessed { - return nil - } - - proxyId, err := core_xds.ParseProxyIdFromString(request.NodeId()) - if err != nil { - return errors.Wrap(err, "invalid node ID") - } - dpKey := proxyId.ToResourceKey() - metadata := core_xds.DataplaneMetadataFromXdsMetadata(request.Metadata()) - if metadata == nil { - return errors.New("metadata in xDS Node cannot be nil") - } - - d.Lock() - // in case client will open 2 concurrent request for the same streamID then - // we don't to increment the counter twice, so checking once again that stream - // wasn't processed - alreadyProcessed = d.dpStreams[streamID].dp != nil - if alreadyProcessed { - return nil - } - - dpStream := d.dpStreams[streamID] - dpStream.dp = &dpKey - d.dpStreams[streamID] = dpStream - - activeStreams := d.activeStreams[dpKey] - d.activeStreams[dpKey]++ - d.Unlock() - - if activeStreams == 0 { - if err := d.callbacks.OnProxyConnected(streamID, dpKey, dpStream.ctx, *metadata); err != nil { - return err - } - } else { - if err := d.callbacks.OnProxyReconnected(streamID, dpKey, dpStream.ctx, *metadata); err != nil { - return err - } - } - return nil -} - -func (d *xdsCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error { - d.Lock() - defer d.Unlock() - dps := dpStream{ - ctx: ctx, - } - d.dpStreams[streamID] = dps - return nil -} - -// NoopDataplaneCallbacks are empty callbacks that helps to implement DataplaneCallbacks without need to implement every function. -type NoopDataplaneCallbacks struct{} - -func (n *NoopDataplaneCallbacks) OnProxyReconnected(core_xds.StreamID, core_model.ResourceKey, context.Context, core_xds.DataplaneMetadata) error { - return nil -} - -func (n *NoopDataplaneCallbacks) OnProxyConnected(core_xds.StreamID, core_model.ResourceKey, context.Context, core_xds.DataplaneMetadata) error { - return nil -} - -func (n *NoopDataplaneCallbacks) OnProxyDisconnected(context.Context, core_xds.StreamID, core_model.ResourceKey) { -} - -var _ DataplaneCallbacks = &NoopDataplaneCallbacks{} diff --git a/pkg/xds/server/callbacks/dataplane_callbacks_test.go b/pkg/xds/server/callbacks/dataplane_callbacks_test.go deleted file mode 100644 index 57ed4fadd50c..000000000000 --- a/pkg/xds/server/callbacks/dataplane_callbacks_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package callbacks_test - -import ( - "context" - - envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "google.golang.org/protobuf/types/known/structpb" - - core_model "github.com/kumahq/kuma/pkg/core/resources/model" - core_xds "github.com/kumahq/kuma/pkg/core/xds" - util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" - . "github.com/kumahq/kuma/pkg/xds/server/callbacks" -) - -type countingDpCallbacks struct { - OnProxyConnectedCounter int - OnProxyReconnectedCounter int - OnProxyDisconnectedCounter int -} - -func (c *countingDpCallbacks) OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error { - c.OnProxyConnectedCounter++ - return nil -} - -func (c *countingDpCallbacks) OnProxyReconnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, ctx context.Context, metadata core_xds.DataplaneMetadata) error { - c.OnProxyReconnectedCounter++ - return nil -} - -func (c *countingDpCallbacks) OnProxyDisconnected(ctx context.Context, streamID core_xds.StreamID, dpKey core_model.ResourceKey) { - c.OnProxyDisconnectedCounter++ -} - -var _ DataplaneCallbacks = &countingDpCallbacks{} - -var _ = Describe("Dataplane Callbacks", func() { - countingCallbacks := &countingDpCallbacks{} - callbacks := util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(countingCallbacks)) - - node := &envoy_core.Node{ - Id: "default.example", - Metadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "dataplane.token": { - Kind: &structpb.Value_StringValue{ - StringValue: "token", - }, - }, - }, - }, - } - req := envoy_sd.DiscoveryRequest{ - Node: node, - } - - It("should call DataplaneCallbacks correctly", func() { - // when only OnStreamOpen is called - err := callbacks.OnStreamOpen(context.Background(), 1, "") - Expect(err).ToNot(HaveOccurred()) - - // then OnProxyConnected and OnProxyReconnected is not yet called - Expect(countingCallbacks.OnProxyConnectedCounter).To(Equal(0)) - Expect(countingCallbacks.OnProxyReconnectedCounter).To(Equal(0)) - - // when OnStreamRequest is sent - err = callbacks.OnStreamRequest(1, &req) - Expect(err).ToNot(HaveOccurred()) - - // then only OnProxyConnected should be called - Expect(countingCallbacks.OnProxyConnectedCounter).To(Equal(1)) - Expect(countingCallbacks.OnProxyReconnectedCounter).To(Equal(0)) - - // when next OnStreamRequest on the same stream is sent - err = callbacks.OnStreamRequest(1, &req) - Expect(err).ToNot(HaveOccurred()) - - // then OnProxyReconnected and OnProxyReconnected are not called again, they should be only called on the first DiscoveryRequest - Expect(countingCallbacks.OnProxyConnectedCounter).To(Equal(1)) - Expect(countingCallbacks.OnProxyReconnectedCounter).To(Equal(0)) - - // when next stream for given data plane proxy is connected - err = callbacks.OnStreamOpen(context.Background(), 2, "") - Expect(err).ToNot(HaveOccurred()) - err = callbacks.OnStreamRequest(2, &req) - Expect(err).ToNot(HaveOccurred()) - - // then only OnProxyReconnected should be called - Expect(countingCallbacks.OnProxyConnectedCounter).To(Equal(1)) - Expect(countingCallbacks.OnProxyReconnectedCounter).To(Equal(1)) - - // when first stream is closed - callbacks.OnStreamClosed(1, node) - - // then OnProxyDisconnected should not yet be called - Expect(countingCallbacks.OnProxyDisconnectedCounter).To(Equal(0)) - - // when last stream is closed - callbacks.OnStreamClosed(2, node) - - // then OnProxyDisconnected should be called - Expect(countingCallbacks.OnProxyDisconnectedCounter).To(Equal(1)) - }) -}) diff --git a/pkg/xds/server/callbacks/dataplane_lifecycle.go b/pkg/xds/server/callbacks/dataplane_lifecycle.go index d38fe7b0d51f..ea621daaf050 100644 --- a/pkg/xds/server/callbacks/dataplane_lifecycle.go +++ b/pkg/xds/server/callbacks/dataplane_lifecycle.go @@ -2,7 +2,6 @@ package callbacks import ( "context" - "sync" "time" "github.com/go-logr/logr" @@ -10,207 +9,146 @@ import ( "github.com/kumahq/kuma/api/generic" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" - "github.com/kumahq/kuma/pkg/core" core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh" "github.com/kumahq/kuma/pkg/core/resources/manager" core_model "github.com/kumahq/kuma/pkg/core/resources/model" "github.com/kumahq/kuma/pkg/core/resources/store" core_xds "github.com/kumahq/kuma/pkg/core/xds" - "github.com/kumahq/kuma/pkg/util/maps" + "github.com/kumahq/kuma/pkg/util/channels" xds_auth "github.com/kumahq/kuma/pkg/xds/auth" ) -var lifecycleLog = core.Log.WithName("xds").WithName("dp-lifecycle") - -// DataplaneLifecycle is responsible for creating a deleting dataplanes that are passed through metadata -// There are two possible workflows -// 1) apply Dataplane resource before kuma-dp run and run kuma-dp +// There are two possible workflows for managing lifecycles of Dataplane resources +// 1) apply Dataplane resource before kuma-dp run and run kuma-dp (this is the case with Kubernetes for example) // 2) run kuma-dp and pass Dataplane resource as an argument to kuma-dp -// This component support second use case. When user passes Dataplane to kuma-dp it is attached to bootstrap request. + +// dataplaneLifecycle is responsible for creating and deleting dataplanes that are passed through metadata (option 2 above). +// When user passes Dataplane to kuma-dp it is attached to bootstrap request. // Then, bootstrap server generates bootstrap configuration with Dataplane embedded in Envoy metadata. -// Here, we read Dataplane resource from metadata and a create resource on first DiscoveryRequest and remove on StreamClosed. -// -// This flow is optional, you may still want to go with 1. an example of this is Kubernetes deployment. -type DataplaneLifecycle struct { +// Here, we read Dataplane resource from metadata if it exists and then create or delete the Dataplane resource. +// This is not thread safe as it's meant to be called from DataplaneSyncCallback, it only has exceptions for DP reconnecting to a different CP. +type dataplaneLifecycle struct { resManager manager.ResourceManager authenticator xds_auth.Authenticator - proxyInfos maps.Sync[core_model.ResourceKey, *proxyInfo] appCtx context.Context deregistrationDelay time.Duration cpInstanceID string cacheExpirationTime time.Duration + proxyKey core_model.ResourceKey + tDesc core_model.ResourceTypeDescriptor + registered bool } -type proxyInfo struct { - mtx sync.Mutex - proxyType mesh_proto.ProxyType - connected bool - deleted bool -} - -var _ DataplaneCallbacks = &DataplaneLifecycle{} - -func NewDataplaneLifecycle( +func DataplaneLifecycleFactory( appCtx context.Context, resManager manager.ResourceManager, authenticator xds_auth.Authenticator, deregistrationDelay time.Duration, cpInstanceID string, cacheExpirationTime time.Duration, -) *DataplaneLifecycle { - return &DataplaneLifecycle{ - resManager: resManager, - authenticator: authenticator, - proxyInfos: maps.Sync[core_model.ResourceKey, *proxyInfo]{}, - appCtx: appCtx, - deregistrationDelay: deregistrationDelay, - cpInstanceID: cpInstanceID, - cacheExpirationTime: cacheExpirationTime, - } -} - -func (d *DataplaneLifecycle) OnProxyConnected(streamID core_xds.StreamID, proxyKey core_model.ResourceKey, ctx context.Context, md core_xds.DataplaneMetadata) error { - if md.Resource == nil { - return nil - } - if err := d.validateProxyKey(proxyKey, md.Resource); err != nil { - return err - } - return d.register(ctx, streamID, proxyKey, md) +) DataplaneLifecycleManagerFactory { + return DataplaneLifecycleManagerFunc(func(proxyKey core_model.ResourceKey) DataplaneLifecycleManager { + return &dataplaneLifecycle{ + proxyKey: proxyKey, + resManager: resManager, + authenticator: authenticator, + appCtx: appCtx, + deregistrationDelay: deregistrationDelay, + cpInstanceID: cpInstanceID, + cacheExpirationTime: cacheExpirationTime, + } + }) } -func (d *DataplaneLifecycle) OnProxyReconnected(streamID core_xds.StreamID, proxyKey core_model.ResourceKey, ctx context.Context, md core_xds.DataplaneMetadata) error { +func (d *dataplaneLifecycle) Register( + log logr.Logger, + ctx context.Context, + md *core_xds.DataplaneMetadata, +) error { if md.Resource == nil { + // Noop when there's no resource in metadata return nil } - if err := d.validateProxyKey(proxyKey, md.Resource); err != nil { - return err - } - return d.register(ctx, streamID, proxyKey, md) -} - -func (d *DataplaneLifecycle) OnProxyDisconnected(ctx context.Context, streamID core_xds.StreamID, proxyKey core_model.ResourceKey) { - // OnStreamClosed method could be called either in case data plane proxy is down or - // Kuma CP is gracefully shutting down. If Kuma CP is gracefully shutting down we - // must not delete Dataplane resource, data plane proxy will be reconnected to another - // instance of Kuma CP. - select { - case <-d.appCtx.Done(): - lifecycleLog.Info("graceful shutdown, don't delete Dataplane resource") - return - default: - } - - d.deregister(ctx, streamID, proxyKey) -} - -func (d *DataplaneLifecycle) register( - ctx context.Context, - streamID core_xds.StreamID, - proxyKey core_model.ResourceKey, - md core_xds.DataplaneMetadata, -) error { - log := lifecycleLog. + d.registered = true + l := log.WithName("lifecycle"). WithValues("proxyType", md.GetProxyType()). - WithValues("proxyKey", proxyKey). - WithValues("streamID", streamID). WithValues("resource", md.Resource) - info, loaded := d.proxyInfos.LoadOrStore(proxyKey, &proxyInfo{ - proxyType: md.GetProxyType(), - }) - - info.mtx.Lock() - defer info.mtx.Unlock() - - if info.deleted { - // we took info object that was deleted from proxyInfo map by other goroutine, return err so DPP retry registration - return errors.Errorf("attempt to concurently register deleted DPP resource, needs retry") + tDesc, err := core_mesh.ResourceTypeDescriptorFromProxyType(md.GetProxyType()) + if err != nil { + return err } + d.tDesc = tDesc - log.Info("register proxy") + l.Info("register proxy") - err := manager.Upsert(ctx, d.resManager, core_model.MetaToResourceKey(md.Resource.GetMeta()), proxyResource(md.GetProxyType()), func(existing core_model.Resource) error { + err = manager.Upsert(ctx, d.resManager, core_model.MetaToResourceKey(md.Resource.GetMeta()), d.tDesc.NewObject(), func(existing core_model.Resource) error { if err := d.validateUpsert(ctx, md.Resource); err != nil { return errors.Wrap(err, "you are trying to override existing proxy to which you don't have an access.") } return existing.SetSpec(md.Resource.GetSpec()) }) if err != nil { - log.Info("cannot register proxy", "reason", err.Error()) - if !loaded { - info.deleted = true - d.proxyInfos.Delete(proxyKey) - } + l.Info("cannot register proxy", "reason", err.Error()) return errors.Wrap(err, "could not register proxy passed in kuma-dp run") } - // We should wait for Cache ExpirationTime to let MeshContext sync the latest data - // in which the latter DataplaneSyncTracker callback relies on it to generate the XDS configuration. - // This only happens on Universal Dataplane because the k8s Dataplane object is created by the controller. + // Because in this case we've just created the dataplane, the MeshContext cache is not up to date. + // So we block for the cache ExpirationTime to the MeshContext is refreshed before starting the + // dataplane watchdog which will generate the XDS config. + // See https://github.com/kumahq/kuma/pull/11180 + // This is not ideal but this cache is usually fairly short. time.Sleep(d.cacheExpirationTime) - info.connected = true return nil } -func (d *DataplaneLifecycle) deregister( +func (d *dataplaneLifecycle) Deregister( + log logr.Logger, ctx context.Context, - streamID core_xds.StreamID, - proxyKey core_model.ResourceKey, ) { - info, ok := d.proxyInfos.Load(proxyKey) - if !ok { - // proxy was not registered with this callback + if !d.registered { + // We never registered this so don't care return } - - info.mtx.Lock() - if info.deleted { - info.mtx.Unlock() + l := log.WithName("lifecycle"). + WithValues("proxyType", d.tDesc.Name) + // Deregister() is called in 2 cases: + // 1. connection between the DP to the CP is closed + // 2. Kuma CP is gracefully shutting down. + // This is for (2), in this case we must not delete the Dataplane resource because we assume the data plane proxy + // will reconnect to another instance of Kuma CP. + if channels.IsClosed(d.appCtx.Done()) { + l.Info("graceful shutdown, don't delete Dataplane resource") return } - info.connected = false - proxyType := info.proxyType - info.mtx.Unlock() - - log := lifecycleLog. - WithValues("proxyType", proxyType). - WithValues("proxyKey", proxyKey). - WithValues("streamID", streamID) - - // if delete immediately we're more likely to have a race condition - // when DPP is connected to another CP but proxy resource in the store is deleted - log.Info("waiting for deregister proxy", "waitFor", d.deregistrationDelay) + // we avoid deleting the dataplane resource immediately because if the DP is connected to another CP. + // a race could occur and we could end up deleting the resource used by the other CP + l.Info("waiting for deregister proxy", "waitFor", d.deregistrationDelay) <-time.After(d.deregistrationDelay) - info.mtx.Lock() - defer info.mtx.Unlock() - - if info.deleted { - return - } - - if info.connected { - log.Info("no need to deregister proxy. It has already connected to this instance") - return - } - - if connected, err := d.proxyConnectedToAnotherCP(ctx, proxyType, proxyKey, log); err != nil { - log.Error(err, "could not check if proxy connected to another CP") - return - } else if connected { + insight := d.tDesc.NewInsight() + err := d.resManager.Get(ctx, insight, store.GetBy(d.proxyKey)) + switch { + case store.IsResourceNotFound(err): + // If insight is missing it most likely means that it was not yet created, so DP just connected and now leaving the mesh. + l.Info("insight is missing. Safe to deregister the proxy") + case err != nil: + l.Error(err, "could not get insight to determine if we can delete proxy object") return + default: // There is an insight, let's check where it is connected to + sub := insight.GetSpec().(generic.Insight).GetLastSubscription() + if sub != nil && sub.(*mesh_proto.DiscoverySubscription).ControlPlaneInstanceId != d.cpInstanceID { + l.Info("no need to deregister proxy. It has already connected to another instance", "lastSub", sub) + return + } } - log.Info("deregister proxy") - if err := d.resManager.Delete(ctx, proxyResource(proxyType), store.DeleteBy(proxyKey)); err != nil { - log.Error(err, "could not unregister proxy") + l.Info("deregister proxy") + if err := d.resManager.Delete(ctx, d.tDesc.NewObject(), store.DeleteBy(d.proxyKey)); err != nil { + l.Error(err, "could not deregister proxy") } - - d.proxyInfos.Delete(proxyKey) - info.deleted = true } // validateUpsert checks if a new data plane proxy can replace the old one. @@ -222,7 +160,7 @@ func (d *DataplaneLifecycle) deregister( // What we do instead is that we use current data plane proxy credential to check if we can manage already registered Dataplane. // The assumption is that if you have a token that can manage the old Dataplane. // You can delete it and create a new one, so we can simplify this manual process by just replacing it. -func (d *DataplaneLifecycle) validateUpsert(ctx context.Context, existing core_model.Resource) error { +func (d *dataplaneLifecycle) validateUpsert(ctx context.Context, existing core_model.Resource) error { if core_model.IsEmpty(existing.GetSpec()) { // existing DP is empty, resource does not exist return nil } @@ -232,67 +170,3 @@ func (d *DataplaneLifecycle) validateUpsert(ctx context.Context, existing core_m } return d.authenticator.Authenticate(ctx, existing, credential) } - -func (d *DataplaneLifecycle) validateProxyKey(proxyKey core_model.ResourceKey, proxyResource core_model.Resource) error { - if core_model.MetaToResourceKey(proxyResource.GetMeta()) != proxyKey { - return errors.Errorf("proxyId %s does not match proxy resource %s", proxyKey, proxyResource.GetMeta()) - } - return nil -} - -func (d *DataplaneLifecycle) proxyConnectedToAnotherCP( - ctx context.Context, - pt mesh_proto.ProxyType, - key core_model.ResourceKey, - log logr.Logger, -) (bool, error) { - insight := proxyInsight(pt) - - err := d.resManager.Get(ctx, insight, store.GetBy(key)) - switch { - case store.IsResourceNotFound(err): - // If insight is missing it most likely means that it was not yet created, so DP just connected and now leaving the mesh. - log.Info("insight is missing. Safe to deregister the proxy") - return false, nil - case err != nil: - return false, errors.Wrap(err, "could not get insight to determine if we can delete proxy object") - } - - subs := insight.GetSpec().(generic.Insight).AllSubscriptions() - if len(subs) == 0 { - return false, nil - } - - if sub := subs[len(subs)-1].(*mesh_proto.DiscoverySubscription); sub.ControlPlaneInstanceId != d.cpInstanceID { - log.Info("no need to deregister proxy. It has already connected to another instance", "newCPInstanceID", sub.ControlPlaneInstanceId) - return true, nil - } - - return false, nil -} - -func proxyResource(pt mesh_proto.ProxyType) core_model.Resource { - switch pt { - case mesh_proto.DataplaneProxyType: - return core_mesh.NewDataplaneResource() - case mesh_proto.IngressProxyType: - return core_mesh.NewZoneIngressResource() - case mesh_proto.EgressProxyType: - return core_mesh.NewZoneEgressResource() - default: - return nil - } -} - -func proxyInsight(pt mesh_proto.ProxyType) core_model.Resource { - switch pt { - case mesh_proto.DataplaneProxyType: - return core_mesh.NewDataplaneInsightResource() - case mesh_proto.IngressProxyType: - return core_mesh.NewZoneIngressInsightResource() - case mesh_proto.EgressProxyType: - return core_mesh.NewZoneEgressInsightResource() - default: - return nil - } -} diff --git a/pkg/xds/server/callbacks/dataplane_lifecycle_test.go b/pkg/xds/server/callbacks/dataplane_lifecycle_test.go index bedcac622b79..bb5dca1e5ab1 100644 --- a/pkg/xds/server/callbacks/dataplane_lifecycle_test.go +++ b/pkg/xds/server/callbacks/dataplane_lifecycle_test.go @@ -53,8 +53,8 @@ var _ = Describe("Dataplane Lifecycle", func() { resManager = core_manager.NewResourceManager(store) ctx, cancel = context.WithCancel(context.Background()) - dpLifecycle := NewDataplaneLifecycle(ctx, resManager, authenticator, 0*time.Second, cpInstanceID, 0*time.Second) - callbacks = util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(dpLifecycle)) + dpLifecycle := DataplaneLifecycleFactory(ctx, resManager, authenticator, 0*time.Second, cpInstanceID, 0*time.Second) + callbacks = util_xds_v3.AdaptCallbacks(NewDataplaneSyncCallbacks(nil, dpLifecycle)) err := resManager.Create(context.Background(), core_mesh.NewMeshResource(), core_store.CreateByKey(core_model.DefaultMesh, core_model.NoMesh)) Expect(err).ToNot(HaveOccurred()) @@ -117,7 +117,7 @@ var _ = Describe("Dataplane Lifecycle", func() { Expect(core_store.IsResourceNotFound(err)).To(BeTrue()) }) - It("should not override extisting DP with different service", func() { + It("should not override existing DP with different service", func() { // given already created DP dp := &core_mesh.DataplaneResource{ Meta: &model.ResourceMeta{ @@ -221,11 +221,10 @@ var _ = Describe("Dataplane Lifecycle", func() { } const streamId = 123 - // when - err = callbacks.OnStreamRequest(streamId, &req) + Expect(callbacks.OnStreamOpen(ctx, streamId, "")).To(Succeed()) - // then - Expect(err).ToNot(HaveOccurred()) + // when + Expect(callbacks.OnStreamRequest(streamId, &req)).To(Succeed()) // when callbacks.OnStreamClosed(streamId, node) @@ -271,29 +270,6 @@ var _ = Describe("Dataplane Lifecycle", func() { Node: node, } - dp := &core_mesh.DataplaneResource{ - Meta: &model.ResourceMeta{ - Mesh: "default", - Name: "dp-01", - }, - Spec: &mesh_proto.Dataplane{ - Networking: &mesh_proto.Dataplane_Networking{ - Address: "192.168.0.1", - Inbound: []*mesh_proto.Dataplane_Networking_Inbound{ - { - Port: 8080, - ServicePort: 8081, - Tags: map[string]string{ - "kuma.io/service": "backend", - }, - }, - }, - }, - }, - } - err := resManager.Create(context.Background(), dp, core_store.CreateByKey("dp-01", "default")) - Expect(err).ToNot(HaveOccurred()) - const streamId = 123 // when @@ -302,7 +278,11 @@ var _ = Describe("Dataplane Lifecycle", func() { }) Expect(callbacks.OnStreamOpen(ctx, streamId, "")).To(Succeed()) - err = callbacks.OnStreamRequest(streamId, &req) + err := callbacks.OnStreamRequest(streamId, &req) + Expect(err).ToNot(HaveOccurred()) + + // dp should be created by the lifecycle manager + err = resManager.Get(context.Background(), core_mesh.NewDataplaneResource(), core_store.GetByKey("backend-01", "default")) Expect(err).ToNot(HaveOccurred()) cancel() diff --git a/pkg/xds/server/callbacks/dataplane_metadata_tracker.go b/pkg/xds/server/callbacks/dataplane_metadata_tracker.go deleted file mode 100644 index 87f11fd286d1..000000000000 --- a/pkg/xds/server/callbacks/dataplane_metadata_tracker.go +++ /dev/null @@ -1,50 +0,0 @@ -package callbacks - -import ( - "context" - "sync" - - core_model "github.com/kumahq/kuma/pkg/core/resources/model" - core_xds "github.com/kumahq/kuma/pkg/core/xds" -) - -type DataplaneMetadataTracker struct { - sync.RWMutex - metadataForDp map[core_model.ResourceKey]*core_xds.DataplaneMetadata -} - -var _ DataplaneCallbacks = &DataplaneMetadataTracker{} - -func NewDataplaneMetadataTracker() *DataplaneMetadataTracker { - return &DataplaneMetadataTracker{ - metadataForDp: map[core_model.ResourceKey]*core_xds.DataplaneMetadata{}, - } -} - -func (d *DataplaneMetadataTracker) Metadata(dpKey core_model.ResourceKey) *core_xds.DataplaneMetadata { - d.RLock() - defer d.RUnlock() - return d.metadataForDp[dpKey] -} - -func (d *DataplaneMetadataTracker) OnProxyReconnected(_ core_xds.StreamID, dpKey core_model.ResourceKey, _ context.Context, metadata core_xds.DataplaneMetadata) error { - d.storeMetadata(dpKey, metadata) - return nil -} - -func (d *DataplaneMetadataTracker) OnProxyConnected(_ core_xds.StreamID, dpKey core_model.ResourceKey, _ context.Context, metadata core_xds.DataplaneMetadata) error { - d.storeMetadata(dpKey, metadata) - return nil -} - -func (d *DataplaneMetadataTracker) storeMetadata(dpKey core_model.ResourceKey, metadata core_xds.DataplaneMetadata) { - d.Lock() - defer d.Unlock() - d.metadataForDp[dpKey] = &metadata -} - -func (d *DataplaneMetadataTracker) OnProxyDisconnected(_ context.Context, _ core_xds.StreamID, dpKey core_model.ResourceKey) { - d.Lock() - defer d.Unlock() - delete(d.metadataForDp, dpKey) -} diff --git a/pkg/xds/server/callbacks/dataplane_metadata_tracker_test.go b/pkg/xds/server/callbacks/dataplane_metadata_tracker_test.go deleted file mode 100644 index 0ebd02c7eb8e..000000000000 --- a/pkg/xds/server/callbacks/dataplane_metadata_tracker_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package callbacks_test - -import ( - envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "google.golang.org/protobuf/types/known/structpb" - - core_model "github.com/kumahq/kuma/pkg/core/resources/model" - util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" - . "github.com/kumahq/kuma/pkg/xds/server/callbacks" -) - -var _ = Describe("Dataplane Metadata Tracker", func() { - tracker := NewDataplaneMetadataTracker() - callbacks := util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(tracker)) - - dpKey := core_model.ResourceKey{ - Mesh: "default", - Name: "example", - } - node := &envoy_core.Node{ - Id: "default.example", - Metadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "dataplane.dns.port": { - Kind: &structpb.Value_StringValue{ - StringValue: "9090", - }, - }, - }, - }, - } - req := envoy_sd.DiscoveryRequest{ - Node: node, - } - const streamId = 123 - - It("should track metadata", func() { - // when - err := callbacks.OnStreamRequest(streamId, &req) - - // then - Expect(err).ToNot(HaveOccurred()) - - // when - metadata := tracker.Metadata(dpKey) - - // then - Expect(metadata.GetDNSPort()).To(Equal(uint32(9090))) - - // when - callbacks.OnStreamClosed(streamId, node) - - // then metadata should be deleted - metadata = tracker.Metadata(dpKey) - Expect(metadata).To(BeNil()) - }) - - It("should track metadata with empty Node in consecutive DiscoveryRequests", func() { - // when - err := callbacks.OnStreamRequest(streamId, &req) - - // then - Expect(err).ToNot(HaveOccurred()) - - // when - err = callbacks.OnStreamRequest(streamId, &envoy_sd.DiscoveryRequest{}) - - // then - Expect(err).ToNot(HaveOccurred()) - - // when - metadata := tracker.Metadata(dpKey) - - // then - Expect(metadata.GetDNSPort()).To(Equal(uint32(9090))) - }) -}) diff --git a/pkg/xds/server/callbacks/dataplane_sync_callbacks.go b/pkg/xds/server/callbacks/dataplane_sync_callbacks.go new file mode 100644 index 000000000000..dfddfd8dab9b --- /dev/null +++ b/pkg/xds/server/callbacks/dataplane_sync_callbacks.go @@ -0,0 +1,230 @@ +package callbacks + +import ( + "context" + "fmt" + "slices" + stdsync "sync" + "sync/atomic" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + + "github.com/kumahq/kuma/pkg/core" + core_model "github.com/kumahq/kuma/pkg/core/resources/model" + core_xds "github.com/kumahq/kuma/pkg/core/xds" + util_xds "github.com/kumahq/kuma/pkg/util/xds" + "github.com/kumahq/kuma/pkg/xds/sync" +) + +var dataplaneSyncLog = core.Log.WithName("xds").WithName("dataplane-sync") + +// NewDataplaneSyncCallbacks creates a callbacks for XDS that will follow the lifecycle of a Dataplane. +// It will ensure that one and only one watchdog and lifecycle manager is created for a given Dataplane. +func NewDataplaneSyncCallbacks( + dpWatchdogFactory sync.DataplaneWatchdogFactory, + lifecycleManagerFactory DataplaneLifecycleManagerFactory, +) util_xds.Callbacks { + return &dataplaneSyncCallbacks{ + dpWatchdogFactory: dpWatchdogFactory, + dpLifecycleManagerFactory: lifecycleManagerFactory, + proxyInfos: map[core_model.ResourceKey]*dpInfo{}, + activeStreams: map[core_xds.StreamID]*streamInfo{}, + } +} + +// DataplaneLifecycleManagerFactory is a factory for DataplaneLifecycleManager +type DataplaneLifecycleManagerFactory interface { + New(key core_model.ResourceKey) DataplaneLifecycleManager +} + +type DataplaneLifecycleManagerFunc func(key core_model.ResourceKey) DataplaneLifecycleManager + +func (f DataplaneLifecycleManagerFunc) New(key core_model.ResourceKey) DataplaneLifecycleManager { + return f(key) +} + +// DataplaneLifecycleManager is responsible for managing the lifecycle of a Dataplane. +type DataplaneLifecycleManager interface { + Register(logr.Logger, context.Context, *core_xds.DataplaneMetadata) error + Deregister(logr.Logger, context.Context) +} + +// dpInfo holds all the state for a specific dataplane +type dpInfo struct { + stdsync.RWMutex + dpKey core_model.ResourceKey + // ctx is the context of the callbacks for a specific DP. + ctx context.Context + // cancels the ctx of any subtasks for the dataplane + cancelFunc context.CancelFunc + // done is closed when all subtasks are completed + done chan struct{} + // meta is the metadata of the dataplane + meta atomic.Pointer[core_xds.DataplaneMetadata] + lifecycleManager DataplaneLifecycleManager + // streams is a list of streamIDs that are associated with this dataplane they are ordered in the order they were opened + streams []core_xds.StreamID +} + +// dataplaneSyncCallbacks tracks XDS streams that are connected to the CP and fire up a watchdog. +// Watchdog should be run only once for given DP regardless of the number of streams. +// For ADS there is only one stream for DP. +// +// We keep +type dataplaneSyncCallbacks struct { + stdsync.RWMutex + + dpWatchdogFactory sync.DataplaneWatchdogFactory + dpLifecycleManagerFactory DataplaneLifecycleManagerFactory + + activeStreams map[core_xds.StreamID]*streamInfo + proxyInfos map[core_model.ResourceKey]*dpInfo +} + +type streamInfo struct { + ctx context.Context + proxyInfo *dpInfo +} + +func (t *dataplaneSyncCallbacks) OnStreamResponse(i int64, request util_xds.DiscoveryRequest, response util_xds.DiscoveryResponse) { + // Noop +} + +func (t *dataplaneSyncCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error { + t.Lock() + defer t.Unlock() + dataplaneSyncLog.V(1).Info("stream is open", "streamID", streamID) + if _, found := t.activeStreams[streamID]; found { + return errors.Errorf("streamID %d is already tracked, we should never reopen a stream", streamID) + } + t.activeStreams[streamID] = &streamInfo{ctx: ctx, proxyInfo: nil} + return nil +} + +func (t *dataplaneSyncCallbacks) OnStreamRequest(streamID core_xds.StreamID, request util_xds.DiscoveryRequest) error { + if request.NodeId() == "" { + // from https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-versioning: + // Only the first request on a stream is guaranteed to carry the node identifier. + // The subsequent discovery requests on the same stream may carry an empty node identifier. + // This holds true regardless of the acceptance of the discovery responses on the same stream. + // The node identifier should always be identical if present more than once on the stream. + // It is sufficient to only check the first message for the node identifier as a result. + return nil + } + + t.RLock() + dataplaneSyncLog.V(1).Info("stream request", "streamID", streamID) + alreadyProcessed := t.activeStreams[streamID].proxyInfo != nil + t.RUnlock() + if alreadyProcessed { // We fast return if we already know that this streamID is tracking a specific dataplane + return nil + } + + proxyId, err := core_xds.ParseProxyIdFromString(request.NodeId()) + if err != nil { + return errors.Wrap(err, "invalid node ID") + } + dpKey := proxyId.ToResourceKey() + metadata := core_xds.DataplaneMetadataFromXdsMetadata(request.Metadata()) + if metadata == nil { + return errors.New("metadata in xDS Node cannot be nil") + } + if metadata.Resource != nil { + if core_model.MetaToResourceKey(metadata.Resource.GetMeta()) != dpKey { + return fmt.Errorf("proxyId %s does not match proxy resource %s", dpKey, metadata.Resource.GetMeta()) + } + } + l := dataplaneSyncLog.WithValues("dpKey", dpKey, "streamID", streamID) + t.Lock() + if t.activeStreams[streamID].proxyInfo != nil { + return nil // We fast return if we already know that this streamID is tracking a specific dataplane + } + pInfo := t.proxyInfos[dpKey] + if pInfo == nil { + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + pInfo = &dpInfo{ + dpKey: dpKey, + lifecycleManager: t.dpLifecycleManagerFactory.New(dpKey), + ctx: ctx, + done: done, + meta: atomic.Pointer[core_xds.DataplaneMetadata]{}, + cancelFunc: cancel, + } + t.proxyInfos[dpKey] = pInfo + } + t.activeStreams[streamID].proxyInfo = pInfo + ctx := t.activeStreams[streamID].ctx + pInfo.Lock() + defer pInfo.Unlock() + t.Unlock() + pInfo.streams = append(pInfo.streams, streamID) + pInfo.meta.Store(metadata) + if len(pInfo.streams) == 1 { + err := pInfo.lifecycleManager.Register(l, ctx, metadata) + if err != nil { + return err + } + + l.V(1).Info("starting watchdog") + go func() { + defer close(pInfo.done) + l.V(1).Info("watchdog started") + if t.dpWatchdogFactory != nil { + t.dpWatchdogFactory.New(dpKey, pInfo.meta.Load).Start(pInfo.ctx) + } + }() + } else { + l.V(1).Info("watchdog was started by a previous stream, not starting a new one", "activeStreams", pInfo.streams) + } + + return nil +} + +func (t *dataplaneSyncCallbacks) OnStreamClosed(streamID core_xds.StreamID) { + t.Lock() + l := dataplaneSyncLog.WithValues("streamID", streamID) + l.V(1).Info("stream closed") + sInfo := t.activeStreams[streamID] + if sInfo == nil { // Should never happen as it means a stream was closed without being opened + l.Error(errors.New("stream closed without being opened"), "stream closed without being opened") + t.Unlock() + return + } + if sInfo.proxyInfo == nil { // Never associated with a dataplane, no need to care + delete(t.activeStreams, streamID) + t.Unlock() + return + } + pInfo := sInfo.proxyInfo + l = dataplaneSyncLog.WithValues("dpKey", pInfo.dpKey) + pInfo.Lock() + t.Unlock() + streamIdx := slices.Index(pInfo.streams, streamID) + if streamIdx == -1 { + l.Info("streamID not found in the list of streams", "streams", pInfo.streams) + pInfo.Unlock() + return + } + if streamIdx != len(pInfo.streams)-1 { + l.V(1).Info("dpKey is associated with a newer stream", "streams", pInfo.streams) + pInfo.streams = append(pInfo.streams[:streamIdx], pInfo.streams[streamIdx+1:]...) // Remove streamID from the list + pInfo.Unlock() + return // We are not the last stream, we don't care + } + if sInfo.proxyInfo.cancelFunc != nil { + l.V(1).Info("stopping watchdog") + sInfo.proxyInfo.cancelFunc() + } + sInfo.proxyInfo.lifecycleManager.Deregister(l, sInfo.ctx) + if sInfo.proxyInfo.cancelFunc != nil { + <-sInfo.proxyInfo.done + l.V(1).Info("watchdog terminated") + } + pInfo.Unlock() + t.Lock() + delete(t.activeStreams, streamID) + delete(t.proxyInfos, pInfo.dpKey) + t.Unlock() +} diff --git a/pkg/xds/server/callbacks/dataplane_sync_callbacks_test.go b/pkg/xds/server/callbacks/dataplane_sync_callbacks_test.go new file mode 100644 index 000000000000..64bcfbe0b9ce --- /dev/null +++ b/pkg/xds/server/callbacks/dataplane_sync_callbacks_test.go @@ -0,0 +1,310 @@ +package callbacks_test + +import ( + "context" + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + core_model "github.com/kumahq/kuma/pkg/core/resources/model" + core_xds "github.com/kumahq/kuma/pkg/core/xds" + "github.com/kumahq/kuma/pkg/test" + util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" + . "github.com/kumahq/kuma/pkg/xds/server/callbacks" + xds_sync "github.com/kumahq/kuma/pkg/xds/sync" +) + +type dummyLifecycleManager struct{} + +func (c *dummyLifecycleManager) Register(logr.Logger, context.Context, *core_xds.DataplaneMetadata) error { + return nil +} + +func (c *dummyLifecycleManager) Deregister(logr.Logger, context.Context) { +} + +var _ = Describe("Sync", func() { + Describe("dataplaneSyncCallbacks", func() { + DescribeTable("when concurrent calls", MustPassRepeatedly(3), + func(run func(callbacks envoy_xds.Callbacks, req *envoy_sd.DiscoveryRequest)) { + wdCount := atomic.Int32{} + events := make(chan string, 1000) + stateFactory := xds_sync.DataplaneWatchdogFactoryFunc(func(key core_model.ResourceKey, fetchMeta func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog { + wdNum := wdCount.Add(1) + events <- fmt.Sprintf("create:%d", wdNum) + return util_xds_v3.WatchdogFunc(func(ctx context.Context) { + r := rand.New(rand.NewSource(GinkgoRandomSeed())) // #nosec G404 - math rand is enough + events <- fmt.Sprintf("start:%d", wdNum) + time.Sleep(time.Duration(r.Intn(200)) * time.Millisecond) + <-ctx.Done() + time.Sleep(time.Duration(r.Intn(100)) * time.Millisecond) + events <- fmt.Sprintf("stop:%d", wdNum) + }) + }) + tracker := NewDataplaneSyncCallbacks(stateFactory, DataplaneLifecycleManagerFunc(func(key core_model.ResourceKey) DataplaneLifecycleManager { + return &dummyLifecycleManager{} + })) + + n := &envoy_core.Node{Id: "demo.example"} + req := &envoy_sd.DiscoveryRequest{Node: n} + callbacks := util_xds_v3.AdaptCallbacks(tracker) + run(callbacks, req) + + time.Sleep(time.Second) + // Let's check events: + close(events) + var all []string + for v := range events { + all = append(all, v) + } + GinkgoWriter.Printf("Events: %v", strings.Join(all, ",")) + for i := range all { + if i == 0 { + Expect(all[0]).To(HavePrefix("create:")) + continue + } + switch { + case strings.HasPrefix(all[i], "create:"): + Expect(all[i-1]).To(HavePrefix("stop:")) + case strings.HasPrefix(all[i], "start:"): + Expect(all[i-1]).To(HavePrefix("create:")) + case strings.HasPrefix(all[i], "stop:"): + Expect(all[i-1]).To(HavePrefix("start:")) + } + } + Expect(all[len(all)-1]).To(HavePrefix("stop:")) + }, + Entry("simple same thread", func(callbacks envoy_xds.Callbacks, req *envoy_sd.DiscoveryRequest) { + ctx := context.Background() + streamA := int64(1) + streamB := int64(2) + Expect(callbacks.OnStreamOpen(ctx, streamA, "")).To(Succeed()) + Expect(callbacks.OnStreamOpen(ctx, streamB, "")).To(Succeed()) + + Expect(callbacks.OnStreamRequest(streamA, req)).To(Succeed()) + Expect(callbacks.OnStreamRequest(streamB, req)).To(Succeed()) + + callbacks.OnStreamClosed(streamB, nil) + callbacks.OnStreamClosed(streamA, nil) + }), + Entry("concurrent clients", func(callbacks envoy_xds.Callbacks, req *envoy_sd.DiscoveryRequest) { + wg := sync.WaitGroup{} + wg.Add(2) + for i := range 2 { + go func() { + r := rand.New(rand.NewSource(GinkgoRandomSeed())) // #nosec G404 - math rand is enough + defer GinkgoRecover() + defer wg.Done() + ctx := context.Background() + streamID := int64(i) + time.Sleep(time.Duration(r.Intn(200)) * time.Millisecond) + Expect(callbacks.OnStreamOpen(ctx, streamID, "")).To(Succeed()) + time.Sleep(time.Duration(r.Intn(200)) * time.Millisecond) + Expect(callbacks.OnStreamRequest(streamID, req)).To(Succeed()) + time.Sleep(time.Duration(r.Intn(200)) * time.Millisecond) + callbacks.OnStreamClosed(streamID, nil) + time.Sleep(time.Duration(r.Intn(200)) * time.Millisecond) + }() + } + wg.Wait() + }), + Entry("start/stop and then restart", func(callbacks envoy_xds.Callbacks, req *envoy_sd.DiscoveryRequest) { + r := rand.New(rand.NewSource(GinkgoRandomSeed())) // #nosec G404 - math rand is enough + ctx := context.Background() + streamA := int64(1) + Expect(callbacks.OnStreamOpen(ctx, streamA, "")).To(Succeed()) + Expect(callbacks.OnStreamRequest(streamA, req)).To(Succeed()) + time.Sleep(time.Duration(r.Intn(200)) * time.Millisecond) + streamB := int64(2) + Expect(callbacks.OnStreamOpen(ctx, streamB, "")).To(Succeed()) + go func() { // We do this in another goroutine so that the new stream happens concurrently with the old one closing. + defer GinkgoRecover() + callbacks.OnStreamClosed(streamA, nil) + }() + Expect(callbacks.OnStreamRequest(streamB, req)).To(Succeed()) + time.Sleep(time.Duration(r.Intn(200)) * time.Millisecond) + callbacks.OnStreamClosed(streamB, nil) + }), + ) + It("should not fail when ADS stream is closed before Watchdog is even created", func() { + // setup + tracker := NewDataplaneSyncCallbacks(nil, nil) + + // given + ctx := context.Background() + streamID := int64(1) + typ := "" + + By("simulating Envoy connecting to the Control Plane") + // when + err := tracker.OnStreamOpen(ctx, streamID, typ) + // then + Expect(err).ToNot(HaveOccurred()) + + By("simulating Envoy disconnecting from the Control Plane prior to any DiscoveryRequest") + // and + tracker.OnStreamClosed(streamID) + + // then + // expect no panic + }) + + It("should not fail when Envoy presents invalid Node ID", func() { + // setup + callbacks := util_xds_v3.AdaptCallbacks(NewDataplaneSyncCallbacks(nil, nil)) + + // given + ctx := context.Background() + streamID := int64(1) + typ := "" + req := &envoy_sd.DiscoveryRequest{Node: nil} + + By("simulating Envoy connecting to the Control Plane") + // when + err := callbacks.OnStreamOpen(ctx, streamID, typ) + // then + Expect(err).ToNot(HaveOccurred()) + + By("simulating DiscoveryRequest") + // when + err = callbacks.OnStreamRequest(streamID, req) + // then + Expect(err).ToNot(HaveOccurred()) + + By("simulating Envoy disconnecting from the Control Plane") + // and + callbacks.OnStreamClosed(streamID, nil) + + // then + // expect no panic + }) + + It("should create a Watchdog when Envoy presents a valid Node ID", test.Within(5*time.Second, func() { + watchdogCh := make(chan core_model.ResourceKey) + + // setup + tracker := NewDataplaneSyncCallbacks( + xds_sync.DataplaneWatchdogFactoryFunc(func(key core_model.ResourceKey, fetchMeta func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog { + return util_xds_v3.WatchdogFunc(func(ctx context.Context) { + watchdogCh <- key + <-ctx.Done() + close(watchdogCh) + }) + }), + DataplaneLifecycleManagerFunc(func(key core_model.ResourceKey) DataplaneLifecycleManager { + return &dummyLifecycleManager{} + }), + ) + callbacks := util_xds_v3.AdaptCallbacks(tracker) + + // given + ctx := context.Background() + streamID := int64(1) + typ := "" + n := &envoy_core.Node{Id: "demo.example"} + req := &envoy_sd.DiscoveryRequest{Node: n} + + By("simulating Envoy connecting to the Control Plane") + // when + err := callbacks.OnStreamOpen(ctx, streamID, typ) + // then + Expect(err).ToNot(HaveOccurred()) + + By("simulating DiscoveryRequest") + // when + err = callbacks.OnStreamRequest(streamID, req) + // then + Expect(err).ToNot(HaveOccurred()) + + By("waiting for Watchdog to get started") + // when + dataplaneID := <-watchdogCh + // then + Expect(dataplaneID).To(Equal(core_model.ResourceKey{Mesh: "demo", Name: "example"})) + + By("simulating another DiscoveryRequest") + // when + err = callbacks.OnStreamRequest(streamID, req) + // then + Expect(err).ToNot(HaveOccurred()) + + By("simulating Envoy disconnecting from the Control Plane") + // and + callbacks.OnStreamClosed(streamID, n) + + By("waiting for Watchdog to get stopped") + // when + _, watchdogIsRunning := <-watchdogCh + // then + Expect(watchdogIsRunning).To(BeFalse()) + })) + + It("should start only one watchdog per dataplane", func() { + // setup + var activeWatchdogs int32 + tracker := NewDataplaneSyncCallbacks( + xds_sync.DataplaneWatchdogFactoryFunc(func(key core_model.ResourceKey, _ func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog { + return util_xds_v3.WatchdogFunc(func(ctx context.Context) { + atomic.AddInt32(&activeWatchdogs, 1) + <-ctx.Done() + atomic.AddInt32(&activeWatchdogs, -1) + }) + }), + DataplaneLifecycleManagerFunc(func(key core_model.ResourceKey) DataplaneLifecycleManager { + return &dummyLifecycleManager{} + }), + ) + callbacks := util_xds_v3.AdaptCallbacks(tracker) + + // when one stream for backend-01 is connected and request is sent + streamID := int64(1) + err := callbacks.OnStreamOpen(context.Background(), streamID, "") + Expect(err).ToNot(HaveOccurred()) + n := &envoy_core.Node{Id: "default.backend-01"} + err = callbacks.OnStreamRequest(streamID, &envoy_sd.DiscoveryRequest{Node: n}) + Expect(err).ToNot(HaveOccurred()) + + // and when new stream from backend-01 is connected and request is sent + streamID = 2 + err = callbacks.OnStreamOpen(context.Background(), streamID, "") + Expect(err).ToNot(HaveOccurred()) + err = callbacks.OnStreamRequest(streamID, &envoy_sd.DiscoveryRequest{ + Node: &envoy_core.Node{ + Id: "default.backend-01", + }, + }) + Expect(err).ToNot(HaveOccurred()) + + // then only one watchdog is active + Eventually(func() int32 { + return atomic.LoadInt32(&activeWatchdogs) + }, "5s", "10ms").Should(Equal(int32(1))) + + // when first stream is closed + callbacks.OnStreamClosed(1, n) + + // then watchdog is still active because other stream is opened + Eventually(func() int32 { + return atomic.LoadInt32(&activeWatchdogs) + }, "5s", "10ms").Should(Equal(int32(1))) + + // when other stream is closed + callbacks.OnStreamClosed(2, n) + + // then no watchdog is stopped + Eventually(func() int32 { + return atomic.LoadInt32(&activeWatchdogs) + }, "5s", "10ms").Should(Equal(int32(0))) + }) + }) +}) diff --git a/pkg/xds/server/callbacks/dataplane_sync_tracker.go b/pkg/xds/server/callbacks/dataplane_sync_tracker.go deleted file mode 100644 index 5a885a85f2a4..000000000000 --- a/pkg/xds/server/callbacks/dataplane_sync_tracker.go +++ /dev/null @@ -1,64 +0,0 @@ -package callbacks - -import ( - "context" - stdsync "sync" - - "github.com/kumahq/kuma/pkg/core" - core_model "github.com/kumahq/kuma/pkg/core/resources/model" - core_xds "github.com/kumahq/kuma/pkg/core/xds" - util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" -) - -var dataplaneSyncTrackerLog = core.Log.WithName("xds").WithName("dataplane-sync-tracker") - -type NewDataplaneWatchdogFunc func(key core_model.ResourceKey) util_xds_v3.Watchdog - -func NewDataplaneSyncTracker(factoryFunc NewDataplaneWatchdogFunc) DataplaneCallbacks { - return &dataplaneSyncTracker{ - newDataplaneWatchdog: factoryFunc, - watchdogs: map[core_model.ResourceKey]context.CancelFunc{}, - } -} - -var _ DataplaneCallbacks = &dataplaneSyncTracker{} - -// dataplaneSyncTracker tracks XDS streams that are connected to the CP and fire up a watchdog. -// Watchdog should be run only once for given dataplane regardless of the number of streams. -// For ADS there is only one stream for DP. -// -// Node info can be (but does not have to be) carried only on the first XDS request. That's why need streamsAssociation map -// that indicates that the stream was already associated -type dataplaneSyncTracker struct { - NoopDataplaneCallbacks - - newDataplaneWatchdog NewDataplaneWatchdogFunc - - stdsync.RWMutex // protects access to the fields below - watchdogs map[core_model.ResourceKey]context.CancelFunc -} - -func (t *dataplaneSyncTracker) OnProxyConnected(streamID core_xds.StreamID, dpKey core_model.ResourceKey, _ context.Context, _ core_xds.DataplaneMetadata) error { - // We use OnProxyConnected because there should be only one watchdog for given dataplane. - t.Lock() - defer t.Unlock() - - ctx, cancel := context.WithCancel(context.Background()) - t.watchdogs[dpKey] = func() { - dataplaneSyncTrackerLog.V(1).Info("stopping Watchdog for a Dataplane", "dpKey", dpKey, "streamID", streamID) - cancel() - } - dataplaneSyncTrackerLog.V(1).Info("starting Watchdog for a Dataplane", "dpKey", dpKey, "streamID", streamID) - //nolint:contextcheck // it's not clear how the parent go-control-plane context lives - go t.newDataplaneWatchdog(dpKey).Start(ctx) - return nil -} - -func (t *dataplaneSyncTracker) OnProxyDisconnected(_ context.Context, _ core_xds.StreamID, dpKey core_model.ResourceKey) { - t.Lock() - defer t.Unlock() - if cancelFn := t.watchdogs[dpKey]; cancelFn != nil { - cancelFn() - } - delete(t.watchdogs, dpKey) -} diff --git a/pkg/xds/server/callbacks/dataplane_sync_tracker_test.go b/pkg/xds/server/callbacks/dataplane_sync_tracker_test.go deleted file mode 100644 index fb456f232ef3..000000000000 --- a/pkg/xds/server/callbacks/dataplane_sync_tracker_test.go +++ /dev/null @@ -1,189 +0,0 @@ -package callbacks_test - -import ( - "context" - "sync/atomic" - "time" - - envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - core_model "github.com/kumahq/kuma/pkg/core/resources/model" - "github.com/kumahq/kuma/pkg/test" - util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" - . "github.com/kumahq/kuma/pkg/xds/server/callbacks" -) - -var _ = Describe("Sync", func() { - Describe("dataplaneSyncTracker", func() { - It("should not fail when ADS stream is closed before Watchdog is even created", func() { - // setup - tracker := DataplaneCallbacksToXdsCallbacks(NewDataplaneSyncTracker(nil)) - - // given - ctx := context.Background() - streamID := int64(1) - typ := "" - - By("simulating Envoy connecting to the Control Plane") - // when - err := tracker.OnStreamOpen(ctx, streamID, typ) - // then - Expect(err).ToNot(HaveOccurred()) - - By("simulating Envoy disconnecting from the Control Plane prior to any DiscoveryRequest") - // and - tracker.OnStreamClosed(streamID) - - // then - // expect no panic - }) - - It("should not fail when Envoy presents invalid Node ID", func() { - // setup - tracker := NewDataplaneSyncTracker(nil) - callbacks := util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(tracker)) - - // given - ctx := context.Background() - streamID := int64(1) - typ := "" - req := &envoy_sd.DiscoveryRequest{Node: nil} - - By("simulating Envoy connecting to the Control Plane") - // when - err := callbacks.OnStreamOpen(ctx, streamID, typ) - // then - Expect(err).ToNot(HaveOccurred()) - - By("simulating DiscoveryRequest") - // when - err = callbacks.OnStreamRequest(streamID, req) - // then - Expect(err).ToNot(HaveOccurred()) - - By("simulating Envoy disconnecting from the Control Plane") - // and - callbacks.OnStreamClosed(streamID, nil) - - // then - // expect no panic - }) - - It("should create a Watchdog when Envoy presents a valid Node ID", test.Within(5*time.Second, func() { - watchdogCh := make(chan core_model.ResourceKey) - - // setup - tracker := NewDataplaneSyncTracker(func(key core_model.ResourceKey) util_xds_v3.Watchdog { - return WatchdogFunc(func(ctx context.Context) { - watchdogCh <- key - <-ctx.Done() - close(watchdogCh) - }) - }) - callbacks := util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(tracker)) - - // given - ctx := context.Background() - streamID := int64(1) - typ := "" - n := &envoy_core.Node{Id: "demo.example"} - req := &envoy_sd.DiscoveryRequest{Node: n} - - By("simulating Envoy connecting to the Control Plane") - // when - err := callbacks.OnStreamOpen(ctx, streamID, typ) - // then - Expect(err).ToNot(HaveOccurred()) - - By("simulating DiscoveryRequest") - // when - err = callbacks.OnStreamRequest(streamID, req) - // then - Expect(err).ToNot(HaveOccurred()) - - By("waiting for Watchdog to get started") - // when - dataplaneID := <-watchdogCh - // then - Expect(dataplaneID).To(Equal(core_model.ResourceKey{Mesh: "demo", Name: "example"})) - - By("simulating another DiscoveryRequest") - // when - err = callbacks.OnStreamRequest(streamID, req) - // then - Expect(err).ToNot(HaveOccurred()) - - By("simulating Envoy disconnecting from the Control Plane") - // and - callbacks.OnStreamClosed(streamID, n) - - By("waiting for Watchdog to get stopped") - // when - _, watchdogIsRunning := <-watchdogCh - // then - Expect(watchdogIsRunning).To(BeFalse()) - })) - - It("should start only one watchdog per dataplane", func() { - // setup - var activeWatchdogs int32 - tracker := NewDataplaneSyncTracker(func(key core_model.ResourceKey) util_xds_v3.Watchdog { - return WatchdogFunc(func(ctx context.Context) { - atomic.AddInt32(&activeWatchdogs, 1) - <-ctx.Done() - atomic.AddInt32(&activeWatchdogs, -1) - }) - }) - callbacks := util_xds_v3.AdaptCallbacks(DataplaneCallbacksToXdsCallbacks(tracker)) - - // when one stream for backend-01 is connected and request is sent - streamID := int64(1) - err := callbacks.OnStreamOpen(context.Background(), streamID, "") - Expect(err).ToNot(HaveOccurred()) - n := &envoy_core.Node{Id: "default.backend-01"} - err = callbacks.OnStreamRequest(streamID, &envoy_sd.DiscoveryRequest{Node: n}) - Expect(err).ToNot(HaveOccurred()) - - // and when new stream from backend-01 is connected and request is sent - streamID = 2 - err = callbacks.OnStreamOpen(context.Background(), streamID, "") - Expect(err).ToNot(HaveOccurred()) - err = callbacks.OnStreamRequest(streamID, &envoy_sd.DiscoveryRequest{ - Node: &envoy_core.Node{ - Id: "default.backend-01", - }, - }) - Expect(err).ToNot(HaveOccurred()) - - // then only one watchdog is active - Eventually(func() int32 { - return atomic.LoadInt32(&activeWatchdogs) - }, "5s", "10ms").Should(Equal(int32(1))) - - // when first stream is closed - callbacks.OnStreamClosed(1, n) - - // then watchdog is still active because other stream is opened - Eventually(func() int32 { - return atomic.LoadInt32(&activeWatchdogs) - }, "5s", "10ms").Should(Equal(int32(1))) - - // when other stream is closed - callbacks.OnStreamClosed(2, n) - - // then no watchdog is stopped - Eventually(func() int32 { - return atomic.LoadInt32(&activeWatchdogs) - }, "5s", "10ms").Should(Equal(int32(0))) - }) - }) -}) - -type WatchdogFunc func(ctx context.Context) - -func (f WatchdogFunc) Start(ctx context.Context) { - f(ctx) -} diff --git a/pkg/xds/server/v3/components.go b/pkg/xds/server/v3/components.go index d3c674f5e7b9..752ffcfe99c7 100644 --- a/pkg/xds/server/v3/components.go +++ b/pkg/xds/server/v3/components.go @@ -39,24 +39,18 @@ func RegisterXDS( authenticator := rt.XDS().PerProxyTypeAuthenticator() authCallbacks := auth.NewCallbacks(rt.ReadOnlyResourceManager(), authenticator, auth.DPNotFoundRetry{}) // no need to retry on DP Not Found because we are creating DP in DataplaneLifecycle callback - metadataTracker := xds_callbacks.NewDataplaneMetadataTracker() reconciler := DefaultReconciler(rt, xdsContext, statsCallbacks) ingressReconciler := DefaultIngressReconciler(rt, xdsContext, statsCallbacks) egressReconciler := DefaultEgressReconciler(rt, xdsContext, statsCallbacks) - watchdogFactory, err := xds_sync.DefaultDataplaneWatchdogFactory(rt, metadataTracker, reconciler, ingressReconciler, egressReconciler, xdsMetrics, envoyCpCtx, envoy_common.APIV3) - if err != nil { - return err - } callbacks := util_xds_v3.CallbacksChain{ util_xds_v3.NewControlPlaneIdCallbacks(rt.GetInstanceId()), util_xds_v3.AdaptCallbacks(statsCallbacks), util_xds_v3.AdaptCallbacks(authCallbacks), - util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(metadataTracker)), - util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks( - xds_callbacks.NewDataplaneLifecycle(rt.AppContext(), rt.ResourceManager(), authenticator, rt.Config().XdsServer.DataplaneDeregistrationDelay.Duration, rt.GetInstanceId(), rt.Config().Store.Cache.ExpirationTime.Duration)), - ), - util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory.New))), + util_xds_v3.AdaptCallbacks(xds_callbacks.NewDataplaneSyncCallbacks( + xds_sync.DefaultDataplaneWatchdogFactory(rt, reconciler, ingressReconciler, egressReconciler, xdsMetrics, envoyCpCtx, envoy_common.APIV3), + xds_callbacks.DataplaneLifecycleFactory(rt.AppContext(), rt.ResourceManager(), authenticator, rt.Config().XdsServer.DataplaneDeregistrationDelay.Duration, rt.GetInstanceId(), rt.Config().Store.Cache.ExpirationTime.Duration), + )), util_xds_v3.AdaptCallbacks(DefaultDataplaneStatusTracker(rt, envoyCpCtx.Secrets)), util_xds_v3.AdaptCallbacks(xds_callbacks.NewNackBackoff(rt.Config().XdsServer.NACKBackoff.Duration)), } diff --git a/pkg/xds/sync/components.go b/pkg/xds/sync/components.go index b503c904da99..211d3e7a1b1c 100644 --- a/pkg/xds/sync/components.go +++ b/pkg/xds/sync/components.go @@ -41,16 +41,16 @@ func DefaultEgressProxyBuilder(rt core_runtime.Runtime, apiVersion core_xds.APIV } } +// DataplaneWatchdogFactory returns a Watchdog that creates a new XdsContext and Proxy and executes SnapshotReconciler if there is any change func DefaultDataplaneWatchdogFactory( rt core_runtime.Runtime, - metadataTracker DataplaneMetadataTracker, dataplaneReconciler SnapshotReconciler, ingressReconciler SnapshotReconciler, egressReconciler SnapshotReconciler, xdsMetrics *xds_metrics.Metrics, envoyCpCtx *xds_context.ControlPlaneContext, apiVersion core_xds.APIVersion, -) (DataplaneWatchdogFactory, error) { +) DataplaneWatchdogFactory { config := rt.Config() dataplaneProxyBuilder := DefaultDataplaneProxyBuilder( @@ -74,12 +74,11 @@ func DefaultDataplaneWatchdogFactory( EgressReconciler: egressReconciler, EnvoyCpCtx: envoyCpCtx, MeshCache: rt.MeshCache(), - MetadataTracker: metadataTracker, ResManager: rt.ReadOnlyResourceManager(), } return NewDataplaneWatchdogFactory( - xdsMetrics, - config.XdsServer.DataplaneConfigurationRefreshInterval.Duration, deps, + config.XdsServer.DataplaneConfigurationRefreshInterval.Duration, + xdsMetrics, ) } diff --git a/pkg/xds/sync/dataplane_watchdog.go b/pkg/xds/sync/dataplane_watchdog.go index 4fdcb22467e0..160619ab343e 100644 --- a/pkg/xds/sync/dataplane_watchdog.go +++ b/pkg/xds/sync/dataplane_watchdog.go @@ -8,7 +8,6 @@ import ( "github.com/pkg/errors" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" - "github.com/kumahq/kuma/pkg/core" core_manager "github.com/kumahq/kuma/pkg/core/resources/manager" core_model "github.com/kumahq/kuma/pkg/core/resources/model" core_xds "github.com/kumahq/kuma/pkg/core/xds" @@ -27,7 +26,6 @@ type DataplaneWatchdogDependencies struct { EgressReconciler SnapshotReconciler EnvoyCpCtx *xds_context.ControlPlaneContext MeshCache *mesh.Cache - MetadataTracker DataplaneMetadataTracker ResManager core_manager.ReadOnlyResourceManager } @@ -57,21 +55,16 @@ type DataplaneWatchdog struct { dpAddress string } -func NewDataplaneWatchdog(deps DataplaneWatchdogDependencies, dpKey core_model.ResourceKey) *DataplaneWatchdog { +func NewDataplaneWatchdog(l logr.Logger, deps DataplaneWatchdogDependencies, dpKey core_model.ResourceKey) *DataplaneWatchdog { return &DataplaneWatchdog{ DataplaneWatchdogDependencies: deps, key: dpKey, - log: core.Log.WithName("xds").WithValues("key", dpKey), + log: l.WithName("xds"), proxyTypeSettled: false, } } -func (d *DataplaneWatchdog) Sync(ctx context.Context) (SyncResult, error) { - metadata := d.MetadataTracker.Metadata(d.key) - if metadata == nil { - return SyncResult{}, errors.New("metadata cannot be nil") - } - +func (d *DataplaneWatchdog) Sync(ctx context.Context, metadata *core_xds.DataplaneMetadata) (SyncResult, error) { if d.dpType == "" { d.dpType = metadata.GetProxyType() } diff --git a/pkg/xds/sync/dataplane_watchdog_factory.go b/pkg/xds/sync/dataplane_watchdog_factory.go index 5f5706cae677..c5d6300261f6 100644 --- a/pkg/xds/sync/dataplane_watchdog_factory.go +++ b/pkg/xds/sync/dataplane_watchdog_factory.go @@ -2,62 +2,51 @@ package sync import ( "context" + "errors" "time" "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/model" "github.com/kumahq/kuma/pkg/core/user" + core_xds "github.com/kumahq/kuma/pkg/core/xds" util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog" util_xds_v3 "github.com/kumahq/kuma/pkg/util/xds/v3" xds_metrics "github.com/kumahq/kuma/pkg/xds/metrics" ) -type dataplaneWatchdogFactory struct { - xdsMetrics *xds_metrics.Metrics - refreshInterval time.Duration - - deps DataplaneWatchdogDependencies -} - -func NewDataplaneWatchdogFactory( - xdsSyncMetrics *xds_metrics.Metrics, - refreshInterval time.Duration, - deps DataplaneWatchdogDependencies, -) (DataplaneWatchdogFactory, error) { - return &dataplaneWatchdogFactory{ - deps: deps, - refreshInterval: refreshInterval, - xdsMetrics: xdsSyncMetrics, - }, nil -} - -func (d *dataplaneWatchdogFactory) New(dpKey model.ResourceKey) util_xds_v3.Watchdog { - log := xdsServerLog.WithName("dataplane-sync-watchdog").WithValues("dataplaneKey", dpKey) - dataplaneWatchdog := NewDataplaneWatchdog(d.deps, dpKey) - return &util_watchdog.SimpleWatchdog{ - NewTicker: func() *time.Ticker { - return time.NewTicker(d.refreshInterval) - }, - OnTick: func(ctx context.Context) error { - ctx = user.Ctx(ctx, user.ControlPlane) - start := core.Now() - result, err := dataplaneWatchdog.Sync(ctx) - if err != nil { - return err - } - d.xdsMetrics.XdsGenerations. - WithLabelValues(string(result.ProxyType), string(result.Status)). - Observe(float64(core.Now().Sub(start).Milliseconds())) - return nil - }, - OnError: func(err error) { - d.xdsMetrics.XdsGenerationsErrors.Inc() - log.Error(err, "OnTick() failed") - }, - OnStop: func() { - if err := dataplaneWatchdog.Cleanup(); err != nil { - log.Error(err, "OnTick() failed") - } - }, - } +func NewDataplaneWatchdogFactory(deps DataplaneWatchdogDependencies, refreshInterval time.Duration, xdsMetrics *xds_metrics.Metrics) DataplaneWatchdogFactory { + return DataplaneWatchdogFactoryFunc(func(dpKey model.ResourceKey, fetchMeta func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog { + l := xdsServerLog.WithName("dataplane-sync-watchdog").WithValues("key", dpKey) + dataplaneWatchdog := NewDataplaneWatchdog(l, deps, dpKey) + return &util_watchdog.SimpleWatchdog{ + NewTicker: func() *time.Ticker { + return time.NewTicker(refreshInterval) + }, + OnTick: func(ctx context.Context) error { + ctx = user.Ctx(ctx, user.ControlPlane) + start := core.Now() + meta := fetchMeta() + if meta == nil { + return errors.New("metadata cannot be nil") + } + result, err := dataplaneWatchdog.Sync(ctx, meta) + if err != nil { + return err + } + xdsMetrics.XdsGenerations. + WithLabelValues(string(result.ProxyType), string(result.Status)). + Observe(float64(core.Now().Sub(start).Milliseconds())) + return nil + }, + OnError: func(err error) { + xdsMetrics.XdsGenerationsErrors.Inc() + l.Error(err, "OnTick() failed") + }, + OnStop: func() { + if err := dataplaneWatchdog.Cleanup(); err != nil { + l.Error(err, "OnTick() failed") + } + }, + } + }) } diff --git a/pkg/xds/sync/dataplane_watchdog_test.go b/pkg/xds/sync/dataplane_watchdog_test.go index f3ae9ef059ae..ed5027d8448a 100644 --- a/pkg/xds/sync/dataplane_watchdog_test.go +++ b/pkg/xds/sync/dataplane_watchdog_test.go @@ -5,6 +5,7 @@ import ( "net" "time" + "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/util/cert" @@ -28,16 +29,6 @@ import ( "github.com/kumahq/kuma/pkg/xds/sync" ) -type staticMetadataTracker struct { - metadata *core_xds.DataplaneMetadata -} - -var _ sync.DataplaneMetadataTracker = &staticMetadataTracker{} - -func (s *staticMetadataTracker) Metadata(dpKey core_model.ResourceKey) *core_xds.DataplaneMetadata { - return s.metadata -} - type staticSnapshotReconciler struct { proxy *core_xds.Proxy } @@ -59,12 +50,10 @@ var _ = Describe("Dataplane Watchdog", func() { var resManager manager.ResourceManager var snapshotReconciler *staticSnapshotReconciler - var metadataTracker *staticMetadataTracker var deps sync.DataplaneWatchdogDependencies BeforeEach(func() { snapshotReconciler = &staticSnapshotReconciler{} - metadataTracker = &staticMetadataTracker{} store := memory.NewStore() resManager = manager.NewResourceManager(store) @@ -96,9 +85,8 @@ var _ = Describe("Dataplane Watchdog", func() { Secrets: secrets, Zone: zone, }, - MeshCache: cache, - MetadataTracker: metadataTracker, - ResManager: resManager, + MeshCache: cache, + ResManager: resManager, } pair, err := envoy_admin_tls.GenerateCA() @@ -110,22 +98,23 @@ var _ = Describe("Dataplane Watchdog", func() { var resKey core_model.ResourceKey var watchdog *sync.DataplaneWatchdog var ctx context.Context + var metadata *core_xds.DataplaneMetadata BeforeEach(func() { Expect(samples.MeshDefaultBuilder().Create(resManager)).To(Succeed()) Expect(samples.DataplaneBackendBuilder().Create(resManager)).To(Succeed()) resKey = core_model.MetaToResourceKey(samples.DataplaneBackend().GetMeta()) - metadataTracker.metadata = &core_xds.DataplaneMetadata{ + metadata = &core_xds.DataplaneMetadata{ ProxyType: mesh_proto.DataplaneProxyType, } - watchdog = sync.NewDataplaneWatchdog(deps, resKey) + watchdog = sync.NewDataplaneWatchdog(logr.Discard(), deps, resKey) ctx = context.Background() }) It("should reissue admin tls certificate when address has changed", func() { // when - _, err := watchdog.Sync(ctx) + _, err := watchdog.Sync(ctx, metadata) // then Expect(err).ToNot(HaveOccurred()) @@ -145,7 +134,7 @@ var _ = Describe("Dataplane Watchdog", func() { // and time.Sleep(cacheExpirationTime) - _, err = watchdog.Sync(ctx) + _, err = watchdog.Sync(ctx, metadata) // then cert is reissued with a new address Expect(err).ToNot(HaveOccurred()) @@ -158,7 +147,7 @@ var _ = Describe("Dataplane Watchdog", func() { It("should not reconcile if mesh hash is the same", func() { // when - _, err := watchdog.Sync(ctx) + _, err := watchdog.Sync(ctx, metadata) // then Expect(err).ToNot(HaveOccurred()) @@ -167,7 +156,7 @@ var _ = Describe("Dataplane Watchdog", func() { // when snapshotReconciler.proxy = nil // set to nil so we can check if it was not called again time.Sleep(cacheExpirationTime) - _, err = watchdog.Sync(ctx) + _, err = watchdog.Sync(ctx, metadata) // then Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/xds/sync/interfaces.go b/pkg/xds/sync/interfaces.go index 28878ee608e5..ed02226a1346 100644 --- a/pkg/xds/sync/interfaces.go +++ b/pkg/xds/sync/interfaces.go @@ -25,5 +25,11 @@ type SnapshotReconciler interface { // DataplaneWatchdogFactory returns a Watchdog that creates a new XdsContext and Proxy and executes SnapshotReconciler if there is any change type DataplaneWatchdogFactory interface { - New(dpKey core_model.ResourceKey) util_xds_v3.Watchdog + New(dpKey core_model.ResourceKey, fetchMeta func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog +} + +type DataplaneWatchdogFactoryFunc func(dpKey core_model.ResourceKey, fetchMeta func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog + +func (f DataplaneWatchdogFactoryFunc) New(dpKey core_model.ResourceKey, fetchMeta func() *core_xds.DataplaneMetadata) util_xds_v3.Watchdog { + return f(dpKey, fetchMeta) }