Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Kafka sink #1406

Closed
wants to merge 4 commits into from
Closed

Kafka sink #1406

wants to merge 4 commits into from

Conversation

aslom
Copy link
Member

@aslom aslom commented Jul 23, 2020

Fixes #

Proposed Changes

Add Kafka Sink, a simple receive adapter that can be run standalone (described in README)

Ref: #682

Immediately useful for users that need to get events from knative to outside Kafka broker

Current version is limited: no security support, no unit tests, no controller

Docs etc. will be addressed in future when limitations are removed and Kafka Sink was tested for some time.

@googlebot googlebot added the cla: yes Indicates the PR's author has signed the CLA. label Jul 23, 2020
@knative-prow-robot knative-prow-robot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Jul 23, 2020
@aslom
Copy link
Member Author

aslom commented Jul 23, 2020

I followed code example provided by @slinkydeveloper and I think that current version can be useful for our users so hope we can get it merged. I listed limitations that will take longer time to implement such as creating CR and controller for Kafka Sink.

@lionelvillard was mentioning Kafka Sink may be useful for async PoC implementation?

/assign @slinkydeveloper @lionelvillard

}

// NewAdapter returns the instance of gitHubReceiveAdapter that implements adapter.Adapter interface
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you create a source like adapter if then you don't use the provided client?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is to structure it as receive adapter for future PR that add controller support and KafkaSink CR

)

func main() {
adapter.Main("kafkasink", kafkasinkadapter.NewEnvConfig, kafkasinkadapter.NewAdapter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it works on kube? adapter.Main requires a service account that can watch config maps and stuff, and from the yamls below it doesn't seem like you're using any

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It worked in k8s v1.18 - see example output in README.md


TODO: coming soon

## Deploying Kafka Sink to Kubernetes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, can we show an example with a sinkbinding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean example that shows how to use kafkasink instead of event-display? https://knative.dev/docs/eventing/samples/sinkbinding/

@@ -0,0 +1,30 @@
apiVersion: apps/v1
kind: Deployment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to have this thing being reflected by a CRD ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in future

@matzew
Copy link
Member

matzew commented Jul 24, 2020

@aslom how about adding this to the examples? I do not really see this being "library" code.

And it would be nice if we just go w/ a deployment, to take the binding into account here

@slinkydeveloper
Copy link
Contributor

I was wondering, how much it takes to go from this code to a crd like:

type: KafkaSink
...
spec:
  topics:
    - aaa
    - bbbb

and then this KafkaSink implements the addressable duck type?

@aslom
Copy link
Member Author

aslom commented Jul 24, 2020

Thank you for your comments @matzew @slinkydeveloper

About Kafka Sink CR - yes that is the goal. However I would like to get there is smaller incremental steps getting to full functionality in small-medium size PRs instead of large XXL (that is hard for me to write and others to test).

So this is first step - MVP - Kafka sink functionality for receive adapter. Do you think this approach could work?

I started looking into Kafka Sink CR but it may require more discussion - for example could Kafka Source and Kafka Sink share the same controller?

@knative-prow-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: aslom
To complete the pull request process, please assign lionelvillard
You can assign the PR to them by writing /assign @lionelvillard in a comment when ready.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-metrics-robot
Copy link

The following is the coverage report on the affected files.
Say /test pull-knative-eventing-contrib-go-coverage to re-run this coverage report

File Old Coverage New Coverage Delta
kafka/sink/pkg/adapter/adapter.go Do not exist 0.0%

@knative-prow-robot
Copy link
Contributor

@aslom: The following test failed, say /retest to rerun all failed tests:

Test name Commit Details Rerun command
pull-knative-eventing-contrib-go-coverage afacb21 link /test pull-knative-eventing-contrib-go-coverage

Full PR test history. Your PR dashboard.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here.

@aslom aslom mentioned this pull request Jul 24, 2020
@aslom
Copy link
Member Author

aslom commented Jul 27, 2020

@slinkydeveloper do you have any links (or ideas) how i could do unit test for kafka protocol (and to get code coverage pull-knative-eventing-contrib-go-coverage to pass)? Is there some way I can run unti test without requiring Kafka broker - intercept messages?

@slinkydeveloper
Copy link
Contributor

@aslom I think you should mock the sync producer (here you create a sender https://github.com/knative/eventing-contrib/pull/1406/files#diff-985279ee1eab4df1be848de7a44a119aR94 but you can create the same sender directly from the built sync producer). So you start the adapter, send events to it and then assert that on the other side of the producer you received something

@pierDipi
Copy link
Member

FYI, with some changes (that I will do anyway) to the receiver component from this project https://github.com/knative-sandbox/eventing-kafka-broker we get a Kafka Sink data plane. It already has tests (unit / integration / e2e).
We can use it in both a multi-tenant or single-tenant setup.

@aslom
Copy link
Member Author

aslom commented Jul 28, 2020

@pierDipi could you point me to kafka sink code and how you run unit tests (and do kafka sender mock)? I could not find it https://github.com/knative-sandbox/eventing-kafka-broker/search?p=2&q=kafka+sink&unscoped_q=kafka+sink

@pierDipi
Copy link
Member

It's written in Java: https://github.com/knative-sandbox/eventing-kafka-broker/tree/master/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver
Basically, the idea is to mount a config map with all configurations where each of them binds an HTTP request path (Addressable) to a Kafka topic.
Is this the goal of a KafkaSink?

For testing, we use MockProducer for unit tests, we create an embedded Kafka cluster so that we can debug things in an IDE out of k8s for integration tests, and for e2e tests the same as in this repo.

@pierDipi
Copy link
Member

All you need to do for implementing a KafkaSink is to write a control plane that updates ConfigMap(s) depending on single-tenant vs multi-tenant setup

@pierDipi
Copy link
Member

Let me know if you find it interesting so that I can prioritize the changes I mentioned before.

@aslom
Copy link
Member Author

aslom commented Jul 28, 2020

@pierDipi the goal of Kafka Sink is to be very simple to use Sink that send events ot Kafka topic: #682

I would like to make it very easy to use from CLI - the same way that event display can be deployed as a sink to show events. That is why it is just one file that listen over HTTP for CloudEvents, send them to Kafka topic.

I would like to keep using golang to have it symmetrical to Kafka Source.

I think mapping config maps looks more complex than the CLI use case I have described in the github issue? However, I do not know the details how alternative kafka broker is implemented so hard to compare for me.

@pierDipi
Copy link
Member

I would like to make it very easy to use from CLI - the same way that event display can be deployed as a sink to show events. That is why it is just one file that listen over HTTP for CloudEvents, send them to Kafka topic

I think mapping config maps looks more complex than the CLI use case I have described in the github issue?

I don't see many differences from a usability perspective between putting configurations in a ConfigMap vs environment variables.

I would like to keep using golang to have it symmetrical to Kafka Source.

So, in this case, re-using that component is not an option.

@slinkydeveloper
Copy link
Contributor

I would like to keep using golang to have it symmetrical to Kafka Source.

Is there any reason for that?

@aslom
Copy link
Member Author

aslom commented Jul 29, 2020

@slinkydeveloper I could not figure how to get to HTTP server started by SDK to send events for unit testing - i need to know HTTP server URL to send cloud events to it.

The closes I could find in SDK was https://github.com/cloudevents/sdk-go/blob/master/v2/client/http_receiver_test.go#L15

@aslom
Copy link
Member Author

aslom commented Jul 30, 2020

@slinkydeveloper I am not able to access syncProducer in kafka_sarama.Sender as the field is private
in protocol/kafka_sarama/v2/sender.go

type Sender struct {
	topic        string
	syncProducer sarama.SyncProducer
}

so that code will not work even though I have syncProducerMock

	sender := &cloudeventskafka.Sender{topic: topic, syncProducer: syncProducerMock}

I created PR to make possible to run

	sender, err := cloudeventskafka.NewSenderFromSyncProducer(topic, syncProducerMock)

PR: cloudevents/sdk-go#564

Any other/better way to run unit test then?

@matzew
Copy link
Member

matzew commented Aug 31, 2020

@aslom I'd suggest we close this, in fav of: knative-extensions/eventing-kafka-broker#134

@matzew
Copy link
Member

matzew commented Sep 9, 2020

@aslom here is a quick sample on the code:
https://gist.github.com/matzew/e2c2fcd2696a346f25b8bc9e64bfd0fa

Let's close this?

@aslom
Copy link
Member Author

aslom commented Sep 9, 2020

@matzew if I understand correctly the Kafka Sink form knative-extensions/eventing-kafka-broker#134 requires new broker and can not run independently? Maybe there can be more than one implementation of Kafka Sink depending how users what to use it? In particular Kafka Sink that runs in user namespace as ksvc? @pierDipi

@slinkydeveloper
Copy link
Contributor

requires new broker and can not run independently?

Not really, the data plane is abstract from the broker concept and this sink is "multi tenant"

@matzew
Copy link
Member

matzew commented Sep 10, 2020

Just read this text: https://gist.github.com/matzew/e2c2fcd2696a346f25b8bc9e64bfd0fa

I do not even have the broker deployed on my cluster 😄

/close

Closing this now, since we have a decent type, instead of more example work, and we can improve things over there

@knative-prow-robot
Copy link
Contributor

@matzew: Closed this PR.

In response to this:

Just read this text: https://gist.github.com/matzew/e2c2fcd2696a346f25b8bc9e64bfd0fa

I do not even have the broker deployed on my cluster 😄

/close

Closing this now, since we have a decent type, instead of more example work, and we can improve things over there

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
cla: yes Indicates the PR's author has signed the CLA. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants