diff --git a/pkg/ansible/proxy/proxy.go b/pkg/ansible/proxy/proxy.go index ecd4f37436..299caf4650 100644 --- a/pkg/ansible/proxy/proxy.go +++ b/pkg/ansible/proxy/proxy.go @@ -55,7 +55,7 @@ type marshaler interface { // resource exists in our cache. If it does then there is no need to bombard // the APIserver with our request and we should write the response from the // proxy. -func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}) http.Handler { +func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}, cMap *controllermap.ControllerMap, injectOwnerRef bool) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { switch req.Method { case http.MethodGet: @@ -138,17 +138,39 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper } m = &un } else { - un := unstructured.Unstructured{} + un := &unstructured.Unstructured{} un.SetGroupVersionKind(k) obj := client.ObjectKey{Namespace: r.Namespace, Name: r.Name} - err = informerCache.Get(context.Background(), obj, &un) + err = informerCache.Get(context.Background(), obj, un) if err != nil { // break here in case resource doesn't exist in cache but exists on APIserver // This is very unlikely but provides user with expected 404 log.Info(fmt.Sprintf("Cache miss: %v, %v", k, obj)) break } - m = &un + m = un + // Once we get the resource, we are going to attempt to recover the dependent watches here, + // This will happen in the background, and log errors. + if injectOwnerRef { + go func() { + ownerRef, err := getRequestOwnerRef(req) + if err != nil { + log.Error(err, "Could not get ownerRef from proxy") + return + } + + for _, oRef := range un.GetOwnerReferences() { + if oRef.APIVersion == ownerRef.APIVersion && oRef.Kind == ownerRef.Kind { + err := addWatchToController(ownerRef, cMap, un, restMapper) + if err != nil { + log.Error(err, "Could not recover dependent resource watch", "owner", ownerRef) + return + } + } + } + }() + } + } i := bytes.Buffer{} @@ -184,7 +206,7 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper // InjectOwnerReferenceHandler will handle proxied requests and inject the // owner refernece found in the authorization header. The Authorization is // then deleted so that the proxy can re-set with the correct authorization. -func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper) http.Handler { +func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { switch req.Method { case http.MethodPost: @@ -203,29 +225,9 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM break } log.Info("Injecting owner reference") - - user, _, ok := req.BasicAuth() - if !ok { - log.Error(errors.New("basic auth header not found"), "") - w.Header().Set("WWW-Authenticate", "Basic realm=\"Operator Proxy\"") - http.Error(w, "", http.StatusUnauthorized) - return - } - authString, err := base64.StdEncoding.DecodeString(user) + owner, err := getRequestOwnerRef(req) if err != nil { - m := "Could not base64 decode username" - log.Error(err, m) - http.Error(w, m, http.StatusBadRequest) - return - } - // Set owner to NamespacedOwnerReference, which has metav1.OwnerReference - // as a subset along with the Namespace of the owner. Please see the - // kubeconfig.NamespacedOwnerReference type for more information. The - // namespace is required when creating the reconcile requests. - owner := kubeconfig.NamespacedOwnerReference{} - json.Unmarshal(authString, &owner) - if err := json.Unmarshal(authString, &owner); err != nil { - m := "Could not unmarshal auth string" + m := "Could not get owner reference" log.Error(err, m) http.Error(w, m, http.StatusInternalServerError) return @@ -257,35 +259,23 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM log.V(1).Info("Serialized body", "Body", string(newBody)) req.Body = ioutil.NopCloser(bytes.NewBuffer(newBody)) req.ContentLength = int64(len(newBody)) - dataMapping, err := restMapper.RESTMapping(data.GroupVersionKind().GroupKind(), data.GroupVersionKind().Version) - if err != nil { - m := fmt.Sprintf("Could not get rest mapping for: %v", data.GroupVersionKind()) - log.Error(err, m) - http.Error(w, m, http.StatusInternalServerError) - return - } - // We need to determine whether or not the owner is a cluster-scoped - // resource because enqueue based on an owner reference does not work if - // a namespaced resource owns a cluster-scoped resource - ownerGV, err := schema.ParseGroupVersion(owner.APIVersion) - ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version) - if err != nil { - m := fmt.Sprintf("could not get rest mapping for: %v", owner) - log.Error(err, m) - http.Error(w, m, http.StatusInternalServerError) - return - } - dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot - ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot - useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped // add watch for resource - err = addWatchToController(owner, cMap, data, useOwnerReference) - if err != nil { - m := "could not add watch to controller" - log.Error(err, m) - http.Error(w, m, http.StatusInternalServerError) - return + // check if resource doesn't exist in watched namespaces + // if watchedNamespaces[""] exists then we are watching all namespaces + // and want to continue + // This is making sure we are not attempting to watch a resource outside of the + // namespaces that the cache can watch. + _, allNsPresent := watchedNamespaces[metav1.NamespaceAll] + _, reqNsPresent := watchedNamespaces[r.Namespace] + if allNsPresent || reqNsPresent { + err = addWatchToController(owner, cMap, data, restMapper) + if err != nil { + m := "could not add watch to controller" + log.Error(err, m) + http.Error(w, m, http.StatusInternalServerError) + return + } } } // Removing the authorization so that the proxy can set the correct authorization. @@ -294,6 +284,7 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM }) } +// RequestLogHandler - log the requests that come through the proxy. func RequestLogHandler(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { // read body @@ -379,13 +370,13 @@ func Run(done chan error, o Options) error { } if !o.NoOwnerInjection { - server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap, o.RESTMapper) + server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap, o.RESTMapper, watchedNamespaceMap) } if o.LogRequests { server.Handler = RequestLogHandler(server.Handler) } if !o.DisableCache { - server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper, watchedNamespaceMap) + server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper, watchedNamespaceMap, o.ControllerMap, !o.NoOwnerInjection) } l, err := server.Listen(o.Address, o.Port) @@ -399,57 +390,71 @@ func Run(done chan error, o Options) error { return nil } -func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, useOwnerReference bool) error { - gv, err := schema.ParseGroupVersion(owner.APIVersion) +func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, restMapper meta.RESTMapper) error { + dataMapping, err := restMapper.RESTMapping(resource.GroupVersionKind().GroupKind(), resource.GroupVersionKind().Version) + if err != nil { + m := fmt.Sprintf("Could not get rest mapping for: %v", resource.GroupVersionKind()) + log.Error(err, m) + return err + + } + // We need to determine whether or not the owner is a cluster-scoped + // resource because enqueue based on an owner reference does not work if + // a namespaced resource owns a cluster-scoped resource + ownerGV, err := schema.ParseGroupVersion(owner.APIVersion) if err != nil { + m := fmt.Sprintf("could not get broup version for: %v", owner) + log.Error(err, m) return err } - gvk := schema.GroupVersionKind{ - Group: gv.Group, - Version: gv.Version, - Kind: owner.Kind, + ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version) + if err != nil { + m := fmt.Sprintf("could not get rest mapping for: %v", owner) + log.Error(err, m) + return err } - contents, ok := cMap.Get(gvk) + + dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot + ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot + useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped + contents, ok := cMap.Get(ownerMapping.GroupVersionKind) if !ok { return errors.New("failed to find controller in map") } wMap := contents.WatchMap uMap := contents.UIDMap - // Store UID - uMap.Store(owner.UID, types.NamespacedName{ - Name: owner.Name, - Namespace: owner.Namespace, - }) u := &unstructured.Unstructured{} - u.SetGroupVersionKind(gvk) + u.SetGroupVersionKind(ownerMapping.GroupVersionKind) // Add a watch to controller if contents.WatchDependentResources { - // Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources - if useOwnerReference && !contents.WatchClusterScopedResources { - _, exists := wMap.Get(resource.GroupVersionKind()) - // If already watching resource no need to add a new watch - if exists { - return nil - } + // Store UID + uMap.Store(owner.UID, types.NamespacedName{ + Name: owner.Name, + Namespace: owner.Namespace, + }) + _, exists := wMap.Get(resource.GroupVersionKind()) + // If already watching resource no need to add a new watch + if exists { + return nil + } + // Store watch in map + wMap.Store(resource.GroupVersionKind()) + // Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources and we have to + if useOwnerReference { log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind()) // Store watch in map - wMap.Store(resource.GroupVersionKind()) err = contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u}) - } else if contents.WatchClusterScopedResources { - // Use Map func since EnqueuRequestForOwner won't work - // Check if resource is already watched - _, exists := wMap.Get(resource.GroupVersionKind()) - if exists { - return nil + if err != nil { + return err } + } else if contents.WatchClusterScopedResources { log.Info("Watching child resource which can be cluster-scoped", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind()) - // Store watch in map - wMap.Store(resource.GroupVersionKind()) // Add watch err = contents.Controller.Watch( &source.Kind{Type: resource}, + // Use Map func since EnqueuRequestForOwner won't work &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request { - log.V(2).Info("Creating reconcile request from object", "gvk", gvk, "name", a.Meta.GetName()) + log.V(2).Info("Creating reconcile request from object", "gvk", ownerMapping.GroupVersionKind, "name", a.Meta.GetName()) ownRefs := a.Meta.GetOwnerReferences() for _, ref := range ownRefs { nn, exists := uMap.Get(ref.UID) @@ -470,3 +475,28 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr } return nil } + +func getRequestOwnerRef(req *http.Request) (kubeconfig.NamespacedOwnerReference, error) { + owner := kubeconfig.NamespacedOwnerReference{} + user, _, ok := req.BasicAuth() + if !ok { + return owner, errors.New("basic auth header not found") + } + authString, err := base64.StdEncoding.DecodeString(user) + if err != nil { + m := "Could not base64 decode username" + log.Error(err, m) + return owner, err + } + // Set owner to NamespacedOwnerReference, which has metav1.OwnerReference + // as a subset along with the Namespace of the owner. Please see the + // kubeconfig.NamespacedOwnerReference type for more information. The + // namespace is required when creating the reconcile requests. + json.Unmarshal(authString, &owner) + if err := json.Unmarshal(authString, &owner); err != nil { + m := "Could not unmarshal auth string" + log.Error(err, m) + return owner, err + } + return owner, err +} diff --git a/test/ansible-memcached/asserts.yml b/test/ansible-memcached/asserts.yml index 102022f597..8f6af29e6c 100644 --- a/test/ansible-memcached/asserts.yml +++ b/test/ansible-memcached/asserts.yml @@ -60,6 +60,59 @@ assert: that: cr.resources[0].status.get("test") == "hello world" when: cr is defined + - when: molecule_yml.scenario.name == "test-local" + block: + - name: Restart the operator by killing the pod + k8s: + state: absent + definition: + api_version: v1 + kind: Pod + metadata: + namespace: '{{ namespace }}' + name: '{{ pod.metadata.name }}' + vars: + pod: '{{ q("k8s", api_version="v1", kind="Pod", namespace=namespace, label_selector="name=memcached-operator").0 }}' + + - name: Wait 2 minutes for operator deployment + debug: + var: deploy + until: deploy and deploy.status and deploy.status.replicas == deploy.status.get("availableReplicas", 0) + retries: 12 + delay: 10 + vars: + deploy: '{{ lookup("k8s", + kind="Deployment", + api_version="apps/v1", + namespace=namespace, + resource_name="memcached-operator" + )}}' + + - name: Wait for reconcilation to have a chance at finishing + pause: + seconds: 15 + + - name: Delete the service that is created. + k8s: + kind: Service + api_version: v1 + namespace: '{{ namespace }}' + name: test-service + state: absent + + - name: Verify that test-service was re-created + debug: + var: service + until: service + retries: 12 + delay: 10 + vars: + service: '{{ lookup("k8s", + kind="Service", + api_version="v1", + namespace=namespace, + resource_name="test-service", + )}}' - name: Delete the custom resource k8s: