Skip to content

Commit

Permalink
Merge pull request #170 from fluxcd/ssa-wait
Browse files Browse the repository at this point in the history
ssa: Implement WaitForSet
  • Loading branch information
stefanprodan authored Oct 18, 2021
2 parents 776c8eb + 131659f commit ecd2e49
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 4 deletions.
18 changes: 18 additions & 0 deletions ssa/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package ssa
import (
"fmt"
"strings"

"sigs.k8s.io/cli-utils/pkg/object"
)

// Action represents the action type performed by the reconciliation process.
Expand Down Expand Up @@ -69,12 +71,28 @@ func (c *ChangeSet) ToMap() map[string]string {
return res
}

func (c *ChangeSet) ToObjMetadataSet() object.ObjMetadataSet {
var res []object.ObjMetadata
for _, entry := range c.Entries {
res = append(res, entry.ObjMetadata)
}
return res
}

// ChangeSetEntry defines the result of an action performed on an object.
type ChangeSetEntry struct {
// ObjMetadata holds the unique identifier of this entry.
ObjMetadata object.ObjMetadata

// GroupVersion holds the API group version of this entry.
GroupVersion string

// Subject represents the Object ID in the format 'kind/namespace/name'.
Subject string

// Action represents the action type taken by the reconciler for this object.
Action string

// Diff contains the YAML diff resulting from server-side apply dry-run.
Diff string
}
Expand Down
10 changes: 8 additions & 2 deletions ssa/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package ssa
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -70,6 +71,11 @@ func (m *ResourceManager) GetOwnerLabels(name, namespace string) map[string]stri
}
}

func (m *ResourceManager) changeSetEntry(object *unstructured.Unstructured, action Action) *ChangeSetEntry {
return &ChangeSetEntry{Subject: FmtUnstructured(object), Action: string(action)}
func (m *ResourceManager) changeSetEntry(o *unstructured.Unstructured, action Action) *ChangeSetEntry {
return &ChangeSetEntry{
ObjMetadata: object.UnstructuredToObjMetaOrDie(o),
GroupVersion: o.GroupVersionKind().Version,
Subject: FmtUnstructured(o),
Action: string(action),
}
}
14 changes: 12 additions & 2 deletions ssa/manager_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,16 @@ func (m *ResourceManager) Wait(objects []*unstructured.Unstructured, interval, t
return err
}

statusCollector := collector.NewResourceStatusCollector(objectsMeta)
if len(objectsMeta) == 0 {
return nil
}

return m.WaitForSet(objectsMeta, interval, timeout)
}

// WaitForSet checks if the given set of ObjMetadata has been fully reconciled.
func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, interval, timeout time.Duration) error {
statusCollector := collector.NewResourceStatusCollector(set)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand All @@ -51,7 +60,7 @@ func (m *ResourceManager) Wait(objects []*unstructured.Unstructured, interval, t
PollInterval: interval,
UseCache: true,
}
eventsChan := m.poller.Poll(ctx, objectsMeta, opts)
eventsChan := m.poller.Poll(ctx, set, opts)

lastStatus := make(map[object.ObjMetadata]*event.ResourceStatus)

Expand All @@ -70,6 +79,7 @@ func (m *ResourceManager) Wait(objects []*unstructured.Unstructured, interval, t
}
rss = append(rss, rs)
}

desired := status.CurrentStatus
aggStatus := aggregator.AggregateStatus(rss, desired)
if aggStatus == desired {
Expand Down
96 changes: 96 additions & 0 deletions ssa/manager_wait_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright 2021 Stefan Prodan
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ssa

import (
"context"
"testing"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestWaitForSet(t *testing.T) {
timeout := 10 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

id := generateName("wait")
objects, err := readManifest("testdata/test5.yaml", id)
if err != nil {
t.Fatal(err)
}

manager.SetOwnerLabels(objects, "infra", "default")

_, crd := getFirstObject(objects, "CustomResourceDefinition", "clustertests.testing.fluxcd.io")
_, cr := getFirstObject(objects, "ClusterTest", id)

t.Run("waits for CRD and CR", func(t *testing.T) {
cs, err := manager.Apply(ctx, crd, false)
if err != nil {
t.Fatal(err)
}

if err := manager.WaitForSet([]object.ObjMetadata{cs.ObjMetadata}, time.Second, 3*time.Second); err != nil {
t.Errorf("wait failed for CRD: %v", err)
}

changeSet, err := manager.ApplyAll(ctx, objects, false)
if err != nil {
t.Fatal(err)
}

if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), time.Second, 3*time.Second); err == nil {
t.Error("wanted wait error due to observedGeneration < generation")
}

clusterCR := &unstructured.Unstructured{}
clusterCR.SetGroupVersionKind(schema.GroupVersionKind{
Group: "testing.fluxcd.io",
Kind: "ClusterTest",
Version: "v1",
})
if err := manager.client.Get(ctx, client.ObjectKeyFromObject(cr), clusterCR); err != nil {
t.Fatal(err)
}

var observedGeneration int64
observedGeneration = 1
clusterCR.SetManagedFields(nil)
err = unstructured.SetNestedField(clusterCR.Object, observedGeneration, "status", "observedGeneration")
if err != nil {
t.Fatal(err)
}

opts := []client.PatchOption{
client.ForceOwnership,
client.FieldOwner(manager.owner.Field),
}
if err := manager.client.Status().Patch(ctx, clusterCR, client.Apply, opts...); err != nil {
t.Fatal(err)
}

if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), time.Second, 3*time.Second); err != nil {
t.Errorf("wait error: %v", err)
}
})
}
72 changes: 72 additions & 0 deletions ssa/testdata/test5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: clustertests.testing.fluxcd.io
spec:
group: testing.fluxcd.io
names:
kind: ClusterTest
listKind: ClusterTestList
plural: clustertests
singular: clustertest
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .spec.type
name: TYPE
type: string
name: v1
schema:
openAPIV3Schema:
description: Test is the Schema for the testing API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
metadata:
type: object
spec:
description: TestSpec defines the desired state of a test run
properties:
type:
description: Type of test
type: string
enum:
- unit
- integration
valuesFrom:
description: config reference
type: string
type: object
status:
default:
observedGeneration: -1
properties:
observedGeneration:
description: ObservedGeneration is the last observed generation.
format: int64
type: integer
type: object
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
---
apiVersion: testing.fluxcd.io/v1
kind: ClusterTest
metadata:
name: "%[1]s"
namespace: test
spec:
type: integration
valuesFrom: test-config

0 comments on commit ecd2e49

Please sign in to comment.