Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Exclude resources if we cannot find their scope
Browse files Browse the repository at this point in the history
This is aiming at custom resources whose scope is unknown because
its CRD is included in a helm chart Flux won't inspect (since it's
the helm operator's responsibility to do it).

This allows Flux to move forward and create the `HelmRelease` supplying the
CRD. In a subsequent sync, once we know its scope, the custom resources will be
created.
  • Loading branch information
Alfonso Acosta committed Apr 16, 2019
1 parent 686acaa commit 17cee20
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cluster/kubernetes/doc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
Package kubernetes provides implementations of `Cluster` and
`Manifests` that interact with the Kubernetes API (using kubectl or
`manifests` that interact with the Kubernetes API (using kubectl or
the k8s API client).
*/
package kubernetes
55 changes: 43 additions & 12 deletions cluster/kubernetes/manifests.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package kubernetes

import (
"fmt"
"strings"

"github.com/go-kit/kit/log"
"gopkg.in/yaml.v2"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -24,13 +28,23 @@ type namespacer interface {
EffectiveNamespace(manifest kresource.KubeManifest, knownScopes ResourceScopes) (string, error)
}

// Manifests is an implementation of cluster.Manifests, particular to
// manifests is an implementation of cluster.Manifests, particular to
// Kubernetes. Aside from loading manifests from files, it does some
// "post-processsing" to make sure the view of the manifests is what
// would be applied; in particular, it fills in the namespace of
// manifests that would be given a default namespace when applied.
type Manifests struct {
Namespacer namespacer
type manifests struct {
namespacer namespacer
logger log.Logger
resourceWarnings map[string]struct{}
}

func NewManifests(ns namespacer, logger log.Logger) *manifests {
return &manifests{
namespacer: ns,
logger: logger,
resourceWarnings: map[string]struct{}{},
}
}

func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes {
Expand Down Expand Up @@ -60,31 +74,48 @@ func getCRDScopes(manifests map[string]kresource.KubeManifest) ResourceScopes {
return result
}

func setEffectiveNamespaces(manifests map[string]kresource.KubeManifest, nser namespacer) (map[string]resource.Resource, error) {
func (m *manifests) setEffectiveNamespaces(manifests map[string]kresource.KubeManifest) (map[string]resource.Resource, error) {
knownScopes := getCRDScopes(manifests)
result := map[string]resource.Resource{}
for _, km := range manifests {
if nser != nil {
ns, err := nser.EffectiveNamespace(km, knownScopes)
if err != nil {
return nil, err
resID := km.ResourceID()
resIDStr := resID.String()
ns, err := m.namespacer.EffectiveNamespace(km, knownScopes)
if err != nil {
if strings.Contains(err.Error(), "not found") {
// discard the resource and keep going after making sure we logged about it
if _, warningLogged := m.resourceWarnings[resIDStr]; !warningLogged {
_, kind, name := resID.Components()
partialResIDStr := kind + "/" + name
m.logger.Log(
"warn", fmt.Sprintf("cannot find scope of resource %s: %s", partialResIDStr, err),
"impact", fmt.Sprintf("resource %s will be excluded until its scope is available", partialResIDStr))
m.resourceWarnings[resIDStr] = struct{}{}
}
continue
}
km.SetNamespace(ns)
return nil, err
}
km.SetNamespace(ns)
if _, warningLogged := m.resourceWarnings[resIDStr]; warningLogged {
// indicate that we found the resource's scope and allow logging a warning again
m.logger.Log("info", fmt.Sprintf("found scope of resource %s, back in bussiness!", km.ResourceID().String()))
delete(m.resourceWarnings, resIDStr)
}
result[km.ResourceID().String()] = km
}
return result, nil
}

func (m *Manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
func (m *manifests) LoadManifests(base string, paths []string) (map[string]resource.Resource, error) {
manifests, err := kresource.Load(base, paths)
if err != nil {
return nil, err
}
return setEffectiveNamespaces(manifests, m.Namespacer)
return m.setEffectiveNamespaces(manifests)
}

func (m *Manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
func (m *manifests) UpdateImage(def []byte, id flux.ResourceID, container string, image image.Ref) ([]byte, error) {
return updateWorkload(def, id, container, image)
}

Expand Down
88 changes: 78 additions & 10 deletions cluster/kubernetes/manifests_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package kubernetes

import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/weaveworks/flux/cluster/kubernetes/testfiles"
)

func TestKnownCRDScope(t *testing.T) {
func TestLocalCRDScope(t *testing.T) {
coreClient := makeFakeClient()

nser, err := NewNamespacer(coreClient.Discovery())
if err != nil {
t.Fatal(err)
}
manifests := Manifests{nser}
assert.NoError(t, err)
manifests := NewManifests(nser, log.NewLogfmtLogger(os.Stdout))

dir, cleanup := testfiles.TempDir(t)
defer cleanup()
Expand Down Expand Up @@ -46,17 +50,81 @@ metadata:
namespace: bar
`

if err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600); err != nil {
t.Fatal(err)
}
err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600)
assert.NoError(t, err)

resources, err := manifests.LoadManifests(dir, []string{dir})
if err != nil {
t.Fatal(err)
}

if _, ok := resources["bar:foo/fooinstance"]; !ok {
t.Fatal("couldn't find crd instance")
assert.Contains(t, resources, "bar:foo/fooinstance")
}

func TestUnKnownCRDScope(t *testing.T) {
coreClient := makeFakeClient()

nser, err := NewNamespacer(coreClient.Discovery())
assert.NoError(t, err)
logBuffer := bytes.NewBuffer(nil)
manifests := NewManifests(nser, log.NewLogfmtLogger(logBuffer))

dir, cleanup := testfiles.TempDir(t)
defer cleanup()
const defs = `---
apiVersion: v1
kind: Namespace
metadata:
name: mynamespace
---
apiVersion: foo.example.com/v1beta1
kind: Foo
metadata:
name: fooinstance
namespace: bar
`

err = ioutil.WriteFile(filepath.Join(dir, "test.yaml"), []byte(defs), 0600)
assert.NoError(t, err)

resources, err := manifests.LoadManifests(dir, []string{dir})
assert.NoError(t, err)

// can't contain the CRD since we cannot figure out its scope
assert.NotContains(t, resources, "bar:foo/fooinstance")

// however, it should contain the namespace
assert.Contains(t, resources, "<cluster>:namespace/mynamespace")

savedLog := logBuffer.String()
// and we should had logged a warning about it
assert.Contains(t, savedLog, "cannot find scope of resource foo/fooinstance")

// loading again shouldn't result in more warnings
resources, err = manifests.LoadManifests(dir, []string{dir})
assert.NoError(t, err)
assert.Equal(t, logBuffer.String(), savedLog)

// But getting the scope of the CRD should result in a log saying we found the scope
savedAPIResources := coreClient.Resources
apiResource := &metav1.APIResourceList{
GroupVersion: "foo.example.com/v1beta1",
APIResources: []metav1.APIResource{
{Name: "foos", SingularName: "foo", Namespaced: true, Kind: "Foo"},
},
}
coreClient.Resources = append(coreClient.Resources, apiResource)

logBuffer.Reset()
resources, err = manifests.LoadManifests(dir, []string{dir})
assert.NoError(t, err)
assert.Len(t, resources, 2)
assert.Contains(t, logBuffer.String(), "found scope of resource bar:foo/fooinstance")
coreClient.Resources = savedAPIResources

// and missing the scope information again should result in another warning
logBuffer.Reset()
resources, err = manifests.LoadManifests(dir, []string{dir})
assert.NoError(t, err)
assert.Contains(t, savedLog, "cannot find scope of resource foo/fooinstance")
}
9 changes: 9 additions & 0 deletions cluster/kubernetes/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kubernetes

import kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"

type ConstNamespacer string

func (ns ConstNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ ResourceScopes) (string, error) {
return string(ns), nil
}
1 change: 1 addition & 0 deletions cluster/kubernetes/namespacer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,6 @@ func (n *namespaceViaDiscovery) lookupNamespacedInCluster(groupVersion, kind str
return resource.Namespaced, nil
}
}

return false, fmt.Errorf("resource not found for API %s, kind %s", groupVersion, kind)
}
6 changes: 3 additions & 3 deletions cluster/kubernetes/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/weaveworks/flux/resource"
)

func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
func (m *manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy.Update) ([]byte, error) {
ns, kind, name := id.Components()
add, del := update.Add, update.Remove

Expand Down Expand Up @@ -48,7 +48,7 @@ func (m *Manifests) UpdatePolicies(def []byte, id flux.ResourceID, update policy
return (KubeYAML{}).Annotate(def, ns, kind, name, args...)
}

func (m *Manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) {
func (m *manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([]resource.Container, error) {
kresources, err := kresource.ParseMultidoc(def, "stdin")
if err != nil {
return nil, err
Expand All @@ -58,7 +58,7 @@ func (m *Manifests) extractWorkloadContainers(def []byte, id flux.ResourceID) ([
// We could get out of our way to fix this (or give a better error) but:
// 1. With the exception of HelmReleases CRD instances are not workloads anyways.
// 2. The problem is eventually fixed by the first successful sync.
resources, err := setEffectiveNamespaces(kresources, m.Namespacer)
resources, err := m.setEffectiveNamespaces(kresources)
if err != nil {
return nil, err
}
Expand Down
14 changes: 5 additions & 9 deletions cluster/kubernetes/policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,17 @@ package kubernetes

import (
"bytes"
"os"
"testing"
"text/template"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"

"github.com/weaveworks/flux"
kresource "github.com/weaveworks/flux/cluster/kubernetes/resource"
"github.com/weaveworks/flux/policy"
)

type constNamespacer string

func (ns constNamespacer) EffectiveNamespace(manifest kresource.KubeManifest, _ ResourceScopes) (string, error) {
return string(ns), nil
}

func TestUpdatePolicies(t *testing.T) {
for _, c := range []struct {
name string
Expand Down Expand Up @@ -186,7 +181,8 @@ func TestUpdatePolicies(t *testing.T) {
caseIn := templToString(t, annotationsTemplate, c.in)
caseOut := templToString(t, annotationsTemplate, c.out)
resourceID := flux.MustParseResourceID("default:deployment/nginx")
out, err := (&Manifests{constNamespacer("default")}).UpdatePolicies([]byte(caseIn), resourceID, c.update)
manifests := NewManifests(ConstNamespacer("default"), log.NewLogfmtLogger(os.Stdout))
out, err := manifests.UpdatePolicies([]byte(caseIn), resourceID, c.update)
assert.Equal(t, c.wantErr, err != nil, "unexpected error value: %s", err)
if !c.wantErr {
assert.Equal(t, string(out), caseOut)
Expand All @@ -200,7 +196,7 @@ func TestUpdatePolicies_invalidTagPattern(t *testing.T) {
update := policy.Update{
Add: policy.Set{policy.TagPrefix("nginx"): "semver:invalid"},
}
_, err := (&Manifests{}).UpdatePolicies(nil, resourceID, update)
_, err := (&manifests{}).UpdatePolicies(nil, resourceID, update)
assert.Error(t, err)
}

Expand Down
4 changes: 3 additions & 1 deletion cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"fmt"
"os"
"sort"
"strings"
"testing"
Expand Down Expand Up @@ -313,14 +314,15 @@ metadata:
if err != nil {
t.Fatal(err)
}
manifests := NewManifests(namespacer, log.NewLogfmtLogger(os.Stdout))

resources0, err := kresource.ParseMultidoc([]byte(defs), "before")
if err != nil {
t.Fatal(err)
}

// Needed to get from KubeManifest to resource.Resource
resources, err := setEffectiveNamespaces(resources0, namespacer)
resources, err := manifests.setEffectiveNamespaces(resources0)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func main() {
var clusterVersion string
var sshKeyRing ssh.KeyRing
var k8s cluster.Cluster
var k8sManifests *kubernetes.Manifests
var k8sManifests cluster.Manifests
var imageCreds func() registry.ImageCreds
{
restClientConfig, err := rest.InClusterConfig()
Expand Down Expand Up @@ -369,13 +369,12 @@ func main() {
imageCreds = k8sInst.ImagesToFetch
// There is only one way we currently interpret a repo of
// files as manifests, and that's as Kubernetes yamels.
k8sManifests = &kubernetes.Manifests{}
k8sManifests.Namespacer, err = kubernetes.NewNamespacer(discoClientset)

namespacer, err := kubernetes.NewNamespacer(discoClientset)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
k8sManifests = kubernetes.NewManifests(namespacer, logger)
}

// Wrap the procedure for collecting images to scan
Expand Down
4 changes: 3 additions & 1 deletion daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,14 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven
// Jobs queue (starts itself)
jobs := job.NewQueue(jshutdown, jwg)

manifests := kubernetes.NewManifests(alwaysDefault, log.NewLogfmtLogger(os.Stdout))

// Finally, the daemon
d := &Daemon{
Repo: repo,
GitConfig: params,
Cluster: k8s,
Manifests: &kubernetes.Manifests{Namespacer: alwaysDefault},
Manifests: manifests,
Registry: imageRegistry,
V: testVersion,
Jobs: jobs,
Expand Down
4 changes: 3 additions & 1 deletion daemon/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ func daemon(t *testing.T) (*Daemon, func()) {
UserEmail: gitEmail,
}

manifests := kubernetes.NewManifests(alwaysDefault, log.NewLogfmtLogger(os.Stdout))

jobs := job.NewQueue(shutdown, wg)
d := &Daemon{
Cluster: k8s,
Manifests: &kubernetes.Manifests{Namespacer: alwaysDefault},
Manifests: manifests,
Registry: &registryMock.Registry{},
Repo: repo,
GitConfig: gitConfig,
Expand Down
Loading

0 comments on commit 17cee20

Please sign in to comment.