Skip to content

Commit

Permalink
[v0.22.0] Backport PR 563 (knative-extensions#233)
Browse files Browse the repository at this point in the history
* [v0.22.0] Backport of PR 563 (use of knative-extensions#215)

* Backport of (knative-sandbox#476)

Dispatcher readiness e2e test

* Parameterize Partitions created via MustCreateTopic
* Add kafkaChannel creation/get obj helper functions
* Add subscription scale readiness test
* Add explicit t.Run statement
* Move test to helper function, use channelTestRunner
* Change from subscriptionv1beta1 to subscriptionv1 (abstraction prep)
* Support subscription v1beta1 and v1
* Fix copyright header
* Change channel creation depnding on typemeta
  Remove subscription v1beta1 testing, having multiple tests run at once
  will change the dispatcher replication in parallel
* Drop unneeded parameter
* Add a blank line
* Fix linter complaints
* Add build constraint tag to mark testing consolidated channel only
* run hack/update-codegen.sh
* run hack/update-codegen.sh
* Modify build constraints for test
* Remove v1alpha1 references as requested
* Fix old reference
* Change subscription creation function reference

* Backport of (knative-sandbox#563)

Continual tests for KafkaSource for upgrade testing

* Using knative/eventing#4815 to override wathola sender transport
* Actual way of replacing an image used for wathola sender
* Reformatting
* Wathola Kafka Sender utilizing plain connection
* Implementing SUT and custom sender
* KafkaSource upgrade test
* Update knative-eventing to 8270497041f2
* Using source config template
* Export kafkaTopicEndpoint to be able to read on TOML
* wathola config reader can read both string and map[string]interface{} for sender.address
* KafkaSource upgrade test works
* Allow to influence Kafka cluster connection details
* Channel tests working
* Removing unused variable name
* Review knative/eventing#5321 changes
* Updates after review of knative/eventing#4815
* Use latest integration/eventing-sandbox/eventing-kafka/pr-563 commit
* Update deps
* Update deps
* Refactor after review
* Shorten the TypeMeta field
* Tweak the merge slices func after review

* Adding generated test images

Co-authored-by: Lukas Berk <lberk@redhat.com>

* Restore manually changed vendor files

* Skipping subscription readiness when scaling dispatcher test (knative-sandbox#573)

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

Co-authored-by: Lukas Berk <lberk@redhat.com>
Co-authored-by: Ahmed Abdalla Abdelrehim <aabdelre@redhat.com>
  • Loading branch information
3 people authored Jun 10, 2021
1 parent b6f1e2a commit 8ce9b2e
Show file tree
Hide file tree
Showing 104 changed files with 7,200 additions and 5,798 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ require (
knative.dev/networking v0.0.0-20210331064822-999a7708876c
knative.dev/pkg v0.0.0-20210331065221-952fdd90dbb0
)

replace knative.dev/eventing => github.com/openshift/knative-eventing v0.99.1-0.20210610110214-7043d7e4f943
9 changes: 5 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/onsi/gomega v1.10.2 h1:aY/nuoWlKJud2J6U0E3NWsjlg+0GtwXxgEqthRdzlcs=
github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/openshift/knative-eventing v0.99.1-0.20210610110214-7043d7e4f943 h1:qrs+/c8zUnP399NixAUlqmAQ4PH1HQPOKGwa8/811lQ=
github.com/openshift/knative-eventing v0.99.1-0.20210610110214-7043d7e4f943/go.mod h1:EmyNMt16keS1pusIL5GxxueP06nxRtl+fiZIy1Il5Ws=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
Expand All @@ -641,9 +643,10 @@ github.com/openzipkin/zipkin-go v0.2.5/go.mod h1:KpXfKdgRDnnhsxw4pNIH9Md5lyFqKUa
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.8.0 h1:Keo9qb7iRJs2voHvunFtuuYFsbWeOBh8/P9v/kVMFtw=
github.com/pelletier/go-toml v1.8.0/go.mod h1:D6yutnOGMveHEPV7VQOuvI/gXY61bv+9bAOTRnLElKs=
github.com/pelletier/go-toml/v2 v2.0.0-beta.2 h1:f/g66OWmYXmVnYL3UAhqpM9YuWKFR2vjYfFNSDQcHPQ=
github.com/pelletier/go-toml/v2 v2.0.0-beta.2/go.mod h1:+X+aW6gUj6Hda43TeYHVCIvYNG/jqY/8ZFXAeXXHl+Q=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc=
Expand Down Expand Up @@ -1276,8 +1279,6 @@ k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6 h1:+WnxoVtG8TMiudHBSEtrVL
k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73 h1:uJmqzgNWG7XyClnU/mLPBWwfKKF1K8Hf8whTseBgJcg=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/eventing v0.22.0 h1:esDaddfSmiVhLKDnUCVVunSdNeHklVEz0fsqs3NupuQ=
knative.dev/eventing v0.22.0/go.mod h1:LOG7bh0eZQkbYANcnORwke6Yy6aUu62o8GeByaOFfRQ=
knative.dev/hack v0.0.0-20210325223819-b6ab329907d3 h1:km0Rrh0T9/wA2pivQm1hqSPVwgNgGCHC2WNn3GakZmE=
knative.dev/hack v0.0.0-20210325223819-b6ab329907d3/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Do not edit! This file was generate via Makefile
FROM openshift/origin-base

ADD wathola-kafka-sender /usr/bin/wathola-kafka-sender
ENTRYPOINT ["/usr/bin/wathola-kafka-sender"]
8 changes: 4 additions & 4 deletions test/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,8 @@ function test_consolidated_channel_plain() {
install_consolidated_channel_crds || return 1
install_consolidated_sources_crds || return 1

go_test_e2e -tags=e2e,source -timeout=40m -test.parallel=${TEST_PARALLEL} ./test/e2e -channels=messaging.knative.dev/v1beta1:KafkaChannel || fail_test
go_test_e2e -tags=e2e,source -timeout=5m -test.parallel=${TEST_PARALLEL} ./test/conformance -channels=messaging.knative.dev/v1beta1:KafkaChannel -sources=sources.knative.dev/v1beta1:KafkaSource || fail_test
go_test_e2e -tags=e2e,consolidated,source -timeout=40m -test.parallel=${TEST_PARALLEL} ./test/e2e -channels=messaging.knative.dev/v1beta1:KafkaChannel || fail_test
go_test_e2e -tags=e2e,consolidated,source -timeout=5m -test.parallel=${TEST_PARALLEL} ./test/conformance -channels=messaging.knative.dev/v1beta1:KafkaChannel -sources=sources.knative.dev/v1beta1:KafkaSource || fail_test

uninstall_sources_crds || return 1
uninstall_channel_crds || return 1
Expand All @@ -506,7 +506,7 @@ function test_consolidated_channel_tls() {

install_consolidated_channel_crds || return 1

go_test_e2e -tags=e2e -timeout=40m -test.parallel=${TEST_PARALLEL} ./test/e2e -channels=messaging.knative.dev/v1beta1:KafkaChannel || fail_test
go_test_e2e -tags=e2e,consolidated -timeout=40m -test.parallel=${TEST_PARALLEL} ./test/e2e -channels=messaging.knative.dev/v1beta1:KafkaChannel || fail_test

uninstall_channel_crds || return 1
}
Expand All @@ -521,7 +521,7 @@ function test_consolidated_channel_sasl() {

install_consolidated_channel_crds || return 1

go_test_e2e -tags=e2e -timeout=40m -test.parallel=${TEST_PARALLEL} ./test/e2e -channels=messaging.knative.dev/v1beta1:KafkaChannel || fail_test
go_test_e2e -tags=e2e,consolidated -timeout=40m -test.parallel=${TEST_PARALLEL} ./test/e2e -channels=messaging.knative.dev/v1beta1:KafkaChannel || fail_test

uninstall_channel_crds || return 1
}
Expand Down
32 changes: 32 additions & 0 deletions test/e2e/channel_subscription_ready_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// +build e2e
// +build consolidated

/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"context"
"testing"

"knative.dev/eventing-kafka/test/e2e/helpers"
)

func TestChannelSubscriptionScaleReadyV1(t *testing.T) {
t.Skipf("Skipping test due to flakiness.")
helpers.ChannelSubscriptionScaleReadyHelper(context.Background(), t, channelTestRunner)
}
141 changes: 141 additions & 0 deletions test/e2e/helpers/channel_subscription_ready_test_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helpers

import (
"context"
"fmt"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
channelsv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
contribtestlib "knative.dev/eventing-kafka/test/lib"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
)

const (
kafkaChannelName = "kafka-sub-ready-channel"
kafkaSub0 = "kafka-sub-0"
kafkaSub1 = "kafka-sub-1"
recordEventsPodName = "e2e-channel-sub-ready-recordevents-pod"
eventSenderName = "e2e-channel-event-sender-pod"
)

func scaleDispatcherDeployment(ctx context.Context, t *testing.T, desiredReplicas int32, client *testlib.Client) {
dispatcherDeployment, err := client.Kube.AppsV1().Deployments("knative-eventing").Get(ctx, "kafka-ch-dispatcher", metav1.GetOptions{})
if err != nil {
t.Fatalf("Unable to get kafka-ch-dispatcher deployment: %v", err)
}
if *dispatcherDeployment.Spec.Replicas != desiredReplicas {
desired := dispatcherDeployment.DeepCopy()
*desired.Spec.Replicas = desiredReplicas
_, err := client.Kube.AppsV1().Deployments("knative-eventing").Update(ctx, desired, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Unable to update kafka-ch-dispatcher deployment to %d replica: %v", desiredReplicas, err)
}
// we actively do NOT wait for deployments to be ready so we can check state below
}
}

func readyDispatcherPodsCheck(ctx context.Context, t *testing.T, client *testlib.Client) int32 {
dispatcherDeployment, err := client.Kube.AppsV1().Deployments("knative-eventing").Get(ctx, "kafka-ch-dispatcher", metav1.GetOptions{})
if err != nil {
t.Fatalf("Unable to get kafka-ch-dispatcher deployment: %v", err)
}
return dispatcherDeployment.Status.ReadyReplicas
}

func createKafkaChannel(client *testlib.Client, kafkaChannelMeta metav1.TypeMeta, kafkaChannelName string) {
kafkaChannelV1Beta1 := &channelsv1beta1.KafkaChannel{
ObjectMeta: metav1.ObjectMeta{
Name: kafkaChannelName,
},
Spec: channelsv1beta1.KafkaChannelSpec{
NumPartitions: 3,
},
}
contribtestlib.CreateKafkaChannelV1Beta1OrFail(client, kafkaChannelV1Beta1)
}

func ChannelSubscriptionScaleReadyHelper(
ctx context.Context,
t *testing.T,
channelTestRunner testlib.ComponentsTestRunner,
options ...testlib.SetupClientOption) {

eventSource := fmt.Sprintf("http://%s.svc/", eventSenderName)

channelTestRunner.RunTests(t, testlib.FeatureBasic, func(st *testing.T, kafkaChannelMeta metav1.TypeMeta) {
client := testlib.Setup(st, true, options...)
defer testlib.TearDown(client)

scaleDispatcherDeployment(ctx, st, 1, client)
createKafkaChannel(client, kafkaChannelMeta, kafkaChannelName)
client.WaitForResourceReadyOrFail(kafkaChannelName, &kafkaChannelMeta)

eventTracker, _ := recordevents.StartEventRecordOrFail(ctx, client, recordEventsPodName)
client.CreateSubscriptionOrFail(
kafkaSub0,
kafkaChannelName,
&kafkaChannelMeta,
resources.WithSubscriberForSubscription(recordEventsPodName),
)
client.WaitForAllTestResourcesReadyOrFail(ctx)

scaleDispatcherDeployment(ctx, st, 4, client)
client.WaitForResourceReadyOrFail(kafkaSub0, testlib.SubscriptionTypeMeta) //this should still be ready

client.CreateSubscriptionOrFail(
kafkaSub1,
kafkaChannelName,
&kafkaChannelMeta,
resources.WithSubscriberForSubscription(recordEventsPodName),
)
for readyDispatcherPodsCheck(ctx, st, client) < 3 {
subObj, err := client.Eventing.MessagingV1().Subscriptions(client.Namespace).Get(ctx, kafkaSub1, metav1.GetOptions{})
if err != nil {
st.Fatalf("Could not get v1 subscription object %q: %v", subObj.Name, err)
}
if subObj.Status.IsReady() {
st.Fatalf("Subscription: %s, marked ready before dispatcher pods ready", subObj.Name)
}
}
client.WaitForResourceReadyOrFail(kafkaSub1, testlib.SubscriptionTypeMeta)
// send CloudEvent to the first channel
event := cloudevents.NewEvent()
event.SetID("test")
event.SetSource(eventSource)
event.SetType(testlib.DefaultEventType)

body := fmt.Sprintf(`{"msg":"TestSingleEvent %s"}`, uuid.New().String())
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
st.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

client.SendEventToAddressable(ctx, eventSenderName, kafkaChannelName, &kafkaChannelMeta, event)
// verify the logger service receives the event
eventTracker.AssertAtLeast(1, recordevents.MatchEvent(
HasSource(eventSource),
HasData([]byte(body)),
))
})
}
4 changes: 2 additions & 2 deletions test/e2e/helpers/kafka_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func MustPublishKafkaMessageViaBinding(client *testlib.Client, selector map[stri
}
}

func MustCreateTopic(client *testlib.Client, clusterName string, clusterNamespace string, topicName string) {
func MustCreateTopic(client *testlib.Client, clusterName, clusterNamespace, topicName string, partitions int) {
obj := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": topicGVR.GroupVersion().String(),
Expand All @@ -227,7 +227,7 @@ func MustCreateTopic(client *testlib.Client, clusterName string, clusterNamespac
},
},
"spec": map[string]interface{}{
"partitions": 10,
"partitions": partitions,
"replicas": 1,
},
},
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/kafka_binding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func testKafkaBinding(t *testing.T, version string, messageKey string, messageHe

defer testlib.TearDown(client)

helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, kafkaTopicName)
helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, kafkaTopicName, 10)

t.Logf("Creating EventRecord")
eventTracker, _ := recordevents.StartEventRecordOrFail(context.Background(), client, loggerPodName)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func testKafkaSource(t *testing.T, name string, version string, messageKey strin
t.Fatalf("could not copy secret(%s): %v", kafkaTLSSecret, err)
}
}
helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, kafkaTopicName)
helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, kafkaTopicName, 10)
if len(recordEventPodName) > 63 {
recordEventPodName = recordEventPodName[:63]
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/kafka_source_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func testKafkaSourceReconciler(c *testlib.Client, name string, doAction func(c *
}

func createKafkaSourceWithSinkMissing(c *testlib.Client) {
helpers.MustCreateTopic(c, kafkaClusterName, kafkaClusterNamespace, rtKafkaTopicName)
helpers.MustCreateTopic(c, kafkaClusterName, kafkaClusterNamespace, rtKafkaTopicName, 10)

contribtestlib.CreateKafkaSourceV1Beta1OrFail(c, contribresources.KafkaSourceV1Beta1(
kafkaBootstrapUrlPlain,
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func testKafkaSourceUpdate(t *testing.T, name string, test updateTest) {
defer testlib.TearDown(client)

t.Logf("Creating topic: %s\n", defaultKafkaSource.topicName+name)
helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, defaultKafkaSource.topicName+name)
helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, defaultKafkaSource.topicName+name, 10)

t.Logf("Copying secrets: %s\n", defaultKafkaSource.topicName+name)
_, err := utils.CopySecret(client.Kube.CoreV1(), "knative-eventing", kafkaTLSSecret, client.Namespace, "default")
Expand Down Expand Up @@ -149,7 +149,7 @@ func testKafkaSourceUpdate(t *testing.T, name string, test updateTest) {
t.Fatalf("Unabled to Get kafkasource: %s/%s\n", client.Namespace, kafkaSourceName)
}
if test.topicName != defaultKafkaSource.topicName {
helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, test.topicName+name)
helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, test.topicName+name, 10)
ksObj.Spec.Topics = []string{test.topicName + name}
eventSourceName = sourcesv1beta1.KafkaEventSource(client.Namespace, kafkaSourceName, test.topicName+name)
}
Expand Down
30 changes: 30 additions & 0 deletions test/lib/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,41 @@ import (

bindingsv1alpha1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1alpha1"
bindingsv1beta1 "knative.dev/eventing-kafka/pkg/apis/bindings/v1beta1"
channelsv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
sourcesv1alpha1 "knative.dev/eventing-kafka/pkg/apis/sources/v1alpha1"
sourcesv1beta1 "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"
kafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned"
)

func CreateKafkaChannelV1Beta1OrFail(c *testlib.Client, kafkaChannel *channelsv1beta1.KafkaChannel) {
kafkaChannelClientSet, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaChannel client: %v", err)
}

kChannels := kafkaChannelClientSet.MessagingV1beta1().KafkaChannels(c.Namespace)
if createdKafkaChannel, err := kChannels.Create(context.Background(), kafkaChannel, metav1.CreateOptions{}); err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaChannel %q: %v", kafkaChannel.Name, err)
} else {
c.Tracker.AddObj(createdKafkaChannel)
}
}

func GetKafkaChannelV1Beta1OrFail(c *testlib.Client, kafkaChannel string) *channelsv1beta1.KafkaChannel {
kafkaChannelClientSet, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
c.T.Fatalf("Failed to create v1beta1 KafkaChannel client: %v", err)
}

kChannels := kafkaChannelClientSet.MessagingV1beta1().KafkaChannels(c.Namespace)
if kcObj, err := kChannels.Get(context.Background(), kafkaChannel, metav1.GetOptions{}); err != nil {
c.T.Fatalf("Failed to get v1beta1 KafkaChannel %q: %v", kafkaChannel, err)
} else {
return kcObj
}
return nil
}

func CreateKafkaSourceV1Alpha1OrFail(c *testlib.Client, kafkaSource *sourcesv1alpha1.KafkaSource) {
kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion test/lib/setupclientoptions/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func KafkaSourceV1B1ClientSetupOption(name string, kafkaClusterName string, kafk
consumerGroup = uuid.New().String()
)

helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, kafkaTopicName)
helpers.MustCreateTopic(client, kafkaClusterName, kafkaClusterNamespace, kafkaTopicName, 10)

recordevents.StartEventRecordOrFail(context.Background(), client, recordEventsPodName)

Expand Down
35 changes: 35 additions & 0 deletions test/test_images/wathola-kafka-sender/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"go.uber.org/zap"
"knative.dev/eventing-kafka/test/upgrade/continual"
"knative.dev/eventing/test/upgrade/prober/wathola/sender"
"knative.dev/pkg/signals"
)

func main() {
ctx := signals.NewContext()
log, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
es := continual.CreateKafkaSender(ctx, log.Sugar())
sender.RegisterEventSender(es)
sender.New().SendContinually()
}
Loading

0 comments on commit 8ce9b2e

Please sign in to comment.