diff --git a/go.mod b/go.mod index ea9080c6271..d5d22971069 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( k8s.io/client-go v0.30.0 k8s.io/kubectl v0.30.0 k8s.io/utils v0.0.0-20230726121419-3b25d923346b - sigs.k8s.io/controller-runtime v0.17.3 + sigs.k8s.io/controller-runtime v0.18.0 sigs.k8s.io/gateway-api v1.0.0 sigs.k8s.io/mcs-api v0.1.0 sigs.k8s.io/yaml v1.4.0 diff --git a/go.sum b/go.sum index b3ca35fd35c..f74d33f537e 100644 --- a/go.sum +++ b/go.sum @@ -992,6 +992,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.7/go.mod h1:PHgbrJT sigs.k8s.io/controller-runtime v0.6.1/go.mod h1:XRYBPdbf5XJu9kpS84VJiZ7h/u1hF3gEORz0efEja7A= sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk= sigs.k8s.io/controller-runtime v0.17.3/go.mod h1:N0jpP5Lo7lMTF9aL56Z/B2oWBJjey6StQM0jRbKQXtY= +sigs.k8s.io/controller-runtime v0.18.0 h1:Z7jKuX784TQSUL1TIyeuF7j8KXZ4RtSX0YgtjKcSTME= +sigs.k8s.io/controller-runtime v0.18.0/go.mod h1:tuAt1+wbVsXIT8lPtk5RURxqAnq7xkpv2Mhttslg7Hw= sigs.k8s.io/controller-tools v0.3.0/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI= sigs.k8s.io/gateway-api v1.0.0 h1:iPTStSv41+d9p0xFydll6d7f7MOBGuqXM6p2/zVYMAs= sigs.k8s.io/gateway-api v1.0.0/go.mod h1:4cUgr0Lnp5FZ0Cdq8FdRwCvpiWws7LVhLHGIudLlf4c= diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index 571a45c5764..43e2e382f20 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -909,51 +909,57 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M // Upon leader election, we retrigger the reconciliation process to allow the elected leader to // process status updates and infrastructure changes. This step is crucial for synchronizing resources // that may have been altered or introduced while there was no elected leader. - if err := c.Watch( - NewWatchAndReconcileSource(mgr.Elected(), &gwapiv1.GatewayClass{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass)); err != nil { + if err := c.Watch(NewWatchAndReconcileSource(mgr.Elected(), &gwapiv1.GatewayClass{}, handler.EnqueueRequestsFromMapFunc(r.enqueueClass))); err != nil { return err } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1.GatewayClass{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.GenerationChangedPredicate{}, - predicate.NewPredicateFuncs(r.hasMatchingController), - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1.GatewayClass{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, gc *gwapiv1.GatewayClass) []reconcile.Request { + return r.enqueueClass(ctx, gc) + }), + &predicate.TypedGenerationChangedPredicate[*gwapiv1.GatewayClass]{}, + predicate.NewTypedPredicateFuncs[*gwapiv1.GatewayClass](r.hasMatchingController))); err != nil { return err } // Only enqueue EnvoyProxy objects that match this Envoy Gateway's GatewayClass. - epPredicates := []predicate.Predicate{ - predicate.GenerationChangedPredicate{}, - predicate.ResourceVersionChangedPredicate{}, - predicate.NewPredicateFuncs(r.hasManagedClass), + epPredicates := []predicate.TypedPredicate[*egv1a1.EnvoyProxy]{ + &predicate.TypedGenerationChangedPredicate[*egv1a1.EnvoyProxy]{}, + predicate.NewTypedPredicateFuncs[*egv1a1.EnvoyProxy](r.hasManagedClass), } if r.namespaceLabel != nil { - epPredicates = append(epPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + epPredicates = append(epPredicates, predicate.NewTypedPredicateFuncs(func(ep *egv1a1.EnvoyProxy) bool { + return r.hasMatchingNamespaceLabels(ep) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.EnvoyProxy{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - epPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &egv1a1.EnvoyProxy{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, t *egv1a1.EnvoyProxy) []reconcile.Request { + return r.enqueueClass(ctx, t) + }), + epPredicates...)); err != nil { return err } // Watch Gateway CRUDs and reconcile affected GatewayClass. - gPredicates := []predicate.Predicate{ - predicate.GenerationChangedPredicate{}, - predicate.NewPredicateFuncs(r.validateGatewayForReconcile), + gPredicates := []predicate.TypedPredicate[*gwapiv1.Gateway]{ + predicate.TypedGenerationChangedPredicate[*gwapiv1.Gateway]{}, + predicate.NewTypedPredicateFuncs(func(gtw *gwapiv1.Gateway) bool { + return r.validateGatewayForReconcile(gtw) + }), } if r.namespaceLabel != nil { - gPredicates = append(gPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + gPredicates = append(gPredicates, predicate.NewTypedPredicateFuncs(func(gtw *gwapiv1.Gateway) bool { + return r.hasMatchingNamespaceLabels(gtw) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1.Gateway{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - gPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1.Gateway{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, gtw *gwapiv1.Gateway) []reconcile.Request { + return r.enqueueClass(ctx, gtw) + }), + gPredicates...)); err != nil { return err } if err := addGatewayIndexers(ctx, mgr); err != nil { @@ -961,15 +967,20 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch HTTPRoute CRUDs and process affected Gateways. - httprPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + httprPredicates := []predicate.TypedPredicate[*gwapiv1.HTTPRoute]{ + predicate.TypedGenerationChangedPredicate[*gwapiv1.HTTPRoute]{}, + } if r.namespaceLabel != nil { - httprPredicates = append(httprPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + httprPredicates = append(httprPredicates, predicate.NewTypedPredicateFuncs(func(hr *gwapiv1.HTTPRoute) bool { + return r.hasMatchingNamespaceLabels(hr) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1.HTTPRoute{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - httprPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1.HTTPRoute{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, t *gwapiv1.HTTPRoute) []reconcile.Request { + return r.enqueueClass(ctx, t) + }), + httprPredicates...)); err != nil { return err } if err := addHTTPRouteIndexers(ctx, mgr); err != nil { @@ -977,15 +988,20 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch GRPCRoute CRUDs and process affected Gateways. - grpcrPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + grpcrPredicates := []predicate.TypedPredicate[*gwapiv1a2.GRPCRoute]{ + predicate.TypedGenerationChangedPredicate[*gwapiv1a2.GRPCRoute]{}, + } if r.namespaceLabel != nil { - grpcrPredicates = append(grpcrPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + grpcrPredicates = append(grpcrPredicates, predicate.NewTypedPredicateFuncs[*gwapiv1a2.GRPCRoute](func(grpc *gwapiv1a2.GRPCRoute) bool { + return r.hasMatchingNamespaceLabels(grpc) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1a2.GRPCRoute{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - grpcrPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1a2.GRPCRoute{}, + handler.TypedEnqueueRequestsFromMapFunc[*gwapiv1a2.GRPCRoute](func(ctx context.Context, route *gwapiv1a2.GRPCRoute) []reconcile.Request { + return r.enqueueClass(ctx, route) + }), + grpcrPredicates...)); err != nil { return err } if err := addGRPCRouteIndexers(ctx, mgr); err != nil { @@ -993,15 +1009,20 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch TLSRoute CRUDs and process affected Gateways. - tlsrPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + tlsrPredicates := []predicate.TypedPredicate[*gwapiv1a2.TLSRoute]{ + predicate.TypedGenerationChangedPredicate[*gwapiv1a2.TLSRoute]{}, + } if r.namespaceLabel != nil { - tlsrPredicates = append(tlsrPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + tlsrPredicates = append(tlsrPredicates, predicate.NewTypedPredicateFuncs[*gwapiv1a2.TLSRoute](func(route *gwapiv1a2.TLSRoute) bool { + return r.hasMatchingNamespaceLabels(route) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1a2.TLSRoute{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - tlsrPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1a2.TLSRoute{}, + handler.TypedEnqueueRequestsFromMapFunc[*gwapiv1a2.TLSRoute](func(ctx context.Context, route *gwapiv1a2.TLSRoute) []reconcile.Request { + return r.enqueueClass(ctx, route) + }), + tlsrPredicates...)); err != nil { return err } if err := addTLSRouteIndexers(ctx, mgr); err != nil { @@ -1009,15 +1030,20 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch UDPRoute CRUDs and process affected Gateways. - udprPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + udprPredicates := []predicate.TypedPredicate[*gwapiv1a2.UDPRoute]{ + predicate.TypedGenerationChangedPredicate[*gwapiv1a2.UDPRoute]{}, + } if r.namespaceLabel != nil { - udprPredicates = append(udprPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + udprPredicates = append(udprPredicates, predicate.NewTypedPredicateFuncs[*gwapiv1a2.UDPRoute](func(route *gwapiv1a2.UDPRoute) bool { + return r.hasMatchingNamespaceLabels(route) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1a2.UDPRoute{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - udprPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1a2.UDPRoute{}, + handler.TypedEnqueueRequestsFromMapFunc[*gwapiv1a2.UDPRoute](func(ctx context.Context, route *gwapiv1a2.UDPRoute) []reconcile.Request { + return r.enqueueClass(ctx, route) + }), + udprPredicates...)); err != nil { return err } if err := addUDPRouteIndexers(ctx, mgr); err != nil { @@ -1025,15 +1051,20 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch TCPRoute CRUDs and process affected Gateways. - tcprPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + tcprPredicates := []predicate.TypedPredicate[*gwapiv1a2.TCPRoute]{ + predicate.TypedGenerationChangedPredicate[*gwapiv1a2.TCPRoute]{}, + } if r.namespaceLabel != nil { - tcprPredicates = append(tcprPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + tcprPredicates = append(tcprPredicates, predicate.NewTypedPredicateFuncs[*gwapiv1a2.TCPRoute](func(route *gwapiv1a2.TCPRoute) bool { + return r.hasMatchingNamespaceLabels(route) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1a2.TCPRoute{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - tcprPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1a2.TCPRoute{}, + handler.TypedEnqueueRequestsFromMapFunc[*gwapiv1a2.TCPRoute](func(ctx context.Context, route *gwapiv1a2.TCPRoute) []reconcile.Request { + return r.enqueueClass(ctx, route) + }), + tcprPredicates...)); err != nil { return err } if err := addTCPRouteIndexers(ctx, mgr); err != nil { @@ -1041,15 +1072,22 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch Service CRUDs and process affected *Route objects. - servicePredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.validateServiceForReconcile)} + servicePredicates := []predicate.TypedPredicate[*corev1.Service]{ + predicate.NewTypedPredicateFuncs[*corev1.Service](func(svc *corev1.Service) bool { + return r.validateServiceForReconcile(svc) + }), + } if r.namespaceLabel != nil { - servicePredicates = append(servicePredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + servicePredicates = append(servicePredicates, predicate.NewTypedPredicateFuncs[*corev1.Service](func(svc *corev1.Service) bool { + return r.hasMatchingNamespaceLabels(svc) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &corev1.Service{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - servicePredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &corev1.Service{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, svc *corev1.Service) []reconcile.Request { + return r.enqueueClass(ctx, svc) + }), + servicePredicates...)); err != nil { return err } @@ -1061,91 +1099,120 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M // Watch ServiceImport CRUDs and process affected *Route objects. if serviceImportCRDExists { if err := c.Watch( - source.Kind(mgr.GetCache(), &mcsapi.ServiceImport{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - predicate.GenerationChangedPredicate{}, - predicate.NewPredicateFuncs(r.validateServiceImportForReconcile)); err != nil { + source.Kind(mgr.GetCache(), &mcsapi.ServiceImport{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, si *mcsapi.ServiceImport) []reconcile.Request { + return r.enqueueClass(ctx, si) + }), + predicate.TypedGenerationChangedPredicate[*mcsapi.ServiceImport]{}, + predicate.NewTypedPredicateFuncs[*mcsapi.ServiceImport](func(si *mcsapi.ServiceImport) bool { + return r.validateServiceImportForReconcile(si) + }))); err != nil { // ServiceImport is not available in the cluster, skip the watch and not throw error. r.log.Info("unable to watch ServiceImport: %s", err.Error()) } } // Watch EndpointSlice CRUDs and process affected *Route objects. - esPredicates := []predicate.Predicate{ - predicate.GenerationChangedPredicate{}, - predicate.NewPredicateFuncs(r.validateEndpointSliceForReconcile), + esPredicates := []predicate.TypedPredicate[*discoveryv1.EndpointSlice]{ + predicate.TypedGenerationChangedPredicate[*discoveryv1.EndpointSlice]{}, + predicate.NewTypedPredicateFuncs[*discoveryv1.EndpointSlice](func(eps *discoveryv1.EndpointSlice) bool { + return r.validateEndpointSliceForReconcile(eps) + }), } if r.namespaceLabel != nil { - esPredicates = append(esPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + esPredicates = append(esPredicates, predicate.NewTypedPredicateFuncs[*discoveryv1.EndpointSlice](func(eps *discoveryv1.EndpointSlice) bool { + return r.hasMatchingNamespaceLabels(eps) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &discoveryv1.EndpointSlice{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - esPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &discoveryv1.EndpointSlice{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, si *discoveryv1.EndpointSlice) []reconcile.Request { + return r.enqueueClass(ctx, si) + }), + esPredicates...)); err != nil { return err } // Watch Node CRUDs to update Gateway Address exposed by Service of type NodePort. // Node creation/deletion and ExternalIP updates would require update in the Gateway - nPredicates := []predicate.Predicate{ - predicate.GenerationChangedPredicate{}, - predicate.NewPredicateFuncs(r.handleNode), + nPredicates := []predicate.TypedPredicate[*corev1.Node]{ + predicate.TypedGenerationChangedPredicate[*corev1.Node]{}, + predicate.NewTypedPredicateFuncs[*corev1.Node](func(node *corev1.Node) bool { + return r.handleNode(node) + }), } if r.namespaceLabel != nil { - nPredicates = append(nPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + nPredicates = append(nPredicates, predicate.NewTypedPredicateFuncs[*corev1.Node](func(node *corev1.Node) bool { + return r.hasMatchingNamespaceLabels(node) + })) } // resource address. if err := c.Watch( - source.Kind(mgr.GetCache(), &corev1.Node{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - nPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &corev1.Node{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, si *corev1.Node) []reconcile.Request { + return r.enqueueClass(ctx, si) + }), + nPredicates...)); err != nil { return err } // Watch Secret CRUDs and process affected EG CRs (Gateway, SecurityPolicy, more in the future). - secretPredicates := []predicate.Predicate{ - predicate.GenerationChangedPredicate{}, - predicate.NewPredicateFuncs(r.validateSecretForReconcile), + secretPredicates := []predicate.TypedPredicate[*corev1.Secret]{ + predicate.TypedGenerationChangedPredicate[*corev1.Secret]{}, + predicate.NewTypedPredicateFuncs(func(s *corev1.Secret) bool { + return r.validateSecretForReconcile(s) + }), } if r.namespaceLabel != nil { - secretPredicates = append(secretPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + secretPredicates = append(secretPredicates, predicate.NewTypedPredicateFuncs(func(s *corev1.Secret) bool { + return r.hasMatchingNamespaceLabels(s) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &corev1.Secret{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - secretPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &corev1.Secret{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []reconcile.Request { + return r.enqueueClass(ctx, s) + }), + secretPredicates...)); err != nil { return err } // Watch ConfigMap CRUDs and process affected ClienTraffiPolicies and BackendTLSPolicies. - configMapPredicates := []predicate.Predicate{ - predicate.GenerationChangedPredicate{}, - predicate.NewPredicateFuncs(r.validateConfigMapForReconcile), + configMapPredicates := []predicate.TypedPredicate[*corev1.ConfigMap]{ + predicate.TypedGenerationChangedPredicate[*corev1.ConfigMap]{}, + predicate.NewTypedPredicateFuncs[*corev1.ConfigMap](func(cm *corev1.ConfigMap) bool { + return r.validateConfigMapForReconcile(cm) + }), } if r.namespaceLabel != nil { - configMapPredicates = append(configMapPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + configMapPredicates = append(configMapPredicates, predicate.NewTypedPredicateFuncs[*corev1.ConfigMap](func(cm *corev1.ConfigMap) bool { + return r.hasMatchingNamespaceLabels(cm) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - configMapPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &corev1.ConfigMap{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, cm *corev1.ConfigMap) []reconcile.Request { + return r.enqueueClass(ctx, cm) + }), + configMapPredicates...)); err != nil { return err } // Watch ReferenceGrant CRUDs and process affected Gateways. - rgPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + rgPredicates := []predicate.TypedPredicate[*gwapiv1b1.ReferenceGrant]{ + predicate.TypedGenerationChangedPredicate[*gwapiv1b1.ReferenceGrant]{}, + } if r.namespaceLabel != nil { - rgPredicates = append(rgPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + rgPredicates = append(rgPredicates, predicate.NewTypedPredicateFuncs[*gwapiv1b1.ReferenceGrant](func(rg *gwapiv1b1.ReferenceGrant) bool { + return r.hasMatchingNamespaceLabels(rg) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1b1.ReferenceGrant{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - rgPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1b1.ReferenceGrant{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, rg *gwapiv1b1.ReferenceGrant) []reconcile.Request { + return r.enqueueClass(ctx, rg) + }), + rgPredicates...)); err != nil { return err } if err := addReferenceGrantIndexers(ctx, mgr); err != nil { @@ -1153,45 +1220,62 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch Deployment CRUDs and process affected Gateways. - dPredicates := []predicate.Predicate{predicate.NewPredicateFuncs(r.validateDeploymentForReconcile)} + dPredicates := []predicate.TypedPredicate[*appsv1.Deployment]{ + predicate.NewTypedPredicateFuncs[*appsv1.Deployment](func(deploy *appsv1.Deployment) bool { + return r.validateDeploymentForReconcile(deploy) + }), + } if r.namespaceLabel != nil { - dPredicates = append(dPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + dPredicates = append(dPredicates, predicate.NewTypedPredicateFuncs[*appsv1.Deployment](func(deploy *appsv1.Deployment) bool { + return r.hasMatchingNamespaceLabels(deploy) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &appsv1.Deployment{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - dPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &appsv1.Deployment{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, deploy *appsv1.Deployment) []reconcile.Request { + return r.enqueueClass(ctx, deploy) + }), + dPredicates...)); err != nil { return err } - // Watch EnvoyPatchPolicy if enabled in config - eppPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} - if r.namespaceLabel != nil { - eppPredicates = append(eppPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) - } if r.envoyGateway.ExtensionAPIs != nil && r.envoyGateway.ExtensionAPIs.EnableEnvoyPatchPolicy { + // Watch EnvoyPatchPolicy if enabled in config + eppPredicates := []predicate.TypedPredicate[*egv1a1.EnvoyPatchPolicy]{ + predicate.TypedGenerationChangedPredicate[*egv1a1.EnvoyPatchPolicy]{}, + } + if r.namespaceLabel != nil { + eppPredicates = append(eppPredicates, predicate.NewTypedPredicateFuncs[*egv1a1.EnvoyPatchPolicy](func(epp *egv1a1.EnvoyPatchPolicy) bool { + return r.hasMatchingNamespaceLabels(epp) + })) + } // Watch EnvoyPatchPolicy CRUDs if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.EnvoyPatchPolicy{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - eppPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &egv1a1.EnvoyPatchPolicy{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, epp *egv1a1.EnvoyPatchPolicy) []reconcile.Request { + return r.enqueueClass(ctx, epp) + }), + eppPredicates...)); err != nil { return err } } // Watch ClientTrafficPolicy - ctpPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + ctpPredicates := []predicate.TypedPredicate[*egv1a1.ClientTrafficPolicy]{ + predicate.TypedGenerationChangedPredicate[*egv1a1.ClientTrafficPolicy]{}, + } if r.namespaceLabel != nil { - ctpPredicates = append(ctpPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + ctpPredicates = append(ctpPredicates, predicate.NewTypedPredicateFuncs[*egv1a1.ClientTrafficPolicy](func(ctp *egv1a1.ClientTrafficPolicy) bool { + return r.hasMatchingNamespaceLabels(ctp) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.ClientTrafficPolicy{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - ctpPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &egv1a1.ClientTrafficPolicy{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, ctp *egv1a1.ClientTrafficPolicy) []reconcile.Request { + return r.enqueueClass(ctx, ctp) + }), + ctpPredicates...)); err != nil { return err } @@ -1200,30 +1284,40 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch BackendTrafficPolicy - btpPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + btpPredicates := []predicate.TypedPredicate[*egv1a1.BackendTrafficPolicy]{ + predicate.TypedGenerationChangedPredicate[*egv1a1.BackendTrafficPolicy]{}, + } if r.namespaceLabel != nil { - btpPredicates = append(btpPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + btpPredicates = append(btpPredicates, predicate.NewTypedPredicateFuncs[*egv1a1.BackendTrafficPolicy](func(btp *egv1a1.BackendTrafficPolicy) bool { + return r.hasMatchingNamespaceLabels(btp) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.BackendTrafficPolicy{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - btpPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &egv1a1.BackendTrafficPolicy{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, btp *egv1a1.BackendTrafficPolicy) []reconcile.Request { + return r.enqueueClass(ctx, btp) + }), + btpPredicates...)); err != nil { return err } // Watch SecurityPolicy - spPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + spPredicates := []predicate.TypedPredicate[*egv1a1.SecurityPolicy]{ + predicate.TypedGenerationChangedPredicate[*egv1a1.SecurityPolicy]{}, + } if r.namespaceLabel != nil { - spPredicates = append(spPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + spPredicates = append(spPredicates, predicate.NewTypedPredicateFuncs[*egv1a1.SecurityPolicy](func(sp *egv1a1.SecurityPolicy) bool { + return r.hasMatchingNamespaceLabels(sp) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.SecurityPolicy{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - spPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &egv1a1.SecurityPolicy{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, sp *egv1a1.SecurityPolicy) []reconcile.Request { + return r.enqueueClass(ctx, sp) + }), + spPredicates...)); err != nil { return err } if err := addSecurityPolicyIndexers(ctx, mgr); err != nil { @@ -1231,16 +1325,21 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch BackendTLSPolicy - btlsPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + btlsPredicates := []predicate.TypedPredicate[*gwapiv1a2.BackendTLSPolicy]{ + predicate.TypedGenerationChangedPredicate[*gwapiv1a2.BackendTLSPolicy]{}, + } if r.namespaceLabel != nil { - btlsPredicates = append(btlsPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + btlsPredicates = append(btlsPredicates, predicate.NewTypedPredicateFuncs[*gwapiv1a2.BackendTLSPolicy](func(btp *gwapiv1a2.BackendTLSPolicy) bool { + return r.hasMatchingNamespaceLabels(btp) + })) } if err := c.Watch( - source.Kind(mgr.GetCache(), &gwapiv1a2.BackendTLSPolicy{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - btlsPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &gwapiv1a2.BackendTLSPolicy{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, btp *gwapiv1a2.BackendTLSPolicy) []reconcile.Request { + return r.enqueueClass(ctx, btp) + }), + btlsPredicates...)); err != nil { return err } @@ -1249,17 +1348,22 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M } // Watch EnvoyExtensionPolicy - eepPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + eepPredicates := []predicate.TypedPredicate[*egv1a1.EnvoyExtensionPolicy]{ + predicate.TypedGenerationChangedPredicate[*egv1a1.EnvoyExtensionPolicy]{}, + } if r.namespaceLabel != nil { - eepPredicates = append(eepPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + eepPredicates = append(eepPredicates, predicate.NewTypedPredicateFuncs[*egv1a1.EnvoyExtensionPolicy](func(eep *egv1a1.EnvoyExtensionPolicy) bool { + return r.hasMatchingNamespaceLabels(eep) + })) } // Watch EnvoyExtensionPolicy CRUDs if err := c.Watch( - source.Kind(mgr.GetCache(), &egv1a1.EnvoyExtensionPolicy{}), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - eepPredicates..., - ); err != nil { + source.Kind(mgr.GetCache(), &egv1a1.EnvoyExtensionPolicy{}, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, eep *egv1a1.EnvoyExtensionPolicy) []reconcile.Request { + return r.enqueueClass(ctx, eep) + }), + eepPredicates...)); err != nil { return err } if err := addEnvoyExtensionPolicyIndexers(ctx, mgr); err != nil { @@ -1269,17 +1373,22 @@ func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.M r.log.Info("Watching gatewayAPI related objects") // Watch any additional GVKs from the registered extension. - uPredicates := []predicate.Predicate{predicate.GenerationChangedPredicate{}} + uPredicates := []predicate.TypedPredicate[*unstructured.Unstructured]{ + predicate.TypedGenerationChangedPredicate[*unstructured.Unstructured]{}, + } if r.namespaceLabel != nil { - uPredicates = append(uPredicates, predicate.NewPredicateFuncs(r.hasMatchingNamespaceLabels)) + uPredicates = append(uPredicates, predicate.NewTypedPredicateFuncs[*unstructured.Unstructured](func(obj *unstructured.Unstructured) bool { + return r.hasMatchingNamespaceLabels(obj) + })) } for _, gvk := range r.extGVKs { u := &unstructured.Unstructured{} u.SetGroupVersionKind(gvk) - if err := c.Watch(source.Kind(mgr.GetCache(), u), - handler.EnqueueRequestsFromMapFunc(r.enqueueClass), - uPredicates..., - ); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), u, + handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, si *unstructured.Unstructured) []reconcile.Request { + return r.enqueueClass(ctx, si) + }), + uPredicates...)); err != nil { return err } r.log.Info("Watching additional resource", "resource", gvk.String()) @@ -1293,12 +1402,7 @@ func (r *gatewayAPIReconciler) enqueueClass(_ context.Context, _ client.Object) }}} } -func (r *gatewayAPIReconciler) hasManagedClass(obj client.Object) bool { - ep, ok := obj.(*egv1a1.EnvoyProxy) - if !ok { - panic(fmt.Sprintf("unsupported object type %T", obj)) - } - +func (r *gatewayAPIReconciler) hasManagedClass(ep *egv1a1.EnvoyProxy) bool { // The EnvoyProxy must be in the same namespace as EG. if ep.Namespace != r.namespace { r.log.Info("envoyproxy namespace does not match Envoy Gateway's namespace", diff --git a/internal/provider/kubernetes/predicates.go b/internal/provider/kubernetes/predicates.go index f25c0092326..3cf200a8796 100644 --- a/internal/provider/kubernetes/predicates.go +++ b/internal/provider/kubernetes/predicates.go @@ -33,13 +33,7 @@ const oidcHMACSecretName = "envoy-oidc-hmac" // hasMatchingController returns true if the provided object is a GatewayClass // with a Spec.Controller string matching this Envoy Gateway's controller string, // or false otherwise. -func (r *gatewayAPIReconciler) hasMatchingController(obj client.Object) bool { - gc, ok := obj.(*gwapiv1.GatewayClass) - if !ok { - r.log.Info("bypassing reconciliation due to unexpected object type", "type", obj) - return false - } - +func (r *gatewayAPIReconciler) hasMatchingController(gc *gwapiv1.GatewayClass) bool { if gc.Spec.ControllerName == r.classController { r.log.Info("gatewayclass has matching controller name, processing", "name", gc.Name) return true diff --git a/internal/provider/kubernetes/sources.go b/internal/provider/kubernetes/sources.go index 66d93acb0d5..e19259f77ca 100644 --- a/internal/provider/kubernetes/sources.go +++ b/internal/provider/kubernetes/sources.go @@ -13,22 +13,22 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" ) // watchAndReconcileSource is a concrete implementation of the Source interface. type watchAndReconcileSource struct { - condition <-chan struct{} - object client.Object + condition <-chan struct{} + object client.Object + eventHandler handler.EventHandler } -func NewWatchAndReconcileSource(cond <-chan struct{}, obj client.Object) source.Source { - return &watchAndReconcileSource{condition: cond, object: obj} +func NewWatchAndReconcileSource(cond <-chan struct{}, obj client.Object, eh handler.EventHandler) source.Source { + return &watchAndReconcileSource{condition: cond, object: obj, eventHandler: eh} } // Start implements the Source interface. It registers the EventHandler with the Informer. -func (s *watchAndReconcileSource) Start(ctx context.Context, eh handler.EventHandler, queue workqueue.RateLimitingInterface, _ ...predicate.Predicate) error { +func (s *watchAndReconcileSource) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { if s.object == nil { return errors.New("object to queue is required") } @@ -39,7 +39,7 @@ func (s *watchAndReconcileSource) Start(ctx context.Context, eh handler.EventHan return case <-s.condition: // Triggers a reconcile - eh.Generic(ctx, event.GenericEvent{Object: s.object}, queue) + s.eventHandler.Generic(ctx, event.GenericEvent{Object: s.object}, queue) } }() return nil