diff --git a/cmd/upgrade/v0.15.0/main.go b/cmd/upgrade/v0.15.0/main.go new file mode 100644 index 00000000000..90b91c731a5 --- /dev/null +++ b/cmd/upgrade/v0.15.0/main.go @@ -0,0 +1,46 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + "context" + "fmt" + "os" + + "k8s.io/client-go/kubernetes" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/signals" + + versioned "knative.dev/eventing/pkg/client/clientset/versioned" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + broker "knative.dev/eventing/pkg/upgrader/broker/v0.15.0" +) + +func main() { + ctx := signals.NewContext() + cfg := sharedmain.ParseAndGetConfigOrDie() + ctx = context.WithValue(ctx, kubeclient.Key{}, kubernetes.NewForConfigOrDie(cfg)) + ctx = context.WithValue(ctx, eventingclient.Key{}, versioned.NewForConfigOrDie(cfg)) + if err := broker.Upgrade(ctx); err != nil { + fmt.Printf("Broker Upgrade failed with: %v\n", err) + os.Exit(1) + } +} diff --git a/config/upgrade/v0.15.0/README.md b/config/upgrade/v0.15.0/README.md new file mode 100644 index 00000000000..227fa8f4104 --- /dev/null +++ b/config/upgrade/v0.15.0/README.md @@ -0,0 +1,116 @@ +# Upgrade script (optional) to upgrade to v0.15.0 of Eventing + +This directory contains a job that updates all of v1alpha1 based Brokers that +are using Spec.ChannelTemplate to specify which channel they use. You are not +able to create new ones specifying the ChannelTemplate, and you can either +manually update them yourself, or run the tool provided here, which will do the +following for any Broker that is using Spec.ChannelTemplate: + +1. Creates a ConfigMap in the same namespace as the Broker named: + `broker-upgrade-auto-gen-config-` with the content from + Spec.ChannelTemplate. +1. Set Broker Spec.Config to point to this ConfigMap +1. Set Broker Spec.ChannelTemplate to nil + +To run the upgrade script: + +```shell +kubectl apply -f https://github.com/knative/eventing/releases/download/v0.15.0/upgrade-to-v0.15.0.yaml +``` + +It will create a job called v0.15.0-upgrade in the knative-eventing namespace. +If you installed to different namespace, you need to modify the upgrade.yaml +appropriately. Also the job by default runs as `eventing-controller` service +account, you can also modify that but the service account will need to have +permissions to list `Namespace`s, list and patch `Broker`s, and create +`ConfigMap`s. + +# Examples + +If for example you have an existing v1lpha1 Broker with the following Spec: + +```yaml +spec: + channelTemplateSpec: + apiVersion: messaging.knative.dev/v1alpha1 + kind: InMemoryChannel +``` + +The tool will create a ConfigMap that looks like so: + +```yaml +apiVersion: v1 +data: + channelTemplateSpec: |2 + + apiVersion: "messaging.knative.dev/v1alpha1" + kind: "InMemoryChannel" +kind: ConfigMap +metadata: + name: broker-upgrade-auto-gen-config-newbroker + namespace: test-broker-6 + ownerReferences: + - apiVersion: eventing.knative.dev/v1alpha1 + blockOwnerDeletion: true + controller: true + kind: Broker + name: newbroker +``` + +And your Broker will then look like this after the upgrade: + +```yaml +spec: + config: + apiVersion: v1 + kind: ConfigMap + name: broker-upgrade-auto-gen-config-newbroker + namespace: test-broker-6 +``` + +For KafkaChannels it might look like something like this: + +```yaml +spec: + channelTemplateSpec: + apiVersion: messaging.knative.dev/v1alpha1 + kind: KafkaChannel + spec: + numPartitions: 1 + replicationFactor: 1 +``` + +The resulting ConfigMap will be: + +```yaml +apiVersion: v1 +data: + channelTemplateSpec: |2 + + apiVersion: "messaging.knative.dev/v1alpha1" + kind: "KafkaChannel" + spec: + numPartitions: 1 + replicationFactor: 1 +kind: ConfigMap +metadata: + name: broker-upgrade-auto-gen-config-newbroker-kafka + namespace: test-broker-6 + ownerReferences: + - apiVersion: eventing.knative.dev/v1alpha1 + blockOwnerDeletion: true + controller: true + kind: Broker + name: newbroker-kafka +``` + +And the Broker will look like this: + +```yaml +spec: + config: + apiVersion: v1 + kind: ConfigMap + name: broker-upgrade-auto-gen-config-newbroker-kafka + namespace: test-broker-6 +``` diff --git a/config/upgrade/v0.15.0/upgrade.yaml b/config/upgrade/v0.15.0/upgrade.yaml new file mode 100644 index 00000000000..15a4e13fb84 --- /dev/null +++ b/config/upgrade/v0.15.0/upgrade.yaml @@ -0,0 +1,18 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: v0.15.0-upgrade + namespace: knative-eventing + labels: + eventing.knative.dev/release: devel +spec: + template: + metadata: + annotations: + sidecar.istio.io/inject: "false" + spec: + serviceAccountName: eventing-controller + restartPolicy: Never + containers: + - name: upgrade-brokers + image: ko://knative.dev/eventing/cmd/upgrade/v0.15.0 diff --git a/hack/release.sh b/hack/release.sh index 5f957792d7a..ce91acb3aab 100755 --- a/hack/release.sh +++ b/hack/release.sh @@ -25,6 +25,7 @@ readonly CHANNEL_BROKER_YAML="deprecated-channel-broker.yaml" readonly MT_CHANNEL_BROKER_YAML="mt-channel-broker.yaml" readonly IN_MEMORY_CHANNEL="in-memory-channel.yaml" readonly UPGRADE_JOB="upgrade-to-v0.14.0.yaml" +readonly UPGRADE_JOB_V_0_15="upgrade-to-v0.15.0.yaml" declare -A RELEASES RELEASES=( @@ -59,10 +60,13 @@ function build_release() { # Create in memory channel yaml ko resolve ${KO_FLAGS} -f config/channels/in-memory-channel/ | "${LABEL_YAML_CMD[@]}" > "${IN_MEMORY_CHANNEL}" - # Create upgrade job yaml + # Create v0.14.0 upgrade job yaml ko resolve ${KO_FLAGS} -f config/upgrade/v0.14.0/ | "${LABEL_YAML_CMD[@]}" > "${UPGRADE_JOB}" - local all_yamls=(${EVENTING_CORE_YAML} ${EVENTING_CRDS_YAML} ${CHANNEL_BROKER_YAML} ${MT_CHANNEL_BROKER_YAML} ${IN_MEMORY_CHANNEL} ${UPGRADE_JOB}) + # Create v0.15.0 upgrade job yaml + ko resolve ${KO_FLAGS} -f config/upgrade/v0.15.0/ | "${LABEL_YAML_CMD[@]}" > "${UPGRADE_JOB_V_0_15}" + + local all_yamls=(${EVENTING_CORE_YAML} ${EVENTING_CRDS_YAML} ${CHANNEL_BROKER_YAML} ${MT_CHANNEL_BROKER_YAML} ${IN_MEMORY_CHANNEL} ${UPGRADE_JOB} ${UPGRADE_JOB_V_0_15}) # Assemble the release for yaml in "${!RELEASES[@]}"; do echo "Assembling Knative Eventing - ${yaml}" diff --git a/pkg/upgrader/broker/v0.15.0/upgrader.go b/pkg/upgrader/broker/v0.15.0/upgrader.go new file mode 100644 index 00000000000..421598fe567 --- /dev/null +++ b/pkg/upgrader/broker/v0.15.0/upgrader.go @@ -0,0 +1,182 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broker + +import ( + "context" + "fmt" + "strings" + + "github.com/ghodss/yaml" + // "gopkg.in/yaml.v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/pkg/apis/duck" + duckv1 "knative.dev/pkg/apis/duck/v1" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/logging" +) + +// Upgrade upgrades all the brokers by seeing if they have Spec.ChannelTemplate +// and if they do, create a ConfigMap in that namespace with the contents of the +// Spec.ChannelTemplate, that's owned by the Broker, then update the Broker +// to have Spec.Config to point to the newly created ConfigMap and remove +// the ChannelTemplate. +func Upgrade(ctx context.Context) error { + logger := logging.FromContext(ctx) + + nsClient := kubeclient.Get(ctx).CoreV1().Namespaces() + namespaces, err := nsClient.List(metav1.ListOptions{}) + if err != nil { + logger.Warnf("Failed to list namespaces: %v", err) + return err + } + for _, ns := range namespaces.Items { + err = processNamespace(ctx, ns.Name) + if err != nil { + return err + } + } + return nil +} + +func processNamespace(ctx context.Context, ns string) error { + logger := logging.FromContext(ctx) + logger.Infof("Processing Brokers in namespace: %q", ns) + + eventingClient := eventingclient.Get(ctx) + brokerClient := eventingClient.EventingV1alpha1().Brokers(ns) + brokers, err := brokerClient.List(metav1.ListOptions{}) + if err != nil { + logger.Warnf("Failed to list brokers for namespace %q: %v", ns, err) + return err + } + for _, broker := range brokers.Items { + patch, err := processBroker(ctx, broker) + if err != nil { + logger.Warnf("Failed to process a Broker \"%s/%s\" : %v", broker.Namespace, broker.Name, err) + return err + } + if len(patch) == 0 { + logger.Infof("Broker \"%s/%s\" does not require updating", broker.Namespace, broker.Name) + continue + } + + // Ok, there are differences, apply the patch + logger.Infof("Patching Broker \"%s/%s\" with %q", broker.Namespace, broker.Name, string(patch)) + patched, err := brokerClient.Patch(broker.Name, types.MergePatchType, patch) + if err != nil { + logger.Warnf("Failed to patch \"%s/%s\" : %v", broker.Namespace, broker.Name, err) + return err + } + logger.Infof("Patched \"%s/%s\" successfully new Spec: %+v", broker.Namespace, broker.Name, patched.Spec) + } + return nil +} + +// Process a broker, create a ConfigMap representing the ChannelTemplate +// and point the Config to it. +// Returns non-empty patch bytes if a patch is necessary. +func processBroker(ctx context.Context, broker v1alpha1.Broker) ([]byte, error) { + logger := logging.FromContext(ctx) + if broker.Spec.ChannelTemplate == nil || broker.Spec.ChannelTemplate.Kind == "" { + logger.Infof("Broker \"%s/%s\" is not using channeltemplate, skipping...", broker.Namespace, broker.Name) + return []byte{}, nil + } + + modified := broker.DeepCopy() + + // If the Broker already has a Config, don't modify it, just nil out the channel template. + if broker.Spec.Config == nil || broker.Spec.Config.Name == "" { + cm, err := createConfigMap(ctx, broker) + if err != nil { + return []byte{}, err + } + modified.Spec.Config = cm + } + modified.Spec.ChannelTemplate = nil + + patch, err := duck.CreateMergePatch(broker, modified) + if err != nil { + logger.Warnf("Failed to create patch for \"%s/%s\" : %v", broker.Namespace, broker.Name, err) + return []byte{}, err + } + logger.Infof("Patch for \"%s/%s\": %q", broker.Namespace, broker.Name, string(patch)) + // If there is nothing to patch, we are good, just return. + // Empty patch is {}, hence we check for that. + if len(patch) <= 2 { + return []byte{}, nil + } + return patch, nil +} + +func createConfigMap(ctx context.Context, broker v1alpha1.Broker) (*duckv1.KReference, error) { + // Generating the spec portion is a bit goofy cause we have turn the runtime raw into + // a yaml blob that we stick into the configmap (with proper indentation). + data := "" + if broker.Spec.ChannelTemplate.Spec != nil { + bytes, err := yaml.JSONToYAML(broker.Spec.ChannelTemplate.Spec.Raw) + if err != nil { + return nil, err + } + logging.FromContext(ctx).Infof("BYTES: %q", string(bytes)) + data = fmt.Sprintf(` +apiVersion: %q +kind: %q +spec: + %s +`, broker.Spec.ChannelTemplate.APIVersion, broker.Spec.ChannelTemplate.Kind, strings.ReplaceAll(strings.TrimSpace(string(bytes)), "\n", "\n ")) + } else { + data = fmt.Sprintf(` +apiVersion: %q +kind: %q +`, broker.Spec.ChannelTemplate.APIVersion, broker.Spec.ChannelTemplate.Kind) + } + + cm := &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "broker-upgrade-auto-gen-config-" + broker.Name, + Namespace: broker.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(&broker), + }, + }, + Data: map[string]string{"channelTemplateSpec": data}, + } + + logging.FromContext(ctx).Infof("Creating configmap: %+v", cm) + _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(broker.Namespace).Create(cm) + if err != nil { + logging.FromContext(ctx).Errorf("Failed to create broker config map \"%s/%s\": %v", broker.Namespace, broker.Name, err) + return nil, err + } + return &duckv1.KReference{ + APIVersion: "v1", + Kind: "ConfigMap", + Namespace: broker.Namespace, + Name: "broker-upgrade-auto-gen-config-" + broker.Name, + }, nil +} diff --git a/pkg/upgrader/broker/v0.15.0/upgrader_test.go b/pkg/upgrader/broker/v0.15.0/upgrader_test.go new file mode 100644 index 00000000000..3cbe1fd582a --- /dev/null +++ b/pkg/upgrader/broker/v0.15.0/upgrader_test.go @@ -0,0 +1,422 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broker + +import ( + "context" + "fmt" + "reflect" + "strings" + + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + types "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" + clientgotesting "k8s.io/client-go/testing" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" + "knative.dev/eventing/pkg/client/clientset/versioned/scheme" + versionedscheme "knative.dev/eventing/pkg/client/clientset/versioned/scheme" + fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" + duckv1 "knative.dev/pkg/apis/duck/v1" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/kmeta" + + _ "knative.dev/eventing/pkg/client/clientset/versioned" + _ "knative.dev/pkg/reconciler/testing" +) + +const ( + testns = "testnamespace" + testns2 = "testnamespace2" + testbroker = "testbroker" + imcSpec = ` +apiVersion: "messaging.knative.dev/v1alpha1" +kind: "InMemoryChannel" +` + kafkaSpec = ` +apiVersion: "messaging.knative.dev/v1alpha1" +kind: "KafkaChannel" +spec: + numPartitions: 3 + replicationFactor: 1 +` + + patchbytesFmt = "{\"spec\":{\"channelTemplateSpec\":null,\"config\":{\"apiVersion\":\"v1\",\"kind\":\"ConfigMap\",\"name\":\"broker-upgrade-auto-gen-config-%s\",\"namespace\":\"%s\"}}}" +) + +var ( + noPatch = []byte{} + + imc = &messagingv1beta1.ChannelTemplateSpec{ + TypeMeta: v1.TypeMeta{ + Kind: "InMemoryChannel", + APIVersion: "messaging.knative.dev/v1alpha1", + }, + } + kafka = &messagingv1beta1.ChannelTemplateSpec{ + TypeMeta: v1.TypeMeta{ + Kind: "KafkaChannel", + APIVersion: "messaging.knative.dev/v1alpha1", + }, + Spec: &runtime.RawExtension{ + Raw: []byte(`{"numPartitions": 3, "replicationFactor": 1}`), + }, + } + ignoreLastTransitionTime = cmp.FilterPath(func(p cmp.Path) bool { + return strings.HasSuffix(p.String(), "LastTransitionTime.Inner.Time") + }, cmp.Ignore()) + + safeDeployDiff = cmpopts.IgnoreUnexported(resource.Quantity{}) +) + +func TestUpgrade(t *testing.T) { + brokers := []runtime.Object{ + broker("b1", testns2, imc, nil), + broker("b2", testns, nil, config("b2", testns)), + broker("b3", testns, kafka, nil), + broker("b4", testns, nil, config("b4", testns)), + } + + ctx, cs := fakeeventingclient.With(context.Background(), brokers...) + kc := fake.NewSimpleClientset( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns}}, + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns2}}, + ) + + ctx = context.WithValue(ctx, kubeclient.Key{}, kc) + + var creates []*corev1.ConfigMap + kc.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + creates = append(creates, action.(clientgotesting.CreateAction).GetObject().(*corev1.ConfigMap)) + // Not important what we ret, it's just logged + return true, &corev1.ConfigMap{}, nil + }) + + var patches []clientgotesting.PatchAction + cs.PrependReactor("patch", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + patches = append(patches, action.(clientgotesting.PatchAction)) + // Not important what we ret, it's just logged + return true, broker("b1", testns, nil, nil), nil + }) + err := Upgrade(ctx) + if err != nil { + t.Errorf("Failed to process namespace: %v", err) + } + checkPatches(t, []string{"b3", "b1"}, patches, [][]byte{patchbytes("b3", testns), patchbytes("b1", testns2)}) + checkCreates(t, creates, []*corev1.ConfigMap{configMap("b3", testns, kafkaSpec), configMap("b1", testns2, imcSpec)}) + +} + +func TestUpgradeListFails(t *testing.T) { + brokers := []runtime.Object{ + broker("b1", testns2, imc, nil), + } + + ctx, _ := fakeeventingclient.With(context.Background(), brokers...) + kc := fake.NewSimpleClientset( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns}}, + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns2}}, + ) + + ctx = context.WithValue(ctx, kubeclient.Key{}, kc) + kc.PrependReactor("list", "namespaces", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("inducing failure for %s %s", action.GetVerb(), action.GetResource().Resource) + }) + err := Upgrade(ctx) + if err == nil { + t.Errorf("processNamespace did not fail") + } + +} + +func TestProcessNamespace(t *testing.T) { + brokers := []runtime.Object{ + broker("b1", testns, imc, nil), + broker("b2", testns, nil, config("b2", testns)), + broker("b3", testns, kafka, nil), + broker("b4", testns, nil, config("b4", testns)), + } + + ctx, cs := fakeeventingclient.With(context.Background(), brokers...) + kc := fake.NewSimpleClientset( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns}}, + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns2}}, + ) + + ctx = context.WithValue(ctx, kubeclient.Key{}, kc) + var creates []*corev1.ConfigMap + kc.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + creates = append(creates, action.(clientgotesting.CreateAction).GetObject().(*corev1.ConfigMap)) + // Not important what we ret, it's just logged + return true, &corev1.ConfigMap{}, nil + }) + + var patches []clientgotesting.PatchAction + cs.PrependReactor("patch", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + patches = append(patches, action.(clientgotesting.PatchAction)) + // Not important what we ret, it's just logged + return true, broker("b1", testns, nil, nil), nil + }) + err := processNamespace(ctx, testns) + if err != nil { + t.Errorf("processNamespace failed: %s", err) + } + checkPatches(t, []string{"b1", "b3"}, patches, [][]byte{patchbytes("b1", testns), patchbytes("b3", testns)}) + checkCreates(t, creates, []*corev1.ConfigMap{configMap("b1", testns, imcSpec), configMap("b3", testns, kafkaSpec)}) +} + +func TestProcessNamespaceListFails(t *testing.T) { + brokers := []runtime.Object{ + broker("b1", testns2, imc, nil), + broker("b2", testns, nil, config("b2", testns)), + broker("b3", testns, kafka, nil), + broker("b4", testns, nil, config("b4", testns)), + } + + ctx, cs := fakeeventingclient.With(context.Background(), brokers...) + kc := fake.NewSimpleClientset( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns}}, + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns2}}, + ) + + ctx = context.WithValue(ctx, kubeclient.Key{}, kc) + cs.PrependReactor("list", "brokers", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("inducing failure for %s %s", action.GetVerb(), action.GetResource().Resource) + }) + err := processNamespace(ctx, testns) + if err == nil { + t.Errorf("processNamespace did not fail") + } +} + +func TestProcessNamespacePatchFails(t *testing.T) { + brokers := []runtime.Object{ + broker("b1", testns2, imc, nil), + broker("b2", testns, nil, config("b2", testns)), + broker("b3", testns, kafka, nil), + broker("b4", testns, nil, config("b4", testns)), + } + + ctx, cs := fakeeventingclient.With(context.Background(), brokers...) + kc := fake.NewSimpleClientset( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns}}, + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns2}}, + ) + + ctx = context.WithValue(ctx, kubeclient.Key{}, kc) + cs.PrependReactor("patch", "brokers", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("inducing failure for %s %s", action.GetVerb(), action.GetResource().Resource) + }) + err := processNamespace(ctx, testns) + if err == nil { + t.Errorf("processNamespace did not fail") + } +} + +func TestUpgradeCreateConfigMapFails(t *testing.T) { + brokers := []runtime.Object{ + broker("b1", testns2, imc, nil), + broker("b2", testns, nil, config("b2", testns)), + broker("b3", testns, kafka, nil), + broker("b4", testns, nil, config("b4", testns)), + } + + ctx, cs := fakeeventingclient.With(context.Background(), brokers...) + kc := fake.NewSimpleClientset( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns}}, + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns2}}, + ) + + ctx = context.WithValue(ctx, kubeclient.Key{}, kc) + var creates []*corev1.ConfigMap + kc.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + creates = append(creates, action.(clientgotesting.CreateAction).GetObject().(*corev1.ConfigMap)) + // Not important what we ret, it's just logged + return true, &corev1.ConfigMap{}, nil + }) + kc.PrependReactor("create", "configmaps", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + if action.(clientgotesting.CreateAction).GetNamespace() == testns2 { + return true, nil, fmt.Errorf("inducing failure for %s %s", action.GetVerb(), action.GetResource().Resource) + } + return false, nil, nil + }) + var patches []clientgotesting.PatchAction + cs.PrependReactor("patch", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + patches = append(patches, action.(clientgotesting.PatchAction)) + // Not important what we ret, it's just logged + return true, broker("b1", testns, nil, nil), nil + }) + err := Upgrade(ctx) + if err == nil { + t.Errorf("Upgrade did not fail") + } + checkPatches(t, []string{"b3"}, patches, [][]byte{patchbytes("b3", testns)}) + checkCreates(t, creates, []*corev1.ConfigMap{configMap("b3", testns, kafkaSpec)}) +} + +func TestProcessBroker(t *testing.T) { + testcases := map[string]struct { + in *v1alpha1.Broker + expectPatch []byte + wantCreate []*corev1.ConfigMap + }{ + "nilchanneltemplate": { + &v1alpha1.Broker{ObjectMeta: metav1.ObjectMeta{Name: testbroker}}, + noPatch, + nil, + }, + "empty": { + broker(testbroker, testns, &messagingv1beta1.ChannelTemplateSpec{}, nil), + noPatch, + nil, + }, + "imc, create and nil out channel template": { + broker(testbroker, testns, imc, nil), + patchbytes(testbroker, testns), + []*corev1.ConfigMap{configMap(testbroker, testns, imcSpec)}, + }, + "kafka, create and nil out channel template": { + broker(testbroker, testns, kafka, nil), + patchbytes(testbroker, testns), + []*corev1.ConfigMap{configMap(testbroker, testns, kafkaSpec)}, + }, + "imc + existing config, only nil out channel template": { + broker(testbroker, testns, imc, config(testbroker, testns)), + []byte("{\"spec\":{\"channelTemplateSpec\":null}}"), + nil, + }, + "imc config, only nil out channel template": { + broker(testbroker, testns, nil, config(testbroker, testns)), + noPatch, + nil, + }, + } + + for _, tc := range testcases { + ctx, _ := fakeeventingclient.With(context.Background(), tc.in) + kc := fake.NewSimpleClientset( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns}}, + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testns2}}, + ) + ctx = context.WithValue(ctx, kubeclient.Key{}, kc) + var creates []*corev1.ConfigMap + kc.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + creates = append(creates, action.(clientgotesting.CreateAction).GetObject().(*corev1.ConfigMap)) + // Not important what we ret, it's just logged + return true, &corev1.ConfigMap{}, nil + }) + + patch, err := processBroker(ctx, *tc.in) + if err != nil { + t.Errorf("Failed to process broker: %v", err) + } + if !reflect.DeepEqual(patch, tc.expectPatch) { + t.Errorf("Patches differ : want: %q got: %q", tc.expectPatch, patch) + } + checkCreates(t, creates, tc.wantCreate) + } +} + +func broker(name, namespace string, template *messagingv1beta1.ChannelTemplateSpec, config *duckv1.KReference) *v1alpha1.Broker { + return &v1alpha1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.BrokerSpec{ + ChannelTemplate: template, + Config: config, + }, + } +} + +func config(name, namespace string) *duckv1.KReference { + return &duckv1.KReference{ + Kind: "ConfigMap", + APIVersion: "v1", + Name: "broker-upgrade-auto-gen-config-" + name, + Namespace: namespace, + } + +} + +func configMap(name, namespace string, spec string) *corev1.ConfigMap { + return &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "broker-upgrade-auto-gen-config-" + name, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(&v1alpha1.Broker{ + TypeMeta: metav1.TypeMeta{ + Kind: "Broker", + APIVersion: "eventing.knative.dev/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }), + }, + }, + Data: map[string]string{"channelTemplateSpec": spec}, + } + +} + +func patchbytes(name, namespace string) []byte { + return []byte(fmt.Sprintf(patchbytesFmt, name, namespace)) +} + +func init() { + versionedscheme.AddToScheme(scheme.Scheme) +} + +func checkCreates(t *testing.T, creates []*corev1.ConfigMap, want []*corev1.ConfigMap) { + if diff := cmp.Diff(want, creates, safeDeployDiff, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Unexpected configmap differences: %s", diff) + } +} + +func checkPatches(t *testing.T, names []string, patches []clientgotesting.PatchAction, want [][]byte) { + if len(want) != len(patches) { + t.Errorf("mismatch in number of expectations, want %d got %d", len(want), len(patches)) + return + } + for i, p := range patches { + if p.GetPatchType() != types.MergePatchType { + t.Errorf("expected patchtype %+v, got %+v", types.MergePatchType, p.GetPatchType()) + } + if p.GetName() != names[i] { + t.Errorf("expected patch to %q, got %q", names[i], p.GetName()) + } + if diff := cmp.Diff(want[i], p.GetPatch(), safeDeployDiff, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Unexpected patch differences: %s", diff) + } + } +}