diff --git a/Gopkg.lock b/Gopkg.lock index 707f5553..19418013 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1530,10 +1530,12 @@ input-imports = [ "github.com/Azure/azure-event-hubs-go", "github.com/cloudevents/sdk-go/v1", - "github.com/cloudevents/sdk-go/v1/cloudevents", "github.com/cloudevents/sdk-go/v1/cloudevents/client", "github.com/cloudevents/sdk-go/v1/cloudevents/transport/http", - "github.com/cloudevents/sdk-go/v1/cloudevents/types", + "github.com/cloudevents/sdk-go/v2", + "github.com/cloudevents/sdk-go/v2/binding", + "github.com/cloudevents/sdk-go/v2/event", + "github.com/cloudevents/sdk-go/v2/types", "github.com/confluentinc/confluent-kafka-go/kafka", "github.com/google/go-cmp/cmp", "github.com/pkg/errors", diff --git a/build/controller/Dockerfile b/build/controller/Dockerfile index 5185a880..470aad3f 100644 --- a/build/controller/Dockerfile +++ b/build/controller/Dockerfile @@ -27,10 +27,6 @@ else \ make build-native-controller; \ fi" -# Get The Microsoft Cert For SSL -WORKDIR /tmp-certs -RUN curl -sSL -f -k http://www.microsoft.com/pki/mscorp/Microsoft%20IT%20TLS%20CA%202.crt -o /tmp-certs/microsoft.crt - # Determine Dependencies of knative-kafka-controller that will be needed for the final image and package them into one dir WORKDIR /controller-deps RUN ldd /go/src/github.com/kyma-incubator/knative-kafka/build/controller/kafka-channel-controller \ @@ -59,8 +55,6 @@ COPY --from=builder /go/src/github.com/kyma-incubator/knative-kafka/build/contro # Copy over the dependencies COPY --from=builder /controller-deps/ / -COPY --from=builder /tmp-certs/microsoft.crt /etc/ssl/certs/microsoft.crt # Provides The SSL Cert For The Base Image To Properly Add It To The Cert Store -ENV SSL_CERT_FILE /etc/ssl/certs/microsoft.crt CMD [ "/kafka-channel-controller" ] diff --git a/cmd/channel/main.go b/cmd/channel/main.go index 5669a3f9..c007e463 100644 --- a/cmd/channel/main.go +++ b/cmd/channel/main.go @@ -3,7 +3,7 @@ package main import ( "context" "flag" - "github.com/cloudevents/sdk-go/v1/cloudevents" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/kyma-incubator/knative-kafka/pkg/channel/channel" "github.com/kyma-incubator/knative-kafka/pkg/channel/constants" "github.com/kyma-incubator/knative-kafka/pkg/channel/env" @@ -13,9 +13,9 @@ import ( kafkautil "github.com/kyma-incubator/knative-kafka/pkg/common/kafka/util" "github.com/kyma-incubator/knative-kafka/pkg/common/prometheus" "go.uber.org/zap" - eventingChannel "knative.dev/eventing/pkg/channel" - "knative.dev/eventing/pkg/kncloudevents" + eventingchannel "knative.dev/eventing/pkg/channel" "knative.dev/pkg/logging" + nethttp "net/http" ) // Variables @@ -67,31 +67,19 @@ func main() { } defer kafkaProducer.Close() - // The EventReceiver is responsible for processing the context (headers and binary/json content) of each request, - // and then passing the context, channel details, and the constructed CloudEvent event to our handleEvent() function. - eventReceiver, err := eventingChannel.NewEventReceiver(handleEvent, logger) + // Create A New Knative Eventing MessageReceiver (Parses The Channel From The Host Header) + messageReceiver, err := eventingchannel.NewMessageReceiver(handleMessage, logger, eventingchannel.ResolveMessageChannelFromHostHeader(eventingchannel.ParseChannel)) if err != nil { - logger.Fatal("Failed To Create Knative EventReceiver", zap.Error(err)) - } - - // The Knative CloudEvent Client handles the mux http server setup (middlewares and transport options) and invokes - // the eventReceiver. Although the NewEventReceiver method above will also invoke kncloudevents.NewDefaultClient - // internally, that client goes unused when using the ServeHTTP on the eventReceiver. - // - // IMPORTANT: Because the kncloudevents package does not allow injecting modified configuration, - // we can't override the default port being used (8080). - knCloudEventClient, err := kncloudevents.NewDefaultClient() - if err != nil { - logger.Fatal("Failed To Create Knative CloudEvent Client", zap.Error(err)) + logger.Fatal("Failed To Create MessageReceiver", zap.Error(err)) } // Set The Liveness Flag - Readiness Is Set By Individual Components healthServer.SetAlive(true) - // Start Receiving Events (Blocking Call :) - err = knCloudEventClient.StartReceiver(ctx, eventReceiver.ServeHTTP) + // Start The Message Receiver (Blocking) + err = messageReceiver.Start(ctx) if err != nil { - logger.Error("Failed To Start Event Receiver", zap.Error(err)) + logger.Error("Failed To Start MessageReceiver", zap.Error(err)) } // Reset The Liveness and Readiness Flags In Preparation For Shutdown @@ -104,19 +92,33 @@ func main() { healthServer.Stop(logger) } -// Handler For Receiving Cloud Events And Sending The Event To Kafka -func handleEvent(_ context.Context, channelReference eventingChannel.ChannelReference, cloudEvent cloudevents.Event) error { +// CloudEvent Message Handler - Converts To KafkaMessage And Produces To Channel's Kafka Topic +func handleMessage(ctx context.Context, channelReference eventingchannel.ChannelReference, message binding.Message, transformers []binding.Transformer, _ nethttp.Header) error { // Note - The context provided here is a different context from the one created in main() and does not have our logger instance. - logger.Debug("~~~~~~~~~~~~~~~~~~~~ Processing Request ~~~~~~~~~~~~~~~~~~~~") - logger.Debug("Received Cloud Event", zap.Any("CloudEvent", cloudEvent), zap.Any("ChannelReference", channelReference)) + logger.Debug("Received Message", zap.Any("Message", message), zap.Any("ChannelReference", channelReference)) + + // + // Convert The CloudEvents Binding Message To A CloudEvent + // + // TODO - It is potentially inefficient to take the CloudEvent binding/Message and convert it into a CloudEvent, + // just so that it can then be further transformed into a Confluent KafkaMessage. The current implementation + // is based on CloudEvent Events, however, and until a "protocol" implementation for Confluent Kafka exists + // this is the simplest path forward. Once such a protocol implementation exists, it would be more efficient + // to convert directly from the binding/Message to the protocol/Message. + // + cloudEvent, err := binding.ToEvent(ctx, message, transformers...) + if err != nil { + logger.Error("Failed To Convert Message To CloudEvent", zap.Error(err)) + return err + } // Trim The "-kafkachannel" Suffix From The Service Name (Added Only To Workaround Kyma Naming Conflict) channelReference.Name = kafkautil.TrimKafkaChannelServiceNameSuffix(channelReference.Name) // Validate The KafkaChannel Prior To Producing Kafka Message - err := channel.ValidateKafkaChannel(channelReference) + err = channel.ValidateKafkaChannel(channelReference) if err != nil { logger.Warn("Unable To Validate ChannelReference", zap.Any("ChannelReference", channelReference), zap.Error(err)) return err diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index adb901ba..2957daa5 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -5,7 +5,6 @@ import ( "flag" commonk8s "github.com/kyma-incubator/knative-kafka/pkg/common/k8s" "github.com/kyma-incubator/knative-kafka/pkg/common/prometheus" - "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/client" "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/controller" dispatch "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/dispatcher" dispatcherhealth "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/health" @@ -109,9 +108,6 @@ func main() { metricsServer := prometheus.NewMetricsServer(logger, metricsPort, "/metrics") metricsServer.Start() - // Create HTTP Client With Retry Settings - ceClient := client.NewRetriableCloudEventClient(logger, exponentialBackoff, initialRetryInterval, maxRetryTime) - // Create The Dispatcher With Specified Configuration dispatcherConfig := dispatch.DispatcherConfig{ Logger: logger, @@ -124,9 +120,11 @@ func main() { OffsetCommitDurationMinimum: MinimumKafkaConsumerOffsetCommitDurationMillis * time.Millisecond, Username: kafkaUsername, Password: kafkaPassword, - Client: ceClient, ChannelKey: channelKey, Metrics: metricsServer, + ExponentialBackoff: exponentialBackoff, + InitialRetryInterval: initialRetryInterval, + MaxRetryTime: maxRetryTime, } dispatcher = dispatch.NewDispatcher(dispatcherConfig) diff --git a/pkg/channel/message/message.go b/pkg/channel/message/message.go index d7ae35df..7c91f32a 100644 --- a/pkg/channel/message/message.go +++ b/pkg/channel/message/message.go @@ -1,8 +1,9 @@ package message import ( - "github.com/cloudevents/sdk-go/v1/cloudevents" - "github.com/cloudevents/sdk-go/v1/cloudevents/types" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/types" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/kyma-incubator/knative-kafka/pkg/channel/constants" "go.uber.org/zap" @@ -11,14 +12,7 @@ import ( ) // Create A Kafka Message From The Specified CloudEvent / Topic -func CreateKafkaMessage(logger *zap.Logger, event cloudevents.Event, kafkaTopic string) (*kafka.Message, error) { - - // Get The Event's Data Bytes - eventBytes, err := event.DataBytes() - if err != nil { - logger.Error("Failed To Get CloudEvent's DataBytes", zap.Error(err)) - return nil, err - } +func CreateKafkaMessage(logger *zap.Logger, event *event.Event, kafkaTopic string) (*kafka.Message, error) { // Get Kafka Message Headers From The Specified CloudEvent Context kafkaHeaders := getKafkaHeaders(logger, event.Context) @@ -33,7 +27,7 @@ func CreateKafkaMessage(logger *zap.Logger, event cloudevents.Event, kafkaTopic Partition: kafka.PartitionAny, // Required For Producer Level Partitioner! (see KafkaProducerConfigPropertyPartitioner) }, Key: partitionKey, - Value: eventBytes, + Value: event.Data(), Headers: kafkaHeaders, } @@ -82,7 +76,7 @@ func getKafkaHeaders(logger *zap.Logger, context cloudevents.EventContext) []kaf } // Precedence For Partitioning Is The CloudEvent PartitionKey Extension Followed By The CloudEvent Subject -func getPartitionKey(event cloudevents.Event) []byte { +func getPartitionKey(event *event.Event) []byte { // Use The CloudEvent Extensions PartitionKey If It Exists pkExtension, err := types.ToString(event.Extensions()[constants.ExtensionKeyPartitionKey]) diff --git a/pkg/channel/producer/producer.go b/pkg/channel/producer/producer.go index ce2d4749..2a017a9f 100644 --- a/pkg/channel/producer/producer.go +++ b/pkg/channel/producer/producer.go @@ -2,7 +2,7 @@ package producer import ( "errors" - "github.com/cloudevents/sdk-go/v1/cloudevents" + "github.com/cloudevents/sdk-go/v2/event" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/kyma-incubator/knative-kafka/pkg/channel/health" "github.com/kyma-incubator/knative-kafka/pkg/channel/message" @@ -62,7 +62,7 @@ var createProducerFunctionWrapper = func(brokers string, username string, passwo } // Produce A KafkaMessage From The Specified CloudEvent To The Specified Topic And Wait For The Delivery Report -func (p *Producer) ProduceKafkaMessage(event cloudevents.Event, channelReference eventingChannel.ChannelReference) error { +func (p *Producer) ProduceKafkaMessage(event *event.Event, channelReference eventingChannel.ChannelReference) error { // Validate The Kafka Producer (Must Be Pre-Initialized) if p.kafkaProducer == nil { diff --git a/pkg/channel/producer/producer_test.go b/pkg/channel/producer/producer_test.go index 639ba919..7fb374fc 100644 --- a/pkg/channel/producer/producer_test.go +++ b/pkg/channel/producer/producer_test.go @@ -1,7 +1,7 @@ package producer import ( - "github.com/cloudevents/sdk-go/v1/cloudevents" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/kyma-incubator/knative-kafka/pkg/channel/constants" channelhealth "github.com/kyma-incubator/knative-kafka/pkg/channel/health" @@ -47,7 +47,7 @@ func TestNewProducer(t *testing.T) { func TestProduceKafkaMessage(t *testing.T) { // Test Data - event := test.CreateCloudEvent(cloudevents.CloudEventsVersionV1) + cloudEvent := test.CreateCloudEvent(cloudevents.VersionV1) channelReference := test.CreateChannelReference(test.ChannelName, test.ChannelNamespace) mockProducer := test.NewMockProducer(test.TopicName) @@ -55,7 +55,7 @@ func TestProduceKafkaMessage(t *testing.T) { producer := createTestProducer(t, mockProducer) // Perform The Test & Verify Results - err := producer.ProduceKafkaMessage(event, channelReference) + err := producer.ProduceKafkaMessage(cloudEvent, channelReference) assert.Nil(t, err) // Block On The MockProducer's Channel & Verify Event Was Produced @@ -67,7 +67,7 @@ func TestProduceKafkaMessage(t *testing.T) { assert.Equal(t, kafka.PartitionAny, kafkaMessage.TopicPartition.Partition) assert.Equal(t, kafka.Offset(0), kafkaMessage.TopicPartition.Offset) assert.Nil(t, kafkaMessage.TopicPartition.Error) - test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeySpecVersion, cloudevents.CloudEventsVersionV1) + test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeySpecVersion, cloudevents.VersionV1) test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeyType, test.EventType) test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeyId, test.EventId) test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeySource, test.EventSource) diff --git a/pkg/channel/test/constants.go b/pkg/channel/test/constants.go index 680c9334..1169b789 100644 --- a/pkg/channel/test/constants.go +++ b/pkg/channel/test/constants.go @@ -1,6 +1,6 @@ package test -import cloudevents "github.com/cloudevents/sdk-go/v1" +import cloudevents "github.com/cloudevents/sdk-go/v2" // Test Data const ( diff --git a/pkg/channel/test/event.go b/pkg/channel/test/event.go index cea012b6..92fb1905 100644 --- a/pkg/channel/test/event.go +++ b/pkg/channel/test/event.go @@ -2,7 +2,7 @@ package test import ( "encoding/json" - cloudevents "github.com/cloudevents/sdk-go/v1" + "github.com/cloudevents/sdk-go/v2/event" "github.com/kyma-incubator/knative-kafka/pkg/channel/constants" ) @@ -10,15 +10,15 @@ import ( var EventDataJson, _ = json.Marshal(map[string]string{EventDataKey: EventDataValue}) // Utility Function For Creating A Test CloudEvent -func CreateCloudEvent(cloudEventVersion string) cloudevents.Event { - event := cloudevents.NewEvent(cloudEventVersion) - event.SetID(EventId) - event.SetType(EventType) - event.SetSource(EventSource) - event.SetDataContentType(EventDataContentType) - event.SetSubject(EventSubject) - event.SetDataSchema(EventDataSchema) - event.SetExtension(constants.ExtensionKeyPartitionKey, PartitionKey) - event.SetData(EventDataJson) - return event +func CreateCloudEvent(cloudEventVersion string) *event.Event { + cloudEvent := event.New(cloudEventVersion) + cloudEvent.SetID(EventId) + cloudEvent.SetType(EventType) + cloudEvent.SetSource(EventSource) + cloudEvent.SetDataContentType(EventDataContentType) + cloudEvent.SetSubject(EventSubject) + cloudEvent.SetDataSchema(EventDataSchema) + cloudEvent.SetExtension(constants.ExtensionKeyPartitionKey, PartitionKey) + _ = cloudEvent.SetData(EventDataContentType, EventDataJson) + return &cloudEvent } diff --git a/pkg/dispatcher/client/client.go b/pkg/dispatcher/client/client.go deleted file mode 100644 index 545616b6..00000000 --- a/pkg/dispatcher/client/client.go +++ /dev/null @@ -1,136 +0,0 @@ -package client - -import ( - "context" - cloudevents "github.com/cloudevents/sdk-go/v1" - cloudeventhttp "github.com/cloudevents/sdk-go/v1/cloudevents/transport/http" - "github.com/pkg/errors" - "github.com/slok/goresilience/retry" - "go.uber.org/zap" - "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/pkg/tracing" - "math" - "net/http" - "strconv" - "time" -) - -// Create a shared go http client with a timeout -var httpClient = &http.Client{ - Timeout: 30 * time.Second, -} - -// Client represents anything that can dispatch an event -// to a downstream service -type RetriableClient interface { - Dispatch(message cloudevents.Event, uri string) error -} - -// retriableCloudEventClient is a client implementation that interprets -// kafka messages as cloud events and utilizes the cloud event library -// and supports retries with exponential backoff -type retriableCloudEventClient struct { - logger *zap.Logger - exponentialBackoff bool - initialRetryInterval int64 - maxRetryTime int64 - cloudEventClient cloudevents.Client -} - -var _ RetriableClient = &retriableCloudEventClient{} - -func NewRetriableCloudEventClient(logger *zap.Logger, exponentialBackoff bool, initialRetryInterval int64, maxRetryTime int64) retriableCloudEventClient { - - // - // TODO - Previously we we're adding cloud events middleware for tracing. This implementation is still - // based on CloudEvents V1, despite being based on knative eventing release-0.14 which has moved - // on to CloudEvents V2. This will all need to be re-done once we migrate to CloudEvents V2 and - // re-evaluate their support for event tracing. - // - - tOpts := []cloudeventhttp.Option{ - cloudevents.WithBinaryEncoding(), - cloudevents.WithMiddleware(tracing.HTTPSpanMiddleware), - } - - // Make an http transport for the CloudEvents client. - transport, err := cloudevents.NewHTTPTransport(tOpts...) - if err != nil { - panic("Failed To Create Transport, " + err.Error()) - } - transport.Client = httpClient - - ceClient, err := kncloudevents.NewDefaultHTTPClient(transport) - if err != nil { - panic("Unable To Create KnativeCloudEvent Client: " + err.Error()) - } - - return retriableCloudEventClient{ - logger: logger, - exponentialBackoff: exponentialBackoff, - initialRetryInterval: initialRetryInterval, - maxRetryTime: maxRetryTime, - cloudEventClient: ceClient, - } -} - -func (rcec retriableCloudEventClient) Dispatch(event cloudevents.Event, uri string) error { - - // Configure The Logger - var logger *zap.Logger - if rcec.logger.Core().Enabled(zap.DebugLevel) { - logger = rcec.logger.With(zap.String("Event", event.String()), zap.String("uri", uri)) - } else { - logger = rcec.logger.With(zap.String("uri", uri)) - } - - // Build the runner for retry capabilities - runner := retry.New(retry.Config{DisableBackoff: rcec.exponentialBackoff, Times: rcec.calculateNumberOfRetries(), WaitBase: time.Millisecond * time.Duration(rcec.initialRetryInterval)}) - - // Build the sending context for the event - sendingCtx := cloudevents.ContextWithTarget(context.Background(), uri) - - // - // TODO - Previously we were manually adding tracing information to the HTTP conext from the event. - // It is hoped that when we convert this implementation to CloudEvents V2 that this will be - // handled for us. The AddSpanFromTraceparentAttribute() function was removed from knative - // after the release-0.13 branch. - // - //sendingCtx, err := knativeeventingtracing.AddSpanFromTraceparentAttribute(sendingCtx, uri, event) - //if err != nil { - // logger.Error("Unable to connect outgoing span", zap.Error(err)) - //} - - err := runner.Run(sendingCtx, func(ctx context.Context) error { - responseContext, _, err := rcec.cloudEventClient.Send(sendingCtx, event) - transportContext := cloudevents.HTTPTransportContextFrom(responseContext) - return logResponse(logger, transportContext.StatusCode, err) - }) - - // Retries failed - if err != nil { - logger.Error("Failed to send after configured number of retries", zap.Error(err)) - return err - } - return nil -} - -func logResponse(logger *zap.Logger, statusCode int, err error) error { - if statusCode >= 500 || statusCode == 404 || statusCode == 429 { - logger.Warn("Failed to send message to subscriber service, retrying", zap.Int("statusCode", statusCode)) - return errors.New("Server returned a bad response code: " + strconv.Itoa(statusCode)) - } else if statusCode > 299 { - logger.Warn("Failed to send message to subscriber service, not retrying", zap.Int("statusCode", statusCode)) - } else if statusCode == 0 { - return errors.Wrap(err, "Validation Error") - } else { - logger.Debug("Successfully sent message to subscriber service", zap.Int("statusCode", statusCode)) - } - return nil -} - -// Convert defined max retry time to the approximate number -// of retries, taking into account the exponential backoff algorithm -func (rcec retriableCloudEventClient) calculateNumberOfRetries() int { - return int(math.Round(math.Log2(float64(rcec.maxRetryTime)/float64(rcec.initialRetryInterval))) + 1) -} diff --git a/pkg/dispatcher/client/client_test.go b/pkg/dispatcher/client/client_test.go deleted file mode 100644 index eb657e18..00000000 --- a/pkg/dispatcher/client/client_test.go +++ /dev/null @@ -1,225 +0,0 @@ -package client - -import ( - "errors" - "fmt" - cloudevents "github.com/cloudevents/sdk-go/v1" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - logtesting "knative.dev/pkg/logging/testing" - "net/http" - "net/http/httptest" - "testing" -) - -func TestHttpClient_Dispatch(t *testing.T) { - t.Parallel() - testCases := []struct { - description string - expectedCallCount int - expectedSuccess bool - handler func(w http.ResponseWriter, r *http.Request, callCount int) - }{ - { - "Basic successful Request", - 1, - true, - func(w http.ResponseWriter, r *http.Request, callCount int) { - w.WriteHeader(http.StatusCreated) - }, - }, - { - "Test first 2 calls fail, 3rd succeeds", - 3, - true, - func(w http.ResponseWriter, r *http.Request, callCount int) { - if callCount < 3 { - w.WriteHeader(http.StatusBadGateway) - } else { - w.WriteHeader(http.StatusCreated) - } - }, - }, - { - "Test all retries fail", - 5, - false, - func(w http.ResponseWriter, r *http.Request, callCount int) { - w.WriteHeader(http.StatusNotFound) - }, - }, - { - "Test don't retry on 400", - 1, - true, - func(w http.ResponseWriter, r *http.Request, callCount int) { - w.WriteHeader(http.StatusBadRequest) - }, - }, - { - "Test do retry on 429", - 2, - true, - func(w http.ResponseWriter, r *http.Request, callCount int) { - if callCount == 1 { - w.WriteHeader(http.StatusTooManyRequests) - } else { - w.WriteHeader(http.StatusCreated) - } - }, - }, - { - "Test do retry on 404", - 2, - true, - func(w http.ResponseWriter, r *http.Request, callCount int) { - if callCount == 1 { - w.WriteHeader(http.StatusNotFound) - } else { - w.WriteHeader(http.StatusCreated) - } - }, - }, - } - - for _, tc := range testCases { - tc := tc // capture range variable - t.Run(tc.description, func(t *testing.T) { - t.Parallel() - - client, server, mux := setup(t) - defer teardown(server) - - callCount := 0 - mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { - callCount++ - tc.handler(writer, request, callCount) - }) - - testCloudEvent := cloudevents.NewEvent(cloudevents.VersionV03) - testCloudEvent.SetID("ABC-123") - testCloudEvent.SetType("com.cloudevents.readme.sent") - testCloudEvent.SetSource("http://localhost:8080/") - testCloudEvent.SetDataContentType("application/json") - err := testCloudEvent.SetData(map[string]string{"test": "value"}) - assert.Nil(t, err) - - err = client.Dispatch(testCloudEvent, server.URL) - - if tc.expectedSuccess && err != nil { - t.Error("Message failed to dispatch:", err) - } else if !tc.expectedSuccess && err == nil { - t.Error("Message should have failed to dispatch") - } - - if callCount != tc.expectedCallCount { - t.Errorf("Expected to call server %d time, was actually %d times", tc.expectedCallCount, callCount) - } - }) - } -} - -func setup(t *testing.T) (*retriableCloudEventClient, *httptest.Server, *http.ServeMux) { - // test server - mux := http.NewServeMux() - server := httptest.NewServer(mux) - client := NewRetriableCloudEventClient(logtesting.TestLogger(t).Desugar(), true, 1000, 10000) - - return &client, server, mux -} - -func teardown(server *httptest.Server) { - server.Close() -} - -func TestHttpClient_calculateNumberOfRetries(t *testing.T) { - type fields struct { - uri string - exponentialBackoff bool - initialRetryInterval int64 - maxNumberRetries int - maxRetryTime int64 - logger *zap.Logger - } - tests := []struct { - fields fields - want int - }{ - {fields{maxRetryTime: 10000, initialRetryInterval: 1000}, 4}, - {fields{maxRetryTime: 10000, initialRetryInterval: 5000}, 2}, - {fields{maxRetryTime: 17000, initialRetryInterval: 1000}, 5}, - {fields{maxRetryTime: 60000, initialRetryInterval: 5000}, 5}, - } - for _, tt := range tests { - t.Run(fmt.Sprintf("%d max retry, initial interval %d", tt.fields.maxRetryTime, tt.fields.initialRetryInterval), func(t *testing.T) { - hc := retriableCloudEventClient{ - exponentialBackoff: tt.fields.exponentialBackoff, - initialRetryInterval: tt.fields.initialRetryInterval, - maxRetryTime: tt.fields.maxRetryTime, - } - if got := hc.calculateNumberOfRetries(); got != tt.want { - t.Errorf("httpClient.calculateNumberOfRetries() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_logResponse(t *testing.T) { - - logger := logtesting.TestLogger(t).Desugar() - - type args struct { - logger *zap.Logger - statusCode int - err error - } - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "200", - args: args{ - logger: logger, - statusCode: 200, - err: nil, - }, - wantErr: false, - }, - { - name: "429", - args: args{ - logger: logger, - statusCode: 429, - err: nil, - }, - wantErr: true, - }, - { - name: "503", - args: args{ - logger: logger, - statusCode: 503, - err: nil, - }, - wantErr: true, - }, - { - name: "Validation Error", - args: args{ - logger: logger, - statusCode: 0, - err: errors.New("validation error"), - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if err := logResponse(tt.args.logger, tt.args.statusCode, tt.args.err); (err != nil) != tt.wantErr { - t.Errorf("logResponse() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/pkg/dispatcher/controller/kafkachannel.go b/pkg/dispatcher/controller/kafkachannel.go index 61a40b21..4b37e760 100644 --- a/pkg/dispatcher/controller/kafkachannel.go +++ b/pkg/dispatcher/controller/kafkachannel.go @@ -149,8 +149,9 @@ func (r Reconciler) reconcile(channel *kafkav1alpha1.KafkaChannel) error { subscriptions := make([]dispatcher.Subscription, 0) for _, subscriber := range channel.Spec.Subscribable.Subscribers { groupId := fmt.Sprintf("kafka.%s", subscriber.UID) - subscriptions = append(subscriptions, dispatcher.Subscription{URI: subscriber.SubscriberURI.String(), GroupId: groupId}) - r.Logger.Debug("Adding Subscriber, Consumer Group", zap.String("groupId", groupId), zap.Any("URI", subscriber.SubscriberURI)) + subscription := dispatcher.Subscription{SubscriberSpec: subscriber, GroupId: groupId} + subscriptions = append(subscriptions, subscription) + r.Logger.Debug("Adding New Subscriber / Consumer Group", zap.Any("Subscription", subscription)) } failedSubscriptions := r.dispatcher.UpdateSubscriptions(subscriptions) @@ -176,7 +177,7 @@ func (r *Reconciler) createSubscribableStatus(subscribable *eventingduck.Subscri Ready: corev1.ConditionTrue, } groupId := fmt.Sprintf("kafka.%s", sub.UID) - subscription := dispatcher.Subscription{URI: sub.SubscriberURI.String(), GroupId: groupId} + subscription := dispatcher.Subscription{SubscriberSpec: sub, GroupId: groupId} if err, ok := failedSubscriptions[subscription]; ok { status.Ready = corev1.ConditionFalse status.Message = err.Error() diff --git a/pkg/dispatcher/controller/kafkachannel_test.go b/pkg/dispatcher/controller/kafkachannel_test.go index 237e231b..67e42274 100644 --- a/pkg/dispatcher/controller/kafkachannel_test.go +++ b/pkg/dispatcher/controller/kafkachannel_test.go @@ -3,7 +3,6 @@ package controller import ( "github.com/confluentinc/confluent-kafka-go/kafka" kafkaconsumer "github.com/kyma-incubator/knative-kafka/pkg/common/kafka/consumer" - "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/client" "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/dispatcher" dispatchertesting "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/testing" reconciletesting "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/testing" @@ -36,6 +35,9 @@ const ( testOffsetCommitDurationMin = 50 * time.Millisecond // Small Durations For Testing! testUsername = "TestUsername" testPassword = "TestPassword" + testExponentialBackoff = false + testInitialRetryInterval = 500 + testMaxRetryTime = 5000 ) func init() { @@ -154,8 +156,6 @@ func NewTestDispatcher(t *testing.T, channelKey string) *dispatcher.Dispatcher { } defer func() { kafkaconsumer.NewConsumerWrapper = newConsumerWrapperPlaceholder }() - cloudEventClient := client.NewRetriableCloudEventClient(logger, false, 500, 5000) - // Create A New Dispatcher dispatcherConfig := dispatcher.DispatcherConfig{ Logger: logger, @@ -168,8 +168,10 @@ func NewTestDispatcher(t *testing.T, channelKey string) *dispatcher.Dispatcher { OffsetCommitDurationMinimum: testOffsetCommitDurationMin, Username: testUsername, Password: testPassword, - Client: cloudEventClient, ChannelKey: channelKey, + ExponentialBackoff: testExponentialBackoff, + InitialRetryInterval: testInitialRetryInterval, + MaxRetryTime: testMaxRetryTime, } testDispatcher := dispatcher.NewDispatcher(dispatcherConfig) return testDispatcher diff --git a/pkg/dispatcher/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher/dispatcher.go index 98cfc91d..779f409c 100644 --- a/pkg/dispatcher/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher/dispatcher.go @@ -2,12 +2,13 @@ package dispatcher import ( "errors" - cloudevents "github.com/cloudevents/sdk-go/v1" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/confluentinc/confluent-kafka-go/kafka" kafkaconsumer "github.com/kyma-incubator/knative-kafka/pkg/common/kafka/consumer" "github.com/kyma-incubator/knative-kafka/pkg/common/prometheus" - "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/client" "go.uber.org/zap" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1" + "knative.dev/eventing/pkg/channel" "strings" "sync" "time" @@ -25,14 +26,11 @@ type DispatcherConfig struct { OffsetCommitDurationMinimum time.Duration Username string Password string - Client client.RetriableClient ChannelKey string Metrics *prometheus.MetricsServer -} - -type Subscription struct { - URI string - GroupId string + ExponentialBackoff bool + InitialRetryInterval int64 + MaxRetryTime int64 } type ConsumerOffset struct { @@ -43,11 +41,17 @@ type ConsumerOffset struct { stoppedCh chan bool } +type Subscription struct { + eventingduck.SubscriberSpec + GroupId string +} + // Define a Dispatcher Struct to hold Dispatcher Config and dispatcher implementation details type Dispatcher struct { DispatcherConfig consumers map[Subscription]*ConsumerOffset consumerUpdateLock sync.Mutex + messageDispatcher channel.MessageDispatcher } // Create A New Dispatcher Of Specified Configuration @@ -55,8 +59,9 @@ func NewDispatcher(dispatcherConfig DispatcherConfig) *Dispatcher { // Create The Dispatcher With Specified Configuration dispatcher := &Dispatcher{ - DispatcherConfig: dispatcherConfig, - consumers: make(map[Subscription]*ConsumerOffset), + DispatcherConfig: dispatcherConfig, + consumers: make(map[Subscription]*ConsumerOffset), + messageDispatcher: channel.NewMessageDispatcher(dispatcherConfig.Logger), } // Return The Dispatcher @@ -72,7 +77,7 @@ func (d *Dispatcher) StopConsumers() { // Stop An Individual Consumer func (d *Dispatcher) stopConsumer(subscription Subscription) { - d.Logger.Info("Stopping Consumer", zap.String("GroupId", subscription.GroupId), zap.String("topic", d.Topic), zap.String("URI", subscription.URI)) + d.Logger.Info("Stopping Consumer", zap.String("GroupId", subscription.GroupId), zap.String("topic", d.Topic), zap.String("URI", subscription.SubscriberURI.String())) consumerOffset := d.consumers[subscription] consumerOffset.stopCh <- true // Send Stop Signal <-consumerOffset.stoppedCh // Wait Until Stop Completes @@ -83,7 +88,7 @@ func (d *Dispatcher) stopConsumer(subscription Subscription) { func (d *Dispatcher) initConsumer(subscription Subscription) (*ConsumerOffset, error) { // Create Consumer - d.Logger.Info("Creating Consumer", zap.String("GroupId", subscription.GroupId), zap.String("topic", d.Topic), zap.String("URI", subscription.URI)) + d.Logger.Info("Creating Consumer", zap.String("GroupId", subscription.GroupId), zap.String("topic", d.Topic), zap.String("URI", subscription.SubscriberURI.String())) consumer, err := kafkaconsumer.CreateConsumer(d.Brokers, subscription.GroupId, d.Offset, d.Username, d.Password) if err != nil { d.Logger.Error("Failed To Create New Consumer", zap.Error(err)) @@ -151,20 +156,23 @@ func (d *Dispatcher) handleKafkaMessages(consumerOffset ConsumerOffset, subscrip switch e := event.(type) { case *kafka.Message: - // Dispatch The Message - Send To Target URL + + // Debug Log Kafka Message Dispatching logger.Debug("Received Kafka Message - Dispatching", zap.String("Message", string(e.Value)), zap.String("Topic", *e.TopicPartition.Topic), zap.Int32("Partition", e.TopicPartition.Partition), zap.Any("Offset", e.TopicPartition.Offset)) - cloudEvent, err := convertToCloudEvent(e) + // Convert The Kafka Message To A Cloud Event + cloudEvent, err := d.convertToCloudEvent(e) if err != nil { logger.Error("Unable To Convert Kafka Message To CloudEvent, Skipping", zap.Any("message", e)) continue } - _ = d.Client.Dispatch(*cloudEvent, subscription.URI) // Ignore Errors - Dispatcher Will Retry And We're Moving On! + // Dispatch (Send!) The CloudEvent To The Subscription URL (Ignore Errors - Dispatcher Will Retry And We're Moving On!) + _ = d.Dispatch(cloudEvent, subscription) // Update Stored Offsets Based On The Processed Message d.updateOffsets(consumerOffset.consumer, e) @@ -256,7 +264,7 @@ func (d *Dispatcher) UpdateSubscriptions(subscriptions []Subscription) map[Subsc } // Stop Consumers For Removed Subscriptions - for runningSubscription, _ := range d.consumers { + for runningSubscription := range d.consumers { if !activeSubscriptions[runningSubscription] { d.stopConsumer(runningSubscription) } @@ -266,7 +274,7 @@ func (d *Dispatcher) UpdateSubscriptions(subscriptions []Subscription) map[Subsc } // Convert Kafka Message To A Cloud Event, Eventually We Should Consider Writing A Cloud Event SDK Codec For This -func convertToCloudEvent(message *kafka.Message) (*cloudevents.Event, error) { +func (d *Dispatcher) convertToCloudEvent(message *kafka.Message) (*cloudevents.Event, error) { var specVersion string // Determine CloudEvent Version @@ -283,13 +291,16 @@ func convertToCloudEvent(message *kafka.Message) (*cloudevents.Event, error) { // Generate CloudEvent event := cloudevents.NewEvent(specVersion) - event.SetData(message.Value) for _, header := range message.Headers { h := header.Key var v = string(header.Value) switch h { case "ce_datacontenttype": - event.SetDataContentType(v) + err := event.SetData(v, message.Value) + if err != nil { + d.Logger.Error("Failed To Set CloudEvent Data From Kafka Message", zap.Error(err)) + return nil, err + } case "ce_type": event.SetType(v) case "ce_source": diff --git a/pkg/dispatcher/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher/dispatcher_test.go index 59f8537a..dcd6ad46 100644 --- a/pkg/dispatcher/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher/dispatcher_test.go @@ -2,13 +2,14 @@ package dispatcher import ( "encoding/json" - cloudevents "github.com/cloudevents/sdk-go/v1" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/google/go-cmp/cmp" kafkaconsumer "github.com/kyma-incubator/knative-kafka/pkg/common/kafka/consumer" - "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/client" "github.com/stretchr/testify/assert" "io/ioutil" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1" + "knative.dev/pkg/apis" logtesting "knative.dev/pkg/logging/testing" "net/http" "net/http/httptest" @@ -36,6 +37,9 @@ const ( testUsername = "TestUsername" testPassword = "TestPassword" testChannelKey = "TestChannel" + testExponentialBackoff = false + testInitialRetryInterval = 500 + testMaxRetryTime = 5000 ) // Test Data (Non-Constants) @@ -54,18 +58,18 @@ func TestDispatcher(t *testing.T) { // Initialize The Test HTTP Server Instance & URL callCount1 := 0 - var httpServer = getHttpServer(t, &callCount1) - testSubscriberUri1 := httpServer.URL - defer httpServer.Close() + var httpServer1 = getHttpServer(t, &callCount1) + testSubscriberUrl1, _ := apis.ParseURL(httpServer1.URL) + defer httpServer1.Close() callCount2 := 0 var httpServer2 = getHttpServer(t, &callCount2) - testSubscriberUri2 := httpServer2.URL + testSubscriberUrl2, _ := apis.ParseURL(httpServer2.URL) defer httpServer2.Close() callCount3 := 0 var httpServer3 = getHttpServer(t, &callCount3) - testSubscriberUri3 := httpServer3.URL + testSubscriberUrl3, _ := apis.ParseURL(httpServer3.URL) defer httpServer3.Close() // Replace The NewClient Wrapper To Provide Mock Consumer & Defer Reset @@ -78,8 +82,6 @@ func TestDispatcher(t *testing.T) { } defer func() { kafkaconsumer.NewConsumerWrapper = newConsumerWrapperPlaceholder }() - cloudEventClient := client.NewRetriableCloudEventClient(logtesting.TestLogger(t).Desugar(), false, 500, 5000) - // Create A New Dispatcher dispatcherConfig := DispatcherConfig{ Logger: logger, @@ -92,8 +94,10 @@ func TestDispatcher(t *testing.T) { OffsetCommitDurationMinimum: testOffsetCommitDurationMin, Username: testUsername, Password: testPassword, - Client: cloudEventClient, ChannelKey: testChannelKey, + ExponentialBackoff: testExponentialBackoff, + InitialRetryInterval: testInitialRetryInterval, + MaxRetryTime: testMaxRetryTime, } testDispatcher := NewDispatcher(dispatcherConfig) @@ -112,8 +116,8 @@ func TestDispatcher(t *testing.T) { // Start 1 Consumer subscriptionResults := testDispatcher.UpdateSubscriptions([]Subscription{ { - URI: testSubscriberUri1, - GroupId: testGroupId1, + SubscriberSpec: eventingduck.SubscriberSpec{SubscriberURI: testSubscriberUrl1}, + GroupId: testGroupId1, }, }) @@ -130,16 +134,16 @@ func TestDispatcher(t *testing.T) { // Start 3 Consumers subscriptionResults = testDispatcher.UpdateSubscriptions([]Subscription{ { - URI: testSubscriberUri1, - GroupId: testGroupId1, + SubscriberSpec: eventingduck.SubscriberSpec{SubscriberURI: testSubscriberUrl1}, + GroupId: testGroupId1, }, { - URI: testSubscriberUri2, - GroupId: testGroupId2, + SubscriberSpec: eventingduck.SubscriberSpec{SubscriberURI: testSubscriberUrl2}, + GroupId: testGroupId2, }, { - URI: testSubscriberUri3, - GroupId: testGroupId3, + SubscriberSpec: eventingduck.SubscriberSpec{SubscriberURI: testSubscriberUrl3}, + GroupId: testGroupId3, }, }) @@ -161,8 +165,8 @@ func TestDispatcher(t *testing.T) { // Remove 2 Consumers subscriptionResults = testDispatcher.UpdateSubscriptions([]Subscription{ { - URI: testSubscriberUri1, - GroupId: testGroupId1, + SubscriberSpec: eventingduck.SubscriberSpec{SubscriberURI: testSubscriberUrl1}, + GroupId: testGroupId1, }, }) verifyConsumersCount(t, testDispatcher.consumers, 1) @@ -219,7 +223,7 @@ func getHttpServer(t *testing.T, callCount *int) *httptest.Server { t.Errorf("expected to be able to read HTTP request Body without error: %+v", err) } else { bodyMap := make(map[string]string) - json.Unmarshal(bodyBytes, &bodyMap) + _ = json.Unmarshal(bodyBytes, &bodyMap) // Ignore Unexpected Errors - Should Not Happen Due To Controlled Test Data if !reflect.DeepEqual(bodyMap, testValue) { t.Errorf("expected HTTP request Body: %+v got: %+v", testValue, bodyMap) } @@ -332,12 +336,11 @@ func sendMessagesToConsumers(t *testing.T, consumers map[Subscription]*ConsumerO func createKafkaMessage(offset kafka.Offset, cloudEventVersion string) *kafka.Message { testCloudEvent := createCloudEvent(cloudEventVersion) - eventBytes, _ := testCloudEvent.DataBytes() return &kafka.Message{ Key: []byte(testKey), Headers: createMessageHeaders(testCloudEvent.Context), - Value: eventBytes, + Value: testCloudEvent.Data(), Timestamp: time.Now(), TopicPartition: kafka.TopicPartition{ Topic: &topic, @@ -352,10 +355,9 @@ func createCloudEvent(cloudEventVersion string) cloudevents.Event { testCloudEvent.SetID("ABC-123") testCloudEvent.SetType("com.cloudevents.readme.sent") testCloudEvent.SetSource("http://localhost:8080/") - testCloudEvent.SetDataContentType("application/json") data, _ := json.Marshal(testValue) - testCloudEvent.SetData(data) + _ = testCloudEvent.SetData("application/json", data) // Ignore Unexpected Errors - Should Not Happen Due To Controlled Test Data return testCloudEvent } @@ -526,6 +528,28 @@ func (mc *MockConsumer) StoreOffsets(offsets []kafka.TopicPartition) (storedOffs func Test_convertToCloudEvent(t *testing.T) { + // Create A Test Logger + logger := logtesting.TestLogger(t).Desugar() + + // Create A New Dispatcher + dispatcherConfig := DispatcherConfig{ + Logger: logger, + Brokers: testBrokers, + Topic: testTopic, + Offset: testOffset, + PollTimeoutMillis: testPollTimeoutMillis, + OffsetCommitCount: testOffsetCommitCount, + OffsetCommitDuration: testOffsetCommitDuration, + OffsetCommitDurationMinimum: testOffsetCommitDurationMin, + Username: testUsername, + Password: testPassword, + ChannelKey: testChannelKey, + ExponentialBackoff: testExponentialBackoff, + InitialRetryInterval: testInitialRetryInterval, + MaxRetryTime: testMaxRetryTime, + } + testDispatcher := NewDispatcher(dispatcherConfig) + // Valid v0.3 Message validMessageV03 := createKafkaMessage(1, cloudevents.VersionV03) expectedCloudEventV03 := createCloudEvent(cloudevents.VersionV03) @@ -584,7 +608,7 @@ func Test_convertToCloudEvent(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := convertToCloudEvent(tt.args.message) + got, err := testDispatcher.convertToCloudEvent(tt.args.message) if (err != nil) != tt.wantErr { t.Errorf("convertToCloudEvent() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/dispatcher/dispatcher/retry.go b/pkg/dispatcher/dispatcher/retry.go new file mode 100644 index 00000000..721653c1 --- /dev/null +++ b/pkg/dispatcher/dispatcher/retry.go @@ -0,0 +1,170 @@ +package dispatcher + +import ( + "context" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/pkg/errors" + "github.com/slok/goresilience/retry" + "go.uber.org/zap" + "math" + "net/url" + "regexp" + "strconv" + "time" +) + +// 3 Digit Word Boundary HTTP Status Code Regular Expression +var HttpStatusCodeRegExp = regexp.MustCompile("(^|\\s)([12345]\\d{2})(\\s|$)") + +// Dispatch A Cloud Event To The Specified Destination URL +func (d *Dispatcher) Dispatch(event *cloudevents.Event, subscription Subscription) error { + + // Configure The Logger + logger := d.Logger.With(zap.String("Subscriber URI", subscription.SubscriberURI.String())) + if d.Logger.Core().Enabled(zap.DebugLevel) { + logger = d.Logger.With(zap.String("Event", event.String())) + } + + // Create A New Retry Runner With Configured Backoff Behavior + retryRunner := retry.New( + retry.Config{ + DisableBackoff: !d.ExponentialBackoff, // Note Negation: Env Var Is Whether To Enable & Retry Config Is Whether To Disable ; ) + Times: d.calculateNumberOfRetries(), + WaitBase: time.Millisecond * time.Duration(d.InitialRetryInterval), + }, + ) + + // + // Convert The CloudEvent To A binding/Message + // + // TODO - It is potentially inefficient to take the KafkaMessage which we've already turned into a Cloud Event, only + // to re-convert it into a binding/Message. The current implementation is based on CloudEvent Events, however, + // and until a "protocol" implementation for Confluent Kafka exists this is the simplest path forward. Once + // such a protocol implementation exists, it would be more efficient to convert directly from the KafkaMessage + // to a binding/Message. + // + message := binding.ToMessage(event) + + // Extract The Relevant Knative Subscription Event URLs + var destinationURL *url.URL + if !subscription.SubscriberURI.IsEmpty() { + destinationURL = subscription.SubscriberURI.URL() + } + var replyURL *url.URL + if !subscription.ReplyURI.IsEmpty() { + replyURL = subscription.ReplyURI.URL() + } + var deadLetterURL *url.URL + if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() { + // TODO - Currently ignoring dead-letter configuration due to wrapping retry implementation - would send one deadletter for every retry :( + // deadLetterURL = subscription.Delivery.DeadLetterSink.URI.URL() + logger.Warn("Subscription Delivery DeadLetterSink Not Currently Supported!") + } + + // Attempt To Dispatch The CloudEvent Message Via Knative Message Dispatcher With Retry Wrapper + err := retryRunner.Run(context.Background(), func(ctx context.Context) error { + err := d.messageDispatcher.DispatchMessage(ctx, message, nil, destinationURL, replyURL, deadLetterURL) + return d.logResponse(err) + }) + + // Retries failed + if err != nil { + logger.Error("Failed to send after configured number of retries", zap.Error(err)) + return err + } + + // Return Success + return nil +} + +// Utility Function For Logging The Response From A Dispatch Request +func (d *Dispatcher) logResponse(err error) error { + + if err == nil { + d.Logger.Debug("Successfully Sent Cloud Event To Subscription") + return nil + } else { + statusCode := d.parseHttpStatusCodeFromError(err) + logger := d.Logger.With(zap.Error(err), zap.Int("StatusCode", statusCode)) + + // + // Note - Normally we would NOT want to retry 400 responses, BUT the knative-eventing + // filter handler (due to CloudEvents SDK V1 usage) is swallowing the actual + // status codes from the subscriber and returning 400s instead. Once this has, + // been resolved we can remove 400 from the list of codes to retry. + // + if statusCode >= 500 || statusCode == 400 || statusCode == 404 || statusCode == 429 { + logger.Warn("Failed to send message to subscriber service, retrying") + return errors.New("Server returned a bad response code") + } else if statusCode >= 300 { + logger.Warn("Failed to send message to subscriber service, not retrying") + } else if statusCode == -1 { + logger.Warn("No response code detected in error, retrying") + return errors.New("No response code detected in error, retrying") + } + return nil + } +} + +// +// Parse The HTTP Status Code From The Specified Error - HACK ALERT! +// +// This is necessary due to the Knative Eventing Channel's MessageDispatcher implementation +// NOT returning the HTTP Status Code other than as part of the error string, combined with +// the DispatchMessage() functionality not supporting retry. We do, though, want to use their +// implementation in order to align with standards and take advantage of tracing logic. +// Therefore, since we are providing our own wrapping retry logic, which depends on the +// resulting HTTP Status Code, we're forced to parse it from the error text here. It is +// hoped that we can provide a better solution directly in the knative channel implementation +// in the near future. +// +func (d *Dispatcher) parseHttpStatusCodeFromError(err error) int { + + // Default Value Indicates The HTTP Status Code Was Not Found In Error + statusCode := -1 + + // Validate The Error + if err != nil && len(err.Error()) >= 3 { + + // Get The Error String Value + errString := err.Error() + + // Match/Group Any 3 Digit Status Code SubString From The Error String + statusCodeStringSubMatch := HttpStatusCodeRegExp.FindAllStringSubmatch(errString, -1) + + // If A Match Was Found + if len(statusCodeStringSubMatch) >= 1 { + + // Log Warning If Multiple Potential HTTP StatusCodes Detected + if len(statusCodeStringSubMatch) > 1 { + d.Logger.Warn("Encountered Multiple Possible HTTP Status Codes In Error String - Using First One") + } + + // Take The First Potential HTTP StatusCode (For lack anything more sophisticated ;) + statusCodeString := statusCodeStringSubMatch[0][2] + + // And Convert To An Int + code, conversionErr := strconv.Atoi(statusCodeString) + if conversionErr != nil { + // Conversion Error - Should Be Impossible Due To RegExp Match - But Log A Warning Just To Be Safe ; ) + d.Logger.Warn("Failed To Convert Parsed HTTP Status Code String To Int", zap.String("HTTP Status Code String", statusCodeString), zap.Error(conversionErr)) + } else { + statusCode = code + } + } + } + + // Return The HTTP Status Code + return statusCode +} + +// Determine the approximate number of retries that will take around maxRetryTime, +// depending on whether exponential backoff is enabled +func (d *Dispatcher) calculateNumberOfRetries() int { + if d.ExponentialBackoff { + return int(math.Round(math.Log2(float64(d.MaxRetryTime) / float64(d.InitialRetryInterval)))) + } else { + return int(d.MaxRetryTime / d.InitialRetryInterval) + } +} diff --git a/pkg/dispatcher/dispatcher/retry_test.go b/pkg/dispatcher/dispatcher/retry_test.go new file mode 100644 index 00000000..a56f74f2 --- /dev/null +++ b/pkg/dispatcher/dispatcher/retry_test.go @@ -0,0 +1,359 @@ +package dispatcher + +import ( + "fmt" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1" + "knative.dev/pkg/apis" + logtesting "knative.dev/pkg/logging/testing" + "net/http" + "net/http/httptest" + "testing" +) + +func TestHttpClient_Dispatch(t *testing.T) { + t.Parallel() + testCases := []struct { + description string + expectedCallCount int + expectedSuccess bool + handler func(w http.ResponseWriter, r *http.Request, callCount int) + }{ + { + "Basic successful Request", + 1, + true, + func(w http.ResponseWriter, r *http.Request, callCount int) { + w.WriteHeader(http.StatusCreated) + }, + }, + { + "Test first 2 calls fail, 3rd succeeds", + 3, + true, + func(w http.ResponseWriter, r *http.Request, callCount int) { + if callCount < 3 { + w.WriteHeader(http.StatusBadGateway) + } else { + w.WriteHeader(http.StatusCreated) + } + }, + }, + { + "Test all retries fail", + 11, + false, + func(w http.ResponseWriter, r *http.Request, callCount int) { + w.WriteHeader(http.StatusNotFound) + }, + }, + { + "Test don't retry on 401", + 1, + true, + func(w http.ResponseWriter, r *http.Request, callCount int) { + w.WriteHeader(http.StatusUnauthorized) + }, + }, + { + // NOTE: We had to retry on 400 to workaround a knative-eventing bug + // where the filter service does not pass-through the correct status code + + "Test do retry on 400", + 2, + true, + func(w http.ResponseWriter, r *http.Request, callCount int) { + if callCount == 1 { + w.WriteHeader(http.StatusBadRequest) + } else { + w.WriteHeader(http.StatusCreated) + } + }, + }, + { + "Test do retry on 429", + 2, + true, + func(w http.ResponseWriter, r *http.Request, callCount int) { + if callCount == 1 { + w.WriteHeader(http.StatusTooManyRequests) + } else { + w.WriteHeader(http.StatusCreated) + } + }, + }, + { + "Test do retry on 404", + 2, + true, + func(w http.ResponseWriter, r *http.Request, callCount int) { + if callCount == 1 { + w.WriteHeader(http.StatusNotFound) + } else { + w.WriteHeader(http.StatusCreated) + } + }, + }, + } + + for _, tc := range testCases { + tc := tc // capture range variable + t.Run(tc.description, func(t *testing.T) { + t.Parallel() + + dispatcher, server, mux := setup(t) + defer teardown(server) + + callCount := 0 + mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { + callCount++ + tc.handler(writer, request, callCount) + }) + + testCloudEvent := cloudevents.NewEvent(cloudevents.VersionV03) + testCloudEvent.SetID("ABC-123") + testCloudEvent.SetType("com.cloudevents.readme.sent") + testCloudEvent.SetSource("http://localhost:8080/") + err := testCloudEvent.SetData("application/json", map[string]string{"test": "value"}) + assert.Nil(t, err) + + subscriberURI, _ := apis.ParseURL(server.URL) + + err = dispatcher.Dispatch(&testCloudEvent, Subscription{SubscriberSpec: eventingduck.SubscriberSpec{SubscriberURI: subscriberURI}}) + + if tc.expectedSuccess && err != nil { + t.Error("Message failed to dispatch:", err) + } else if !tc.expectedSuccess && err == nil { + t.Error("Message should have failed to dispatch") + } + + if callCount != tc.expectedCallCount { + t.Errorf("Expected to call server %d time, was actually %d times", tc.expectedCallCount, callCount) + } + }) + } +} + +func setup(t *testing.T) (*Dispatcher, *httptest.Server, *http.ServeMux) { + // test server + mux := http.NewServeMux() + server := httptest.NewServer(mux) + + // Create A New Dispatcher + dispatcherConfig := DispatcherConfig{ + Logger: logtesting.TestLogger(t).Desugar(), + Brokers: testBrokers, + Topic: testTopic, + Offset: testOffset, + PollTimeoutMillis: testPollTimeoutMillis, + OffsetCommitCount: testOffsetCommitCount, + OffsetCommitDuration: testOffsetCommitDuration, + OffsetCommitDurationMinimum: testOffsetCommitDurationMin, + Username: testUsername, + Password: testPassword, + ChannelKey: testChannelKey, + ExponentialBackoff: false, + InitialRetryInterval: 1000, + MaxRetryTime: 10000, + } + dispatcher := NewDispatcher(dispatcherConfig) + + return dispatcher, server, mux +} + +func teardown(server *httptest.Server) { + server.Close() +} + +func TestHttpClient_calculateNumberOfRetries(t *testing.T) { + + type fields struct { + uri string + exponentialBackoff bool + initialRetryInterval int64 + maxNumberRetries int + maxRetryTime int64 + logger *zap.Logger + } + tests := []struct { + fields fields + want int + }{ + {fields{maxRetryTime: 10000, initialRetryInterval: 1000, exponentialBackoff: true}, 3}, + {fields{maxRetryTime: 10000, initialRetryInterval: 5000, exponentialBackoff: true}, 1}, + {fields{maxRetryTime: 17000, initialRetryInterval: 1000, exponentialBackoff: true}, 4}, + {fields{maxRetryTime: 60000, initialRetryInterval: 5000, exponentialBackoff: true}, 4}, + {fields{maxRetryTime: 300000, initialRetryInterval: 500, exponentialBackoff: true}, 9}, + {fields{maxRetryTime: 300000, initialRetryInterval: 500, exponentialBackoff: false}, 600}, + {fields{maxRetryTime: 10000, initialRetryInterval: 5000, exponentialBackoff: false}, 2}, + } + + // Create A Test Logger + logger := logtesting.TestLogger(t).Desugar() + + // Loop Over All The Tests + for _, test := range tests { + t.Run(fmt.Sprintf("%d max retry, initial interval %d, exponential backoff %t", test.fields.maxRetryTime, test.fields.initialRetryInterval, test.fields.exponentialBackoff), func(t *testing.T) { + + // Create A New Dispatcher + dispatcherConfig := DispatcherConfig{ + Logger: logger, + Brokers: testBrokers, + Topic: testTopic, + Offset: testOffset, + PollTimeoutMillis: testPollTimeoutMillis, + OffsetCommitCount: testOffsetCommitCount, + OffsetCommitDuration: testOffsetCommitDuration, + OffsetCommitDurationMinimum: testOffsetCommitDurationMin, + Username: testUsername, + Password: testPassword, + ChannelKey: testChannelKey, + ExponentialBackoff: test.fields.exponentialBackoff, + InitialRetryInterval: test.fields.initialRetryInterval, + MaxRetryTime: test.fields.maxRetryTime, + } + testDispatcher := NewDispatcher(dispatcherConfig) + + // Perform The Test + if got := testDispatcher.calculateNumberOfRetries(); got != test.want { + t.Errorf("httpClient.calculateNumberOfRetries() = %v, want %v", got, test.want) + } + }) + } +} + +func TestLogResponse(t *testing.T) { + + // Test Data + noStatusCodeError := errors.New("No response code detected in error, retrying") + badResponseError := errors.New("Server returned a bad response code") + + // Define TestCase Type + type testCase struct { + errIn error + errOut error + } + + // Create The Set Of TestCases + tests := []testCase{ + + {errIn: nil, errOut: nil}, + + {errIn: errors.New(""), errOut: noStatusCodeError}, + {errIn: errors.New("no status code"), errOut: noStatusCodeError}, + + {errIn: errors.New("100"), errOut: nil}, + {errIn: errors.New("200"), errOut: nil}, + + {errIn: errors.New("300"), errOut: nil}, + {errIn: errors.New("301"), errOut: nil}, + {errIn: errors.New("399"), errOut: nil}, + + {errIn: errors.New("400"), errOut: badResponseError}, + {errIn: errors.New("401"), errOut: nil}, + {errIn: errors.New("499"), errOut: nil}, + + {errIn: errors.New("404"), errOut: badResponseError}, + {errIn: errors.New("429"), errOut: badResponseError}, + + {errIn: errors.New("500"), errOut: badResponseError}, + {errIn: errors.New("503"), errOut: badResponseError}, + {errIn: errors.New("599"), errOut: badResponseError}, + } + + // Create A Test Logger + logger := logtesting.TestLogger(t).Desugar() + + // Create A New Dispatcher + dispatcherConfig := DispatcherConfig{ + Logger: logger, + Brokers: testBrokers, + Topic: testTopic, + Offset: testOffset, + PollTimeoutMillis: testPollTimeoutMillis, + OffsetCommitCount: testOffsetCommitCount, + OffsetCommitDuration: testOffsetCommitDuration, + OffsetCommitDurationMinimum: testOffsetCommitDurationMin, + Username: testUsername, + Password: testPassword, + ChannelKey: testChannelKey, + ExponentialBackoff: testExponentialBackoff, + InitialRetryInterval: testInitialRetryInterval, + MaxRetryTime: testMaxRetryTime, + } + testDispatcher := NewDispatcher(dispatcherConfig) + + // Loop Over The TestCases + for _, test := range tests { + + // Perform The Specific TestCase + actualErrOut := testDispatcher.logResponse(test.errIn) + + // Verify Results + if test.errOut == nil { + assert.Nil(t, actualErrOut) + } else { + assert.NotNil(t, actualErrOut) + assert.Equal(t, test.errOut.Error(), actualErrOut.Error()) + } + } +} + +func TestParseHttpStatusCodeFromError(t *testing.T) { + + // Define TestCase Type + type testCase struct { + error error + code int + } + + // Create The Set Of TestCases + tests := []testCase{ + {error: nil, code: -1}, + {error: errors.New(""), code: -1}, + {error: errors.New("no status codes to see here"), code: -1}, + {error: errors.New("status code without leading200 word boundary"), code: -1}, + {error: errors.New("status code without both200word boundary"), code: -1}, + {error: errors.New("status code without 200trailing word boundary"), code: -1}, + {error: errors.New("200 status code at start"), code: 200}, + {error: errors.New("status code 200 in middle"), code: 200}, + {error: errors.New("status code at end 200"), code: 200}, + {error: errors.New("multiple 200 status codes 300 selects first"), code: 200}, + {error: errors.New("unable to complete request to http://sample-event-proxy-300-stage.svc.cluster.local/: unexpected HTTP response, expected 2xx, got 500"), code: 500}, + } + + // Create A Test Logger + logger := logtesting.TestLogger(t).Desugar() + + // Create A New Dispatcher + dispatcherConfig := DispatcherConfig{ + Logger: logger, + Brokers: testBrokers, + Topic: testTopic, + Offset: testOffset, + PollTimeoutMillis: testPollTimeoutMillis, + OffsetCommitCount: testOffsetCommitCount, + OffsetCommitDuration: testOffsetCommitDuration, + OffsetCommitDurationMinimum: testOffsetCommitDurationMin, + Username: testUsername, + Password: testPassword, + ChannelKey: testChannelKey, + ExponentialBackoff: testExponentialBackoff, + InitialRetryInterval: testInitialRetryInterval, + MaxRetryTime: testMaxRetryTime, + } + testDispatcher := NewDispatcher(dispatcherConfig) + + // Loop Over The TestCases + for _, test := range tests { + + // Perform The Specific TestCase + actualStatusCode := testDispatcher.parseHttpStatusCodeFromError(test.error) + + // Verify Results + assert.Equal(t, test.code, actualStatusCode) + } +}