Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: calculate SSA diffs with smd.merge.Updater #467

Merged
merged 8 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ jobs:
with:
version: v1.38.0
args: --timeout 5m
skip-go-installation: true
- uses: codecov/codecov-action@v3.1.0
with:
token: ${{ secrets.CODECOV_TOKEN }} #required
Expand Down
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
run:
skip-files:
- "pkg/diff/internal/fieldmanager/borrowed_.+\\.go$"
125 changes: 85 additions & 40 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import (

jsonpatch "github.com/evanphx/json-patch"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/managedfields"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
"sigs.k8s.io/structured-merge-diff/v4/merge"
"sigs.k8s.io/structured-merge-diff/v4/typed"

"github.com/argoproj/gitops-engine/internal/kubernetes_vendor/pkg/api/v1/endpoints"
"github.com/argoproj/gitops-engine/pkg/diff/internal/fieldmanager"
"github.com/argoproj/gitops-engine/pkg/sync/resource"
jsonutil "github.com/argoproj/gitops-engine/pkg/utils/json"
gescheme "github.com/argoproj/gitops-engine/pkg/utils/kube/scheme"
Expand Down Expand Up @@ -121,67 +124,66 @@ func Diff(config, live *unstructured.Unstructured, opts ...Option) (*DiffResult,
// k8s library (https://github.com/kubernetes-sigs/structured-merge-diff).
func StructuredMergeDiff(config, live *unstructured.Unstructured, gvkParser *managedfields.GvkParser, manager string) (*DiffResult, error) {
if live != nil && config != nil {
gvk := config.GetObjectKind().GroupVersionKind()
pt := gescheme.ResolveParseableType(gvk, gvkParser)
return structuredMergeDiff(config, live, pt, manager)
params := &SMDParams{
config: config,
live: live,
gvkParser: gvkParser,
manager: manager,
}
return structuredMergeDiff(params)
}
return handleResourceCreateOrDeleteDiff(config, live)
}

func structuredMergeDiff(config, live *unstructured.Unstructured, pt *typed.ParseableType, manager string) (*DiffResult, error) {
// 1) Build typed value from live and config unstructures
tvLive, err := pt.FromUnstructured(live.Object)
// SMDParams defines the parameters required by the structuredMergeDiff
// function
type SMDParams struct {
config *unstructured.Unstructured
live *unstructured.Unstructured
gvkParser *managedfields.GvkParser
manager string
}

func structuredMergeDiff(p *SMDParams) (*DiffResult, error) {

gvk := p.config.GetObjectKind().GroupVersionKind()
pt := gescheme.ResolveParseableType(gvk, p.gvkParser)

// Build typed value from live and config unstructures
tvLive, err := pt.FromUnstructured(p.live.Object)
if err != nil {
return nil, fmt.Errorf("error building typed value from live resource: %w", err)
}
tvConfig, err := pt.FromUnstructured(config.Object)
tvConfig, err := pt.FromUnstructured(p.config.Object)
if err != nil {
return nil, fmt.Errorf("error building typed value from config resource: %w", err)
}

previousFieldSet := &fieldpath.Set{}
managerFound := false
// 2) Search for manager to find all fields managed by it
// so it can be removed from live state before merging desired
// state (config).
if manager != "" {
for _, m := range live.GetManagedFields() {
if m.Manager == manager {
err := previousFieldSet.FromJSON(bytes.NewReader(m.FieldsV1.Raw))
if err != nil {
return nil, fmt.Errorf("error parsing manager fields from JSON: %w", err)
}
managerFound = true
}
}
// Invoke the apply function to calculate the diff using
// the structured-merge-diff library
mergedLive, err := apply(tvConfig, tvLive, p)
if err != nil {
return nil, fmt.Errorf("error calculating diff: %w", err)
}

// 3) When manager is not found, it means that the resource
// wasn't being synced with the given manager up to this point.
// In this case config fields will be used to clean live state.
if !managerFound {
previousFieldSet, err = tvConfig.ToFieldSet()
// When mergedLive is nil it means that there is no change
if mergedLive == nil {
liveBytes, err := json.Marshal(p.live)
if err != nil {
return nil, fmt.Errorf("error converting config to fieldset: %w", err)
return nil, fmt.Errorf("error marshaling live resource: %w", err)
}
// In this case diff result will have live state for both,
// predicted and live.
return buildDiffResult(liveBytes, liveBytes), nil
}

// 4) Remove previous fields from live
cleanLive := tvLive.RemoveItems(previousFieldSet)

// 5) Merge desired state in clean live
mergedCleanLive, err := cleanLive.Merge(tvConfig)
if err != nil {
return nil, fmt.Errorf("error merging config into clean live: %w", err)
}

// 6) Apply default values in predicted live
predictedLive, err := normalizeTypedValue(mergedCleanLive)
// Normalize merged live
predictedLive, err := normalizeTypedValue(mergedLive)
if err != nil {
return nil, fmt.Errorf("error applying default values in predicted live: %w", err)
}

// 7) Apply default values in live
// Normalize live
taintedLive, err := normalizeTypedValue(tvLive)
if err != nil {
return nil, fmt.Errorf("error applying default values in live: %w", err)
Expand All @@ -190,6 +192,49 @@ func structuredMergeDiff(config, live *unstructured.Unstructured, pt *typed.Pars
return buildDiffResult(predictedLive, taintedLive), nil
}

// apply will build all the dependency required to invoke the smd.merge.updater.Apply
// to correctly calculate the diff with the same logic used in k8s with server-side
// apply.
func apply(tvConfig, tvLive *typed.TypedValue, p *SMDParams) (*typed.TypedValue, error) {

// Build the structured-merge-diff Updater
updater := merge.Updater{
Converter: fieldmanager.NewVersionConverter(p.gvkParser, scheme.Scheme, p.config.GroupVersionKind().GroupVersion()),
}

// Build a list of managers and which API version they own
managed, err := fieldmanager.DecodeManagedFields(p.live.GetManagedFields())
if err != nil {
return nil, fmt.Errorf("error decoding managed fields: %w", err)
}

// Use the desired manifest to extract the target resource version
version := fieldpath.APIVersion(p.config.GetAPIVersion())

// The manager string needs to be converted to the internal manager
// key used inside structured-merge-diff apply logic
managerKey, err := buildManagerInfoForApply(p.manager)
if err != nil {
return nil, fmt.Errorf("error building manager info: %w", err)
}

// Finally invoke Apply to execute the same function used in k8s
// server-side applies
mergedLive, _, err := updater.Apply(tvLive, tvConfig, version, managed.Fields(), managerKey, true)
if err != nil {
return nil, fmt.Errorf("error while running updater.Apply: %w", err)
}
return mergedLive, err
}

func buildManagerInfoForApply(manager string) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: manager,
Operation: metav1.ManagedFieldsOperationApply,
}
return fieldmanager.BuildManagerIdentifier(&managerInfo)
}

// normalizeTypedValue will prepare the given tv so it can be used in diffs by:
// - removing last-applied-configuration annotation
// - applying default values
Expand Down
99 changes: 88 additions & 11 deletions pkg/diff/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ import (
"testing"

"github.com/argoproj/gitops-engine/pkg/diff/testdata"
"github.com/argoproj/gitops-engine/pkg/utils/kube/scheme"
openapi_v2 "github.com/google/gnostic/openapiv2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/managedfields"
"k8s.io/klog/v2/klogr"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/yaml"
)

Expand Down Expand Up @@ -747,39 +751,68 @@ func TestUnsortedEndpoints(t *testing.T) {
}
}

func buildGVKParser(t *testing.T) *managedfields.GvkParser {
document := &openapi_v2.Document{}
err := proto.Unmarshal(testdata.OpenAPIV2Doc, document)
if err != nil {
t.Fatalf("error unmarshaling openapi doc: %s", err)
}
models, err := openapiproto.NewOpenAPIData(document)
if err != nil {
t.Fatalf("error building openapi data: %s", err)
}

gvkParser, err := managedfields.NewGVKParser(models, false)
if err != nil {
t.Fatalf("error building gvkParser: %s", err)
}
return gvkParser
}

func TestStructuredMergeDiff(t *testing.T) {
parser := scheme.StaticParser()
svcParseType := parser.Type("io.k8s.api.core.v1.Service")
manager := "argocd-controller"
buildParams := func(live, config *unstructured.Unstructured) *SMDParams {
gvkParser := buildGVKParser(t)
manager := "argocd-controller"
return &SMDParams{
config: config,
live: live,
gvkParser: gvkParser,
manager: manager,
}
}

t.Run("will apply default values", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.ServiceLiveYAML)
desiredState := StrToUnstructured(testdata.ServiceConfigYAML)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(desiredState, liveState, &svcParseType, manager)
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
assert.NotNil(t, result)
assert.False(t, result.Modified)
assert.True(t, result.Modified)
predictedSVC := YamlToSvc(t, result.PredictedLive)
liveSVC := YamlToSvc(t, result.NormalizedLive)
assert.NotNil(t, predictedSVC.Spec.InternalTrafficPolicy)
assert.NotNil(t, liveSVC.Spec.InternalTrafficPolicy)
require.NotNil(t, predictedSVC.Spec.InternalTrafficPolicy)
require.NotNil(t, liveSVC.Spec.InternalTrafficPolicy)
assert.Equal(t, "Cluster", string(*predictedSVC.Spec.InternalTrafficPolicy))
assert.Equal(t, "Cluster", string(*liveSVC.Spec.InternalTrafficPolicy))
assert.Empty(t, predictedSVC.Annotations[AnnotationLastAppliedConfig])
assert.Empty(t, liveSVC.Annotations[AnnotationLastAppliedConfig])
})
t.Run("will remove entries in list", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.ServiceLiveYAML)
desiredState := StrToUnstructured(testdata.ServiceConfigWith2Ports)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(desiredState, liveState, &svcParseType, manager)
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
Expand All @@ -790,11 +823,13 @@ func TestStructuredMergeDiff(t *testing.T) {
})
t.Run("will remove previously added fields not present in desired state", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.LiveServiceWithTypeYAML)
desiredState := StrToUnstructured(testdata.ServiceConfigYAML)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(desiredState, liveState, &svcParseType, manager)
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
Expand All @@ -805,11 +840,13 @@ func TestStructuredMergeDiff(t *testing.T) {
})
t.Run("will apply service with multiple ports", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.ServiceLiveYAML)
desiredState := StrToUnstructured(testdata.ServiceConfigWithSamePortsYAML)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(desiredState, liveState, &svcParseType, manager)
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
Expand All @@ -818,6 +855,36 @@ func TestStructuredMergeDiff(t *testing.T) {
svc := YamlToSvc(t, result.PredictedLive)
assert.Len(t, svc.Spec.Ports, 5)
})
t.Run("will apply deployment defaults correctly", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.DeploymentLiveYAML)
desiredState := StrToUnstructured(testdata.DeploymentConfigYAML)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
assert.NotNil(t, result)
assert.False(t, result.Modified)
deploy := YamlToDeploy(t, result.PredictedLive)
assert.Len(t, deploy.Spec.Template.Spec.Containers, 1)
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Requests.Cpu().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Requests.Memory().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Requests.Storage().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Limits.Memory().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Limits.Storage().String())
require.NotNil(t, deploy.Spec.Strategy.RollingUpdate)
expectedMaxSurge := &intstr.IntOrString{
Type: intstr.String,
StrVal: "25%",
}
assert.Equal(t, expectedMaxSurge, deploy.Spec.Strategy.RollingUpdate.MaxSurge)
assert.Equal(t, "ClusterFirst", string(deploy.Spec.Template.Spec.DNSPolicy))
})
}

func createSecret(data map[string]string) *unstructured.Unstructured {
Expand Down Expand Up @@ -1078,6 +1145,16 @@ func YamlToSvc(t *testing.T, y []byte) *corev1.Service {
return &svc
}

func YamlToDeploy(t *testing.T, y []byte) *appsv1.Deployment {
t.Helper()
deploy := appsv1.Deployment{}
err := yaml.Unmarshal(y, &deploy)
if err != nil {
t.Fatalf("error unmarshaling deployment bytes: %s", err)
}
return &deploy
}

func StrToUnstructured(yamlStr string) *unstructured.Unstructured {
obj := make(map[string]interface{})
err := yaml.Unmarshal([]byte(yamlStr), &obj)
Expand Down
2 changes: 2 additions & 0 deletions pkg/diff/internal/fieldmanager/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Please check the doc.go file for more details about
how to use and maintain the code in this package.
Loading