Skip to content
Merged
14 changes: 8 additions & 6 deletions commands/operator-sdk/cmd/up/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,23 @@ func upLocalAnsible() {
printVersion()
log.Infof("watching namespace: %s", namespace)
done := make(chan error)
cMap := proxy.NewControllerMap()

// start the proxy
err = proxy.Run(done, proxy.Options{
Address: "localhost",
Port: 8888,
KubeConfig: mgr.GetConfig(),
Cache: mgr.GetCache(),
RESTMapper: mgr.GetRESTMapper(),
Address: "localhost",
Port: 8888,
KubeConfig: mgr.GetConfig(),
Cache: mgr.GetCache(),
RESTMapper: mgr.GetRESTMapper(),
ControllerMap: cMap,
})
if err != nil {
log.Fatalf("error starting proxy: (%v)", err)
}

// start the operator
go ansibleOperator.Run(done, mgr, ansibleOperatorFlags)
go ansibleOperator.Run(done, mgr, ansibleOperatorFlags, cMap)

// wait for either to finish
err = <-done
Expand Down
23 changes: 16 additions & 7 deletions pkg/ansible/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ var log = logf.Log.WithName("ansible-controller")

// Options - options for your controller
type Options struct {
EventHandlers []events.EventHandler
LoggingLevel events.LogLevel
Runner runner.Runner
GVK schema.GroupVersionKind
ReconcilePeriod time.Duration
ManageStatus bool
EventHandlers []events.EventHandler
LoggingLevel events.LogLevel
Runner runner.Runner
GVK schema.GroupVersionKind
ReconcilePeriod time.Duration
ManageStatus bool
WatchDependentResources bool
}

// Add - Creates a new ansible operator controller and adds it to the manager
func Add(mgr manager.Manager, options Options) {
func Add(mgr manager.Manager, options Options) *controller.Controller {
log.Info("Watching resource", "Options.Group", options.GVK.Group, "Options.Version", options.GVK.Version, "Options.Kind", options.GVK.Kind)
if options.EventHandlers == nil {
options.EventHandlers = []events.EventHandler{}
Expand All @@ -63,6 +64,13 @@ func Add(mgr manager.Manager, options Options) {
ManageStatus: options.ManageStatus,
}

if mgr.GetScheme().IsVersionRegistered(schema.GroupVersion{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Group: options.GVK.Group,
Version: options.GVK.Version,
}) {
log.Info("Version already registered... skipping")
return nil
}
// Register the GVK with the schema
mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{})
metav1.AddToGroupVersion(mgr.GetScheme(), schema.GroupVersion{
Expand All @@ -84,4 +92,5 @@ func Add(mgr manager.Manager, options Options) {
log.Error(err, "")
os.Exit(1)
}
return &c
}
11 changes: 9 additions & 2 deletions pkg/ansible/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package operator

import (
"errors"
"math/rand"
"time"

"github.com/operator-framework/operator-sdk/pkg/ansible/controller"
"github.com/operator-framework/operator-sdk/pkg/ansible/flags"
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy"
"github.com/operator-framework/operator-sdk/pkg/ansible/runner"

"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -30,7 +32,7 @@ import (
// Run - A blocking function which starts a controller-runtime manager
// It starts an Operator by reading in the values in `./watches.yaml`, adds a controller
// to the manager, and finally running the manager.
func Run(done chan error, mgr manager.Manager, f *flags.AnsibleOperatorFlags) {
func Run(done chan error, mgr manager.Manager, f *flags.AnsibleOperatorFlags, cMap *proxy.ControllerMap) {
watches, err := runner.NewFromWatches(f.WatchesFile)
if err != nil {
logf.Log.WithName("manager").Error(err, "failed to get watches")
Expand All @@ -47,7 +49,12 @@ func Run(done chan error, mgr manager.Manager, f *flags.AnsibleOperatorFlags) {
ManageStatus: runner.GetManageStatus(),
}
applyFlagsToControllerOptions(f, &o)
controller.Add(mgr, o)
ctr := controller.Add(mgr, o)
if ctr == nil {
done <- errors.New("failed to add controller")
return
}
cMap.Store(o.GVK, *ctr, runner.GetWatchDependentResources())
}
done <- mgr.Start(c)
}
Expand Down
91 changes: 89 additions & 2 deletions pkg/ansible/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/http"
"net/http/httputil"
"strings"
"sync"

k8sRequest "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/requestfactory"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -38,8 +39,18 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// ControllerMap - map of GVK to controller
type ControllerMap struct {
sync.RWMutex
internal map[schema.GroupVersionKind]controller.Controller
watch map[schema.GroupVersionKind]bool
}

// CacheResponseHandler will handle proxied requests and check if the requested
// resource exists in our cache. If it does then there is no need to bombard
// the APIserver with our request and we should write the response from the
Expand Down Expand Up @@ -126,7 +137,7 @@ func CacheResponseHandler(h http.Handler, informerCache cache.Cache, restMapper
// InjectOwnerReferenceHandler will handle proxied requests and inject the
// owner refernece found in the authorization header. The Authorization is
// then deleted so that the proxy can re-set with the correct authorization.
func InjectOwnerReferenceHandler(h http.Handler) http.Handler {
func InjectOwnerReferenceHandler(h http.Handler, cMap *ControllerMap) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost {
log.Info("injecting owner reference")
Expand Down Expand Up @@ -178,6 +189,13 @@ func InjectOwnerReferenceHandler(h http.Handler) http.Handler {
log.V(1).Info("serialized body", "Body", string(newBody))
req.Body = ioutil.NopCloser(bytes.NewBuffer(newBody))
req.ContentLength = int64(len(newBody))
// add watch for resource
err = addWatchToController(owner, cMap, data)
if err != nil {
m := "could not add watch to controller"
log.Error(err, m)
http.Error(w, m, http.StatusInternalServerError)
}
}
// Removing the authorization so that the proxy can set the correct authorization.
req.Header.Del("Authorization")
Expand All @@ -200,6 +218,7 @@ type Options struct {
KubeConfig *rest.Config
Cache cache.Cache
RESTMapper meta.RESTMapper
ControllerMap *ControllerMap
}

// Run will start a proxy server in a go routine that returns on the error
Expand All @@ -213,6 +232,9 @@ func Run(done chan error, o Options) error {
if o.Handler != nil {
server.Handler = o.Handler(server.Handler)
}
if o.ControllerMap == nil {
return fmt.Errorf("failed to get controller map from options")
}

if o.Cache == nil {
// Need to initialize cache since we don't have one
Expand All @@ -236,7 +258,7 @@ func Run(done chan error, o Options) error {
}

if !o.NoOwnerInjection {
server.Handler = InjectOwnerReferenceHandler(server.Handler)
server.Handler = InjectOwnerReferenceHandler(server.Handler, o.ControllerMap)
}
// Always add cache handler
server.Handler = CacheResponseHandler(server.Handler, o.Cache, o.RESTMapper)
Expand All @@ -251,3 +273,68 @@ func Run(done chan error, o Options) error {
}()
return nil
}

func addWatchToController(owner metav1.OwnerReference, cMap *ControllerMap, resource *unstructured.Unstructured) error {
gv, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
return err
}
gvk := schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: owner.Kind,
}
c, watch, ok := cMap.Get(gvk)
if !ok {
return errors.New("failed to find controller in map")
}
// Add a watch to controller
if watch {
err = c.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{OwnerType: resource})
if err != nil {
return err
}
}
return nil
}

// NewControllerMap returns a new object that contains a mapping between GVK
// and controller
func NewControllerMap() *ControllerMap {
return &ControllerMap{
internal: make(map[schema.GroupVersionKind]controller.Controller),
watch: make(map[schema.GroupVersionKind]bool),
}
}

// Get - Returns a controller given a GVK as the key. `watch` in the return
// specifies whether or not the operator will watch dependent resources for
// this controller. `ok` returns whether the query was successful. `controller`
// is the associated controller-runtime controller object.
func (cm *ControllerMap) Get(key schema.GroupVersionKind) (controller controller.Controller, watch, ok bool) {
cm.RLock()
defer cm.RUnlock()
result, ok := cm.internal[key]
if !ok {
return result, false, ok
}
watch, ok = cm.watch[key]
return result, watch, ok
}

// Delete - Deletes associated GVK to controller mapping from the ControllerMap
func (cm *ControllerMap) Delete(key schema.GroupVersionKind) {
cm.Lock()
defer cm.Unlock()
delete(cm.internal, key)
}

// Store - Adds a new GVK to controller mapping. Also creates a mapping between
// GVK and a boolean `watch` that specifies whether this controller is watching
// dependent resources.
func (cm *ControllerMap) Store(key schema.GroupVersionKind, value controller.Controller, watch bool) {
cm.Lock()
defer cm.Unlock()
cm.internal[key] = value
cm.watch[key] = watch
}
12 changes: 7 additions & 5 deletions pkg/ansible/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ func TestHandler(t *testing.T) {
t.Fatalf("failed to instantiate manager: %v", err)
}
done := make(chan error)
cMap := NewControllerMap()
err = Run(done, Options{
Address: "localhost",
Port: 8888,
KubeConfig: mgr.GetConfig(),
Cache: nil,
RESTMapper: mgr.GetRESTMapper(),
Address: "localhost",
Port: 8888,
KubeConfig: mgr.GetConfig(),
Cache: nil,
RESTMapper: mgr.GetRESTMapper(),
ControllerMap: cMap,
})
if err != nil {
t.Fatalf("error starting proxy: %v", err)
Expand Down
12 changes: 9 additions & 3 deletions pkg/ansible/runner/fake/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (

// Runner - implements the Runner interface for a GVK that's being watched.
type Runner struct {
Finalizer string
ReconcilePeriod time.Duration
ManageStatus bool
Finalizer string
ReconcilePeriod time.Duration
ManageStatus bool
WatchDependentResources bool
// Used to send error if Run should fail.
Error error
// Job Events that will be sent back from the runs channel
Expand Down Expand Up @@ -77,6 +78,11 @@ func (r *Runner) GetManageStatus() bool {
return r.ManageStatus
}

// GetWatchDependentResources - get watchDependentResources.
func (r *Runner) GetWatchDependentResources() bool {
return r.WatchDependentResources
}

// GetFinalizer - gets the fake finalizer.
func (r *Runner) GetFinalizer() (string, bool) {
return r.Finalizer, r.Finalizer != ""
Expand Down
41 changes: 25 additions & 16 deletions pkg/ansible/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@ type Runner interface {
GetFinalizer() (string, bool)
GetReconcilePeriod() (time.Duration, bool)
GetManageStatus() bool
GetWatchDependentResources() bool
}

// watch holds data used to create a mapping of GVK to ansible playbook or role.
// The mapping is used to compose an ansible operator.
type watch struct {
Version string `yaml:"version"`
Group string `yaml:"group"`
Kind string `yaml:"kind"`
Playbook string `yaml:"playbook"`
Role string `yaml:"role"`
ReconcilePeriod string `yaml:"reconcilePeriod"`
ManageStatus bool `yaml:"manageStatus"`
Finalizer *Finalizer `yaml:"finalizer"`
Version string `yaml:"version"`
Group string `yaml:"group"`
Kind string `yaml:"kind"`
Playbook string `yaml:"playbook"`
Role string `yaml:"role"`
ReconcilePeriod string `yaml:"reconcilePeriod"`
ManageStatus bool `yaml:"manageStatus"`
WatchDependentResources bool `yaml:"watchDependentResources"`
Finalizer *Finalizer `yaml:"finalizer"`
}

// Finalizer - Expose finalizer to be used by a user.
Expand All @@ -69,8 +71,9 @@ type Finalizer struct {

// UnmarshalYaml - implements the yaml.Unmarshaler interface
func (w *watch) UnmarshalYAML(unmarshal func(interface{}) error) error {
// by default, the operator will manage status
// by default, the operator will manage status and watch dependent resources
w.ManageStatus = true
w.WatchDependentResources = true

// hide watch data in plain struct to prevent unmarshal from calling
// UnmarshalYAML again
Expand Down Expand Up @@ -185,13 +188,14 @@ func NewForRole(path string, gvk schema.GroupVersionKind, finalizer *Finalizer,

// runner - implements the Runner interface for a GVK that's being watched.
type runner struct {
Path string // path on disk to a playbook or role depending on what cmdFunc expects
GVK schema.GroupVersionKind // GVK being watched that corresponds to the Path
Finalizer *Finalizer
cmdFunc func(ident, inputDirPath string) *exec.Cmd // returns a Cmd that runs ansible-runner
finalizerCmdFunc func(ident, inputDirPath string) *exec.Cmd
reconcilePeriod *time.Duration
manageStatus bool
Path string // path on disk to a playbook or role depending on what cmdFunc expects
GVK schema.GroupVersionKind // GVK being watched that corresponds to the Path
Finalizer *Finalizer
cmdFunc func(ident, inputDirPath string) *exec.Cmd // returns a Cmd that runs ansible-runner
finalizerCmdFunc func(ident, inputDirPath string) *exec.Cmd
reconcilePeriod *time.Duration
manageStatus bool
watchDependentResources bool
}

func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig string) (RunResult, error) {
Expand Down Expand Up @@ -281,6 +285,11 @@ func (r *runner) GetManageStatus() bool {
return r.manageStatus
}

// GetWatchDependentResources - get the watch dependent resources value
func (r *runner) GetWatchDependentResources() bool {
return r.watchDependentResources
}

func (r *runner) GetFinalizer() (string, bool) {
if r.Finalizer != nil {
return r.Finalizer.Name, true
Expand Down
Loading