From 3886b9c311cdadaa9479e9621e09ca08cb045482 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Fri, 24 Jul 2020 14:39:12 +0300 Subject: [PATCH 1/7] KafkaChannel reconciler to use v1beta1 api shape --- kafka/channel/README.md | 4 +- kafka/channel/pkg/dispatcher/dispatcher.go | 2 +- .../pkg/dispatcher/dispatcher_it_test.go | 2 +- .../channel/pkg/dispatcher/dispatcher_test.go | 2 +- .../pkg/reconciler/controller/controller.go | 4 +- .../pkg/reconciler/controller/kafkachannel.go | 26 ++++----- .../controller/kafkachannel_test.go | 12 ++-- .../controller/resources/service.go | 4 +- .../controller/resources/service_test.go | 8 +-- .../pkg/reconciler/dispatcher/kafkachannel.go | 55 ++++++++----------- .../pkg/reconciler/testing/kafkachannel.go | 38 ++++++------- .../channel/pkg/reconciler/testing/listers.go | 6 +- 12 files changed, 78 insertions(+), 85 deletions(-) diff --git a/kafka/channel/README.md b/kafka/channel/README.md index 7257ea5ce9..073c6954a8 100644 --- a/kafka/channel/README.md +++ b/kafka/channel/README.md @@ -42,7 +42,7 @@ topics. 1. Create the `KafkaChannel` custom objects: ```yaml - apiVersion: messaging.knative.dev/v1alpha1 + apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel metadata: name: my-kafka-channel @@ -119,7 +119,7 @@ data: Then create a KafkaChannel: ```yaml -apiVersion: messaging.knative.dev/v1alpha1 +apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel metadata: name: my-kafka-channel diff --git a/kafka/channel/pkg/dispatcher/dispatcher.go b/kafka/channel/pkg/dispatcher/dispatcher.go index 1a4684a1ef..8318f86989 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher.go +++ b/kafka/channel/pkg/dispatcher/dispatcher.go @@ -30,7 +30,7 @@ import ( "go.opencensus.io/trace" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" eventingchannels "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/kncloudevents" diff --git a/kafka/channel/pkg/dispatcher/dispatcher_it_test.go b/kafka/channel/pkg/dispatcher/dispatcher_it_test.go index 336a82523f..4898eecbad 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher_it_test.go +++ b/kafka/channel/pkg/dispatcher/dispatcher_it_test.go @@ -31,7 +31,7 @@ import ( protocolhttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/cloudevents/sdk-go/v2/test" "go.uber.org/zap" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/apis" diff --git a/kafka/channel/pkg/dispatcher/dispatcher_test.go b/kafka/channel/pkg/dispatcher/dispatcher_test.go index 045713e9d3..3784775173 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher_test.go +++ b/kafka/channel/pkg/dispatcher/dispatcher_test.go @@ -33,7 +33,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" eventingchannels "knative.dev/eventing/pkg/channel" _ "knative.dev/pkg/system/testing" diff --git a/kafka/channel/pkg/reconciler/controller/controller.go b/kafka/channel/pkg/reconciler/controller/controller.go index ba89415d54..dee26676a3 100644 --- a/kafka/channel/pkg/reconciler/controller/controller.go +++ b/kafka/channel/pkg/reconciler/controller/controller.go @@ -40,8 +40,8 @@ import ( "knative.dev/pkg/system" kafkaChannelClient "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/client" - "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/informers/messaging/v1alpha1/kafkachannel" - kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1/kafkachannel" + "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" + kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" eventingClient "knative.dev/eventing/pkg/client/injection/client" ) diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel.go b/kafka/channel/pkg/reconciler/controller/kafkachannel.go index 2f7b815997..db30765b19 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel.go @@ -48,11 +48,11 @@ import ( "knative.dev/pkg/controller" pkgreconciler "knative.dev/pkg/reconciler" - "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" + "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1" kafkaclientset "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned" kafkaScheme "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned/scheme" - kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1/kafkachannel" - listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1" + kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" + listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1beta1" "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/controller/resources" "knative.dev/eventing-contrib/kafka/channel/pkg/utils" ) @@ -139,7 +139,7 @@ type envConfig struct { var _ kafkaChannelReconciler.Interface = (*Reconciler)(nil) var _ kafkaChannelReconciler.Finalizer = (*Reconciler)(nil) -func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChannel) pkgreconciler.Event { +func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { kc.Status.InitializeConditions() logger := logging.FromContext(ctx) @@ -247,7 +247,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChanne return newReconciledNormal(kc.Namespace, kc.Name) } -func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, dispatcherNamespace string, kc *v1alpha1.KafkaChannel) (*appsv1.Deployment, error) { +func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, dispatcherNamespace string, kc *v1beta1.KafkaChannel) (*appsv1.Deployment, error) { if scope == scopeNamespace { // Configure RBAC in namespace to access the configmaps sa, err := r.reconcileServiceAccount(ctx, dispatcherNamespace, kc) @@ -311,7 +311,7 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp return d, nil } -func (r *Reconciler) reconcileServiceAccount(ctx context.Context, dispatcherNamespace string, kc *v1alpha1.KafkaChannel) (*corev1.ServiceAccount, error) { +func (r *Reconciler) reconcileServiceAccount(ctx context.Context, dispatcherNamespace string, kc *v1beta1.KafkaChannel) (*corev1.ServiceAccount, error) { sa, err := r.serviceAccountLister.ServiceAccounts(dispatcherNamespace).Get(dispatcherName) if err != nil { if apierrs.IsNotFound(err) { @@ -332,7 +332,7 @@ func (r *Reconciler) reconcileServiceAccount(ctx context.Context, dispatcherName return sa, err } -func (r *Reconciler) reconcileRoleBinding(ctx context.Context, name string, ns string, kc *v1alpha1.KafkaChannel, clusterRoleName string, sa *corev1.ServiceAccount) (*rbacv1.RoleBinding, error) { +func (r *Reconciler) reconcileRoleBinding(ctx context.Context, name string, ns string, kc *v1beta1.KafkaChannel, clusterRoleName string, sa *corev1.ServiceAccount) (*rbacv1.RoleBinding, error) { rb, err := r.roleBindingLister.RoleBindings(ns).Get(name) if err != nil { if apierrs.IsNotFound(err) { @@ -352,7 +352,7 @@ func (r *Reconciler) reconcileRoleBinding(ctx context.Context, name string, ns s return rb, err } -func (r *Reconciler) reconcileDispatcherService(ctx context.Context, dispatcherNamespace string, kc *v1alpha1.KafkaChannel) (*corev1.Service, error) { +func (r *Reconciler) reconcileDispatcherService(ctx context.Context, dispatcherNamespace string, kc *v1beta1.KafkaChannel) (*corev1.Service, error) { svc, err := r.serviceLister.Services(dispatcherNamespace).Get(dispatcherName) if err != nil { if apierrs.IsNotFound(err) { @@ -380,7 +380,7 @@ func (r *Reconciler) reconcileDispatcherService(ctx context.Context, dispatcherN return svc, nil } -func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherNamespace string, channel *v1alpha1.KafkaChannel) (*corev1.Service, error) { +func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherNamespace string, channel *v1beta1.KafkaChannel) (*corev1.Service, error) { logger := logging.FromContext(ctx) // Get the Service and propagate the status to the Channel in case it does not exist. // We don't do anything with the service because it's status contains nothing useful, so just do @@ -425,7 +425,7 @@ func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherName return svc, nil } -func (r *Reconciler) createClient(ctx context.Context, kc *v1alpha1.KafkaChannel) (sarama.ClusterAdmin, error) { +func (r *Reconciler) createClient(ctx context.Context, kc *v1beta1.KafkaChannel) (sarama.ClusterAdmin, error) { // We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time. // This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162. // Once the issue is fixed we should use a shared cluster admin client. Also, r.kafkaClusterAdmin is currently @@ -441,7 +441,7 @@ func (r *Reconciler) createClient(ctx context.Context, kc *v1alpha1.KafkaChannel return kafkaClusterAdmin, nil } -func (r *Reconciler) createTopic(ctx context.Context, channel *v1alpha1.KafkaChannel, kafkaClusterAdmin sarama.ClusterAdmin) error { +func (r *Reconciler) createTopic(ctx context.Context, channel *v1beta1.KafkaChannel, kafkaClusterAdmin sarama.ClusterAdmin) error { logger := logging.FromContext(ctx) topicName := utils.TopicName(utils.KafkaChannelSeparator, channel.Namespace, channel.Name) @@ -460,7 +460,7 @@ func (r *Reconciler) createTopic(ctx context.Context, channel *v1alpha1.KafkaCha return err } -func (r *Reconciler) deleteTopic(ctx context.Context, channel *v1alpha1.KafkaChannel, kafkaClusterAdmin sarama.ClusterAdmin) error { +func (r *Reconciler) deleteTopic(ctx context.Context, channel *v1beta1.KafkaChannel, kafkaClusterAdmin sarama.ClusterAdmin) error { logger := logging.FromContext(ctx) topicName := utils.TopicName(utils.KafkaChannelSeparator, channel.Namespace, channel.Name) @@ -488,7 +488,7 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co r.kafkaConfigError = err } -func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1alpha1.KafkaChannel) pkgreconciler.Event { +func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { // Do not attempt retrying creating the client because it might be a permanent error // in which case the finalizer will never get removed. if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.kafkaConfig != nil { diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go index 33b625a185..0630d89cc9 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go @@ -35,7 +35,7 @@ import ( eventingClient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/pkg/utils" - duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" + duckv1 "knative.dev/pkg/apis/duck/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -44,9 +44,9 @@ import ( logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" - "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" + "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1" fakekafkaclient "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/client/fake" - "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1/kafkachannel" + "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/controller/resources" reconcilekafkatesting "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/testing" reconcilertesting "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/testing" @@ -68,8 +68,8 @@ var ( func init() { // Add types to scheme - _ = v1alpha1.AddToScheme(scheme.Scheme) - _ = duckv1alpha1.AddToScheme(scheme.Scheme) + _ = v1beta1.AddToScheme(scheme.Scheme) + _ = duckv1.AddToScheme(scheme.Scheme) } func TestAllCases(t *testing.T) { @@ -595,7 +595,7 @@ func makeService() *corev1.Service { return resources.MakeDispatcherService(testNS) } -func makeChannelService(nc *v1alpha1.KafkaChannel) *corev1.Service { +func makeChannelService(nc *v1beta1.KafkaChannel) *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", diff --git a/kafka/channel/pkg/reconciler/controller/resources/service.go b/kafka/channel/pkg/reconciler/controller/resources/service.go index d267f5a647..96c6f9ffae 100644 --- a/kafka/channel/pkg/reconciler/controller/resources/service.go +++ b/kafka/channel/pkg/reconciler/controller/resources/service.go @@ -21,7 +21,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" + "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1" "knative.dev/eventing/pkg/utils" "knative.dev/pkg/kmeta" ) @@ -60,7 +60,7 @@ func ExternalService(namespace, service string) ServiceOption { // MakeK8sService creates a new K8s Service for a Channel resource. It also sets the appropriate // OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it. // As well as being garbage collected when the Channel is deleted. -func MakeK8sService(kc *v1alpha1.KafkaChannel, opts ...ServiceOption) (*corev1.Service, error) { +func MakeK8sService(kc *v1beta1.KafkaChannel, opts ...ServiceOption) (*corev1.Service, error) { // Add annotations svc := &corev1.Service{ TypeMeta: metav1.TypeMeta{ diff --git a/kafka/channel/pkg/reconciler/controller/resources/service_test.go b/kafka/channel/pkg/reconciler/controller/resources/service_test.go index 9db94d7268..3b90e2c2fc 100644 --- a/kafka/channel/pkg/reconciler/controller/resources/service_test.go +++ b/kafka/channel/pkg/reconciler/controller/resources/service_test.go @@ -24,7 +24,7 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" + "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1" "knative.dev/pkg/kmeta" ) @@ -49,7 +49,7 @@ func TestMakeChannelServiceAddress(t *testing.T) { } func TestMakeService(t *testing.T) { - imc := &v1alpha1.KafkaChannel{ + imc := &v1beta1.KafkaChannel{ ObjectMeta: metav1.ObjectMeta{ Name: kcName, Namespace: testNS, @@ -92,7 +92,7 @@ func TestMakeService(t *testing.T) { } func TestMakeServiceWithExternal(t *testing.T) { - imc := &v1alpha1.KafkaChannel{ + imc := &v1beta1.KafkaChannel{ ObjectMeta: metav1.ObjectMeta{ Name: kcName, Namespace: testNS, @@ -130,7 +130,7 @@ func TestMakeServiceWithExternal(t *testing.T) { } func TestMakeServiceWithFailingOption(t *testing.T) { - imc := &v1alpha1.KafkaChannel{ + imc := &v1beta1.KafkaChannel{ ObjectMeta: metav1.ObjectMeta{ Name: kcName, Namespace: testNS, diff --git a/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go b/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go index c36f1da1e6..145c756bf1 100644 --- a/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go @@ -26,9 +26,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" - eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" - eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/eventing/pkg/apis/eventing" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/logging" @@ -38,13 +37,13 @@ import ( "knative.dev/pkg/injection" pkgreconciler "knative.dev/pkg/reconciler" - "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" + "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1" kafkaclientset "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned" kafkaScheme "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned/scheme" kafkaclientsetinjection "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/client" - "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/informers/messaging/v1alpha1/kafkachannel" - kafkachannelreconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1/kafkachannel" - listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1" + "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" + kafkachannelreconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" + listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1beta1" "knative.dev/eventing-contrib/kafka/channel/pkg/dispatcher" "knative.dev/eventing-contrib/kafka/channel/pkg/utils" ) @@ -142,7 +141,7 @@ func filterWithAnnotation(namespaced bool) func(obj interface{}) bool { return pkgreconciler.AnnotationFilterFunc(eventing.ScopeAnnotationKey, "cluster", true) } -func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChannel) pkgreconciler.Event { +func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { channels, err := r.kafkachannelLister.List(labels.Everything()) if err != nil { logging.FromContext(ctx).Error("Error listing kafka channels") @@ -152,7 +151,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChanne // TODO: revisit this code. Instead of reading all channels and updating consumers and hostToChannel map for all // why not just reconcile the current channel. With this the UpdateKafkaConsumers can now return SubscribableStatus // for the subscriptions on the channel that is being reconciled. - kafkaChannels := make([]*v1alpha1.KafkaChannel, 0) + kafkaChannels := make([]*v1beta1.KafkaChannel, 0) for _, channel := range channels { if channel.Status.IsReady() { kafkaChannels = append(kafkaChannels, channel) @@ -164,32 +163,26 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChanne return err } - failedSubscriptionsv1beta1, err := r.kafkaDispatcher.UpdateKafkaConsumers(config) + failedSubscriptions, err := r.kafkaDispatcher.UpdateKafkaConsumers(config) if err != nil { logging.FromContext(ctx).Error("Error updating kafka consumers in dispatcher") return err } - failedSubscriptionsv1alpha1 := make(map[eventingduckv1alpha1.SubscriberSpec]error, len(failedSubscriptionsv1beta1)) - for k, v := range failedSubscriptionsv1beta1 { - newSub := eventingduckv1alpha1.SubscriberSpec{} - newSub.ConvertFrom(context.TODO(), &k) - failedSubscriptionsv1alpha1[newSub] = v - } - kc.Status.SubscribableTypeStatus.SubscribableStatus = r.createSubscribableStatus(kc.Spec.Subscribable, failedSubscriptionsv1alpha1) - if len(failedSubscriptionsv1alpha1) > 0 { + kc.Status.SubscribableStatus = r.createSubscribableStatus(&kc.Spec.SubscribableSpec, failedSubscriptions) + if len(failedSubscriptions) > 0 { logging.FromContext(ctx).Error("Some kafka subscriptions failed to subscribe") return fmt.Errorf("Some kafka subscriptions failed to subscribe") } return nil } -func (r *Reconciler) createSubscribableStatus(subscribable *eventingduckv1alpha1.Subscribable, failedSubscriptions map[eventingduckv1alpha1.SubscriberSpec]error) *eventingduckv1alpha1.SubscribableStatus { +func (r *Reconciler) createSubscribableStatus(subscribable *eventingduckv1.SubscribableSpec, failedSubscriptions map[eventingduckv1.SubscriberSpec]error) eventingduckv1.SubscribableStatus { if subscribable == nil { - return nil + return eventingduckv1.SubscribableStatus{} } - subscriberStatus := make([]eventingduckv1beta1.SubscriberStatus, 0) + subscriberStatus := make([]eventingduckv1.SubscriberStatus, 0) for _, sub := range subscribable.Subscribers { - status := eventingduckv1beta1.SubscriberStatus{ + status := eventingduckv1.SubscriberStatus{ UID: sub.UID, ObservedGeneration: sub.Generation, Ready: corev1.ConditionTrue, @@ -200,21 +193,21 @@ func (r *Reconciler) createSubscribableStatus(subscribable *eventingduckv1alpha1 } subscriberStatus = append(subscriberStatus, status) } - return &eventingduckv1alpha1.SubscribableStatus{ + return eventingduckv1.SubscribableStatus{ Subscribers: subscriberStatus, } } // newConfigFromKafkaChannels creates a new Config from the list of kafka channels. -func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel) *dispatcher.ChannelConfig { +func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1beta1.KafkaChannel) *dispatcher.ChannelConfig { channelConfig := dispatcher.ChannelConfig{ Namespace: c.Namespace, Name: c.Name, - HostName: c.Status.Address.Hostname, + HostName: c.Status.Address.URL.Host, } - if c.Spec.Subscribable != nil { - newSubs := make([]dispatcher.Subscription, 0, len(c.Spec.Subscribable.Subscribers)) - for _, source := range c.Spec.Subscribable.Subscribers { + if c.Spec.SubscribableSpec.Subscribers != nil { + newSubs := make([]dispatcher.Subscription, 0, len(c.Spec.SubscribableSpec.Subscribers)) + for _, source := range c.Spec.SubscribableSpec.Subscribers { // Extract retry configuration var retryConfig *kncloudevents.RetryConfig @@ -228,7 +221,7 @@ func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel) } } - subSpec := &eventingduckv1beta1.SubscriberSpec{} + subSpec := &eventingduckv1.SubscriberSpec{} _ = source.ConvertTo(context.TODO(), subSpec) newSubs = append(newSubs, dispatcher.Subscription{ @@ -243,7 +236,7 @@ func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel) } // newConfigFromKafkaChannels creates a new Config from the list of kafka channels. -func (r *Reconciler) newConfigFromKafkaChannels(channels []*v1alpha1.KafkaChannel) *dispatcher.Config { +func (r *Reconciler) newConfigFromKafkaChannels(channels []*v1beta1.KafkaChannel) *dispatcher.Config { cc := make([]dispatcher.ChannelConfig, 0) for _, c := range channels { channelConfig := r.newChannelConfigFromKafkaChannel(c) @@ -253,7 +246,7 @@ func (r *Reconciler) newConfigFromKafkaChannels(channels []*v1alpha1.KafkaChanne ChannelConfigs: cc, } } -func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.KafkaChannel) (*v1alpha1.KafkaChannel, error) { +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1beta1.KafkaChannel) (*v1beta1.KafkaChannel, error) { kc, err := r.kafkachannelLister.KafkaChannels(desired.Namespace).Get(desired.Name) if err != nil { return nil, err @@ -267,6 +260,6 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.KafkaCh existing := kc.DeepCopy() existing.Status = desired.Status - new, err := r.kafkaClientSet.MessagingV1alpha1().KafkaChannels(desired.Namespace).UpdateStatus(existing) + new, err := r.kafkaClientSet.MessagingV1beta1().KafkaChannels(desired.Namespace).UpdateStatus(existing) return new, err } diff --git a/kafka/channel/pkg/reconciler/testing/kafkachannel.go b/kafka/channel/pkg/reconciler/testing/kafkachannel.go index 42bc742ffb..24b449b6db 100644 --- a/kafka/channel/pkg/reconciler/testing/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/testing/kafkachannel.go @@ -25,21 +25,21 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" + "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1" "knative.dev/pkg/apis" ) // KafkaChannelOption enables further configuration of a KafkaChannel. -type KafkaChannelOption func(*v1alpha1.KafkaChannel) +type KafkaChannelOption func(*v1beta1.KafkaChannel) // NewKafkaChannel creates an KafkaChannel with KafkaChannelOptions. -func NewKafkaChannel(name, namespace string, ncopt ...KafkaChannelOption) *v1alpha1.KafkaChannel { - nc := &v1alpha1.KafkaChannel{ +func NewKafkaChannel(name, namespace string, ncopt ...KafkaChannelOption) *v1beta1.KafkaChannel { + nc := &v1beta1.KafkaChannel{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, }, - Spec: v1alpha1.KafkaChannelSpec{}, + Spec: v1beta1.KafkaChannelSpec{}, } for _, opt := range ncopt { opt(nc) @@ -48,77 +48,77 @@ func NewKafkaChannel(name, namespace string, ncopt ...KafkaChannelOption) *v1alp return nc } -func WithInitKafkaChannelConditions(nc *v1alpha1.KafkaChannel) { +func WithInitKafkaChannelConditions(nc *v1beta1.KafkaChannel) { nc.Status.InitializeConditions() } -func WithKafkaChannelDeleted(nc *v1alpha1.KafkaChannel) { +func WithKafkaChannelDeleted(nc *v1beta1.KafkaChannel) { deleteTime := metav1.NewTime(time.Unix(1e9, 0)) nc.ObjectMeta.SetDeletionTimestamp(&deleteTime) } func WithKafkaChannelTopicReady() KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkTopicTrue() } } func WithKafkaChannelConfigReady() KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkConfigTrue() } } func WithKafkaChannelDeploymentNotReady(reason, message string) KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkDispatcherFailed(reason, message) } } func WithKafkaChannelDeploymentReady() KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.PropagateDispatcherStatus(&appsv1.DeploymentStatus{Conditions: []appsv1.DeploymentCondition{{Type: appsv1.DeploymentAvailable, Status: corev1.ConditionTrue}}}) } } func WithKafkaChannelServicetNotReady(reason, message string) KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkServiceFailed(reason, message) } } func WithKafkaChannelServiceReady() KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkServiceTrue() } } func WithKafkaChannelChannelServicetNotReady(reason, message string) KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkChannelServiceFailed(reason, message) } } func WithKafkaChannelChannelServiceReady() KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkChannelServiceTrue() } } func WithKafkaChannelEndpointsNotReady(reason, message string) KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkEndpointsFailed(reason, message) } } func WithKafkaChannelEndpointsReady() KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.MarkEndpointsTrue() } } func WithKafkaChannelAddress(a string) KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { nc.Status.SetAddress(&apis.URL{ Scheme: "http", Host: a, @@ -127,7 +127,7 @@ func WithKafkaChannelAddress(a string) KafkaChannelOption { } func WithKafkaFinalizer(finalizerName string) KafkaChannelOption { - return func(nc *v1alpha1.KafkaChannel) { + return func(nc *v1beta1.KafkaChannel) { finalizers := sets.NewString(nc.Finalizers...) finalizers.Insert(finalizerName) nc.SetFinalizers(finalizers.List()) diff --git a/kafka/channel/pkg/reconciler/testing/listers.go b/kafka/channel/pkg/reconciler/testing/listers.go index e28541be2f..55d641256f 100644 --- a/kafka/channel/pkg/reconciler/testing/listers.go +++ b/kafka/channel/pkg/reconciler/testing/listers.go @@ -28,9 +28,9 @@ import ( fakeeventsclientset "knative.dev/eventing/pkg/client/clientset/versioned/fake" "knative.dev/pkg/reconciler/testing" - messagingv1alpha1 "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" + messagingv1beta1 "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1" fakemessagingclientset "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned/fake" - messaginglisters "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1" + messaginglisters "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1beta1" ) var clientSetSchemes = []func(*runtime.Scheme) error{ @@ -97,7 +97,7 @@ func (l *Listers) GetEndpointsLister() corev1listers.EndpointsLister { } func (l *Listers) GetKafkaChannelLister() messaginglisters.KafkaChannelLister { - return messaginglisters.NewKafkaChannelLister(l.indexerFor(&messagingv1alpha1.KafkaChannel{})) + return messaginglisters.NewKafkaChannelLister(l.indexerFor(&messagingv1beta1.KafkaChannel{})) } func (l *Listers) GetDeploymentLister() appsv1listers.DeploymentLister { From cdaaa07682450afcf2f8430cb28c8058dcb2e38b Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Thu, 6 Aug 2020 16:27:12 +0300 Subject: [PATCH 2/7] KafkaChannel v1alpha1 should use v1alpha1 subscribable duck explicitly --- .../messaging/v1alpha1/kafka_channel_defaults.go | 13 +++++++++++++ .../v1alpha1/kafka_channel_defaults_test.go | 11 +++++++++++ 2 files changed, 24 insertions(+) diff --git a/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go b/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go index dfbb96a368..dae4468f23 100644 --- a/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go +++ b/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go @@ -18,11 +18,24 @@ package v1alpha1 import ( "context" + "knative.dev/eventing/pkg/apis/messaging" "knative.dev/eventing-contrib/kafka/channel/pkg/utils" ) func (c *KafkaChannel) SetDefaults(ctx context.Context) { + // Set the duck subscription to the stored version of the duck + // we support. Reason for this is that the stored version will + // not get a chance to get modified, but for newer versions + // conversion webhook will be able to take a crack at it and + // can modify it to match the duck shape. + if c.Annotations == nil { + c.Annotations = make(map[string]string) + } + if _, ok := c.Annotations[messaging.SubscribableDuckVersionAnnotation]; !ok { + c.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1alpha1" + } + c.Spec.SetDefaults(ctx) } diff --git a/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults_test.go b/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults_test.go index d91cfbad10..01b2b3fa56 100644 --- a/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults_test.go +++ b/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults_test.go @@ -17,6 +17,8 @@ import ( "context" "testing" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing-contrib/kafka/channel/pkg/utils" "github.com/google/go-cmp/cmp" @@ -35,6 +37,9 @@ func TestKafkaChannelDefaults(t *testing.T) { "nil spec": { initial: KafkaChannel{}, expected: KafkaChannel{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1alpha1"}, + }, Spec: KafkaChannelSpec{ NumPartitions: utils.DefaultNumPartitions, ReplicationFactor: utils.DefaultReplicationFactor, @@ -48,6 +53,9 @@ func TestKafkaChannelDefaults(t *testing.T) { }, }, expected: KafkaChannel{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1alpha1"}, + }, Spec: KafkaChannelSpec{ NumPartitions: utils.DefaultNumPartitions, ReplicationFactor: testReplicationFactor, @@ -61,6 +69,9 @@ func TestKafkaChannelDefaults(t *testing.T) { }, }, expected: KafkaChannel{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{"messaging.knative.dev/subscribable": "v1alpha1"}, + }, Spec: KafkaChannelSpec{ NumPartitions: testNumPartitions, ReplicationFactor: utils.DefaultReplicationFactor, From a8befa127bcb01aac63c27ae30bde5c9b1885cc9 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 7 Aug 2020 09:45:05 +0200 Subject: [PATCH 3/7] Reenable Test single event Signed-off-by: Francesco Guardiani --- test/e2e/channel_single_event_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index 3f14f0d56d..b34d60bff6 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -26,11 +26,9 @@ import ( ) func TestSingleBinaryEventForChannel(t *testing.T) { - t.SkipNow() helpers.SingleEventForChannelTestHelper(t, cloudevents.EncodingBinary, "v1alpha1", "", channelTestRunner) } func TestSingleStructuredEventForChannel(t *testing.T) { - t.SkipNow() helpers.SingleEventForChannelTestHelper(t, cloudevents.EncodingStructured, "v1alpha1", "", channelTestRunner) } From ec6af2b3010142c07aa8f4ea5ea239f6ba91a111 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 7 Aug 2020 11:06:58 +0200 Subject: [PATCH 4/7] Fixed stuff Signed-off-by: Francesco Guardiani --- kafka/channel/pkg/dispatcher/dispatcher.go | 91 ++++---- .../pkg/dispatcher/dispatcher_it_test.go | 38 ++-- .../pkg/reconciler/dispatcher/kafkachannel.go | 26 +-- .../github.com/valyala/bytebufferpool/LICENSE | 22 ++ .../buffering/acks_before_finish_message.go | 42 ++++ .../buffering/binary_buffer_message.go | 113 ++++++++++ .../v2/binding/buffering/copy_message.go | 53 +++++ .../sdk-go/v2/binding/buffering/doc.go | 2 + .../buffering/struct_buffer_message.go | 53 +++++ .../valyala/bytebufferpool/.travis.yml | 15 ++ .../github.com/valyala/bytebufferpool/LICENSE | 22 ++ .../valyala/bytebufferpool/README.md | 21 ++ .../valyala/bytebufferpool/bytebuffer.go | 111 +++++++++ .../github.com/valyala/bytebufferpool/doc.go | 7 + .../github.com/valyala/bytebufferpool/pool.go | 151 +++++++++++++ .../channel/fanout/fanout_message_handler.go | 213 ++++++++++++++++++ vendor/modules.txt | 4 + 17 files changed, 896 insertions(+), 88 deletions(-) create mode 100644 third_party/VENDOR-LICENSE/github.com/valyala/bytebufferpool/LICENSE create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/acks_before_finish_message.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/binary_buffer_message.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/copy_message.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/doc.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/struct_buffer_message.go create mode 100644 vendor/github.com/valyala/bytebufferpool/.travis.yml create mode 100644 vendor/github.com/valyala/bytebufferpool/LICENSE create mode 100644 vendor/github.com/valyala/bytebufferpool/README.md create mode 100644 vendor/github.com/valyala/bytebufferpool/bytebuffer.go create mode 100644 vendor/github.com/valyala/bytebufferpool/doc.go create mode 100644 vendor/github.com/valyala/bytebufferpool/pool.go create mode 100644 vendor/knative.dev/eventing/pkg/channel/fanout/fanout_message_handler.go diff --git a/kafka/channel/pkg/dispatcher/dispatcher.go b/kafka/channel/pkg/dispatcher/dispatcher.go index 8318f86989..ad2b1a826d 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher.go +++ b/kafka/channel/pkg/dispatcher/dispatcher.go @@ -20,7 +20,7 @@ import ( "errors" "fmt" nethttp "net/http" - "net/url" + "strings" "sync" "sync/atomic" @@ -30,8 +30,8 @@ import ( "go.opencensus.io/trace" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1" eventingchannels "knative.dev/eventing/pkg/channel" + "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing-contrib/kafka/channel/pkg/utils" @@ -49,7 +49,7 @@ type KafkaDispatcher struct { kafkaAsyncProducer sarama.AsyncProducer channelSubscriptions map[eventingchannels.ChannelReference][]types.UID subsConsumerGroups map[types.UID]sarama.ConsumerGroup - subscriptions map[types.UID]subscription + subscriptions map[types.UID]Subscription // consumerUpdateLock must be used to update kafkaConsumers consumerUpdateLock sync.Mutex kafkaConsumerFactory kafka.KafkaConsumerGroupFactory @@ -58,6 +58,30 @@ type KafkaDispatcher struct { logger *zap.Logger } +type Subscription struct { + UID types.UID + fanout.Subscription +} + +func (sub Subscription) String() string { + var s strings.Builder + s.WriteString("UID: " + string(sub.UID)) + s.WriteRune('\n') + if sub.Subscriber != nil { + s.WriteString("Subscriber: " + sub.Subscriber.String()) + s.WriteRune('\n') + } + if sub.Reply != nil { + s.WriteString("Reply: " + sub.Reply.String()) + s.WriteRune('\n') + } + if sub.DeadLetter != nil { + s.WriteString("DeadLetter: " + sub.DeadLetter.String()) + s.WriteRune('\n') + } + return s.String() +} + func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) { conf := sarama.NewConfig() conf.Version = sarama.V2_0_0_0 @@ -74,7 +98,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat kafkaConsumerFactory: kafka.NewConsumerGroupFactory(args.Brokers, conf), channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), - subscriptions: make(map[types.UID]subscription), + subscriptions: make(map[types.UID]Subscription), kafkaAsyncProducer: producer, logger: args.Logger, topicFunc: args.TopicFunc, @@ -119,7 +143,7 @@ type KafkaDispatcherArgs struct { type consumerMessageHandler struct { logger *zap.Logger - sub subscription + sub Subscription dispatcher *eventingchannels.MessageDispatcherImpl } @@ -139,26 +163,19 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar c.logger.Debug("Going to dispatch the message", zap.String("topic", consumerMessage.Topic), - zap.Any("destination", c.sub.SubscriberURI), - zap.Any("reply", c.sub.ReplyURI), - zap.Any("delivery", c.sub.Delivery), + zap.String("subscription", c.sub.String()), ) ctx, span := startTraceFromMessage(c.logger, ctx, message, consumerMessage.Topic) defer span.End() - var DLS *url.URL - if c.sub.Delivery != nil && c.sub.Delivery.DeadLetterSink != nil && c.sub.Delivery.DeadLetterSink.URI != nil { - DLS = (*url.URL)(c.sub.Delivery.DeadLetterSink.URI) - } - err := c.dispatcher.DispatchMessageWithRetries( ctx, message, nil, - (*url.URL)(c.sub.SubscriberURI), - (*url.URL)(c.sub.ReplyURI), - DLS, + c.sub.Subscriber, + c.sub.Reply, + c.sub.DeadLetter, c.sub.RetryConfig, ) @@ -180,19 +197,8 @@ type ChannelConfig struct { Subscriptions []Subscription } -type Subscription struct { - eventingduck.SubscriberSpec - RetryConfig *kncloudevents.RetryConfig -} - -type subscription struct { - Subscription - Namespace string - Name string -} - // UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller. -func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[eventingduck.SubscriberSpec]error, error) { +func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]error, error) { if config == nil { return nil, fmt.Errorf("nil config") } @@ -201,20 +207,19 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[eventingduck defer d.consumerUpdateLock.Unlock() var newSubs []types.UID - failedToSubscribe := make(map[eventingduck.SubscriberSpec]error) + failedToSubscribe := make(map[types.UID]error) for _, cc := range config.ChannelConfigs { channelRef := eventingchannels.ChannelReference{ Name: cc.Name, Namespace: cc.Namespace, } for _, subSpec := range cc.Subscriptions { - sub := newSubscription(subSpec, string(subSpec.UID), cc.Namespace) - newSubs = append(newSubs, sub.UID) + newSubs = append(newSubs, subSpec.UID) // Check if sub already exists exists := false for _, s := range d.channelSubscriptions[channelRef] { - if s == sub.UID { + if s == subSpec.UID { exists = true } } @@ -222,8 +227,8 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[eventingduck if !exists { // only subscribe when not exists in channel-subscriptions map // do not need to resubscribe every time channel fanout config is updated - if err := d.subscribe(channelRef, sub); err != nil { - failedToSubscribe[subSpec.SubscriberSpec] = err + if err := d.subscribe(channelRef, subSpec); err != nil { + failedToSubscribe[subSpec.UID] = err } } } @@ -316,11 +321,11 @@ func (d *KafkaDispatcher) Start(ctx context.Context) error { // subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine. // subscribe must be called under updateLock. -func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub subscription) error { - d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) +func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub Subscription) error { + d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID)) topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name) - groupID := fmt.Sprintf("kafka.%s.%s.%s", sub.Namespace, channelRef.Name, sub.Name) + groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID)) handler := &consumerMessageHandler{d.logger, sub, d.dispatcher} @@ -349,8 +354,8 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference // unsubscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine. // unsubscribe must be called under updateLock. -func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub subscription) error { - d.logger.Info("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub)) +func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error { + d.logger.Info("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String())) delete(d.subscriptions, sub.UID) if subsSlice, ok := d.channelSubscriptions[channel]; ok { var newSlice []types.UID @@ -385,14 +390,6 @@ func (d *KafkaDispatcher) getChannelReferenceFromHost(host string) (eventingchan return cr, nil } -func newSubscription(sub Subscription, name string, namespace string) subscription { - return subscription{ - Subscription: sub, - Name: name, - Namespace: namespace, - } -} - func startTraceFromMessage(logger *zap.Logger, inCtx context.Context, message *protocolkafka.Message, topic string) (context.Context, *trace.Span) { sc, ok := parseSpanContext(message.Headers) if !ok { diff --git a/kafka/channel/pkg/dispatcher/dispatcher_it_test.go b/kafka/channel/pkg/dispatcher/dispatcher_it_test.go index 4898eecbad..2dcb9aec7f 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher_it_test.go +++ b/kafka/channel/pkg/dispatcher/dispatcher_it_test.go @@ -21,6 +21,7 @@ import ( "net/http" "net/http/httptest" "net/http/httputil" + "net/url" "os" "sync" "testing" @@ -31,12 +32,10 @@ import ( protocolhttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/cloudevents/sdk-go/v2/test" "go.uber.org/zap" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/apis" - duckv1 "knative.dev/pkg/apis/duck/v1" - tracingconfig "knative.dev/pkg/tracing/config" "knative.dev/eventing-contrib/kafka/channel/pkg/utils" @@ -158,22 +157,18 @@ func TestDispatcher(t *testing.T) { HostName: "channela.svc", Subscriptions: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "aaaa", - Generation: 1, - SubscriberURI: mustParseUrl(t, transformationsServer.URL), - ReplyURI: mustParseUrl(t, channelBProxy.URL), + UID: "aaaa", + Subscription: fanout.Subscription{ + Subscriber: mustParseUrl(t, transformationsServer.URL), + Reply: mustParseUrl(t, channelBProxy.URL), }, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "cccc", - Generation: 1, - SubscriberURI: mustParseUrl(t, transformationsFailureServer.URL), - ReplyURI: mustParseUrl(t, channelBProxy.URL), - Delivery: &eventingduck.DeliverySpec{ - DeadLetterSink: &duckv1.Destination{URI: mustParseUrl(t, deadLetterServer.URL)}, - }, + UID: "cccc", + Subscription: fanout.Subscription{ + Subscriber: mustParseUrl(t, transformationsFailureServer.URL), + Reply: mustParseUrl(t, channelBProxy.URL), + DeadLetter: mustParseUrl(t, deadLetterServer.URL), }, }, }, @@ -184,10 +179,9 @@ func TestDispatcher(t *testing.T) { HostName: "channelb.svc", Subscriptions: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "bbbb", - Generation: 1, - SubscriberURI: mustParseUrl(t, receiverServer.URL), + UID: "bbbb", + Subscription: fanout.Subscription{ + Subscriber: mustParseUrl(t, receiverServer.URL), }, }, }, @@ -264,10 +258,10 @@ func createReverseProxy(t *testing.T, host string) *httputil.ReverseProxy { return &httputil.ReverseProxy{Director: director} } -func mustParseUrl(t *testing.T, str string) *apis.URL { +func mustParseUrl(t *testing.T, str string) *url.URL { url, err := apis.ParseURL(str) if err != nil { t.Fatal(err) } - return url + return url.URL() } diff --git a/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go b/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go index 145c756bf1..40509570ef 100644 --- a/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go @@ -24,8 +24,10 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/channel/fanout" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" @@ -176,7 +178,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel return nil } -func (r *Reconciler) createSubscribableStatus(subscribable *eventingduckv1.SubscribableSpec, failedSubscriptions map[eventingduckv1.SubscriberSpec]error) eventingduckv1.SubscribableStatus { +func (r *Reconciler) createSubscribableStatus(subscribable *eventingduckv1.SubscribableSpec, failedSubscriptions map[types.UID]error) eventingduckv1.SubscribableStatus { if subscribable == nil { return eventingduckv1.SubscribableStatus{} } @@ -187,7 +189,7 @@ func (r *Reconciler) createSubscribableStatus(subscribable *eventingduckv1.Subsc ObservedGeneration: sub.Generation, Ready: corev1.ConditionTrue, } - if err, ok := failedSubscriptions[sub]; ok { + if err, ok := failedSubscriptions[sub.UID]; ok { status.Ready = corev1.ConditionFalse status.Message = err.Error() } @@ -208,25 +210,11 @@ func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1beta1.KafkaChannel) * if c.Spec.SubscribableSpec.Subscribers != nil { newSubs := make([]dispatcher.Subscription, 0, len(c.Spec.SubscribableSpec.Subscribers)) for _, source := range c.Spec.SubscribableSpec.Subscribers { - - // Extract retry configuration - var retryConfig *kncloudevents.RetryConfig - if source.Delivery != nil { - delivery := &eventingduckv1.DeliverySpec{} - _ = source.Delivery.ConvertTo(context.TODO(), delivery) - - _retryConfig, err := kncloudevents.RetryConfigFromDeliverySpec(*delivery) - if err == nil { - retryConfig = &_retryConfig - } - } - - subSpec := &eventingduckv1.SubscriberSpec{} - _ = source.ConvertTo(context.TODO(), subSpec) + innerSub, _ := fanout.SubscriberSpecToFanoutConfig(source) newSubs = append(newSubs, dispatcher.Subscription{ - SubscriberSpec: *subSpec, - RetryConfig: retryConfig, + Subscription: *innerSub, + UID: source.UID, }) } channelConfig.Subscriptions = newSubs diff --git a/third_party/VENDOR-LICENSE/github.com/valyala/bytebufferpool/LICENSE b/third_party/VENDOR-LICENSE/github.com/valyala/bytebufferpool/LICENSE new file mode 100644 index 0000000000..f7c935c201 --- /dev/null +++ b/third_party/VENDOR-LICENSE/github.com/valyala/bytebufferpool/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2016 Aliaksandr Valialkin, VertaMedia + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/acks_before_finish_message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/acks_before_finish_message.go new file mode 100644 index 0000000000..4e7a33f0be --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/acks_before_finish_message.go @@ -0,0 +1,42 @@ +package buffering + +import ( + "sync/atomic" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/spec" +) + +type acksMessage struct { + binding.Message + requiredAcks int32 +} + +func (m *acksMessage) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) { + return m.Message.(binding.MessageMetadataReader).GetAttribute(k) +} + +func (m *acksMessage) GetExtension(s string) interface{} { + return m.Message.(binding.MessageMetadataReader).GetExtension(s) +} + +func (m *acksMessage) GetWrappedMessage() binding.Message { + return m.Message +} + +func (m *acksMessage) Finish(err error) error { + remainingAcks := atomic.AddInt32(&m.requiredAcks, -1) + if remainingAcks == 0 { + return m.Message.Finish(err) + } + return nil +} + +var _ binding.MessageWrapper = (*acksMessage)(nil) + +// WithAcksBeforeFinish returns a wrapper for m that calls m.Finish() +// only after the specified number of acks are received. +// Use it when you need to dispatch a Message using several Sender instances +func WithAcksBeforeFinish(m binding.Message, requiredAcks int) binding.Message { + return &acksMessage{Message: m, requiredAcks: int32(requiredAcks)} +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/binary_buffer_message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/binary_buffer_message.go new file mode 100644 index 0000000000..356b0975e2 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/binary_buffer_message.go @@ -0,0 +1,113 @@ +package buffering + +import ( + "bytes" + "context" + "io" + + "github.com/valyala/bytebufferpool" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/spec" +) + +var binaryMessagePool bytebufferpool.Pool + +// binaryBufferedMessage implements a binary-mode message as a simple struct. +// This message implementation is used by CopyMessage and BufferMessage +type binaryBufferedMessage struct { + version spec.Version + metadata map[spec.Attribute]interface{} + extensions map[string]interface{} + body *bytebufferpool.ByteBuffer +} + +func (m *binaryBufferedMessage) Start(ctx context.Context) error { + m.metadata = make(map[spec.Attribute]interface{}, 4) + m.extensions = make(map[string]interface{}) + return nil +} + +func (m *binaryBufferedMessage) ReadEncoding() binding.Encoding { + return binding.EncodingBinary +} + +func (m *binaryBufferedMessage) ReadStructured(context.Context, binding.StructuredWriter) error { + return binding.ErrNotStructured +} + +func (m *binaryBufferedMessage) ReadBinary(ctx context.Context, b binding.BinaryWriter) (err error) { + for k, v := range m.metadata { + err = b.SetAttribute(k, v) + if err != nil { + return + } + } + for k, v := range m.extensions { + err = b.SetExtension(k, v) + if err != nil { + return + } + } + if m.body != nil { + err = b.SetData(bytes.NewReader(m.body.Bytes())) + if err != nil { + return + } + } + return nil +} + +func (m *binaryBufferedMessage) Finish(error) error { + if m.body != nil { + binaryMessagePool.Put(m.body) + } + return nil +} + +// Binary Encoder +func (m *binaryBufferedMessage) SetData(data io.Reader) error { + buf := binaryMessagePool.Get() + w, err := io.Copy(buf, data) + if err != nil { + return err + } + if w == 0 { + binaryMessagePool.Put(buf) + return nil + } + m.body = buf + return nil +} + +func (m *binaryBufferedMessage) SetAttribute(attribute spec.Attribute, value interface{}) error { + // If spec version we need to change to right context struct + m.version = attribute.Version() + m.metadata[attribute] = value + return nil +} + +func (m *binaryBufferedMessage) SetExtension(name string, value interface{}) error { + m.extensions[name] = value + return nil +} + +func (m *binaryBufferedMessage) End(ctx context.Context) error { + return nil +} + +func (m *binaryBufferedMessage) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) { + a := m.version.AttributeFromKind(k) + if a != nil { + return a, m.metadata[a] + } + return nil, nil +} + +func (m *binaryBufferedMessage) GetExtension(name string) interface{} { + return m.extensions[name] +} + +var _ binding.Message = (*binaryBufferedMessage)(nil) // Test it conforms to the interface +var _ binding.MessageMetadataReader = (*binaryBufferedMessage)(nil) +var _ binding.BinaryWriter = (*binaryBufferedMessage)(nil) diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/copy_message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/copy_message.go new file mode 100644 index 0000000000..6740839ac3 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/copy_message.go @@ -0,0 +1,53 @@ +package buffering + +import ( + "context" + + "github.com/cloudevents/sdk-go/v2/binding" +) + +// BufferMessage works the same as CopyMessage and it also bounds the original Message +// lifecycle to the newly created message: calling Finish() on the returned message calls m.Finish(). +// transformers can be nil and this function guarantees that they are invoked only once during the encoding process. +func BufferMessage(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + result, err := CopyMessage(ctx, m, transformers...) + if err != nil { + return nil, err + } + return binding.WithFinish(result, func(err error) { _ = m.Finish(err) }), nil +} + +// CopyMessage reads m once and creates an in-memory copy depending on the encoding of m. +// The returned copy is not dependent on any transport and can be visited many times. +// When the copy can be forgot, the copied message must be finished with Finish() message to release the memory. +// transformers can be nil and this function guarantees that they are invoked only once during the encoding process. +func CopyMessage(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + originalMessageEncoding := m.ReadEncoding() + + if originalMessageEncoding == binding.EncodingUnknown { + return nil, binding.ErrUnknownEncoding + } + if originalMessageEncoding == binding.EncodingEvent { + e, err := binding.ToEvent(ctx, m, transformers...) + if err != nil { + return nil, err + } + return (*binding.EventMessage)(e), nil + } + + sm := structBufferedMessage{} + bm := binaryBufferedMessage{} + + encoding, err := binding.DirectWrite(ctx, m, &sm, &bm, transformers...) + if encoding == binding.EncodingStructured { + return &sm, err + } else if encoding == binding.EncodingBinary { + return &bm, err + } else { + e, err := binding.ToEvent(ctx, m, transformers...) + if err != nil { + return nil, err + } + return (*binding.EventMessage)(e), nil + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/doc.go new file mode 100644 index 0000000000..fefa83c3b0 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/doc.go @@ -0,0 +1,2 @@ +// Package buffering provides APIs for buffered messages. +package buffering diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/struct_buffer_message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/struct_buffer_message.go new file mode 100644 index 0000000000..90adb75702 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/buffering/struct_buffer_message.go @@ -0,0 +1,53 @@ +package buffering + +import ( + "bytes" + "context" + "io" + + "github.com/valyala/bytebufferpool" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" +) + +var structMessagePool bytebufferpool.Pool + +// structBufferedMessage implements a structured-mode message as a simple struct. +// This message implementation is used by CopyMessage and BufferMessage +type structBufferedMessage struct { + Format format.Format + Bytes *bytebufferpool.ByteBuffer +} + +func (m *structBufferedMessage) ReadEncoding() binding.Encoding { + return binding.EncodingStructured +} + +// Structured copies structured data to a StructuredWriter +func (m *structBufferedMessage) ReadStructured(ctx context.Context, enc binding.StructuredWriter) error { + return enc.SetStructuredEvent(ctx, m.Format, bytes.NewReader(m.Bytes.B)) +} + +// Binary returns ErrNotBinary +func (m structBufferedMessage) ReadBinary(context.Context, binding.BinaryWriter) error { + return binding.ErrNotBinary +} + +func (m *structBufferedMessage) Finish(error) error { + structMessagePool.Put(m.Bytes) + return nil +} + +func (m *structBufferedMessage) SetStructuredEvent(ctx context.Context, format format.Format, event io.Reader) error { + m.Bytes = structMessagePool.Get() + _, err := io.Copy(m.Bytes, event) + if err != nil { + return err + } + m.Format = format + return nil +} + +var _ binding.Message = (*structBufferedMessage)(nil) // Test it conforms to the interface +var _ binding.StructuredWriter = (*structBufferedMessage)(nil) // Test it conforms to the interface diff --git a/vendor/github.com/valyala/bytebufferpool/.travis.yml b/vendor/github.com/valyala/bytebufferpool/.travis.yml new file mode 100644 index 0000000000..6a6ec2eb06 --- /dev/null +++ b/vendor/github.com/valyala/bytebufferpool/.travis.yml @@ -0,0 +1,15 @@ +language: go + +go: + - 1.6 + +script: + # build test for supported platforms + - GOOS=linux go build + - GOOS=darwin go build + - GOOS=freebsd go build + - GOOS=windows go build + - GOARCH=386 go build + + # run tests on a standard platform + - go test -v ./... diff --git a/vendor/github.com/valyala/bytebufferpool/LICENSE b/vendor/github.com/valyala/bytebufferpool/LICENSE new file mode 100644 index 0000000000..f7c935c201 --- /dev/null +++ b/vendor/github.com/valyala/bytebufferpool/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2016 Aliaksandr Valialkin, VertaMedia + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/valyala/bytebufferpool/README.md b/vendor/github.com/valyala/bytebufferpool/README.md new file mode 100644 index 0000000000..061357e833 --- /dev/null +++ b/vendor/github.com/valyala/bytebufferpool/README.md @@ -0,0 +1,21 @@ +[![Build Status](https://travis-ci.org/valyala/bytebufferpool.svg)](https://travis-ci.org/valyala/bytebufferpool) +[![GoDoc](https://godoc.org/github.com/valyala/bytebufferpool?status.svg)](http://godoc.org/github.com/valyala/bytebufferpool) +[![Go Report](http://goreportcard.com/badge/valyala/bytebufferpool)](http://goreportcard.com/report/valyala/bytebufferpool) + +# bytebufferpool + +An implementation of a pool of byte buffers with anti-memory-waste protection. + +The pool may waste limited amount of memory due to fragmentation. +This amount equals to the maximum total size of the byte buffers +in concurrent use. + +# Benchmark results +Currently bytebufferpool is fastest and most effective buffer pool written in Go. + +You can find results [here](https://omgnull.github.io/go-benchmark/buffer/). + +# bytebufferpool users + +* [fasthttp](https://github.com/valyala/fasthttp) +* [quicktemplate](https://github.com/valyala/quicktemplate) diff --git a/vendor/github.com/valyala/bytebufferpool/bytebuffer.go b/vendor/github.com/valyala/bytebufferpool/bytebuffer.go new file mode 100644 index 0000000000..07a055a2df --- /dev/null +++ b/vendor/github.com/valyala/bytebufferpool/bytebuffer.go @@ -0,0 +1,111 @@ +package bytebufferpool + +import "io" + +// ByteBuffer provides byte buffer, which can be used for minimizing +// memory allocations. +// +// ByteBuffer may be used with functions appending data to the given []byte +// slice. See example code for details. +// +// Use Get for obtaining an empty byte buffer. +type ByteBuffer struct { + + // B is a byte buffer to use in append-like workloads. + // See example code for details. + B []byte +} + +// Len returns the size of the byte buffer. +func (b *ByteBuffer) Len() int { + return len(b.B) +} + +// ReadFrom implements io.ReaderFrom. +// +// The function appends all the data read from r to b. +func (b *ByteBuffer) ReadFrom(r io.Reader) (int64, error) { + p := b.B + nStart := int64(len(p)) + nMax := int64(cap(p)) + n := nStart + if nMax == 0 { + nMax = 64 + p = make([]byte, nMax) + } else { + p = p[:nMax] + } + for { + if n == nMax { + nMax *= 2 + bNew := make([]byte, nMax) + copy(bNew, p) + p = bNew + } + nn, err := r.Read(p[n:]) + n += int64(nn) + if err != nil { + b.B = p[:n] + n -= nStart + if err == io.EOF { + return n, nil + } + return n, err + } + } +} + +// WriteTo implements io.WriterTo. +func (b *ByteBuffer) WriteTo(w io.Writer) (int64, error) { + n, err := w.Write(b.B) + return int64(n), err +} + +// Bytes returns b.B, i.e. all the bytes accumulated in the buffer. +// +// The purpose of this function is bytes.Buffer compatibility. +func (b *ByteBuffer) Bytes() []byte { + return b.B +} + +// Write implements io.Writer - it appends p to ByteBuffer.B +func (b *ByteBuffer) Write(p []byte) (int, error) { + b.B = append(b.B, p...) + return len(p), nil +} + +// WriteByte appends the byte c to the buffer. +// +// The purpose of this function is bytes.Buffer compatibility. +// +// The function always returns nil. +func (b *ByteBuffer) WriteByte(c byte) error { + b.B = append(b.B, c) + return nil +} + +// WriteString appends s to ByteBuffer.B. +func (b *ByteBuffer) WriteString(s string) (int, error) { + b.B = append(b.B, s...) + return len(s), nil +} + +// Set sets ByteBuffer.B to p. +func (b *ByteBuffer) Set(p []byte) { + b.B = append(b.B[:0], p...) +} + +// SetString sets ByteBuffer.B to s. +func (b *ByteBuffer) SetString(s string) { + b.B = append(b.B[:0], s...) +} + +// String returns string representation of ByteBuffer.B. +func (b *ByteBuffer) String() string { + return string(b.B) +} + +// Reset makes ByteBuffer.B empty. +func (b *ByteBuffer) Reset() { + b.B = b.B[:0] +} diff --git a/vendor/github.com/valyala/bytebufferpool/doc.go b/vendor/github.com/valyala/bytebufferpool/doc.go new file mode 100644 index 0000000000..e511b7c593 --- /dev/null +++ b/vendor/github.com/valyala/bytebufferpool/doc.go @@ -0,0 +1,7 @@ +// Package bytebufferpool implements a pool of byte buffers +// with anti-fragmentation protection. +// +// The pool may waste limited amount of memory due to fragmentation. +// This amount equals to the maximum total size of the byte buffers +// in concurrent use. +package bytebufferpool diff --git a/vendor/github.com/valyala/bytebufferpool/pool.go b/vendor/github.com/valyala/bytebufferpool/pool.go new file mode 100644 index 0000000000..8bb4134dd0 --- /dev/null +++ b/vendor/github.com/valyala/bytebufferpool/pool.go @@ -0,0 +1,151 @@ +package bytebufferpool + +import ( + "sort" + "sync" + "sync/atomic" +) + +const ( + minBitSize = 6 // 2**6=64 is a CPU cache line size + steps = 20 + + minSize = 1 << minBitSize + maxSize = 1 << (minBitSize + steps - 1) + + calibrateCallsThreshold = 42000 + maxPercentile = 0.95 +) + +// Pool represents byte buffer pool. +// +// Distinct pools may be used for distinct types of byte buffers. +// Properly determined byte buffer types with their own pools may help reducing +// memory waste. +type Pool struct { + calls [steps]uint64 + calibrating uint64 + + defaultSize uint64 + maxSize uint64 + + pool sync.Pool +} + +var defaultPool Pool + +// Get returns an empty byte buffer from the pool. +// +// Got byte buffer may be returned to the pool via Put call. +// This reduces the number of memory allocations required for byte buffer +// management. +func Get() *ByteBuffer { return defaultPool.Get() } + +// Get returns new byte buffer with zero length. +// +// The byte buffer may be returned to the pool via Put after the use +// in order to minimize GC overhead. +func (p *Pool) Get() *ByteBuffer { + v := p.pool.Get() + if v != nil { + return v.(*ByteBuffer) + } + return &ByteBuffer{ + B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)), + } +} + +// Put returns byte buffer to the pool. +// +// ByteBuffer.B mustn't be touched after returning it to the pool. +// Otherwise data races will occur. +func Put(b *ByteBuffer) { defaultPool.Put(b) } + +// Put releases byte buffer obtained via Get to the pool. +// +// The buffer mustn't be accessed after returning to the pool. +func (p *Pool) Put(b *ByteBuffer) { + idx := index(len(b.B)) + + if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold { + p.calibrate() + } + + maxSize := int(atomic.LoadUint64(&p.maxSize)) + if maxSize == 0 || cap(b.B) <= maxSize { + b.Reset() + p.pool.Put(b) + } +} + +func (p *Pool) calibrate() { + if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) { + return + } + + a := make(callSizes, 0, steps) + var callsSum uint64 + for i := uint64(0); i < steps; i++ { + calls := atomic.SwapUint64(&p.calls[i], 0) + callsSum += calls + a = append(a, callSize{ + calls: calls, + size: minSize << i, + }) + } + sort.Sort(a) + + defaultSize := a[0].size + maxSize := defaultSize + + maxSum := uint64(float64(callsSum) * maxPercentile) + callsSum = 0 + for i := 0; i < steps; i++ { + if callsSum > maxSum { + break + } + callsSum += a[i].calls + size := a[i].size + if size > maxSize { + maxSize = size + } + } + + atomic.StoreUint64(&p.defaultSize, defaultSize) + atomic.StoreUint64(&p.maxSize, maxSize) + + atomic.StoreUint64(&p.calibrating, 0) +} + +type callSize struct { + calls uint64 + size uint64 +} + +type callSizes []callSize + +func (ci callSizes) Len() int { + return len(ci) +} + +func (ci callSizes) Less(i, j int) bool { + return ci[i].calls > ci[j].calls +} + +func (ci callSizes) Swap(i, j int) { + ci[i], ci[j] = ci[j], ci[i] +} + +func index(n int) int { + n-- + n >>= minBitSize + idx := 0 + for n > 0 { + n >>= 1 + idx++ + } + if idx >= steps { + idx = steps - 1 + } + return idx +} diff --git a/vendor/knative.dev/eventing/pkg/channel/fanout/fanout_message_handler.go b/vendor/knative.dev/eventing/pkg/channel/fanout/fanout_message_handler.go new file mode 100644 index 0000000000..3b79fe6a8e --- /dev/null +++ b/vendor/knative.dev/eventing/pkg/channel/fanout/fanout_message_handler.go @@ -0,0 +1,213 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package fanout provides an http.Handler that takes in one request and fans it out to N other +// requests, based on a list of Subscriptions. Logically, it represents all the Subscriptions to a +// single Knative Channel. +// It will normally be used in conjunction with multichannelfanout.MessageHandler, which contains multiple +// fanout.MessageHandler, each corresponding to a single Knative Channel. +package fanout + +import ( + "context" + "errors" + nethttp "net/http" + "net/url" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/buffering" + "go.opencensus.io/trace" + "go.uber.org/zap" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/channel" + "knative.dev/eventing/pkg/kncloudevents" +) + +const ( + defaultTimeout = 15 * time.Minute +) + +type Subscription struct { + Subscriber *url.URL + Reply *url.URL + DeadLetter *url.URL + RetryConfig *kncloudevents.RetryConfig +} + +// Config for a fanout.MessageHandler. +type Config struct { + Subscriptions []Subscription `json:"subscriptions"` + // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. + // It is expected to be false when used as a sidecar. + AsyncHandler bool `json:"asyncHandler,omitempty"` +} + +// MessageHandler is a http.Handler that takes a single request in and fans it out to N other servers. +type MessageHandler struct { + config Config + + receiver *channel.MessageReceiver + dispatcher channel.MessageDispatcher + + // TODO: Plumb context through the receiver and dispatcher and use that to store the timeout, + // rather than a member variable. + timeout time.Duration + + logger *zap.Logger +} + +// NewMessageHandler creates a new fanout.MessageHandler. +func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDispatcher, config Config) (*MessageHandler, error) { + handler := &MessageHandler{ + logger: logger, + config: config, + dispatcher: messageDispatcher, + timeout: defaultTimeout, + } + // The receiver function needs to point back at the handler itself, so set it up after + // initialization. + receiver, err := channel.NewMessageReceiver(createMessageReceiverFunction(handler), logger) + if err != nil { + return nil, err + } + handler.receiver = receiver + + return handler, nil +} + +func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscription, error) { + var destination *url.URL + if sub.SubscriberURI != nil { + destination = sub.SubscriberURI.URL() + } + + var reply *url.URL + if sub.ReplyURI != nil { + reply = sub.ReplyURI.URL() + } + + var deadLetter *url.URL + if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil { + deadLetter = sub.Delivery.DeadLetterSink.URI.URL() + } + + var retryConfig *kncloudevents.RetryConfig + if sub.Delivery != nil { + if rc, err := kncloudevents.RetryConfigFromDeliverySpec(*sub.Delivery); err != nil { + return nil, err + } else { + retryConfig = &rc + } + } + + return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil +} + +func createMessageReceiverFunction(f *MessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error { + if f.config.AsyncHandler { + return func(ctx context.Context, _ channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error { + if len(f.config.Subscriptions) == 0 { + // Nothing to do here, finish the message and return + _ = message.Finish(nil) + return nil + } + + parentSpan := trace.FromContext(ctx) + // Message buffering here is done before starting the dispatch goroutine + // Because the message could be closed before the buffering happens + bufferedMessage, err := buffering.CopyMessage(ctx, message, transformers...) + if err != nil { + return err + } + // We don't need the original message anymore + _ = message.Finish(nil) + go func(m binding.Message, h nethttp.Header, s *trace.Span) { + // Run async dispatch with background context. + ctx = trace.NewContext(context.Background(), s) + // Any returned error is already logged in f.dispatch(). + _ = f.dispatch(ctx, m, h) + }(bufferedMessage, additionalHeaders, parentSpan) + return nil + } + } + return func(ctx context.Context, _ channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error { + if len(f.config.Subscriptions) == 0 { + // Nothing to do here, finish the message and return + _ = message.Finish(nil) + return nil + } + + // We buffer the message to send it several times + bufferedMessage, err := buffering.CopyMessage(ctx, message, transformers...) + if err != nil { + return err + } + // We don't need the original message anymore + _ = message.Finish(nil) + return f.dispatch(ctx, bufferedMessage, additionalHeaders) + } +} + +func (f *MessageHandler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) { + f.receiver.ServeHTTP(response, request) +} + +// dispatch takes the event, fans it out to each subscription in f.config. If all the fanned out +// events return successfully, then return nil. Else, return an error. +func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.Message, additionalHeaders nethttp.Header) error { + subs := len(f.config.Subscriptions) + + // Bind the lifecycle of the buffered message to the number of subs + bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, subs) + + errorCh := make(chan error, subs) + for _, sub := range f.config.Subscriptions { + go func(s Subscription) { + errorCh <- f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s) + }(sub) + } + + for range f.config.Subscriptions { + select { + case err := <-errorCh: + if err != nil { + f.logger.Error("Fanout had an error", zap.Error(err)) + return err + } + case <-time.After(f.timeout): + f.logger.Error("Fanout timed out") + return errors.New("fanout timed out") + } + } + // All Subscriptions returned err = nil. + return nil +} + +// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and +// the `sink` portions of the subscription. +func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) error { + return f.dispatcher.DispatchMessageWithRetries( + ctx, + message, + additionalHeaders, + sub.Subscriber, + sub.Reply, + sub.DeadLetter, + sub.RetryConfig, + ) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index fed2400002..7576d2cd30 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -107,6 +107,7 @@ github.com/cloudevents/sdk-go/protocol/stan/v2 ## explicit github.com/cloudevents/sdk-go/v2 github.com/cloudevents/sdk-go/v2/binding +github.com/cloudevents/sdk-go/v2/binding/buffering github.com/cloudevents/sdk-go/v2/binding/format github.com/cloudevents/sdk-go/v2/binding/spec github.com/cloudevents/sdk-go/v2/binding/transformer @@ -366,6 +367,8 @@ github.com/stretchr/testify/assert github.com/stretchr/testify/require # github.com/tsenart/vegeta v12.7.1-0.20190725001342-b5f4fca92137+incompatible github.com/tsenart/vegeta/lib +# github.com/valyala/bytebufferpool v1.0.0 +github.com/valyala/bytebufferpool # github.com/xanzy/go-gitlab v0.32.0 ## explicit github.com/xanzy/go-gitlab @@ -1118,6 +1121,7 @@ knative.dev/eventing/pkg/apis/sources/v1alpha1 knative.dev/eventing/pkg/apis/sources/v1alpha2 knative.dev/eventing/pkg/apis/sources/v1beta1 knative.dev/eventing/pkg/channel +knative.dev/eventing/pkg/channel/fanout knative.dev/eventing/pkg/client/clientset/versioned knative.dev/eventing/pkg/client/clientset/versioned/fake knative.dev/eventing/pkg/client/clientset/versioned/scheme From cc27d129e9cf6979116dd0af6bedfff5c9acc438 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Fri, 7 Aug 2020 12:11:25 +0300 Subject: [PATCH 5/7] Fix test single event --- test/e2e/channel_single_event_test.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index b34d60bff6..60d0b7eadf 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -25,10 +25,18 @@ import ( "knative.dev/eventing/test/e2e/helpers" ) -func TestSingleBinaryEventForChannel(t *testing.T) { - helpers.SingleEventForChannelTestHelper(t, cloudevents.EncodingBinary, "v1alpha1", "", channelTestRunner) +func TestSingleBinaryEventForChannelV1Beta1(t *testing.T) { + helpers.SingleEventForChannelTestHelper(t, cloudevents.EncodingBinary, helpers.SubscriptionV1beta1, "", channelTestRunner) } -func TestSingleStructuredEventForChannel(t *testing.T) { - helpers.SingleEventForChannelTestHelper(t, cloudevents.EncodingStructured, "v1alpha1", "", channelTestRunner) +func TestSingleStructuredEventForChannelV1Beta1(t *testing.T) { + helpers.SingleEventForChannelTestHelper(t, cloudevents.EncodingStructured, helpers.SubscriptionV1beta1, "", channelTestRunner) +} + +func TestSingleBinaryEventForChannelV1(t *testing.T) { + helpers.SingleEventForChannelTestHelper(t, cloudevents.EncodingBinary, helpers.SubscriptionV1, "", channelTestRunner) +} + +func TestSingleStructuredEventForChannelV1(t *testing.T) { + helpers.SingleEventForChannelTestHelper(t, cloudevents.EncodingStructured, helpers.SubscriptionV1, "", channelTestRunner) } From 9c04c4d9bcaa4075b773a9febb44fc61f4ceb511 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Fri, 7 Aug 2020 12:37:09 +0300 Subject: [PATCH 6/7] Fix dispatcher unit test --- .../channel/pkg/dispatcher/dispatcher_test.go | 99 +++++++++---------- 1 file changed, 46 insertions(+), 53 deletions(-) diff --git a/kafka/channel/pkg/dispatcher/dispatcher_test.go b/kafka/channel/pkg/dispatcher/dispatcher_test.go index 3784775173..571396feee 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher_test.go +++ b/kafka/channel/pkg/dispatcher/dispatcher_test.go @@ -19,21 +19,20 @@ package dispatcher import ( "context" "errors" + "knative.dev/eventing/pkg/channel/fanout" "net/http" + "net/url" "testing" + "github.com/Shopify/sarama" "github.com/cloudevents/sdk-go/v2/binding" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "knative.dev/pkg/apis" - "github.com/Shopify/sarama" - "github.com/google/go-cmp/cmp/cmpopts" - "go.uber.org/zap" - - eventingduck "knative.dev/eventing/pkg/apis/duck/v1" eventingchannels "knative.dev/eventing/pkg/channel" _ "knative.dev/pkg/system/testing" @@ -94,7 +93,7 @@ func (d *KafkaDispatcher) checkConfigAndUpdate(config *Config) error { } func TestDispatcher_UpdateConfig(t *testing.T) { - subscriber, _ := apis.ParseURL("http://test/subscriber") + subscriber, _ := url.Parse("http://test/subscriber") testCases := []struct { name string @@ -149,15 +148,15 @@ func TestDispatcher_UpdateConfig(t *testing.T) { HostName: "a.b.c.d", Subscriptions: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-1", - SubscriberURI: subscriber, + UID: "subscription-1", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-2", - SubscriberURI: subscriber, + UID: "subscription-2", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, @@ -180,15 +179,15 @@ func TestDispatcher_UpdateConfig(t *testing.T) { HostName: "a.b.c.d", Subscriptions: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-1", - SubscriberURI: subscriber, + UID: "subscription-1", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-2", - SubscriberURI: subscriber, + UID: "subscription-2", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, @@ -203,15 +202,15 @@ func TestDispatcher_UpdateConfig(t *testing.T) { HostName: "a.b.c.d", Subscriptions: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-2", - SubscriberURI: subscriber, + UID: "subscription-2", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-3", - SubscriberURI: subscriber, + UID: "subscription-3", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, @@ -237,15 +236,15 @@ func TestDispatcher_UpdateConfig(t *testing.T) { HostName: "a.b.c.d", Subscriptions: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-1", - SubscriberURI: subscriber, + UID: "subscription-1", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-2", - SubscriberURI: subscriber, + UID: "subscription-2", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, @@ -260,9 +259,9 @@ func TestDispatcher_UpdateConfig(t *testing.T) { HostName: "a.b.c.d", Subscriptions: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-1", - SubscriberURI: subscriber, + UID: "subscription-1", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, @@ -273,15 +272,15 @@ func TestDispatcher_UpdateConfig(t *testing.T) { HostName: "e.f.g.h", Subscriptions: []Subscription{ { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-3", - SubscriberURI: subscriber, + UID: "subscription-3", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, { - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "subscription-4", - SubscriberURI: subscriber, + UID: "subscription-4", + Subscription: fanout.Subscription{ + Subscriber: subscriber, }, }, }, @@ -327,7 +326,7 @@ func TestDispatcher_UpdateConfig(t *testing.T) { kafkaConsumerFactory: &mockKafkaConsumerFactory{}, channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID), subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup), - subscriptions: make(map[types.UID]subscription), + subscriptions: make(map[types.UID]Subscription), topicFunc: utils.TopicName, logger: zaptest.NewLogger(t), } @@ -392,12 +391,9 @@ func TestSubscribeError(t *testing.T) { Namespace: "test-ns", } - subRef := subscription{ - Subscription: Subscription{ - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "test-sub", - }, - }, + subRef := Subscription{ + UID: "test-sub", + Subscription: fanout.Subscription{}, } err := d.subscribe(channelRef, subRef) if err == nil { @@ -417,12 +413,9 @@ func TestUnsubscribeUnknownSub(t *testing.T) { Namespace: "test-ns", } - subRef := subscription{ - Subscription: Subscription{ - SubscriberSpec: eventingduck.SubscriberSpec{ - UID: "test-sub", - }, - }, + subRef := Subscription{ + UID: "test-sub", + Subscription: fanout.Subscription{}, } if err := d.unsubscribe(channelRef, subRef); err != nil { t.Errorf("Unsubscribe error: %v", err) From a0f18a26c60f77d115cd31a2fc5e732638b7fb63 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Fri, 7 Aug 2020 15:29:56 +0200 Subject: [PATCH 7/7] Disabled flaky test Signed-off-by: Francesco Guardiani --- .../pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go | 1 + test/e2e/broker_redelivery_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go b/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go index dae4468f23..0e70753487 100644 --- a/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go +++ b/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_defaults.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "context" + "knative.dev/eventing/pkg/apis/messaging" "knative.dev/eventing-contrib/kafka/channel/pkg/utils" diff --git a/test/e2e/broker_redelivery_test.go b/test/e2e/broker_redelivery_test.go index 0f6db6ec09..7c527a71bf 100644 --- a/test/e2e/broker_redelivery_test.go +++ b/test/e2e/broker_redelivery_test.go @@ -60,6 +60,7 @@ func ChannelBasedBrokerCreator(channel metav1.TypeMeta, brokerClass string) help } func TestBrokerRedelivery(t *testing.T) { + t.Skip("Skipping because flaky https://github.com/knative/eventing-contrib/issues/1433") channelTestRunner.RunTests(t, testlib.FeatureRedelivery, func(t *testing.T, component metav1.TypeMeta) {