Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Kafka sink #1406

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions kafka/sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Kafka Sink: Knative Sink for Apache Kafka

The Apache Kafka Event sink enables Knative Eventing integration with Apache
Kafka. When a CloudEvent is received over HTTP by the Knative Sink it is
sent to the topic in Apache Kafka broker.

## Deploying Kafka Sink CRDs and controller

TODO: coming soon

## Deploying Kafka Sink to Kubernetes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, can we show an example with a sinkbinding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean example that shows how to use kafkasink instead of event-display? https://knative.dev/docs/eventing/samples/sinkbinding/


Edit deployment YAML in samples to point to your Kafka broker and topic.

Then deploy it

```
ko apply -f samples/
```

Chekc that it is running - expected output:

```
k get pods
NAME READY STATUS RESTARTS AGE
kafka-sink-694fd8f689-smwq4 1/1 Running 0 8s

k logs kafka-sink-694fd8f689-smwq4
2020/07/22 22:13:03 Using HTTP PORT=8080
2020/07/22 22:13:03 Sinking to KAFKA_SERVER=my-cluster-kafka-bootstrap.kafka:9092 KAFKA_TOPIC=knative-sink-topic
2020/07/22 22:13:03 Ready to receive
```

To test follow the same approach as sending events to broker: https://knative.dev/docs/eventing/getting-started/#sending-events-to-the-broker

Create CLI pod:

```
kubectl apply --filename - << END
apiVersion: v1
kind: Pod
metadata:
labels:
run: curl
name: curl
spec:
containers:
# This could be any image that we can attach into and has curl.
- image: radial/busyboxplus:curl
imagePullPolicy: IfNotPresent
name: curl
resources: {}
stdin: true
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
tty: true
END
```

Get into it:

```
kubectl attach curl -it
Defaulting container name to curl.
Use 'kubectl describe pod/curl -n default' to see all of the containers in this pod.
If you don't see a command prompt, try pressing enter.
[ root@curl:/ ]$
```

Send (modify URL to replace default with current namespace)

```
curl -v "http://kafka-sink.default.svc.cluster.local" \
-X POST \
-H "Ce-Id: kafka-sink-event-1" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: greeting" \
-H "Ce-Source: not-sendoff" \
-H "Content-Type: application/json" \
-d '{"msg":"Hello Knative Kafka Sink!"}'
```

Check Kafka sink logs:

```
k logs -f kafka-sink-694fd8f689-smwq4
{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:110","msg":"Successfully created the logger."}
{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:111","msg":"Logging level set to info"}
{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:78","msg":"Fetch GitHub commit ID from kodata failed","error":"\"KO_DATA_PATH\" does not exist or is empty"}
{"level":"error","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"v2/main.go:71","msg":"failed to process metrics options{error 25 0 json options string is empty}","stacktrace":"knative.dev/eventing/pkg/adapter/v2.MainWithContext\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:71\nknative.dev/eventing/pkg/adapter/v2.Main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:46\nmain.main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/kafka/sink/cmd/receive_adapter/main.go:26\nruntime.main\n\t/usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:203"}
{"level":"warn","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"v2/config.go:163","msg":"Tracing configuration is invalid, using the no-op default{error 25 0 empty json tracing config}"}
{"level":"info","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"adapter/adapter.go:84","msg":"Using HTTP PORT=%d8080"}
{"level":"info","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"adapter/adapter.go:95","msg":"Sinking from HTTP to KAFKA_SERVER=%s KAFKA_TOPIC=%slocalhost:9092test-topic"}
{"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:131","msg":"Server is ready to handle requests."}
{"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"}
{"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:109","msg":"Received message"}
{"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:116","msg":"Sending message to Kafka"}
{"level":"info","ts":"2020-07-23T16:25:16.291-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"}
```

And verify that event was received by Kafka, for example

```
kubectl -n kafka run kafka-consumer -ti --image=strimzi/kafka:0.17.0-kafka-2.4.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic knative-sink-topic --from-beginning
If you don't see a command prompt, try pressing enter.
{"msg":"Hello Knative Kafka Sink!"}
```


## Quick testing without Kubernetes

RUn Kafka Sink receive adapter from command line:

```
export KAFKA_SERVER=localhost:9092
export KAFKA_TOPIC= test-topic
export NAMESPACE=test
export K_LOGGING_CONFIG=""
export K_METRICS_CONFIG=""
go run cmd/receive_adapter/main.go
```

And send an event to Kafka Sink:

```
curl -v "http://localhost:8080" \
-X POST \
-H "Ce-Id: test-kafka-sink" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: greeting" \
-H "Ce-Source: not-sendoff" \
-H "Content-Type: application/json" \
-d '{"msg":"Hello Kafka Sink!"}'
```

Expected output:

```
{"level":"info","ts":"2020-07-23T17:22:40.126-0400","caller":"logging/config.go:110","msg":"Successfully created the logger."}
{"level":"info","ts":"2020-07-23T17:22:40.126-0400","caller":"logging/config.go:111","msg":"Logging level set to info"}
{"level":"info","ts":"2020-07-23T17:22:40.126-0400","caller":"logging/config.go:78","msg":"Fetch GitHub commit ID from kodata failed","error":"\"KO_DATA_PATH\" does not exist or is empty"}
{"level":"error","ts":"2020-07-23T17:22:40.126-0400","logger":"kafkasink","caller":"v2/main.go:71","msg":"failed to process metrics options{error 25 0 json options string is empty}","stacktrace":"knative.dev/eventing/pkg/adapter/v2.MainWithContext\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:71\nknative.dev/eventing/pkg/adapter/v2.Main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:46\nmain.main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/kafka/sink/cmd/receive_adapter/main.go:26\nruntime.main\n\t/usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:203"}
{"level":"warn","ts":"2020-07-23T17:22:40.127-0400","logger":"kafkasink","caller":"v2/config.go:163","msg":"Tracing configuration is invalid, using the no-op default{error 25 0 empty json tracing config}"}
{"level":"info","ts":"2020-07-23T17:22:40.127-0400","logger":"kafkasink","caller":"adapter/adapter.go:84","msg":"Using HTTP PORT=%d8080"}
{"level":"info","ts":"2020-07-23T17:22:40.127-0400","logger":"kafkasink","caller":"adapter/adapter.go:95","msg":"Sinking from HTTP to KAFKA_SERVER=%s KAFKA_TOPIC=%slocalhost:9092test-topic"}
{"level":"info","ts":"2020-07-23T17:22:40.132-0400","logger":"kafkasink","caller":"adapter/adapter.go:131","msg":"Server is ready to handle requests."}
{"level":"info","ts":"2020-07-23T17:22:40.132-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"}
{"level":"info","ts":"2020-07-23T17:22:46.093-0400","logger":"kafkasink","caller":"adapter/adapter.go:109","msg":"Received message"}
{"level":"info","ts":"2020-07-23T17:22:46.093-0400","logger":"kafkasink","caller":"adapter/adapter.go:116","msg":"Sending message to Kafka"}
{"level":"info","ts":"2020-07-23T17:22:46.100-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"}
```
27 changes: 27 additions & 0 deletions kafka/sink/cmd/receive_adapter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"knative.dev/eventing/pkg/adapter/v2"

kafkasinkadapter "knative.dev/eventing-contrib/kafka/sink/pkg/adapter"
)

func main() {
adapter.Main("kafkasink", kafkasinkadapter.NewEnvConfig, kafkasinkadapter.NewAdapter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it works on kube? adapter.Main requires a service account that can watch config maps and stuff, and from the yamls below it doesn't seem like you're using any

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It worked in k8s v1.18 - see example output in README.md

}
134 changes: 134 additions & 0 deletions kafka/sink/pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package adapter

import (
"context"
"fmt"
"io"
"strconv"

"github.com/Shopify/sarama"
cloudeventskafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"go.uber.org/zap"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/logging"
)

type envConfig struct {
adapter.EnvConfig

// Port on which to listen for cloudevents
Port string `envconfig:"PORT" default:"8080"`

// KafkaServer URL to connect to the Kafka server.
KafkaServer string `envconfig:"KAFKA_SERVER" required:"true"`

// Topic to publish cloudevents to.
Topic string `envconfig:"KAFKA_TOPIC" required:"true"`
}

// NewEnvConfig function reads env variables defined in envConfig structure and
// returns accessor interface
func NewEnvConfig() adapter.EnvConfigAccessor {
return &envConfig{}
}

// kafkaSinkAdapter acts as Sink that sends incoming CloudEvents to Kafka topic
type kafkaSinkAdapter struct {
logger *zap.SugaredLogger
client cloudevents.Client
//source string

port string
kafkaServer string
topic string
}

// NewAdapter returns the instance of kafkaSinkAdapter that implements adapter.Adapter interface
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you create a source like adapter if then you don't use the provided client?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is to structure it as receive adapter for future PR that add controller support and KafkaSink CR

logger := logging.FromContext(ctx)
env := processed.(*envConfig)

return &kafkaSinkAdapter{
logger: logger,
client: ceClient,
port: env.Port,
kafkaServer: env.KafkaServer,
topic: env.Topic,
}
}

func (a *kafkaSinkAdapter) Start(ctx context.Context) error {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0

a.logger.Info("Using HTTP PORT=%d", a.port)
port, err := strconv.Atoi(a.port)
if err != nil {
return fmt.Errorf("failed to parse HTTP port %s: %v", a.port, err)
}

httpProtocol, err := cloudeventshttp.New(cloudeventshttp.WithPort(port))
if err != nil {
return fmt.Errorf("failed to create HTTP protocol: %v", err)
}

a.logger.Info("Sinking from HTTP to KAFKA_SERVER=%s KAFKA_TOPIC=%s", a.kafkaServer, a.topic)
kafkaProtocol, err := cloudeventskafka.NewSender([]string{a.kafkaServer}, saramaConfig, a.topic)
if err != nil {
return fmt.Errorf("failed to create Kafka protcol: %v", err)
}

defer kafkaProtocol.Close(ctx)

// Pipe all messages incoming from httpProtocol to kafkaProtocol
go func() {
for {
a.logger.Info("Ready to receive")
// Blocking call to wait for new messages from httpProtocol
message, err := httpProtocol.Receive(ctx)
a.logger.Info("Received message")
if err != nil {
if err == io.EOF {
return // Context closed and/or receiver closed
}
a.logger.Info("Error while receiving a message: %s", err.Error())
}
a.logger.Info("Sending message to Kafka")
err = kafkaProtocol.Send(ctx, message)
if err != nil {
a.logger.Info("Error while forwarding the message: %s", err.Error())
}
}
}()

// Start the HTTP Server invoking OpenInbound()
go func() {
if err := httpProtocol.OpenInbound(ctx); err != nil {
a.logger.Info("failed to StartHTTPReceiver, %v", err)
}
}()

a.logger.Infof("Server is ready to handle requests.")

<-ctx.Done()
a.logger.Infof("Server stopped")
return nil
}
Loading