Skip to content

Commit

Permalink
Prevent generation only updates (#3915)
Browse files Browse the repository at this point in the history
* Prevent generation only contract updates

This guards contract ConfigMap updates when only `generation`
changes.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Refactor to simplify and remove reconciler level change detection

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored May 31, 2024
1 parent 3bec54f commit 483904b
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 199 deletions.
9 changes: 9 additions & 0 deletions control-plane/pkg/contract/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@

package contract

import (
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
)

// IncrementGeneration increments Generation.
func (x *Contract) IncrementGeneration() {
x.Generation++
}

func SemanticEqual(ct1 *Contract, ct2 *Contract) bool {
return cmp.Equal(ct1, ct2, protocmp.Transform(), protocmp.IgnoreFields(ct1, "generation"))
}
11 changes: 2 additions & 9 deletions control-plane/pkg/core/config/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,17 @@ const (
)

// AddOrUpdateEgressConfig adds or updates the given egress to the given contract at the specified indexes.
func AddOrUpdateEgressConfig(ct *contract.Contract, resourceIndex int, egress *contract.Egress, egressIndex int) int {
func AddOrUpdateEgressConfig(ct *contract.Contract, resourceIndex int, egress *contract.Egress, egressIndex int) {

if egressIndex != NoEgress {
prev := ct.Resources[resourceIndex].Egresses[egressIndex]
ct.Resources[resourceIndex].Egresses[egressIndex] = egress

if proto.Equal(prev, egress) {
return EgressUnchanged
}
return EgressChanged
return
}

ct.Resources[resourceIndex].Egresses = append(
ct.Resources[resourceIndex].Egresses,
egress,
)

return EgressChanged
}

// AddOrUpdateEgressConfigForResource adds or updates the given egress to the given contract at the specified indexes.
Expand Down
11 changes: 8 additions & 3 deletions control-plane/pkg/core/config/egress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -210,13 +211,17 @@ func TestAddOrUpdateEgressConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := AddOrUpdateEgressConfig(tt.givenCt, tt.brokerIndex, tt.egress, tt.egressIndex); got != tt.changed {
t.Errorf("AddOrUpdateEgressConfig() = %v, want %v", got, tt.changed)
}
before := proto.Clone(tt.givenCt).(*contract.Contract)
AddOrUpdateEgressConfig(tt.givenCt, tt.brokerIndex, tt.egress, tt.egressIndex)

if diff := cmp.Diff(tt.wantCt, tt.givenCt, protocmp.Transform()); diff != "" {
t.Errorf("(-want, +got) %s", diff)
}

gotEqual := contract.SemanticEqual(before, tt.wantCt)
if expectedEqual := contract.SemanticEqual(before, tt.wantCt); expectedEqual != gotEqual {
t.Errorf("expectEqual want %v got %v", expectedEqual, gotEqual)
}
})
}
}
Expand Down
20 changes: 2 additions & 18 deletions control-plane/pkg/core/config/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package config

import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"k8s.io/apimachinery/pkg/types"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
Expand All @@ -41,37 +40,22 @@ func FindResource(contract *contract.Contract, resource types.UID) int {
return resourceIndex
}

const (
ResourceChanged = iota
ResourceUnchanged
)

func SetResourceEgressesFromContract(contract *contract.Contract, resource *contract.Resource, index int) {
if index != NoResource {
resource.Egresses = contract.Resources[index].Egresses
}
}

// AddOrUpdateResourceConfig adds or updates the given resourceConfig to the given resources at the specified index.
func AddOrUpdateResourceConfig(contract *contract.Contract, resource *contract.Resource, index int, logger *zap.Logger) int {

func AddOrUpdateResourceConfig(contract *contract.Contract, resource *contract.Resource, index int, logger *zap.Logger) {
if index != NoResource {
logger.Debug("Resource exists", zap.Int("index", index))

prev := contract.Resources[index]
contract.Resources[index] = resource

if proto.Equal(prev, resource) {
return ResourceUnchanged
}
return ResourceChanged
return
}

logger.Debug("Resource doesn't exist")

contract.Resources = append(contract.Resources, resource)

return ResourceChanged
}

// DeleteResource deletes the resource at the given index from Resources.
Expand Down
14 changes: 8 additions & 6 deletions control-plane/pkg/core/config/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
Expand Down Expand Up @@ -86,7 +87,6 @@ func TestAddOrUpdateResourcesConfig(t *testing.T) {
newResource *contract.Resource
index int
wantContract *contract.Contract
changed int
}{
{
name: "resource not found - add resource",
Expand Down Expand Up @@ -314,8 +314,7 @@ func TestAddOrUpdateResourcesConfig(t *testing.T) {
ContentMode: contract.ContentMode_STRUCTURED,
},
},
index: 0,
changed: ResourceUnchanged,
index: 0,
wantContract: &contract.Contract{
Resources: []*contract.Resource{
{
Expand Down Expand Up @@ -346,13 +345,16 @@ func TestAddOrUpdateResourcesConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
changed := AddOrUpdateResourceConfig(tt.haveContract, tt.newResource, tt.index, zap.NewNop())
before := proto.Clone(tt.haveContract).(*contract.Contract)
AddOrUpdateResourceConfig(tt.haveContract, tt.newResource, tt.index, zap.NewNop())

if diff := cmp.Diff(tt.wantContract, tt.haveContract, protocmp.Transform()); diff != "" {
t.Errorf("(-want, +got) %s", diff)
}
if changed != tt.changed {
t.Errorf("Changed want %d got %d", tt.changed, changed)

gotEqual := contract.SemanticEqual(before, tt.wantContract)
if expectedEqual := contract.SemanticEqual(before, tt.wantContract); expectedEqual != gotEqual {
t.Errorf("expectEqual want %v got %v", expectedEqual, gotEqual)
}
})
}
Expand Down
18 changes: 15 additions & 3 deletions control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/utils/pointer"
"knative.dev/pkg/logging"
"knative.dev/pkg/tracker"

"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
Expand Down Expand Up @@ -235,7 +236,21 @@ func GetDataPlaneConfigMapData(logger *zap.Logger, dataPlaneConfigMap *corev1.Co
return ct, nil
}

func CompareSemanticEqual(ctx context.Context, ct *contract.Contract, existing *corev1.ConfigMap, format string) bool {
existingCt, err := GetDataPlaneConfigMapData(logging.FromContext(ctx).Desugar(), existing, format)
if existingCt != nil && err == nil {
return contract.SemanticEqual(existingCt, ct)
}
return false
}

func (r *Reconciler) UpdateDataPlaneConfigMap(ctx context.Context, contract *contract.Contract, configMap *corev1.ConfigMap) error {
if CompareSemanticEqual(ctx, contract, configMap, r.ContractConfigMapFormat) {
return nil
}

// Resource changed, increment contract generation.
coreconfig.IncrementContractGeneration(contract)

var data []byte
var err error
Expand Down Expand Up @@ -378,9 +393,6 @@ func (r *Reconciler) DeleteResource(ctx context.Context, logger *zap.Logger, uui

logger.Debug("Resource deleted", zap.Int("index", resourceIndex))

// Resource changed, increment contract generation.
coreconfig.IncrementContractGeneration(ct)

// Update the configuration map with the new contract data.
if err := r.UpdateDataPlaneConfigMap(ctx, ct, contractConfigMap); err != nil {
return err
Expand Down
159 changes: 159 additions & 0 deletions control-plane/pkg/reconciler/base/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,162 @@ const dataPlaneContractExtraData = `{
]
}
`

func TestCompareSemanticEqual(t *testing.T) {
type args struct {
ctx context.Context
contract *contract.Contract
existing *corev1.ConfigMap
format string
}

ctx, _ := reconcilertesting.SetupFakeContext(t)

tests := []struct {
name string
args args
want bool
}{
{
name: "only generation changes",
args: args{
ctx: ctx,
contract: &contract.Contract{
Generation: 0,
Resources: nil,
TrustBundles: nil,
},
existing: &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Immutable: nil,
Data: nil,
BinaryData: map[string][]byte{
base.ConfigMapDataKey: mustMarshal(&contract.Contract{
Generation: 1,
Resources: nil,
TrustBundles: nil,
}),
},
},
format: string(contract.Json),
},
want: true,
},
{
name: "only generation changes with resources",
args: args{
ctx: ctx,
contract: &contract.Contract{
Generation: 0,
Resources: []*contract.Resource{
{
Uid: "aaa",
},
},
TrustBundles: []string{"---"},
},
existing: &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Immutable: nil,
Data: nil,
BinaryData: map[string][]byte{
base.ConfigMapDataKey: mustMarshal(&contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: "aaa",
},
},
TrustBundles: []string{"---"},
}),
},
},
format: string(contract.Json),
},
want: true,
},
{
name: "resources changes",
args: args{
ctx: ctx,
contract: &contract.Contract{
Generation: 0,
Resources: []*contract.Resource{
{
Uid: "aaa",
},
},
TrustBundles: []string{"---"},
},
existing: &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Immutable: nil,
Data: nil,
BinaryData: map[string][]byte{
base.ConfigMapDataKey: mustMarshal(&contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: "aab",
},
},
TrustBundles: []string{"---"},
}),
},
},
format: string(contract.Json),
},
want: false,
},
{
name: "trust bundle changes",
args: args{
ctx: ctx,
contract: &contract.Contract{
Generation: 0,
Resources: []*contract.Resource{
{
Uid: "aaa",
},
},
TrustBundles: []string{"--"},
},
existing: &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Immutable: nil,
Data: nil,
BinaryData: map[string][]byte{
base.ConfigMapDataKey: mustMarshal(&contract.Contract{
Generation: 1,
Resources: []*contract.Resource{
{
Uid: "aaa",
},
},
TrustBundles: []string{"xyz"},
}),
},
},
format: string(contract.Json),
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, base.CompareSemanticEqual(tt.args.ctx, tt.args.contract, tt.args.existing, tt.args.format), "CompareSemanticEqual(%v, %v, %v, %v)", tt.args.ctx, tt.args.contract, tt.args.existing, tt.args.format)
})
}
}

func mustMarshal(c *contract.Contract) []byte {
b, err := protojson.Marshal(c)
if err != nil {
panic(err)
}
return b
}
Loading

0 comments on commit 483904b

Please sign in to comment.