Skip to content
This repository has been archived by the owner on Jun 11, 2021. It is now read-only.

Cloudevent v2 #126

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions build/controller/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ else \
make build-native-controller; \
fi"

# Get The Microsoft Cert For SSL
WORKDIR /tmp-certs
RUN curl -sSL -f -k http://www.microsoft.com/pki/mscorp/Microsoft%20IT%20TLS%20CA%202.crt -o /tmp-certs/microsoft.crt

# Determine Dependencies of knative-kafka-controller that will be needed for the final image and package them into one dir
WORKDIR /controller-deps
RUN ldd /go/src/github.com/kyma-incubator/knative-kafka/build/controller/kafka-channel-controller \
Expand Down Expand Up @@ -59,8 +55,6 @@ COPY --from=builder /go/src/github.com/kyma-incubator/knative-kafka/build/contro

# Copy over the dependencies
COPY --from=builder /controller-deps/ /
COPY --from=builder /tmp-certs/microsoft.crt /etc/ssl/certs/microsoft.crt

# Provides The SSL Cert For The Base Image To Properly Add It To The Cert Store
ENV SSL_CERT_FILE /etc/ssl/certs/microsoft.crt
CMD [ "/kafka-channel-controller" ]
54 changes: 28 additions & 26 deletions cmd/channel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"context"
"flag"
"github.com/cloudevents/sdk-go/v1/cloudevents"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/kyma-incubator/knative-kafka/pkg/channel/channel"
"github.com/kyma-incubator/knative-kafka/pkg/channel/constants"
"github.com/kyma-incubator/knative-kafka/pkg/channel/env"
Expand All @@ -13,9 +13,9 @@ import (
kafkautil "github.com/kyma-incubator/knative-kafka/pkg/common/kafka/util"
"github.com/kyma-incubator/knative-kafka/pkg/common/prometheus"
"go.uber.org/zap"
eventingChannel "knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
eventingchannel "knative.dev/eventing/pkg/channel"
"knative.dev/pkg/logging"
nethttp "net/http"
)

// Variables
Expand Down Expand Up @@ -67,31 +67,19 @@ func main() {
}
defer kafkaProducer.Close()

// The EventReceiver is responsible for processing the context (headers and binary/json content) of each request,
// and then passing the context, channel details, and the constructed CloudEvent event to our handleEvent() function.
eventReceiver, err := eventingChannel.NewEventReceiver(handleEvent, logger)
// Create A New Knative Eventing MessageReceiver (Parses The Channel From The Host Header)
messageReceiver, err := eventingchannel.NewMessageReceiver(handleMessage, logger, eventingchannel.ResolveMessageChannelFromHostHeader(eventingchannel.ParseChannel))
if err != nil {
logger.Fatal("Failed To Create Knative EventReceiver", zap.Error(err))
}

// The Knative CloudEvent Client handles the mux http server setup (middlewares and transport options) and invokes
// the eventReceiver. Although the NewEventReceiver method above will also invoke kncloudevents.NewDefaultClient
// internally, that client goes unused when using the ServeHTTP on the eventReceiver.
//
// IMPORTANT: Because the kncloudevents package does not allow injecting modified configuration,
// we can't override the default port being used (8080).
knCloudEventClient, err := kncloudevents.NewDefaultClient()
if err != nil {
logger.Fatal("Failed To Create Knative CloudEvent Client", zap.Error(err))
logger.Fatal("Failed To Create MessageReceiver", zap.Error(err))
}

// Set The Liveness Flag - Readiness Is Set By Individual Components
healthServer.SetAlive(true)

// Start Receiving Events (Blocking Call :)
err = knCloudEventClient.StartReceiver(ctx, eventReceiver.ServeHTTP)
// Start The Message Receiver (Blocking)
err = messageReceiver.Start(ctx)
if err != nil {
logger.Error("Failed To Start Event Receiver", zap.Error(err))
logger.Error("Failed To Start MessageReceiver", zap.Error(err))
}

// Reset The Liveness and Readiness Flags In Preparation For Shutdown
Expand All @@ -104,19 +92,33 @@ func main() {
healthServer.Stop(logger)
}

// Handler For Receiving Cloud Events And Sending The Event To Kafka
func handleEvent(_ context.Context, channelReference eventingChannel.ChannelReference, cloudEvent cloudevents.Event) error {
// CloudEvent Message Handler - Converts To KafkaMessage And Produces To Channel's Kafka Topic
func handleMessage(ctx context.Context, channelReference eventingchannel.ChannelReference, message binding.Message, transformers []binding.Transformer, _ nethttp.Header) error {

// Note - The context provided here is a different context from the one created in main() and does not have our logger instance.

logger.Debug("~~~~~~~~~~~~~~~~~~~~ Processing Request ~~~~~~~~~~~~~~~~~~~~")
logger.Debug("Received Cloud Event", zap.Any("CloudEvent", cloudEvent), zap.Any("ChannelReference", channelReference))
logger.Debug("Received Message", zap.Any("Message", message), zap.Any("ChannelReference", channelReference))

//
// Convert The CloudEvents Binding Message To A CloudEvent
//
// TODO - It is potentially inefficient to take the CloudEvent binding/Message and convert it into a CloudEvent,
// just so that it can then be further transformed into a Confluent KafkaMessage. The current implementation
// is based on CloudEvent Events, however, and until a "protocol" implementation for Confluent Kafka exists
// this is the simplest path forward. Once such a protocol implementation exists, it would be more efficient
// to convert directly from the binding/Message to the protocol/Message.
//
cloudEvent, err := binding.ToEvent(ctx, message, transformers...)
if err != nil {
logger.Error("Failed To Convert Message To CloudEvent", zap.Error(err))
return err
}

// Trim The "-kafkachannel" Suffix From The Service Name (Added Only To Workaround Kyma Naming Conflict)
channelReference.Name = kafkautil.TrimKafkaChannelServiceNameSuffix(channelReference.Name)

// Validate The KafkaChannel Prior To Producing Kafka Message
err := channel.ValidateKafkaChannel(channelReference)
err = channel.ValidateKafkaChannel(channelReference)
if err != nil {
logger.Warn("Unable To Validate ChannelReference", zap.Any("ChannelReference", channelReference), zap.Error(err))
return err
Expand Down
8 changes: 3 additions & 5 deletions cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"flag"
commonk8s "github.com/kyma-incubator/knative-kafka/pkg/common/k8s"
"github.com/kyma-incubator/knative-kafka/pkg/common/prometheus"
"github.com/kyma-incubator/knative-kafka/pkg/dispatcher/client"
"github.com/kyma-incubator/knative-kafka/pkg/dispatcher/controller"
dispatch "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/dispatcher"
dispatcherhealth "github.com/kyma-incubator/knative-kafka/pkg/dispatcher/health"
Expand Down Expand Up @@ -109,9 +108,6 @@ func main() {
metricsServer := prometheus.NewMetricsServer(logger, metricsPort, "/metrics")
metricsServer.Start()

// Create HTTP Client With Retry Settings
ceClient := client.NewRetriableCloudEventClient(logger, exponentialBackoff, initialRetryInterval, maxRetryTime)

// Create The Dispatcher With Specified Configuration
dispatcherConfig := dispatch.DispatcherConfig{
Logger: logger,
Expand All @@ -124,9 +120,11 @@ func main() {
OffsetCommitDurationMinimum: MinimumKafkaConsumerOffsetCommitDurationMillis * time.Millisecond,
Username: kafkaUsername,
Password: kafkaPassword,
Client: ceClient,
ChannelKey: channelKey,
Metrics: metricsServer,
ExponentialBackoff: exponentialBackoff,
InitialRetryInterval: initialRetryInterval,
MaxRetryTime: maxRetryTime,
}
dispatcher = dispatch.NewDispatcher(dispatcherConfig)

Expand Down
18 changes: 6 additions & 12 deletions pkg/channel/message/message.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package message

import (
"github.com/cloudevents/sdk-go/v1/cloudevents"
"github.com/cloudevents/sdk-go/v1/cloudevents/types"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/types"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/kyma-incubator/knative-kafka/pkg/channel/constants"
"go.uber.org/zap"
Expand All @@ -11,14 +12,7 @@ import (
)

// Create A Kafka Message From The Specified CloudEvent / Topic
func CreateKafkaMessage(logger *zap.Logger, event cloudevents.Event, kafkaTopic string) (*kafka.Message, error) {

// Get The Event's Data Bytes
eventBytes, err := event.DataBytes()
if err != nil {
logger.Error("Failed To Get CloudEvent's DataBytes", zap.Error(err))
return nil, err
}
func CreateKafkaMessage(logger *zap.Logger, event *event.Event, kafkaTopic string) (*kafka.Message, error) {

// Get Kafka Message Headers From The Specified CloudEvent Context
kafkaHeaders := getKafkaHeaders(logger, event.Context)
Expand All @@ -33,7 +27,7 @@ func CreateKafkaMessage(logger *zap.Logger, event cloudevents.Event, kafkaTopic
Partition: kafka.PartitionAny, // Required For Producer Level Partitioner! (see KafkaProducerConfigPropertyPartitioner)
},
Key: partitionKey,
Value: eventBytes,
Value: event.Data(),
Headers: kafkaHeaders,
}

Expand Down Expand Up @@ -82,7 +76,7 @@ func getKafkaHeaders(logger *zap.Logger, context cloudevents.EventContext) []kaf
}

// Precedence For Partitioning Is The CloudEvent PartitionKey Extension Followed By The CloudEvent Subject
func getPartitionKey(event cloudevents.Event) []byte {
func getPartitionKey(event *event.Event) []byte {

// Use The CloudEvent Extensions PartitionKey If It Exists
pkExtension, err := types.ToString(event.Extensions()[constants.ExtensionKeyPartitionKey])
Expand Down
4 changes: 2 additions & 2 deletions pkg/channel/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package producer

import (
"errors"
"github.com/cloudevents/sdk-go/v1/cloudevents"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/kyma-incubator/knative-kafka/pkg/channel/health"
"github.com/kyma-incubator/knative-kafka/pkg/channel/message"
Expand Down Expand Up @@ -62,7 +62,7 @@ var createProducerFunctionWrapper = func(brokers string, username string, passwo
}

// Produce A KafkaMessage From The Specified CloudEvent To The Specified Topic And Wait For The Delivery Report
func (p *Producer) ProduceKafkaMessage(event cloudevents.Event, channelReference eventingChannel.ChannelReference) error {
func (p *Producer) ProduceKafkaMessage(event *event.Event, channelReference eventingChannel.ChannelReference) error {

// Validate The Kafka Producer (Must Be Pre-Initialized)
if p.kafkaProducer == nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/channel/producer/producer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package producer

import (
"github.com/cloudevents/sdk-go/v1/cloudevents"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/kyma-incubator/knative-kafka/pkg/channel/constants"
channelhealth "github.com/kyma-incubator/knative-kafka/pkg/channel/health"
Expand Down Expand Up @@ -47,15 +47,15 @@ func TestNewProducer(t *testing.T) {
func TestProduceKafkaMessage(t *testing.T) {

// Test Data
event := test.CreateCloudEvent(cloudevents.CloudEventsVersionV1)
cloudEvent := test.CreateCloudEvent(cloudevents.VersionV1)
channelReference := test.CreateChannelReference(test.ChannelName, test.ChannelNamespace)
mockProducer := test.NewMockProducer(test.TopicName)

// Create A Test Producer
producer := createTestProducer(t, mockProducer)

// Perform The Test & Verify Results
err := producer.ProduceKafkaMessage(event, channelReference)
err := producer.ProduceKafkaMessage(cloudEvent, channelReference)
assert.Nil(t, err)

// Block On The MockProducer's Channel & Verify Event Was Produced
Expand All @@ -67,7 +67,7 @@ func TestProduceKafkaMessage(t *testing.T) {
assert.Equal(t, kafka.PartitionAny, kafkaMessage.TopicPartition.Partition)
assert.Equal(t, kafka.Offset(0), kafkaMessage.TopicPartition.Offset)
assert.Nil(t, kafkaMessage.TopicPartition.Error)
test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeySpecVersion, cloudevents.CloudEventsVersionV1)
test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeySpecVersion, cloudevents.VersionV1)
test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeyType, test.EventType)
test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeyId, test.EventId)
test.ValidateKafkaMessageHeader(t, kafkaMessage.Headers, constants.CeKafkaHeaderKeySource, test.EventSource)
Expand Down
2 changes: 1 addition & 1 deletion pkg/channel/test/constants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package test

import cloudevents "github.com/cloudevents/sdk-go/v1"
import cloudevents "github.com/cloudevents/sdk-go/v2"

// Test Data
const (
Expand Down
24 changes: 12 additions & 12 deletions pkg/channel/test/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ package test

import (
"encoding/json"
cloudevents "github.com/cloudevents/sdk-go/v1"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/kyma-incubator/knative-kafka/pkg/channel/constants"
)

// Test Data
var EventDataJson, _ = json.Marshal(map[string]string{EventDataKey: EventDataValue})

// Utility Function For Creating A Test CloudEvent
func CreateCloudEvent(cloudEventVersion string) cloudevents.Event {
event := cloudevents.NewEvent(cloudEventVersion)
event.SetID(EventId)
event.SetType(EventType)
event.SetSource(EventSource)
event.SetDataContentType(EventDataContentType)
event.SetSubject(EventSubject)
event.SetDataSchema(EventDataSchema)
event.SetExtension(constants.ExtensionKeyPartitionKey, PartitionKey)
event.SetData(EventDataJson)
return event
func CreateCloudEvent(cloudEventVersion string) *event.Event {
cloudEvent := event.New(cloudEventVersion)
cloudEvent.SetID(EventId)
cloudEvent.SetType(EventType)
cloudEvent.SetSource(EventSource)
cloudEvent.SetDataContentType(EventDataContentType)
cloudEvent.SetSubject(EventSubject)
cloudEvent.SetDataSchema(EventDataSchema)
cloudEvent.SetExtension(constants.ExtensionKeyPartitionKey, PartitionKey)
_ = cloudEvent.SetData(EventDataContentType, EventDataJson)
return &cloudEvent
}
Loading