Skip to content

Commit

Permalink
Fix SSA conflict when updating old objects (#3275)
Browse files Browse the repository at this point in the history
Refs pulumi/customer-support#1837

This adds unit tests around:
* SSA updates for objects with existing field managers.
* SSA updates for old/legacy objects with no field managers.

This currently forces the apply when the object has no managers. There's
also some code showing where we could impersonate `kubectl` SSA to
trigger a server-side upgrade of these fields.

Worth noting `client-go` recently added support for `Apply` but only in
typed clients. I use a wrapper around this to test our untyped
`ssaUpdate` path.

---------

Co-authored-by: Ramon Quitales <ramon@pulumi.com>
  • Loading branch information
blampe and rquitales authored Oct 31, 2024
1 parent fdfb964 commit e0f6e13
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## Unreleased

### Fixed

- Objects created on clusters older than 1.18 will no longer see a
`before-first-apply` conflict when Pulumi performs a server-side apply for
the first time. (https://github.com/pulumi/pulumi-kubernetes/pull/3275)

## 4.18.2 (October 16, 2024)

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion provider/cmd/pulumi-resource-kubernetes/schema.json

Large diffs are not rendered by default.

15 changes: 13 additions & 2 deletions provider/pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,12 @@ func csaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy
return client.Patch(c.Context, liveOldObj.GetName(), patchType, patch, options)
}

type patcher interface {
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
}

// ssaUpdate handles the logic for updating a resource using server-side apply.
func ssaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dynamic.ResourceInterface) (*unstructured.Unstructured, error) {
func ssaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client patcher) (*unstructured.Unstructured, error) {
liveOldObj, err := fixCSAFieldManagers(c, liveOldObj, client)
if err != nil {
return nil, err
Expand Down Expand Up @@ -720,7 +724,7 @@ func ensureFieldsAreMembers(s *fieldpath.Set) *fieldpath.Set {

// fixCSAFieldManagers patches the field managers for an existing resource that was managed using client-side apply.
// The new server-side apply field manager takes ownership of all these fields to avoid conflicts.
func fixCSAFieldManagers(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dynamic.ResourceInterface) (*unstructured.Unstructured, error) {
func fixCSAFieldManagers(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client patcher) (*unstructured.Unstructured, error) {
if kinds.IsPatchResource(c.URN, c.Inputs.GetKind()) {
// When dealing with a patch resource, there's no need to patch the field managers.
// Doing so would inadvertently make us responsible for managing fields that are not relevant to us during updates,
Expand Down Expand Up @@ -903,6 +907,13 @@ func patchForce(inputs, live *unstructured.Unstructured, preview bool) bool {
}
}
}
// Legacy objects created before SSA don't record any managedFields, but
// they still have a default "before-first-apply" manager. This manager owns every
// field that existed before the first SSA apply. To work around this we will take
// control of the object.
if live != nil && len(live.GetManagedFields()) == 0 {
return true
}

return false
}
Expand Down
304 changes: 303 additions & 1 deletion provider/pkg/await/await_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
// Copyright 2021, Pulumi Corporation. All rights reserved.
// Copyright 2021-2024, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package await

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -31,9 +44,15 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/managedfields"
"k8s.io/apimachinery/pkg/util/managedfields/managedfieldstest"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
kfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
kubetesting "k8s.io/client-go/testing"
"k8s.io/kube-openapi/pkg/validation/spec"
"sigs.k8s.io/yaml"
)

var (
Expand Down Expand Up @@ -1151,3 +1170,286 @@ func FailedRESTMapper(mapper meta.ResettableRESTMapper, err error) *fake.StubRes
},
}
}

func fakeTypeConverter(t *testing.T) managedfields.TypeConverter {
t.Helper()

openapi, err := fake.LoadOpenAPISchema()
require.NoError(t, err)

swagger := spec.Swagger{}
raw, err := openapi.YAMLValue("")
require.NoError(t, err)
require.NoError(t, yaml.Unmarshal(raw, &swagger))

definitions := map[string]*spec.Schema{}
for k, v := range swagger.Definitions {
p := v
definitions[k] = &p
}

tc, err := managedfields.NewTypeConverter(definitions, false)
require.NoError(t, err)
return tc
}

// TestSSAWithOldObjects is a regression test for
// https://github.com/pulumi/customer-support/issues/1837. An object is created
// and manipulated such that it no longer has any .metadata.managedFields, as
// is the case with things created prior to 1.18. We confirm this reproduces
// the issue and that our SSA upgrade logic handles it.
func TestSSAWithOldObjects(t *testing.T) {
t.Parallel()

tc := fakeTypeConverter(t)
fm := managedfieldstest.NewTestFieldManager(tc, schema.FromAPIVersionAndKind("v1", "Service"))

inputs := &unstructured.Unstructured{}
in := `{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"labels": {
"app.kubernetes.io/instance": "autoscaler",
"app.kubernetes.io/managed-by": "pulumi",
"app.kubernetes.io/name": "aws-cluster-autoscaler",
"app.kubernetes.io/version": "1.28.2",
"helm.sh/chart": "cluster-autoscaler-9.34.1"
},
"name": "cluster-autoscaler",
"namespace": "kube-system"
},
"spec": {}
}`
require.NoError(t, json.Unmarshal([]byte(in), inputs))
// We need the last-applied-config annotation in order to trigger kubectl's
// graceful CSA->SSA upgrade path.
last, err := inputs.MarshalJSON()
require.NoError(t, err)
inputs.SetAnnotations(map[string]string{
"kubectl.kubernetes.io/last-applied-configuration": string(last),
})

// Create the object. As of 1.18 all objects are created with
// managedFields -- even when using CSA.
obj := inputs.DeepCopy()
err = fm.Update(obj, "kubectl-create")
require.NoError(t, err)
require.NotEmpty(t, fm.ManagedFields())
assert.Len(t, fm.ManagedFields(), 1)

// However we can still disable managed fields after creating the object by
// explicitly setting it to `[]`.
obj = inputs.DeepCopy()
obj.SetManagedFields([]metav1.ManagedFieldsEntry{})
err = fm.Update(obj, "kubectl-update")
require.NoError(t, err)
assert.Empty(t, fm.ManagedFields())

// Try to update a label on the object using a naive apply.
obj = inputs.DeepCopy()
obj.SetLabels(map[string]string{
"helm.sh/chart": "cluster-autoscaler-9.36.0",
})
// Despite having no field managers, our apply still conflicts with the
// legacy "before-first-apply" manager.
err = fm.Apply(obj, "pulumi-kubernetes", false)
assert.ErrorContains(t, err, `Apply failed with 1 conflict: conflict with "before-first-apply" using v1: .metadata.labels.helm.sh/chart`)

// Now try again using our SSA upgrade logic -- this should succeed.
cfg := &UpdateConfig{
Inputs: obj,
Preview: false,
ProviderConfig: ProviderConfig{
URN: resource.NewURN(tokens.QName("teststack"), tokens.PackageName("testproj"), tokens.Type(""), "v1/Service", "testresource"),
FieldManager: "pulumi-kubernetes",
ServerSideApply: true,
},
}
_, err = ssaUpdate(cfg, obj, fieldManagerPatcher{fm})
require.NoError(t, err)
}

func TestSSAUpdate(t *testing.T) {
tests := []struct {
name string
obj string
preview bool
wantManagers []string
}{
{
name: "we take ownership of kubectl CSA",
obj: `apiVersion: v1
kind: Namespace
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"v1","kind":"Namespace","metadata":{"annotations":{},"labels":{"app.kubernetes.io/instance":"flux-system","app.kubernetes.io/part-of":"flux","pod-security.kubernetes.io/warn":"restricted","pod-security.kubernetes.io/warn-version":"latest"},"name":"flux-system"}}
creationTimestamp: "2024-09-24T19:27:32Z"
labels:
app.kubernetes.io/instance: flux-system
app.kubernetes.io/part-of: flux
kubernetes.io/metadata.name: flux-system
pod-security.kubernetes.io/warn: restricted
pod-security.kubernetes.io/warn-version: latest
managedFields:
- apiVersion: v1
fieldsType: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.: {}
f:kubectl.kubernetes.io/last-applied-configuration: {}
f:labels:
.: {}
f:app.kubernetes.io/instance: {}
f:app.kubernetes.io/part-of: {}
f:kubernetes.io/metadata.name: {}
f:pod-security.kubernetes.io/warn: {}
f:pod-security.kubernetes.io/warn-version: {}
manager: kubectl-client-side-apply
operation: Update
time: "2024-09-24T19:27:32Z"
name: flux-system
resourceVersion: "138234"
uid: c14c35d8-ae5d-4f53-8391-791d47efe337
spec:
finalizers:
- kubernetes
status:
phase: Active`,
preview: false,
wantManagers: []string{"pulumi-kubernetes"},
},
{
name: "we take ownership of kubectl SSA",
obj: `apiVersion: v1
kind: Namespace
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"v1","kind":"Namespace","metadata":{"annotations":{},"labels":{"app.kubernetes.io/instance":"flux-system","app.kubernetes.io/part-of":"flux","pod-security.kubernetes.io/warn":"restricted","pod-security.kubernetes.io/warn-version":"latest"},"name":"flux-system"}}
creationTimestamp: "2024-09-24T19:27:32Z"
labels:
app.kubernetes.io/instance: flux-system
app.kubernetes.io/part-of: flux
kubernetes.io/metadata.name: flux-system
pod-security.kubernetes.io/warn: restricted
pod-security.kubernetes.io/warn-version: latest
managedFields:
- apiVersion: v1
fieldsType: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.: {}
f:kubectl.kubernetes.io/last-applied-configuration: {}
f:labels:
.: {}
f:app.kubernetes.io/instance: {}
f:app.kubernetes.io/part-of: {}
f:kubernetes.io/metadata.name: {}
f:pod-security.kubernetes.io/warn: {}
f:pod-security.kubernetes.io/warn-version: {}
manager: kubectl-client-side-apply
operation: Update
time: "2024-09-24T19:27:32Z"
name: flux-system
resourceVersion: "138234"
uid: c14c35d8-ae5d-4f53-8391-791d47efe337
spec:
finalizers:
- kubernetes
status:
phase: Active`,
preview: false,
wantManagers: []string{"pulumi-kubernetes"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var obj unstructured.Unstructured
require.NoError(t, yaml.Unmarshal([]byte(tt.obj), &obj))

typed, err := scheme.Scheme.New(obj.GroupVersionKind())
require.NoError(t, err)

require.NoError(t, runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, typed))

// client, _, _, _ := fake.NewSimpleDynamicClient(fake.WithObjects(typed))
//
client := kfake.NewClientset(typed)
c := client.CoreV1().Namespaces()

inputs := obj.DeepCopy()
inputs.SetLabels(nil)
inputs.SetManagedFields(nil)
cfg := &UpdateConfig{
Inputs: inputs,
Preview: tt.preview,
ProviderConfig: ProviderConfig{
URN: resource.NewURN(tokens.QName("teststack"), tokens.PackageName("testproj"), tokens.Type(""), "v1/Service", "testresource"),
FieldManager: "pulumi-kubernetes",
ServerSideApply: true,
},
}
live, err := ssaUpdate(cfg, &obj, untypedPatcher[*corev1.Namespace]{wrapped: c})
require.NoError(t, err)
assert.Len(t, live.GetManagedFields(), 1)
for idx, want := range tt.wantManagers {
assert.Equal(t, want, live.GetManagedFields()[idx].Manager)
}
})
}
}

type typedPatcher[T runtime.Object] interface {
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (T, error)
}

type untypedPatcher[T runtime.Object] struct {
wrapped typedPatcher[T]
}

func (p untypedPatcher[T]) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) {
typed, err := p.wrapped.Patch(ctx, name, pt, data, options, subresources...)
if err != nil {
return nil, err
}
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(typed)
return &unstructured.Unstructured{Object: obj}, err
}

type fieldManagerPatcher struct {
fm managedfieldstest.TestFieldManager
}

func (p fieldManagerPatcher) Patch(_ context.Context, _ string, pt types.PatchType, data []byte, options metav1.PatchOptions, _ ...string) (*unstructured.Unstructured, error) {
if pt != types.ApplyPatchType {
return nil, fmt.Errorf("fieldManagerPatcher only handles Apply")
}

force := false
if options.Force != nil {
force = *options.Force
}

in, err := yaml.YAMLToJSON(data)
if err != nil {
return nil, err
}

obj, _, err := unstructured.UnstructuredJSONScheme.Decode(in, nil, nil)
if err != nil {
return nil, err
}

err = p.fm.Apply(obj, options.FieldManager, force)
if err != nil {
return nil, err
}

live := p.fm.Live()
return live.(*unstructured.Unstructured), err
}
Loading

0 comments on commit e0f6e13

Please sign in to comment.