Skip to content

Commit

Permalink
pkg/ansible/proxy: adding ability for dependent watches to be recovered
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Hurley committed Feb 7, 2019
1 parent 09d0d70 commit d663bd0
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 86 deletions.
202 changes: 116 additions & 86 deletions pkg/ansible/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type marshaler interface {
// resource exists in our cache. If it does then there is no need to bombard
// the APIserver with our request and we should write the response from the
// proxy.
func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}) http.Handler {
func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}, cMap *controllermap.ControllerMap, injectOwnerRef bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
Expand Down Expand Up @@ -138,17 +138,39 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
}
m = &un
} else {
un := unstructured.Unstructured{}
un := &unstructured.Unstructured{}
un.SetGroupVersionKind(k)
obj := client.ObjectKey{Namespace: r.Namespace, Name: r.Name}
err = informerCache.Get(context.Background(), obj, &un)
err = informerCache.Get(context.Background(), obj, un)
if err != nil {
// break here in case resource doesn't exist in cache but exists on APIserver
// This is very unlikely but provides user with expected 404
log.Info(fmt.Sprintf("Cache miss: %v, %v", k, obj))
break
}
m = &un
m = un
// Once we get the resource, we are going to attempt to recover the dependent watches here,
// This will happen in the background, and log errors.
if injectOwnerRef {
go func() {
ownerRef, err := getRequestOwnerRef(req)
if err != nil {
log.Error(err, "Could not get ownerRef from proxy")
return
}

for _, oRef := range un.GetOwnerReferences() {
if oRef.APIVersion == ownerRef.APIVersion && oRef.Kind == ownerRef.Kind {
err := addWatchToController(ownerRef, cMap, un, restMapper)
if err != nil {
log.Error(err, "Could not recover dependent resource watch", "owner", ownerRef)
return
}
}
}
}()
}

}

i := bytes.Buffer{}
Expand Down Expand Up @@ -184,7 +206,7 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
// InjectOwnerReferenceHandler will handle proxied requests and inject the
// owner refernece found in the authorization header. The Authorization is
// then deleted so that the proxy can re-set with the correct authorization.
func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper) http.Handler {
func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerMap, restMapper meta.RESTMapper, watchedNamespaces map[string]interface{}) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodPost:
Expand All @@ -203,29 +225,9 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
break
}
log.Info("Injecting owner reference")

user, _, ok := req.BasicAuth()
if !ok {
log.Error(errors.New("basic auth header not found"), "")
w.Header().Set("WWW-Authenticate", "Basic realm=\"Operator Proxy\"")
http.Error(w, "", http.StatusUnauthorized)
return
}
authString, err := base64.StdEncoding.DecodeString(user)
owner, err := getRequestOwnerRef(req)
if err != nil {
m := "Could not base64 decode username"
log.Error(err, m)
http.Error(w, m, http.StatusBadRequest)
return
}
// Set owner to NamespacedOwnerReference, which has metav1.OwnerReference
// as a subset along with the Namespace of the owner. Please see the
// kubeconfig.NamespacedOwnerReference type for more information. The
// namespace is required when creating the reconcile requests.
owner := kubeconfig.NamespacedOwnerReference{}
json.Unmarshal(authString, &owner)
if err := json.Unmarshal(authString, &owner); err != nil {
m := "Could not unmarshal auth string"
m := "Could not get owner reference"
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
Expand Down Expand Up @@ -257,35 +259,23 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
log.V(1).Info("Serialized body", "Body", string(newBody))
req.Body = ioutil.NopCloser(bytes.NewBuffer(newBody))
req.ContentLength = int64(len(newBody))
dataMapping, err := restMapper.RESTMapping(data.GroupVersionKind().GroupKind(), data.GroupVersionKind().Version)
if err != nil {
m := fmt.Sprintf("Could not get rest mapping for: %v", data.GroupVersionKind())
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
}
// We need to determine whether or not the owner is a cluster-scoped
// resource because enqueue based on an owner reference does not work if
// a namespaced resource owns a cluster-scoped resource
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version)
if err != nil {
m := fmt.Sprintf("could not get rest mapping for: %v", owner)
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
}

dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot
ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot
useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped
// add watch for resource
err = addWatchToController(owner, cMap, data, useOwnerReference)
if err != nil {
m := "could not add watch to controller"
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
// check if resource doesn't exist in watched namespaces
// if watchedNamespaces[""] exists then we are watching all namespaces
// and want to continue
// This is making sure we are not attempting to watch a resource outside of the
// namespaces that the cache can watch.
_, allNsPresent := watchedNamespaces[metav1.NamespaceAll]
_, reqNsPresent := watchedNamespaces[r.Namespace]
if allNsPresent || reqNsPresent {
err = addWatchToController(owner, cMap, data, restMapper)
if err != nil {
m := "could not add watch to controller"
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
return
}
}
}
// Removing the authorization so that the proxy can set the correct authorization.
Expand All @@ -294,6 +284,7 @@ func InjectOwnerReferenceHandler(h http.Handler, cMap *controllermap.ControllerM
})
}

// RequestLogHandler - log the requests that come through the proxy.
func RequestLogHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// read body
Expand Down Expand Up @@ -379,13 +370,13 @@ func Run(done chan error, o Options) error {
}

if !o.NoOwnerInjection {
server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap, o.RESTMapper)
server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap, o.RESTMapper, watchedNamespaceMap)
}
if o.LogRequests {
server.Handler = RequestLogHandler(server.Handler)
}
if !o.DisableCache {
server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper, watchedNamespaceMap)
server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper, watchedNamespaceMap, o.ControllerMap, !o.NoOwnerInjection)
}

l, err := server.Listen(o.Address, o.Port)
Expand All @@ -399,57 +390,71 @@ func Run(done chan error, o Options) error {
return nil
}

func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, useOwnerReference bool) error {
gv, err := schema.ParseGroupVersion(owner.APIVersion)
func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *controllermap.ControllerMap, resource *unstructured.Unstructured, restMapper meta.RESTMapper) error {
dataMapping, err := restMapper.RESTMapping(resource.GroupVersionKind().GroupKind(), resource.GroupVersionKind().Version)
if err != nil {
m := fmt.Sprintf("Could not get rest mapping for: %v", resource.GroupVersionKind())
log.Error(err, m)
return err

}
// We need to determine whether or not the owner is a cluster-scoped
// resource because enqueue based on an owner reference does not work if
// a namespaced resource owns a cluster-scoped resource
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
m := fmt.Sprintf("could not get broup version for: %v", owner)
log.Error(err, m)
return err
}
gvk := schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: owner.Kind,
ownerMapping, err := restMapper.RESTMapping(schema.GroupKind{Kind: owner.Kind, Group: ownerGV.Group}, ownerGV.Version)
if err != nil {
m := fmt.Sprintf("could not get rest mapping for: %v", owner)
log.Error(err, m)
return err
}
contents, ok := cMap.Get(gvk)

dataNamespaceScoped := dataMapping.Scope.Name() != meta.RESTScopeNameRoot
ownerNamespaceScoped := ownerMapping.Scope.Name() != meta.RESTScopeNameRoot
useOwnerReference := !ownerNamespaceScoped || dataNamespaceScoped
contents, ok := cMap.Get(ownerMapping.GroupVersionKind)
if !ok {
return errors.New("failed to find controller in map")
}
wMap := contents.WatchMap
uMap := contents.UIDMap
// Store UID
uMap.Store(owner.UID, types.NamespacedName{
Name: owner.Name,
Namespace: owner.Namespace,
})
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(gvk)
u.SetGroupVersionKind(ownerMapping.GroupVersionKind)
// Add a watch to controller
if contents.WatchDependentResources {
// Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources
if useOwnerReference && !contents.WatchClusterScopedResources {
_, exists := wMap.Get(resource.GroupVersionKind())
// If already watching resource no need to add a new watch
if exists {
return nil
}
// Store UID
uMap.Store(owner.UID, types.NamespacedName{
Name: owner.Name,
Namespace: owner.Namespace,
})
_, exists := wMap.Get(resource.GroupVersionKind())
// If already watching resource no need to add a new watch
if exists {
return nil
}
// Store watch in map
wMap.Store(resource.GroupVersionKind())
// Use EnqueueRequestForOwner unless user has configured watching cluster scoped resources and we have to
if useOwnerReference {
log.Info("Watching child resource", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
// Store watch in map
wMap.Store(resource.GroupVersionKind())
err = contents.Controller.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: u})
} else if contents.WatchClusterScopedResources {
// Use Map func since EnqueuRequestForOwner won't work
// Check if resource is already watched
_, exists := wMap.Get(resource.GroupVersionKind())
if exists {
return nil
if err != nil {
return err
}
} else if contents.WatchClusterScopedResources {
log.Info("Watching child resource which can be cluster-scoped", "kind", resource.GroupVersionKind(), "enqueue_kind", u.GroupVersionKind())
// Store watch in map
wMap.Store(resource.GroupVersionKind())
// Add watch
err = contents.Controller.Watch(
&source.Kind{Type: resource},
// Use Map func since EnqueuRequestForOwner won't work
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(func(a handler.MapObject) []reconcile.Request {
log.V(2).Info("Creating reconcile request from object", "gvk", gvk, "name", a.Meta.GetName())
log.V(2).Info("Creating reconcile request from object", "gvk", ownerMapping.GroupVersionKind, "name", a.Meta.GetName())
ownRefs := a.Meta.GetOwnerReferences()
for _, ref := range ownRefs {
nn, exists := uMap.Get(ref.UID)
Expand All @@ -470,3 +475,28 @@ func addWatchToController(owner kubeconfig.NamespacedOwnerReference, cMap *contr
}
return nil
}

func getRequestOwnerRef(req *http.Request) (kubeconfig.NamespacedOwnerReference, error) {
owner := kubeconfig.NamespacedOwnerReference{}
user, _, ok := req.BasicAuth()
if !ok {
return owner, errors.New("basic auth header not found")
}
authString, err := base64.StdEncoding.DecodeString(user)
if err != nil {
m := "Could not base64 decode username"
log.Error(err, m)
return owner, err
}
// Set owner to NamespacedOwnerReference, which has metav1.OwnerReference
// as a subset along with the Namespace of the owner. Please see the
// kubeconfig.NamespacedOwnerReference type for more information. The
// namespace is required when creating the reconcile requests.
json.Unmarshal(authString, &owner)
if err := json.Unmarshal(authString, &owner); err != nil {
m := "Could not unmarshal auth string"
log.Error(err, m)
return owner, err
}
return owner, err
}
53 changes: 53 additions & 0 deletions test/ansible-memcached/asserts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,59 @@
assert:
that: cr.resources[0].status.get("test") == "hello world"
when: cr is defined
- when: molecule_yml.scenario.name == "test-local"
block:
- name: Restart the operator by killing the pod
k8s:
state: absent
definition:
api_version: v1
kind: Pod
metadata:
namespace: '{{ namespace }}'
name: '{{ pod.metadata.name }}'
vars:
pod: '{{ q("k8s", api_version="v1", kind="Pod", namespace=namespace, label_selector="name=memcached-operator").0 }}'

- name: Wait 2 minutes for operator deployment
debug:
var: deploy
until: deploy and deploy.status and deploy.status.replicas == deploy.status.get("availableReplicas", 0)
retries: 12
delay: 10
vars:
deploy: '{{ lookup("k8s",
kind="Deployment",
api_version="apps/v1",
namespace=namespace,
resource_name="memcached-operator"
)}}'

- name: Wait for reconcilation to have a chance at finishing
pause:
seconds: 15

- name: Delete the service that is created.
k8s:
kind: Service
api_version: v1
namespace: '{{ namespace }}'
name: test-service
state: absent

- name: Verify that test-service was re-created
debug:
var: service
until: service
retries: 12
delay: 10
vars:
service: '{{ lookup("k8s",
kind="Service",
api_version="v1",
namespace=namespace,
resource_name="test-service",
)}}'

- name: Delete the custom resource
k8s:
Expand Down

0 comments on commit d663bd0

Please sign in to comment.