From 2d13f3277e39eda46db39b968480a6be34ee0410 Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Wed, 22 Jul 2020 18:23:47 -0400 Subject: [PATCH 1/4] initial Kafka Sink RA impl --- kafka/sink/README.md | 126 ++++++++++++++++++ kafka/sink/cmd/receive_adapter/main.go | 85 ++++++++++++ kafka/sink/samples/kafka-sink-deployment.yaml | 24 ++++ kafka/sink/samples/kafka-sink-service.yaml | 12 ++ 4 files changed, 247 insertions(+) create mode 100644 kafka/sink/README.md create mode 100644 kafka/sink/cmd/receive_adapter/main.go create mode 100644 kafka/sink/samples/kafka-sink-deployment.yaml create mode 100644 kafka/sink/samples/kafka-sink-service.yaml diff --git a/kafka/sink/README.md b/kafka/sink/README.md new file mode 100644 index 0000000000..9f3d77cadd --- /dev/null +++ b/kafka/sink/README.md @@ -0,0 +1,126 @@ +# Kafka Sink: Knative Sink for Apache Kafka + +The Apache Kafka Event sink enables Knative Eventing integration with Apache +Kafka. When a CloudEvent is received over HTTP by the Knative Sink it is +sent to the topic in Apache Kafka broker. + +## Deploying Kafka Sink CRDs and controller + +TODO: coming soon + +## Deploying Kafka Sink to Kubernetes + +Edit deployment YAML in samples to point to your Kafka broker and topic. + +Then deploy it + +``` +ko apply -f samples/ +``` + +Chekc that it is running - expected output: + +``` +k get pods +NAME READY STATUS RESTARTS AGE +kafka-sink-694fd8f689-smwq4 1/1 Running 0 8s + +k logs kafka-sink-694fd8f689-smwq4 +2020/07/22 22:13:03 Using HTTP PORT=8080 +2020/07/22 22:13:03 Sinking to KAFKA_SERVER=my-cluster-kafka-bootstrap.kafka:9092 KAFKA_TOPIC=knative-sink-topic +2020/07/22 22:13:03 Ready to receive +``` + +To test follow the same approach as sending events to broker: https://knative.dev/docs/eventing/getting-started/#sending-events-to-the-broker + +Create CLI pod: + +``` +kubectl apply --filename - << END +apiVersion: v1 +kind: Pod +metadata: + labels: + run: curl + name: curl +spec: + containers: + # This could be any image that we can attach into and has curl. + - image: radial/busyboxplus:curl + imagePullPolicy: IfNotPresent + name: curl + resources: {} + stdin: true + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + tty: true +END +``` + +Get into it: + +``` +kubectl attach curl -it +Defaulting container name to curl. +Use 'kubectl describe pod/curl -n default' to see all of the containers in this pod. +If you don't see a command prompt, try pressing enter. +[ root@curl:/ ]$ +``` + +Send (modify URL to replace default with current namespace) + +``` +curl -v "http://kafka-sink.default.svc.cluster.local" \ + -X POST \ + -H "Ce-Id: kafka-sink-event-1" \ + -H "Ce-Specversion: 1.0" \ + -H "Ce-Type: greeting" \ + -H "Ce-Source: not-sendoff" \ + -H "Content-Type: application/json" \ + -d '{"msg":"Hello Knative Kafka Sink!"}' +``` + +Check Kafka sink logs: + +``` +k logs -f kafka-sink-694fd8f689-smwq4 +2020/07/22 22:13:03 Using HTTP PORT=8080 +2020/07/22 22:13:03 Sinking to KAFKA_SERVER=my-cluster-kafka-bootstrap.kafka:9092 KAFKA_TOPIC=knative-sink-topic +2020/07/22 22:13:03 Ready to receive +2020/07/22 22:19:46 Received message +2020/07/22 22:19:46 Sending message to Kafka +2020/07/22 22:19:47 Ready to receive +``` + +And verify that event was received by Kafka, for example + +``` +kubectl -n kafka run kafka-consumer -ti --image=strimzi/kafka:0.17.0-kafka-2.4.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic knative-sink-topic --from-beginning +If you don't see a command prompt, try pressing enter. +{"msg":"Hello Knative Kafka Sink!"} +``` + + +## Quick testing without Kubernetes + +RUn Kafka Sink receive adapter from command line: + +``` +export KAFKA_SERVER=localhost:9092 +export KAFKA_TOPIC= test-topic +go run cmd/receive_adapter/main.go +``` + +Send cloud event to Kafka Sink: + +``` +curl -v "http://localhost:8080" \ + -X POST \ + -H "Ce-Id: test-kafka-sink" \ + -H "Ce-Specversion: 1.0" \ + -H "Ce-Type: greeting" \ + -H "Ce-Source: not-sendoff" \ + -H "Content-Type: application/json" \ + -d '{"msg":"Hello Kafka Sink!"}' +``` + diff --git a/kafka/sink/cmd/receive_adapter/main.go b/kafka/sink/cmd/receive_adapter/main.go new file mode 100644 index 0000000000..70dd34c537 --- /dev/null +++ b/kafka/sink/cmd/receive_adapter/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "io" + "log" + "os" + + "github.com/Shopify/sarama" + cloudeventskafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" + cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/kelseyhightower/envconfig" +) + +type envConfig struct { + // Port on which to listen for cloudevents + Port int `envconfig:"PORT" default:"8080"` + + // KafkaServer URL to connect to the Kafka server. + KafkaServer string `envconfig:"KAFKA_SERVER" required:"true"` + + // Subject is the nats subject to publish cloudevents on. + Topic string `envconfig:"KAFKA_TOPIC" required:"true"` +} + +const ( + count = 10 +) + +func main() { + saramaConfig := sarama.NewConfig() + saramaConfig.Version = sarama.V2_0_0_0 + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Printf("[ERROR] Failed to process envirnment variables: %s", err) + os.Exit(1) + } + + ctx := context.Background() + + log.Printf("Using HTTP PORT=%d", env.Port) + httpProtocol, err := cloudeventshttp.New(cloudeventshttp.WithPort(env.Port)) + if err != nil { + log.Fatalf("failed to create http protocol: %s", err.Error()) + } + + log.Printf("Sinking to KAFKA_SERVER=%s KAFKA_TOPIC=%s", env.KafkaServer, env.Topic) + kafkaProtocol, err := cloudeventskafka.NewSender([]string{env.KafkaServer}, saramaConfig, env.Topic) + if err != nil { + log.Fatalf("failed to create Kafka protcol, %s", err.Error()) + } + + defer kafkaProtocol.Close(ctx) + + // Pipe all messages incoming from httpProtocol to kafkaProtocol + go func() { + for { + log.Printf("Ready to receive") + // Blocking call to wait for new messages from httpProtocol + message, err := httpProtocol.Receive(ctx) + log.Printf("Received message") + if err != nil { + if err == io.EOF { + return // Context closed and/or receiver closed + } + log.Printf("Error while receiving a message: %s", err.Error()) + } + log.Printf("Sending message to Kafka") + err = kafkaProtocol.Send(ctx, message) + if err != nil { + log.Printf("Error while forwarding the message: %s", err.Error()) + } + } + }() + + // Start the HTTP Server invoking OpenInbound() + go func() { + if err := httpProtocol.OpenInbound(ctx); err != nil { + log.Printf("failed to StartHTTPReceiver, %v", err) + } + }() + + <-ctx.Done() +} diff --git a/kafka/sink/samples/kafka-sink-deployment.yaml b/kafka/sink/samples/kafka-sink-deployment.yaml new file mode 100644 index 0000000000..9e1e6e5530 --- /dev/null +++ b/kafka/sink/samples/kafka-sink-deployment.yaml @@ -0,0 +1,24 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka-sink +spec: + replicas: 1 + selector: + matchLabels: &labels + app: kafka-sink + template: + metadata: + labels: *labels + spec: + containers: + - name: kafka-sink + # Source code: https://github.com/knative/eventing-contrib/tree/master/kafka/sink/cms/receive_adapter + #image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/kafka/sink/cms/receive_adapter + #image: aslom/receive_adapter-e1b9f0da1df39ff0b2252942b7d14bee:latest + image: ko://knative.dev/eventing-contrib/kafka/sink/cmd/receive_adapter + env: + - name: KAFKA_SERVER + value: my-cluster-kafka-bootstrap.kafka:9092 + - name: KAFKA_TOPIC + value: knative-sink-topic diff --git a/kafka/sink/samples/kafka-sink-service.yaml b/kafka/sink/samples/kafka-sink-service.yaml new file mode 100644 index 0000000000..ad823a42eb --- /dev/null +++ b/kafka/sink/samples/kafka-sink-service.yaml @@ -0,0 +1,12 @@ +kind: Service +apiVersion: v1 +metadata: + name: kafka-sink +spec: + selector: + app: kafka-sink + ports: + - protocol: TCP + port: 80 + targetPort: 8080 + From aafa478b108797e699e578c014852ce0c587745e Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Thu, 23 Jul 2020 16:54:57 -0400 Subject: [PATCH 2/4] documented and tested running local and in k8s --- kafka/sink/README.md | 21 ++- kafka/sink/cmd/receive_adapter/main.go | 92 +++--------- kafka/sink/pkg/adapter/adapter.go | 136 +++++++++++++++++ kafka/sink/pkg/adapter/adapter_test.go | 139 ++++++++++++++++++ kafka/sink/samples/kafka-sink-deployment.yaml | 6 + 5 files changed, 318 insertions(+), 76 deletions(-) create mode 100644 kafka/sink/pkg/adapter/adapter.go create mode 100644 kafka/sink/pkg/adapter/adapter_test.go diff --git a/kafka/sink/README.md b/kafka/sink/README.md index 9f3d77cadd..bda471e402 100644 --- a/kafka/sink/README.md +++ b/kafka/sink/README.md @@ -108,10 +108,13 @@ RUn Kafka Sink receive adapter from command line: ``` export KAFKA_SERVER=localhost:9092 export KAFKA_TOPIC= test-topic +export NAMESPACE=test +export K_LOGGING_CONFIG="" +export K_METRICS_CONFIG="" go run cmd/receive_adapter/main.go ``` -Send cloud event to Kafka Sink: +And send an event to Kafka Sink: ``` curl -v "http://localhost:8080" \ @@ -124,3 +127,19 @@ curl -v "http://localhost:8080" \ -d '{"msg":"Hello Kafka Sink!"}' ``` +Expected output: + +``` +{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:110","msg":"Successfully created the logger."} +{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:111","msg":"Logging level set to info"} +{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:78","msg":"Fetch GitHub commit ID from kodata failed","error":"\"KO_DATA_PATH\" does not exist or is empty"} +{"level":"error","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"v2/main.go:71","msg":"failed to process metrics options{error 25 0 json options string is empty}","stacktrace":"knative.dev/eventing/pkg/adapter/v2.MainWithContext\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:71\nknative.dev/eventing/pkg/adapter/v2.Main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:46\nmain.main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/kafka/sink/cmd/receive_adapter/main.go:26\nruntime.main\n\t/usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:203"} +{"level":"warn","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"v2/config.go:163","msg":"Tracing configuration is invalid, using the no-op default{error 25 0 empty json tracing config}"} +{"level":"info","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"adapter/adapter.go:84","msg":"Using HTTP PORT=%d8080"} +{"level":"info","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"adapter/adapter.go:95","msg":"Sinking from HTTP to KAFKA_SERVER=%s KAFKA_TOPIC=%slocalhost:9092test-topic"} +{"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:131","msg":"Server is ready to handle requests."} +{"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} +{"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:109","msg":"Received message"} +{"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:116","msg":"Sending message to Kafka"} +{"level":"info","ts":"2020-07-23T16:25:16.291-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} +``` \ No newline at end of file diff --git a/kafka/sink/cmd/receive_adapter/main.go b/kafka/sink/cmd/receive_adapter/main.go index 70dd34c537..45b8bd66bb 100644 --- a/kafka/sink/cmd/receive_adapter/main.go +++ b/kafka/sink/cmd/receive_adapter/main.go @@ -1,85 +1,27 @@ -package main +/* +Copyright 2018 The Knative Authors -import ( - "context" - "io" - "log" - "os" +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - "github.com/Shopify/sarama" - cloudeventskafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" - cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" - "github.com/kelseyhightower/envconfig" -) + https://www.apache.org/licenses/LICENSE-2.0 -type envConfig struct { - // Port on which to listen for cloudevents - Port int `envconfig:"PORT" default:"8080"` +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ - // KafkaServer URL to connect to the Kafka server. - KafkaServer string `envconfig:"KAFKA_SERVER" required:"true"` +package main - // Subject is the nats subject to publish cloudevents on. - Topic string `envconfig:"KAFKA_TOPIC" required:"true"` -} +import ( + "knative.dev/eventing/pkg/adapter/v2" -const ( - count = 10 + kafkasinkadapter "knative.dev/eventing-contrib/kafka/sink/pkg/adapter" ) func main() { - saramaConfig := sarama.NewConfig() - saramaConfig.Version = sarama.V2_0_0_0 - - var env envConfig - if err := envconfig.Process("", &env); err != nil { - log.Printf("[ERROR] Failed to process envirnment variables: %s", err) - os.Exit(1) - } - - ctx := context.Background() - - log.Printf("Using HTTP PORT=%d", env.Port) - httpProtocol, err := cloudeventshttp.New(cloudeventshttp.WithPort(env.Port)) - if err != nil { - log.Fatalf("failed to create http protocol: %s", err.Error()) - } - - log.Printf("Sinking to KAFKA_SERVER=%s KAFKA_TOPIC=%s", env.KafkaServer, env.Topic) - kafkaProtocol, err := cloudeventskafka.NewSender([]string{env.KafkaServer}, saramaConfig, env.Topic) - if err != nil { - log.Fatalf("failed to create Kafka protcol, %s", err.Error()) - } - - defer kafkaProtocol.Close(ctx) - - // Pipe all messages incoming from httpProtocol to kafkaProtocol - go func() { - for { - log.Printf("Ready to receive") - // Blocking call to wait for new messages from httpProtocol - message, err := httpProtocol.Receive(ctx) - log.Printf("Received message") - if err != nil { - if err == io.EOF { - return // Context closed and/or receiver closed - } - log.Printf("Error while receiving a message: %s", err.Error()) - } - log.Printf("Sending message to Kafka") - err = kafkaProtocol.Send(ctx, message) - if err != nil { - log.Printf("Error while forwarding the message: %s", err.Error()) - } - } - }() - - // Start the HTTP Server invoking OpenInbound() - go func() { - if err := httpProtocol.OpenInbound(ctx); err != nil { - log.Printf("failed to StartHTTPReceiver, %v", err) - } - }() - - <-ctx.Done() + adapter.Main("kafkasink", kafkasinkadapter.NewEnvConfig, kafkasinkadapter.NewAdapter) } diff --git a/kafka/sink/pkg/adapter/adapter.go b/kafka/sink/pkg/adapter/adapter.go new file mode 100644 index 0000000000..6e22210664 --- /dev/null +++ b/kafka/sink/pkg/adapter/adapter.go @@ -0,0 +1,136 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package adapter + +import ( + "context" + "fmt" + "io" + "strconv" + + "github.com/Shopify/sarama" + cloudeventskafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" + cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "go.uber.org/zap" + "knative.dev/eventing/pkg/adapter/v2" + "knative.dev/pkg/logging" +) + +type envConfig struct { + adapter.EnvConfig + + // Port on which to listen for cloudevents + Port string `envconfig:"PORT" default:"8080"` + + // KafkaServer URL to connect to the Kafka server. + KafkaServer string `envconfig:"KAFKA_SERVER" required:"true"` + + // Topic to publish cloudevents to. + Topic string `envconfig:"KAFKA_TOPIC" required:"true"` +} + +// NewEnvConfig function reads env variables defined in envConfig structure and +// returns accessor interface +func NewEnvConfig() adapter.EnvConfigAccessor { + return &envConfig{} +} + +// kafkaSinkAdapter acts as Sink that sends incoming CloudEvents to Kafka topic +type kafkaSinkAdapter struct { + logger *zap.SugaredLogger + client cloudevents.Client + //source string + + port string + kafkaServer string + topic string +} + +// NewAdapter returns the instance of gitHubReceiveAdapter that implements adapter.Adapter interface +func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter { + logger := logging.FromContext(ctx) + env := processed.(*envConfig) + + return &kafkaSinkAdapter{ + logger: logger, + client: ceClient, + port: env.Port, + kafkaServer: env.KafkaServer, + topic: env.Topic, + } +} + +func (a *kafkaSinkAdapter) Start(ctx context.Context) error { + saramaConfig := sarama.NewConfig() + saramaConfig.Version = sarama.V2_0_0_0 + + //done := make(chan bool, 1) + + a.logger.Info("Using HTTP PORT=%d", a.port) + port, err := strconv.Atoi(a.port) + if err != nil { + return fmt.Errorf("failed to parse HTTP port %s: %v", a.port, err) + } + + httpProtocol, err := cloudeventshttp.New(cloudeventshttp.WithPort(port)) + if err != nil { + return fmt.Errorf("failed to create HTTP protocol: %v", err) + } + + a.logger.Info("Sinking from HTTP to KAFKA_SERVER=%s KAFKA_TOPIC=%s", a.kafkaServer, a.topic) + kafkaProtocol, err := cloudeventskafka.NewSender([]string{a.kafkaServer}, saramaConfig, a.topic) + if err != nil { + return fmt.Errorf("failed to create Kafka protcol: %v", err) + } + + defer kafkaProtocol.Close(ctx) + + // Pipe all messages incoming from httpProtocol to kafkaProtocol + go func() { + for { + a.logger.Info("Ready to receive") + // Blocking call to wait for new messages from httpProtocol + message, err := httpProtocol.Receive(ctx) + a.logger.Info("Received message") + if err != nil { + if err == io.EOF { + return // Context closed and/or receiver closed + } + a.logger.Info("Error while receiving a message: %s", err.Error()) + } + a.logger.Info("Sending message to Kafka") + err = kafkaProtocol.Send(ctx, message) + if err != nil { + a.logger.Info("Error while forwarding the message: %s", err.Error()) + } + } + }() + + // Start the HTTP Server invoking OpenInbound() + go func() { + if err := httpProtocol.OpenInbound(ctx); err != nil { + a.logger.Info("failed to StartHTTPReceiver, %v", err) + } + }() + + a.logger.Infof("Server is ready to handle requests.") + + <-ctx.Done() + a.logger.Infof("Server stopped") + return nil +} diff --git a/kafka/sink/pkg/adapter/adapter_test.go b/kafka/sink/pkg/adapter/adapter_test.go new file mode 100644 index 0000000000..b21a3298c0 --- /dev/null +++ b/kafka/sink/pkg/adapter/adapter_test.go @@ -0,0 +1,139 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Veroute.on 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package adapter + +import ( + "bytes" + "encoding/json" + "net/http" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" + + "knative.dev/eventing/pkg/adapter/v2" + adaptertest "knative.dev/eventing/pkg/adapter/v2/test" + "knative.dev/pkg/logging" + pkgtesting "knative.dev/pkg/reconciler/testing" +) + +const ( + eventID = "12345" +) + +// testCase holds a single row of our GitHubSource table tests +type testCase struct { + // name is a descriptive name for this test suitable as a first argument to t.Run() + name string + + // payload contains the CloudEvents event payload + payload interface{} +} + +// type TestPayload struct { +// Id string `json:"id"` +// } + +var testCases = []testCase{ + // { + // name: "valid cloud event", + // payload: func() interface{} { + // pl := TestPayload{} + // pl.Id = eventID + // retun pl + // }(), + // }, +} + +func newTestAdapter(t *testing.T, ce cloudevents.Client) *kafkaSinkAdapter { + env := envConfig{ + EnvConfig: adapter.EnvConfig{ + Namespace: "default", + }, + Port: "8023", + KafkaServer: "test.kafka", + Topic: "test-topic", + } + ctx, _ := pkgtesting.SetupFakeContext(t) + logger := zap.NewExample().Sugar() + ctx = logging.WithLogger(ctx, logger) + + return NewAdapter(ctx, &env, ce).(*kafkaSinkAdapter) +} + +func TestGracefulShutdown(t *testing.T) { + // ce := adaptertest.NewTestClient() + // ra := newTestAdapter(t, ce) + // ctx, cancel := context.WithCancel(context.Background()) + + // go func(cancel context.CancelFunc) { + // defer cancel() + // time.Sleep(time.Second) + + // }(cancel) + + // t.Logf("starting Kafka Sink server") + + // err := ra.Start(ctx) + // if err != nil { + // t.Error(err) + // } +} + +func TestServer(t *testing.T) { + // for _, tc := range testCases { + // ce := adaptertest.NewTestClient() + // adapter := newTestAdapter(t, ce) + + // //router := adapter.newRouter() + // //server := httptest.NewServer(router) + // //defer server.Close() + + // //t.Run(tc.name, tc.runner(t, server.URL, ce)) + // } +} + +// runner returns a testing func that can be passed to t.Run. +func (tc *testCase) runner(t *testing.T, url string, ceClient *adaptertest.TestCloudEventsClient) func(t *testing.T) { + return func(t *testing.T) { + // if tc.eventType == "" { + // t.Fatal("eventType is required for table tests") + // } + body, _ := json.Marshal(tc.payload) + req, err := http.NewRequest("POST", url, bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + + //req.Header.Set(common.GHHeaderEvent, tc.eventType) + //req.Header.Set(common.GHHeaderDelivery, eventID) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Error(err) + } + defer resp.Body.Close() + + tc.validateAcceptedPayload(t, ceClient) + } +} + +func (tc *testCase) validateAcceptedPayload(t *testing.T, ce *adaptertest.TestCloudEventsClient) { + t.Helper() + if len(ce.Sent()) != 1 { + return + } +} diff --git a/kafka/sink/samples/kafka-sink-deployment.yaml b/kafka/sink/samples/kafka-sink-deployment.yaml index 9e1e6e5530..5fcc71d3e2 100644 --- a/kafka/sink/samples/kafka-sink-deployment.yaml +++ b/kafka/sink/samples/kafka-sink-deployment.yaml @@ -22,3 +22,9 @@ spec: value: my-cluster-kafka-bootstrap.kafka:9092 - name: KAFKA_TOPIC value: knative-sink-topic + - name: NAMESPACE + value: ignore + - name: K_LOGGING_CONFIG + value: "" + - name: K_METRICS_CONFIG + value: "" From 7bf32ae8b3c0bea75918fa50a2fcf11c2f3fa54a Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Thu, 23 Jul 2020 17:23:28 -0400 Subject: [PATCH 3/4] add example output --- kafka/sink/README.md | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/kafka/sink/README.md b/kafka/sink/README.md index bda471e402..e3895597fc 100644 --- a/kafka/sink/README.md +++ b/kafka/sink/README.md @@ -84,13 +84,18 @@ Check Kafka sink logs: ``` k logs -f kafka-sink-694fd8f689-smwq4 -2020/07/22 22:13:03 Using HTTP PORT=8080 -2020/07/22 22:13:03 Sinking to KAFKA_SERVER=my-cluster-kafka-bootstrap.kafka:9092 KAFKA_TOPIC=knative-sink-topic -2020/07/22 22:13:03 Ready to receive -2020/07/22 22:19:46 Received message -2020/07/22 22:19:46 Sending message to Kafka -2020/07/22 22:19:47 Ready to receive -``` +{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:110","msg":"Successfully created the logger."} +{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:111","msg":"Logging level set to info"} +{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:78","msg":"Fetch GitHub commit ID from kodata failed","error":"\"KO_DATA_PATH\" does not exist or is empty"} +{"level":"error","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"v2/main.go:71","msg":"failed to process metrics options{error 25 0 json options string is empty}","stacktrace":"knative.dev/eventing/pkg/adapter/v2.MainWithContext\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:71\nknative.dev/eventing/pkg/adapter/v2.Main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:46\nmain.main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/kafka/sink/cmd/receive_adapter/main.go:26\nruntime.main\n\t/usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:203"} +{"level":"warn","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"v2/config.go:163","msg":"Tracing configuration is invalid, using the no-op default{error 25 0 empty json tracing config}"} +{"level":"info","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"adapter/adapter.go:84","msg":"Using HTTP PORT=%d8080"} +{"level":"info","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"adapter/adapter.go:95","msg":"Sinking from HTTP to KAFKA_SERVER=%s KAFKA_TOPIC=%slocalhost:9092test-topic"} +{"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:131","msg":"Server is ready to handle requests."} +{"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} +{"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:109","msg":"Received message"} +{"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:116","msg":"Sending message to Kafka"} +{"level":"info","ts":"2020-07-23T16:25:16.291-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"}``` And verify that event was received by Kafka, for example @@ -130,16 +135,16 @@ curl -v "http://localhost:8080" \ Expected output: ``` -{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:110","msg":"Successfully created the logger."} -{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:111","msg":"Logging level set to info"} -{"level":"info","ts":"2020-07-23T16:25:09.138-0400","caller":"logging/config.go:78","msg":"Fetch GitHub commit ID from kodata failed","error":"\"KO_DATA_PATH\" does not exist or is empty"} -{"level":"error","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"v2/main.go:71","msg":"failed to process metrics options{error 25 0 json options string is empty}","stacktrace":"knative.dev/eventing/pkg/adapter/v2.MainWithContext\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:71\nknative.dev/eventing/pkg/adapter/v2.Main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:46\nmain.main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/kafka/sink/cmd/receive_adapter/main.go:26\nruntime.main\n\t/usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:203"} -{"level":"warn","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"v2/config.go:163","msg":"Tracing configuration is invalid, using the no-op default{error 25 0 empty json tracing config}"} -{"level":"info","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"adapter/adapter.go:84","msg":"Using HTTP PORT=%d8080"} -{"level":"info","ts":"2020-07-23T16:25:09.139-0400","logger":"kafkasink","caller":"adapter/adapter.go:95","msg":"Sinking from HTTP to KAFKA_SERVER=%s KAFKA_TOPIC=%slocalhost:9092test-topic"} -{"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:131","msg":"Server is ready to handle requests."} -{"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} -{"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:109","msg":"Received message"} -{"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:116","msg":"Sending message to Kafka"} -{"level":"info","ts":"2020-07-23T16:25:16.291-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} +{"level":"info","ts":"2020-07-23T17:22:40.126-0400","caller":"logging/config.go:110","msg":"Successfully created the logger."} +{"level":"info","ts":"2020-07-23T17:22:40.126-0400","caller":"logging/config.go:111","msg":"Logging level set to info"} +{"level":"info","ts":"2020-07-23T17:22:40.126-0400","caller":"logging/config.go:78","msg":"Fetch GitHub commit ID from kodata failed","error":"\"KO_DATA_PATH\" does not exist or is empty"} +{"level":"error","ts":"2020-07-23T17:22:40.126-0400","logger":"kafkasink","caller":"v2/main.go:71","msg":"failed to process metrics options{error 25 0 json options string is empty}","stacktrace":"knative.dev/eventing/pkg/adapter/v2.MainWithContext\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:71\nknative.dev/eventing/pkg/adapter/v2.Main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/vendor/knative.dev/eventing/pkg/adapter/v2/main.go:46\nmain.main\n\t/Users/aslom/Documents/awsm/go/src/knative.dev/eventing-contrib/kafka/sink/cmd/receive_adapter/main.go:26\nruntime.main\n\t/usr/local/Cellar/go/1.14/libexec/src/runtime/proc.go:203"} +{"level":"warn","ts":"2020-07-23T17:22:40.127-0400","logger":"kafkasink","caller":"v2/config.go:163","msg":"Tracing configuration is invalid, using the no-op default{error 25 0 empty json tracing config}"} +{"level":"info","ts":"2020-07-23T17:22:40.127-0400","logger":"kafkasink","caller":"adapter/adapter.go:84","msg":"Using HTTP PORT=%d8080"} +{"level":"info","ts":"2020-07-23T17:22:40.127-0400","logger":"kafkasink","caller":"adapter/adapter.go:95","msg":"Sinking from HTTP to KAFKA_SERVER=%s KAFKA_TOPIC=%slocalhost:9092test-topic"} +{"level":"info","ts":"2020-07-23T17:22:40.132-0400","logger":"kafkasink","caller":"adapter/adapter.go:131","msg":"Server is ready to handle requests."} +{"level":"info","ts":"2020-07-23T17:22:40.132-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} +{"level":"info","ts":"2020-07-23T17:22:46.093-0400","logger":"kafkasink","caller":"adapter/adapter.go:109","msg":"Received message"} +{"level":"info","ts":"2020-07-23T17:22:46.093-0400","logger":"kafkasink","caller":"adapter/adapter.go:116","msg":"Sending message to Kafka"} +{"level":"info","ts":"2020-07-23T17:22:46.100-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} ``` \ No newline at end of file From afacb213923dd08b85c8ba0ac46bef883b926cbe Mon Sep 17 00:00:00 2001 From: Aleksander Slominski Date: Fri, 24 Jul 2020 14:47:43 -0400 Subject: [PATCH 4/4] update README, remove not needed comments --- kafka/sink/README.md | 3 +- kafka/sink/pkg/adapter/adapter.go | 4 +-- kafka/sink/pkg/adapter/adapter_test.go | 46 +------------------------- 3 files changed, 4 insertions(+), 49 deletions(-) diff --git a/kafka/sink/README.md b/kafka/sink/README.md index e3895597fc..4a7733b90e 100644 --- a/kafka/sink/README.md +++ b/kafka/sink/README.md @@ -95,7 +95,8 @@ k logs -f kafka-sink-694fd8f689-smwq4 {"level":"info","ts":"2020-07-23T16:25:09.144-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} {"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:109","msg":"Received message"} {"level":"info","ts":"2020-07-23T16:25:16.286-0400","logger":"kafkasink","caller":"adapter/adapter.go:116","msg":"Sending message to Kafka"} -{"level":"info","ts":"2020-07-23T16:25:16.291-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"}``` +{"level":"info","ts":"2020-07-23T16:25:16.291-0400","logger":"kafkasink","caller":"adapter/adapter.go:106","msg":"Ready to receive"} +``` And verify that event was received by Kafka, for example diff --git a/kafka/sink/pkg/adapter/adapter.go b/kafka/sink/pkg/adapter/adapter.go index 6e22210664..abafa5bd06 100644 --- a/kafka/sink/pkg/adapter/adapter.go +++ b/kafka/sink/pkg/adapter/adapter.go @@ -61,7 +61,7 @@ type kafkaSinkAdapter struct { topic string } -// NewAdapter returns the instance of gitHubReceiveAdapter that implements adapter.Adapter interface +// NewAdapter returns the instance of kafkaSinkAdapter that implements adapter.Adapter interface func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter { logger := logging.FromContext(ctx) env := processed.(*envConfig) @@ -79,8 +79,6 @@ func (a *kafkaSinkAdapter) Start(ctx context.Context) error { saramaConfig := sarama.NewConfig() saramaConfig.Version = sarama.V2_0_0_0 - //done := make(chan bool, 1) - a.logger.Info("Using HTTP PORT=%d", a.port) port, err := strconv.Atoi(a.port) if err != nil { diff --git a/kafka/sink/pkg/adapter/adapter_test.go b/kafka/sink/pkg/adapter/adapter_test.go index b21a3298c0..cc02d7ddec 100644 --- a/kafka/sink/pkg/adapter/adapter_test.go +++ b/kafka/sink/pkg/adapter/adapter_test.go @@ -44,20 +44,7 @@ type testCase struct { payload interface{} } -// type TestPayload struct { -// Id string `json:"id"` -// } - -var testCases = []testCase{ - // { - // name: "valid cloud event", - // payload: func() interface{} { - // pl := TestPayload{} - // pl.Id = eventID - // retun pl - // }(), - // }, -} +var testCases = []testCase{} func newTestAdapter(t *testing.T, ce cloudevents.Client) *kafkaSinkAdapter { env := envConfig{ @@ -76,51 +63,20 @@ func newTestAdapter(t *testing.T, ce cloudevents.Client) *kafkaSinkAdapter { } func TestGracefulShutdown(t *testing.T) { - // ce := adaptertest.NewTestClient() - // ra := newTestAdapter(t, ce) - // ctx, cancel := context.WithCancel(context.Background()) - - // go func(cancel context.CancelFunc) { - // defer cancel() - // time.Sleep(time.Second) - - // }(cancel) - - // t.Logf("starting Kafka Sink server") - - // err := ra.Start(ctx) - // if err != nil { - // t.Error(err) - // } } func TestServer(t *testing.T) { - // for _, tc := range testCases { - // ce := adaptertest.NewTestClient() - // adapter := newTestAdapter(t, ce) - - // //router := adapter.newRouter() - // //server := httptest.NewServer(router) - // //defer server.Close() - - // //t.Run(tc.name, tc.runner(t, server.URL, ce)) - // } } // runner returns a testing func that can be passed to t.Run. func (tc *testCase) runner(t *testing.T, url string, ceClient *adaptertest.TestCloudEventsClient) func(t *testing.T) { return func(t *testing.T) { - // if tc.eventType == "" { - // t.Fatal("eventType is required for table tests") - // } body, _ := json.Marshal(tc.payload) req, err := http.NewRequest("POST", url, bytes.NewReader(body)) if err != nil { t.Fatal(err) } - //req.Header.Set(common.GHHeaderEvent, tc.eventType) - //req.Header.Set(common.GHHeaderDelivery, eventID) resp, err := http.DefaultClient.Do(req) if err != nil { t.Error(err)