diff --git a/CHANGELOG.md b/CHANGELOG.md index 58f3b02bf1..952a233126 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ # Unreleased +- [#1107](https://github.com/kubernetes-sigs/kubefed/pull/1107) + `status.observedGeneration` is now recorded by the sync controller + for federated resources to provide an indication of whether the + current state of a resource has been processed. - [#1121](https://github.com/kubernetes-sigs/kubefed/pull/1121) Update `kubefedctl federate` shorthand option for `--enable-type` to `-t` instead of `-e` to avoid confusing error message when only one dash is accidentally diff --git a/charts/kubefed/templates/crds.yaml b/charts/kubefed/templates/crds.yaml index cdd4ae6527..10f71e9c3a 100644 --- a/charts/kubefed/templates/crds.yaml +++ b/charts/kubefed/templates/crds.yaml @@ -122,6 +122,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -251,6 +254,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -382,6 +388,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -511,6 +520,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -638,6 +650,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -767,6 +782,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -898,6 +916,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -1025,6 +1046,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -1154,6 +1178,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec @@ -1283,6 +1310,9 @@ spec: - status type: object type: array + observedGeneration: + format: int64 + type: integer type: object required: - spec diff --git a/pkg/controller/sync/controller.go b/pkg/controller/sync/controller.go index c778b77169..1f4bd1bd61 100644 --- a/pkg/controller/sync/controller.go +++ b/pkg/controller/sync/controller.go @@ -289,13 +289,13 @@ func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) ut clusters, err := s.informer.GetClusters() if err != nil { fedResource.RecordError(string(status.ClusterRetrievalFailed), errors.Wrap(err, "Failed to retrieve list of clusters")) - return s.setPropagationStatus(fedResource, status.ClusterRetrievalFailed, nil) + return s.setFederatedStatus(fedResource, status.ClusterRetrievalFailed, nil) } selectedClusterNames, err := fedResource.ComputePlacement(clusters) if err != nil { fedResource.RecordError(string(status.ComputePlacementFailed), errors.Wrap(err, "Failed to compute placement")) - return s.setPropagationStatus(fedResource, status.ComputePlacementFailed, nil) + return s.setFederatedStatus(fedResource, status.ComputePlacementFailed, nil) } kind := fedResource.TargetKind() @@ -379,10 +379,10 @@ func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) ut } collectedStatus := dispatcher.CollectedStatus() - return s.setPropagationStatus(fedResource, status.AggregateSuccess, &collectedStatus) + return s.setFederatedStatus(fedResource, status.AggregateSuccess, &collectedStatus) } -func (s *KubeFedSyncController) setPropagationStatus(fedResource FederatedResource, +func (s *KubeFedSyncController) setFederatedStatus(fedResource FederatedResource, reason status.AggregateReason, collectedStatus *status.CollectedPropagationStatus) util.ReconciliationStatus { if collectedStatus == nil { @@ -407,10 +407,10 @@ func (s *KubeFedSyncController) setPropagationStatus(fedResource FederatedResour // If the underlying resource has changed, attempt to retrieve and // update it repeatedly. err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { - if updateRequired, err := status.SetPropagationStatus(obj, reason, *collectedStatus); err != nil { + if updateRequired, err := status.SetFederatedStatus(obj, reason, *collectedStatus); err != nil { return false, errors.Wrapf(err, "failed to set the status") } else if !updateRequired { - klog.V(4).Infof("No update necessary for %s %q propagation status", kind, name) + klog.V(4).Infof("No status update necessary for %s %q", kind, name) return true, nil } diff --git a/pkg/controller/sync/status/status.go b/pkg/controller/sync/status/status.go index 9e456ee0da..00c2c97323 100644 --- a/pkg/controller/sync/status/status.go +++ b/pkg/controller/sync/status/status.go @@ -91,16 +91,17 @@ type GenericCondition struct { Reason AggregateReason `json:"reason,omitempty"` } -type GenericPropagationStatus struct { - Conditions []*GenericCondition `json:"conditions,omitempty"` - Clusters []GenericClusterStatus `json:"clusters,omitempty"` +type GenericFederatedStatus struct { + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + Conditions []*GenericCondition `json:"conditions,omitempty"` + Clusters []GenericClusterStatus `json:"clusters,omitempty"` } -type GenericFederatedStatus struct { +type GenericFederatedResource struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Status *GenericPropagationStatus `json:"status,omitempty"` + Status *GenericFederatedStatus `json:"status,omitempty"` } type PropagationStatusMap map[string]PropagationStatus @@ -110,43 +111,49 @@ type CollectedPropagationStatus struct { ResourcesUpdated bool } -// SetPropagationStatus sets the conditions and clusters fields of the -// federated resource's object map from the provided reason and collected -// propagation status. Returns a boolean indication of whether status -// should be written to the API. -func SetPropagationStatus(fedObject *unstructured.Unstructured, reason AggregateReason, collectedStatus CollectedPropagationStatus) (bool, error) { - status := &GenericFederatedStatus{} - err := util.UnstructuredToInterface(fedObject, status) +// SetFederatedStatus sets the conditions and clusters fields of the +// federated resource's object map. Returns a boolean indication of +// whether status should be written to the API. +func SetFederatedStatus(fedObject *unstructured.Unstructured, reason AggregateReason, collectedStatus CollectedPropagationStatus) (bool, error) { + resource := &GenericFederatedResource{} + err := util.UnstructuredToInterface(fedObject, resource) if err != nil { - return false, errors.Wrapf(err, "Failed to unmarshall to generic status") + return false, errors.Wrapf(err, "Failed to unmarshall to generic resource") } - if status.Status == nil { - status.Status = &GenericPropagationStatus{} + if resource.Status == nil { + resource.Status = &GenericFederatedStatus{} } - changed := status.Status.update(reason, collectedStatus) + changed := resource.Status.update(fedObject.GetGeneration(), reason, collectedStatus) if !changed { return false, nil } - statusJSON, err := json.Marshal(status) + resourceJSON, err := json.Marshal(resource) if err != nil { return false, errors.Wrapf(err, "Failed to marshall generic status to json") } - statusObj := &unstructured.Unstructured{} - err = statusObj.UnmarshalJSON(statusJSON) + resourceObj := &unstructured.Unstructured{} + err = resourceObj.UnmarshalJSON(resourceJSON) if err != nil { - return false, errors.Wrapf(err, "Failed to marshall generic status json to unstructured") + return false, errors.Wrapf(err, "Failed to marshall generic resource json to unstructured") } - fedObject.Object[util.StatusField] = statusObj.Object[util.StatusField] + fedObject.Object[util.StatusField] = resourceObj.Object[util.StatusField] return true, nil } -// update ensures that the status reflects the given reason and collected -// status. Returns a boolean indication of whether the status has been -// changed. -func (s *GenericPropagationStatus) update(reason AggregateReason, collectedStatus CollectedPropagationStatus) bool { +// update ensures that the status reflects the given generation, reason +// and collected status. Returns a boolean indication of whether the +// status has been changed. +func (s *GenericFederatedStatus) update(generation int64, reason AggregateReason, + collectedStatus CollectedPropagationStatus) bool { + + generationUpdated := s.ObservedGeneration != generation + if generationUpdated { + s.ObservedGeneration = generation + } + // Identify whether one or more clusters could not be reconciled // successfully. if reason == AggregateSuccess { @@ -165,13 +172,16 @@ func (s *GenericPropagationStatus) update(reason AggregateReason, collectedStatu // occur even if status.clusters was unchanged). changesPropagated := clustersChanged || len(collectedStatus.StatusMap) > 0 && collectedStatus.ResourcesUpdated - return s.setPropagationCondition(reason, changesPropagated) + propStatusUpdated := s.setPropagationCondition(reason, changesPropagated) + + statusUpdated := generationUpdated || propStatusUpdated + return statusUpdated } // setClusters sets the status.clusters slice from a propagation status // map. Returns a boolean indication of whether the status.clusters was // modified. -func (s *GenericPropagationStatus) setClusters(statusMap PropagationStatusMap) bool { +func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap) bool { if !s.clustersDiffers(statusMap) { return false } @@ -187,7 +197,7 @@ func (s *GenericPropagationStatus) setClusters(statusMap PropagationStatusMap) b // clustersDiffers checks whether `status.clusters` differs from the // given status map. -func (s *GenericPropagationStatus) clustersDiffers(statusMap PropagationStatusMap) bool { +func (s *GenericFederatedStatus) clustersDiffers(statusMap PropagationStatusMap) bool { if len(s.Clusters) != len(statusMap) { return true } @@ -202,7 +212,7 @@ func (s *GenericPropagationStatus) clustersDiffers(statusMap PropagationStatusMa // setPropagationCondition ensures that the Propagation condition is // updated to reflect the given reason. The type of the condition is // derived from the reason (empty -> True, not empty -> False). -func (s *GenericPropagationStatus) setPropagationCondition(reason AggregateReason, changesPropagated bool) bool { +func (s *GenericFederatedStatus) setPropagationCondition(reason AggregateReason, changesPropagated bool) bool { // Determine the appropriate status from the reason. var newStatus apiv1.ConditionStatus if reason == AggregateSuccess { diff --git a/pkg/controller/sync/status/status_test.go b/pkg/controller/sync/status/status_test.go index 06d3384982..8f8b313655 100644 --- a/pkg/controller/sync/status/status_test.go +++ b/pkg/controller/sync/status/status_test.go @@ -24,6 +24,7 @@ import ( func TestGenericPropagationStatusUpdateChanged(t *testing.T) { testCases := map[string]struct { + generation int64 reason AggregateReason statusMap PropagationStatusMap resourcesUpdated bool @@ -48,10 +49,14 @@ func TestGenericPropagationStatusUpdateChanged(t *testing.T) { reason: NamespaceNotFederated, expectedChanged: true, }, + "Changed generation indicates changed": { + generation: 1, + expectedChanged: true, + }, } for testName, tc := range testCases { t.Run(testName, func(t *testing.T) { - propStatus := &GenericPropagationStatus{ + propStatus := &GenericFederatedStatus{ Clusters: []GenericClusterStatus{ { Name: "cluster1", @@ -68,7 +73,7 @@ func TestGenericPropagationStatusUpdateChanged(t *testing.T) { StatusMap: tc.statusMap, ResourcesUpdated: tc.resourcesUpdated, } - changed := propStatus.update(tc.reason, collectedStatus) + changed := propStatus.update(tc.generation, tc.reason, collectedStatus) if tc.expectedChanged != changed { t.Fatalf("Expected changed to be %v, got %v", tc.expectedChanged, changed) } diff --git a/pkg/kubefedctl/enable/validation.go b/pkg/kubefedctl/enable/validation.go index 0450a81eb0..7314904cba 100644 --- a/pkg/kubefedctl/enable/validation.go +++ b/pkg/kubefedctl/enable/validation.go @@ -240,6 +240,10 @@ func ValidationSchema(specProps v1beta1.JSONSchemaProps) *v1beta1.CustomResource }, }, }, + "observedGeneration": { + Format: "int64", + Type: "integer", + }, }, }, }, diff --git a/test/common/crudtester.go b/test/common/crudtester.go index 1ace8e533e..02c9548cf2 100644 --- a/test/common/crudtester.go +++ b/test/common/crudtester.go @@ -433,7 +433,7 @@ func (c *FederatedTypeCrudTester) CheckPropagation(fedObject *unstructured.Unstr waitInterval := 1 * time.Second var waitingForError error err := wait.PollImmediate(waitInterval, c.clusterWaitTimeout, func() (bool, error) { - ok, err := c.checkPropagationStatus(fedObject, clusterName, objExpected) + ok, err := c.checkFederatedStatus(fedObject, clusterName, objExpected) if err != nil { // Logging lots of waiting messages would clutter the // logs. Instead, track the most recent message @@ -448,34 +448,38 @@ func (c *FederatedTypeCrudTester) CheckPropagation(fedObject *unstructured.Unstr }) if err != nil { if waitingForError != nil { - c.tl.Fatalf("Failed to check propagation status for %s %q: %v", federatedKind, qualifiedName, waitingForError) + c.tl.Fatalf("Failed to check status for %s %q: %v", federatedKind, qualifiedName, waitingForError) } - c.tl.Fatalf("Failed to check propagation status for %s %q: %v", federatedKind, qualifiedName, err) + c.tl.Fatalf("Failed to check status for %s %q: %v", federatedKind, qualifiedName, err) } } } -// checkPropagationStatus ensures that the federated resource status +// checkFederatedStatus ensures that the federated resource status // reflects the expected propagation state. -func (c *FederatedTypeCrudTester) checkPropagationStatus(fedObject *unstructured.Unstructured, clusterName string, objExpected bool) (bool, error) { +func (c *FederatedTypeCrudTester) checkFederatedStatus(fedObject *unstructured.Unstructured, clusterName string, objExpected bool) (bool, error) { federatedKind := fedObject.GetKind() qualifiedName := util.NewQualifiedName(fedObject) // Retrieve the resource from the API to ensure the latest status // is considered. - genericStatus, err := GetGenericStatus(c.client, fedObject.GroupVersionKind(), qualifiedName) + resource, err := GetGenericResource(c.client, fedObject.GroupVersionKind(), qualifiedName) if err != nil { return false, err } - if genericStatus.Status == nil { - c.tl.Logf("Propagation status is not yet available for %s %q", federatedKind, qualifiedName) + if resource.Status == nil { + c.tl.Logf("Status is not yet available for %s %q", federatedKind, qualifiedName) return false, nil } - propStatus := genericStatus.Status + fedStatus := resource.Status + + if fedStatus.ObservedGeneration != fedObject.GetGeneration() { + return false, errors.Errorf("Waiting for status.observedGeneration to match metadata.generation for %s %q", federatedKind, qualifiedName) + } // Check that aggregate status is ok conditionTrue := false - for _, condition := range propStatus.Conditions { + for _, condition := range fedStatus.Conditions { if condition.Type == status.PropagationConditionType { if condition.Status == apiv1.ConditionTrue { conditionTrue = true @@ -490,7 +494,7 @@ func (c *FederatedTypeCrudTester) checkPropagationStatus(fedObject *unstructured // Check that the cluster status is correct if objExpected { clusterStatusOK := false - for _, cluster := range propStatus.Clusters { + for _, cluster := range fedStatus.Clusters { if cluster.Name == clusterName && cluster.Status == status.ClusterPropagationOK { clusterStatusOK = true break @@ -501,7 +505,7 @@ func (c *FederatedTypeCrudTester) checkPropagationStatus(fedObject *unstructured } } else { clusterRemoved := true - for _, cluster := range propStatus.Clusters { + for _, cluster := range fedStatus.Clusters { if cluster.Name == clusterName && cluster.Status != status.WaitingForRemoval { clusterRemoved = false break @@ -608,10 +612,12 @@ func (c *FederatedTypeCrudTester) waitForResourceDeletion(client util.ResourceCl func (c *FederatedTypeCrudTester) updateObject(apiResource metav1.APIResource, obj *unstructured.Unstructured, mutateResourceFunc func(*unstructured.Unstructured)) (*unstructured.Unstructured, error) { client := c.resourceClient(apiResource) + var updatedObj *unstructured.Unstructured err := wait.PollImmediate(c.waitInterval, wait.ForeverTestTimeout, func() (bool, error) { mutateResourceFunc(obj) - _, err := client.Resources(obj.GetNamespace()).Update(obj, metav1.UpdateOptions{}) + var err error + updatedObj, err = client.Resources(obj.GetNamespace()).Update(obj, metav1.UpdateOptions{}) if apierrors.IsConflict(err) { // The resource was updated by the KubeFed controller. // Get the latest version and retry. @@ -624,7 +630,7 @@ func (c *FederatedTypeCrudTester) updateObject(apiResource metav1.APIResource, o } return (err == nil), err }) - return obj, err + return updatedObj, err } // expectedVersion retrieves the version of the resource expected in the named cluster @@ -728,10 +734,10 @@ func (c *FederatedTypeCrudTester) CheckStatusCreated(qualifiedName util.Qualifie } } -// GetGenericStatus retrieves a federated resource and converts it to -// the generic status interface. -func GetGenericStatus(client genericclient.Client, gvk schema.GroupVersionKind, - qualifiedName util.QualifiedName) (*status.GenericFederatedStatus, error) { +// GetGenericResource retrieves a federated resource and converts it to +// the generic resource struct. +func GetGenericResource(client genericclient.Client, gvk schema.GroupVersionKind, + qualifiedName util.QualifiedName) (*status.GenericFederatedResource, error) { fedObject := &unstructured.Unstructured{} fedObject.SetGroupVersionKind(gvk) @@ -740,12 +746,11 @@ func GetGenericStatus(client genericclient.Client, gvk schema.GroupVersionKind, return nil, errors.Wrapf(err, "Failed to retrieve federated resource from the API") } - // Convert the resource to the status struct - genericStatus := &status.GenericFederatedStatus{} - err = util.UnstructuredToInterface(fedObject, genericStatus) + resource := &status.GenericFederatedResource{} + err = util.UnstructuredToInterface(fedObject, resource) if err != nil { - return nil, errors.Wrapf(err, "Failed to unmarshall federated resource to generic status") + return nil, errors.Wrapf(err, "Failed to unmarshall federated resource to generic resource struct") } - return genericStatus, nil + return resource, nil } diff --git a/test/e2e/crud.go b/test/e2e/crud.go index 7157f26ad8..c093b1a944 100644 --- a/test/e2e/crud.go +++ b/test/e2e/crud.go @@ -87,15 +87,15 @@ var _ = Describe("Federated", func() { By(fmt.Sprintf("Waiting until the status of the %s %q indicates NamespaceNotFederated", kind, qualifiedName)) client := genericclient.NewForConfigOrDie(f.KubeConfig()) err := wait.PollImmediate(framework.PollInterval, wait.ForeverTestTimeout, func() (bool, error) { - genericStatus, err := common.GetGenericStatus(client, fedObject.GroupVersionKind(), qualifiedName) + genericResource, err := common.GetGenericResource(client, fedObject.GroupVersionKind(), qualifiedName) if err != nil { tl.Fatalf("An error occurred retrieving the status of the %s %q: %v", kind, qualifiedName, err) } - if genericStatus.Status == nil { + if genericResource.Status == nil { return false, nil } var propCondition *status.GenericCondition - for _, condition := range genericStatus.Status.Conditions { + for _, condition := range genericResource.Status.Conditions { if condition.Type == status.PropagationConditionType { propCondition = condition break