Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Honor allowed namespaces in all cluster/git operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Alfonso Acosta committed Mar 6, 2019
1 parent 0ad1b91 commit cb4ad60
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 46 deletions.
3 changes: 2 additions & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"errors"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/policy"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/ssh"
"github.com/weaveworks/flux/policy"
)

// Constants for workload ready status. These are defined here so that
Expand All @@ -27,6 +27,7 @@ type Cluster interface {
// Get all of the services (optionally, from a specific namespace), excluding those
AllControllers(maybeNamespace string) ([]Controller, error)
SomeControllers([]flux.ResourceID) ([]Controller, error)
IsAllowedResource(flux.ResourceID) bool
Ping() error
Export() ([]byte, error)
Sync(SyncSet) error
Expand Down
40 changes: 35 additions & 5 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes/resource"
fhrclient "github.com/weaveworks/flux/integrations/client/clientset/versioned"
"github.com/weaveworks/flux/ssh"
)
Expand Down Expand Up @@ -120,11 +121,14 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing,
// --- cluster.Cluster

// SomeControllers returns the controllers named, missing out any that don't
// exist in the cluster. They do not necessarily have to be returned
// in the order requested.
// exist in the clusteror aren't in an allowed namespace.
// They do not necessarily have to be returned in the order requested.
func (c *Cluster) SomeControllers(ids []flux.ResourceID) (res []cluster.Controller, err error) {
var controllers []cluster.Controller
for _, id := range ids {
if !c.IsAllowedResource(id) {
continue
}
ns, kind, name := id.Components()

resourceKind, ok := resourceKinds[kind]
Expand All @@ -147,8 +151,8 @@ func (c *Cluster) SomeControllers(ids []flux.ResourceID) (res []cluster.Controll
return controllers, nil
}

// AllControllers returns all controllers matching the criteria; that is, in
// the namespace (or any namespace if that argument is empty)
// AllControllers returns all controllers in allowed namespaces matching the criteria;
// that is, in the namespace (or any namespace if that argument is empty)
func (c *Cluster) AllControllers(namespace string) (res []cluster.Controller, err error) {
namespaces, err := c.getAllowedNamespaces()
if err != nil {
Expand Down Expand Up @@ -282,7 +286,7 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) {
nsList = append(nsList, *ns)
case apierrors.IsUnauthorized(err) || apierrors.IsForbidden(err) || apierrors.IsNotFound(err):
if !c.loggedAllowedNS[name] {
c.logger.Log("warning", "cannot access namespace set as allowed",
c.logger.Log("warning", "cannot access allowed namespace",
"namespace", name, "err", err)
c.loggedAllowedNS[name] = true
}
Expand All @@ -300,6 +304,32 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) {
return namespaces.Items, nil
}

func (c *Cluster) IsAllowedResource(id flux.ResourceID) bool {
if len(c.allowedNamespaces) == 0 {
// All resources are allowed when all namespaces are allowed
return true
}

namespace, kind, name := id.Components()
namespaceToCheck := namespace

if namespace == resource.ClusterScope {
// All cluster-scoped resources (not namespaced) are allowed ...
if kind != "namespace" {
return true
}
// ... except namespaces themselves, whose name needs to be explicitly allowed
namespaceToCheck = name
}

for _, allowedNS := range c.allowedNamespaces {
if namespaceToCheck == allowedNS {
return true
}
}
return false
}

// kind & apiVersion must be passed separately as the object's TypeMeta is not populated
func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error {
yamlBytes, err := k8syaml.Marshal(object)
Expand Down
50 changes: 38 additions & 12 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,19 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {

// NB we get all resources, since we care about leaving unsynced,
// _ignored_ resources alone.
clusterResources, err := c.getResourcesBySelector("")
clusterResources, err := c.getAllowedResourcesBySelector("")
if err != nil {
return errors.Wrap(err, "collating resources in cluster for sync")
}

cs := makeChangeSet()
var errs cluster.SyncError
for _, res := range spec.Resources {
id := res.ResourceID().String()
resID := res.ResourceID()
if !c.IsAllowedResource(resID) {
continue
}
id := resID.String()
// make a record of the checksum, whether we stage it to
// be applied or not, so that we don't delete it later.
csum := sha1.Sum(res.Bytes())
Expand Down Expand Up @@ -114,7 +118,7 @@ func (c *Cluster) collectGarbage(

orphanedResources := makeChangeSet()

clusterResources, err := c.getResourcesInSyncSet(spec.Name)
clusterResources, err := c.getAllowedResourcesInSyncSet(spec.Name)
if err != nil {
return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection")
}
Expand Down Expand Up @@ -178,7 +182,7 @@ func (r *kuberesource) GetChecksum() string {
return r.obj.GetAnnotations()[checksumAnnotation]
}

func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) {
func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*kuberesource, error) {
listOptions := meta_v1.ListOptions{}
if selector != "" {
listOptions.LabelSelector = selector
Expand Down Expand Up @@ -206,19 +210,17 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
if !contains(verbs, "list") {
continue
}

groupVersion, err := schema.ParseGroupVersion(resource.GroupVersion)
if err != nil {
return nil, err
}

resourceClient := c.client.dynamicClient.Resource(groupVersion.WithResource(apiResource.Name))
data, err := resourceClient.List(listOptions)
gvr := groupVersion.WithResource(apiResource.Name)
list, err := c.listAllowedResources(apiResource.Namespaced, gvr, listOptions)
if err != nil {
return nil, err
}

for i, item := range data.Items {
for i, item := range list {
apiVersion := item.GetAPIVersion()
kind := item.GetKind()

Expand All @@ -229,7 +231,7 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
}
// TODO(michael) also exclude anything that has an ownerReference (that isn't "standard"?)

res := &kuberesource{obj: &data.Items[i], namespaced: apiResource.Namespaced}
res := &kuberesource{obj: &list[i], namespaced: apiResource.Namespaced}
result[res.ResourceID().String()] = res
}
}
Expand All @@ -238,10 +240,34 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
return result, nil
}

func (c *Cluster) listAllowedResources(
namespaced bool, gvr schema.GroupVersionResource, options meta_v1.ListOptions) ([]unstructured.Unstructured, error) {
if !namespaced || len(c.allowedNamespaces) == 0 {
// The resource is not namespaced or all the namespaces are allowed
resourceClient := c.client.dynamicClient.Resource(gvr)
data, err := resourceClient.List(options)
if err != nil {
return nil, err
}
return data.Items, nil
}

// List resources only from the allowed namespaces
var result []unstructured.Unstructured
for _, ns := range c.allowedNamespaces {
data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns).List(options)
if err != nil {
return result, err
}
result = append(result, data.Items...)
}
return result, nil
}

// exportResourcesInStack collates all the resources that belong to a
// stack, i.e., were applied by flux.
func (c *Cluster) getResourcesInSyncSet(name string) (map[string]*kuberesource, error) {
return c.getResourcesBySelector(fmt.Sprintf("%s=%s", syncSetLabel, name)) // means "has label <<stackLabel>>"
func (c *Cluster) getAllowedResourcesInSyncSet(name string) (map[string]*kuberesource, error) {
return c.getAllowedResourcesBySelector(fmt.Sprintf("%s=%s", syncSetLabel, name)) // means "has label <<stackLabel>>"
}

func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error) {
Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ metadata:
}

// Now check that the resources were created
actual, err := kube.getResourcesInSyncSet("testset")
actual, err := kube.getAllowedResourcesInSyncSet("testset")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -506,7 +506,7 @@ spec:
assert.NoError(t, err)

// Check that our resource-getting also sees the pre-existing resource
resources, err := kube.getResourcesBySelector("")
resources, err := kube.getAllowedResourcesBySelector("")
assert.NoError(t, err)
assert.Contains(t, resources, "foobar:deployment/dep1")

Expand Down
23 changes: 14 additions & 9 deletions cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (

// Doubles as a cluster.Cluster and cluster.Manifests implementation
type Mock struct {
AllServicesFunc func(maybeNamespace string) ([]Controller, error)
SomeServicesFunc func([]flux.ResourceID) ([]Controller, error)
PingFunc func() error
ExportFunc func() ([]byte, error)
SyncFunc func(SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error)
UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error)
AllServicesFunc func(maybeNamespace string) ([]Controller, error)
SomeServicesFunc func([]flux.ResourceID) ([]Controller, error)
IsAllowedResourceFunc func(flux.ResourceID) bool
PingFunc func() error
ExportFunc func() ([]byte, error)
SyncFunc func(SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error)
UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error)
}

func (m *Mock) AllControllers(maybeNamespace string) ([]Controller, error) {
Expand All @@ -29,6 +30,10 @@ func (m *Mock) SomeControllers(s []flux.ResourceID) ([]Controller, error) {
return m.SomeServicesFunc(s)
}

func (m *Mock) IsAllowedResource(id flux.ResourceID) bool {
return m.IsAllowedResourceFunc(id)
}

func (m *Mock) Ping() error {
return m.PingFunc()
}
Expand Down
5 changes: 5 additions & 0 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) updateFu
var anythingAutomated bool

for serviceID, u := range updates {
if d.Cluster.IsAllowedResource(serviceID) {
result.Result[serviceID] = update.ControllerResult{
Status: update.ReleaseStatusSkipped,
}
}
if policy.Set(u.Add).Has(policy.Automated) {
anythingAutomated = true
}
Expand Down
1 change: 1 addition & 0 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven
}
return []cluster.Controller{}, nil
}
k8s.IsAllowedResourceFunc = func(flux.ResourceID) bool { return true }
k8s.ExportFunc = func() ([]byte, error) { return testBytes, nil }
k8s.PingFunc = func() error { return nil }
k8s.SomeServicesFunc = func([]flux.ResourceID) ([]cluster.Controller, error) {
Expand Down
4 changes: 2 additions & 2 deletions release/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func (rc *ReleaseContext) SelectServices(results update.Result, prefilters, post
for _, s := range allDefined {
res := s.Filter(prefilters...)
if res.Error == "" {
// Give these a default value, in case we don't find them
// Give these a default value, in case we cannot access them
// in the cluster.
results[s.ResourceID] = update.ControllerResult{
Status: update.ReleaseStatusSkipped,
Error: update.NotInCluster,
Error: update.NotAccessibleInCluster,
}
toAskClusterAbout = append(toAskClusterAbout, s.ResourceID)
} else {
Expand Down
4 changes: 2 additions & 2 deletions release/releaser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ var ignoredNotInRepo = update.ControllerResult{

var ignoredNotInCluster = update.ControllerResult{
Status: update.ReleaseStatusIgnored,
Error: update.NotInCluster,
Error: update.NotAccessibleInCluster,
}

var skippedLocked = update.ControllerResult{
Expand All @@ -190,7 +190,7 @@ var skippedLocked = update.ControllerResult{

var skippedNotInCluster = update.ControllerResult{
Status: update.ReleaseStatusSkipped,
Error: update.NotInCluster,
Error: update.NotAccessibleInCluster,
}

var skippedNotInRepo = update.ControllerResult{
Expand Down
26 changes: 13 additions & 13 deletions update/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (
)

const (
Locked = "locked"
Ignore = "ignore"
NotIncluded = "not included"
Excluded = "excluded"
DifferentImage = "a different image"
NotInCluster = "not running in cluster"
NotInRepo = "not found in repository"
ImageNotFound = "cannot find one or more images"
ImageUpToDate = "image(s) up to date"
DoesNotUseImage = "does not use image(s)"
ContainerNotFound = "container(s) not found: %s"
ContainerTagMismatch = "container(s) tag mismatch: %s"
Locked = "locked"
Ignore = "ignore"
NotIncluded = "not included"
Excluded = "excluded"
DifferentImage = "a different image"
NotAccessibleInCluster = "not accessible in cluster"
NotInRepo = "not found in repository"
ImageNotFound = "cannot find one or more images"
ImageUpToDate = "image(s) up to date"
DoesNotUseImage = "does not use image(s)"
ContainerNotFound = "container(s) not found: %s"
ContainerTagMismatch = "container(s) tag mismatch: %s"
)

type SpecificImageFilter struct {
Expand All @@ -30,7 +30,7 @@ func (f *SpecificImageFilter) Filter(u ControllerUpdate) ControllerResult {
if len(u.Controller.Containers.Containers) == 0 {
return ControllerResult{
Status: ReleaseStatusIgnored,
Error: NotInCluster,
Error: NotAccessibleInCluster,
}
}
// For each container in update
Expand Down

0 comments on commit cb4ad60

Please sign in to comment.