Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka transport binding v2 #337

Merged
merged 2 commits into from
Jun 27, 2019
Merged

Kafka transport binding v2 #337

merged 2 commits into from
Jun 27, 2019

Conversation

bluemonk3y
Copy link

@bluemonk3y bluemonk3y commented Nov 1, 2018

Kafka transport binding for CloudEvents, similar to the HTTP binding and proposed NATS, MQTT, AMQP bindings.

Follow-on PR from #300

Signed-off-by: Neil Avery neil@confluent.io

@duglin duglin mentioned this pull request Nov 1, 2018
@duglin
Copy link
Collaborator

duglin commented Nov 1, 2018

CI error is ok since it'll be automagically fixed once merged. But the DCO issue is real.

kafka-transport-binding.md Outdated Show resolved Hide resolved
kafka-transport-binding.md Outdated Show resolved Hide resolved

The receiver of the event can distinguish between the two content modes by
inspecting the `cloudEvents_contentType` property of the Kafka message. If the
value is prefixed with the CloudEvents media type `application/cloudevents`, indicating the use of a known [event
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap at 80


#### 3.1.3. Metadata Headers

All [CloudEvents][CE] attributes with exception of `data` MUST be individually
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we include extensions in this too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

@bluemonk3y
Copy link
Author

CI error is ok since it'll be automagically fixed once merged. But the DCO issue is real.

@duglin - yep I will fix that tomorrow

kafka-transport-binding.md Outdated Show resolved Hide resolved

##### 3.1.3.1 Property Names

Cloud Event attributes are prefixed with "cloudEvents_" for use in the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloudEvents_ ? instead of "cloudEvents_" ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should use some sort of fqn: io.cloudevents. to avoid potential clashes ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using cloudEvents_ as it has been generally adopted by other transport bindings. I believe we also need to avoid '.' characters. We should be consistent across all bindings.
@duglin - your thoughts on this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like consistency, but I think we need to be consistent with that users of the transport expect. I did a quick search and came across this: https://streamsets.com/documentation/controlhub/3.5.0/help/pdesigner/datacollector/UserGuide/Origins/KConsumer.html
and it talks about things like ssl.truststore.location and com.sun.security.auth.module.Krb5LoginModule. So Kafka does seem to be ok with '.'.
I don't know enough about Kafka to know if FQNs would be more "expected" or not, I'll leave that to others - I'd be ok with either.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clemensv - what is your opinion on this (above)? Do we adopt language packaging/namespace semantics like io.cloudevents or cloudEvents_ It does raise the question about event propagation between transports.

Copy link

@gwenshap gwenshap Nov 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the record, Apache Kafka never managed to decide on whether "." or "_" are preferred as separators. Configuration parameters are always "." separated (as the Streamset doc shows), but topics are often namespaced with "_" (for example, the internal "__consumer_offsets" topic). I've definitely seen both used in "the wild" (a fact that was sometimes inconvenient as the project had to deal with converting topic names to metric names...)

Either separator will be acceptable by the Apache Kafka community (or rather, either will be objected to, by different sets of people).

(duglin: I edited the comment to escape the underscore since it wasn't showing up and was being treated as the "turn on italics" symbol)


* `eventTime` maps to `cloudEvents_eventTime`
* `eventID` maps to `cloudEvents_eventID`
* `cloudEventsVersion` maps to `cloudEvents_cloudEventsVersion`
Copy link
Member

@matzew matzew Nov 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the general concept of this.

Our group has been doing exactly the same:
https://github.com/rh-event-flow/jcloudevents/blob/master/kafka/src/main/java/io/streamzi/cloudevents/kafka/util/KafkaHeaderUtil.java#L42

Ok, biggest diff is ours is on internal RecordHeader(s) API, and no prefix.

Perhaps it should be based on the Header(s) interface API

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/cloudEventsVersion/specversion/

kafka-transport-binding.md Outdated Show resolved Hide resolved
kafka-transport-binding.md Outdated Show resolved Hide resolved
@matzew
Copy link
Member

matzew commented Nov 7, 2018

@bluemonk3y we have some POC code for this. and it would be nice to get this into the Cloudevents-sdk for java.

See my proposal here:
cloudevents/sdk-java#5

looking forward working with you on that


------------------- key ----------------------

Key: mykey
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kafka message key is where things are still a bit blurry to me. How will its value be obtained from the CloudEvents event? Have you foreseen some extractor function or similar?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about that one, perhaps a header key should be defined whose value, if present, will be propagated as key of the Kafka message, e.g. cloudEvents_messageKey. Alternatively, the binding, wherever it's running, could be configured with some kind of path expression which gets applied to the data value to extract the value, e.g. /customer/id.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, there is a PR for this: #218

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks. I've commented over there, too. Seems that'd need resolution first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conflict we have here is that the partition key is a transport specific concern (and the requirements may vary across different transport options) and that the publisher ought not to care about what transport the event route gets bound to. We may also have the situation that an event gets first published via MQTT from a little device and then gets put on Kafka by a device gateway.

I think this binding needs to define a rule by which a key is constructed from the event rather than expecting that the event brings it along.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that the natural way to get a key for a Kafka message is to use the source.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if source is the right place - I'd expect all events produced by same application to have identical source, why partitioning typically applies to events produced by same source.

I like the proposal from @clemensv in #218 :
" instead of putting the burden of producing an event key on the client and then only having one, any transport that requires particular constructs such as keys such define a mechanism by which you can harvest/synthesize a key from an incoming CloudEvent as some sort of transform. The spec doesn't need to prescribe how -- it just needs to say that that's how the key materializes. A transform that just cooks up a random key might also be valid if that's what you want."

KafkaConnect already has "key extraction" transformation, exactly because external records require mapping to Kafka keys, and the logic for doing so varies between use-cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gwenshap @clemensv that would mean that the kafka transport spec would leave the determination of the "key" as an exercise for the implementer (or admin configuring the transport) ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would hope a reasonable implementation of transport would allow plugging in the "key selection" logic, since we can't know in advance what it will be. (Although I'm still quite new to CloudEvents, so take my opinions with a bit of salt)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added a comment on this over at #218. The idea being that instead of having fully pluggable logic for retrieving the key instead there could be a mechanism to just select the key from a given event field when sending a cloud event to a Kafka Topic.

@clemensv
Copy link
Contributor

clemensv commented Nov 8, 2018

The header prefixes might be a bit generous. I think we'll end up collapsing them to "ce" across the board.

@matzew
Copy link
Member

matzew commented Dec 12, 2018

I think we'll end up collapsing them to "ce" across the board.

+1

@JemDay
Copy link
Contributor

JemDay commented Mar 19, 2019

It seems this PR is a bit behind the 0.2 spec version ...

@duglin
Copy link
Collaborator

duglin commented May 10, 2019

ping @hschaffner - please review and add any comments w.r.t. why the partitionkey extension should not be needed.

ping @clemensv - please add comments about the call-back mechanism you mentioned in PR #429

@bluemonk3y now that #429 is merged, can you rebase this one to deal with the merge conflicts? Also, you may need to edit some of the text to deal with recent spec changes - such as the names of the attributes being changed (or lower-cased).

thanks everyone for your patience.

@bluemonk3y
Copy link
Author

@duglin - yep as soon as I get a chance.

@duglin
Copy link
Collaborator

duglin commented May 28, 2019

@bluemonk3y sorry to pester :-) but any chance of getting an update of this one? I get the sense we're nearing a pretty big milestone and this is one of the bigger outstanding items for us.

@bluemonk3y
Copy link
Author

@duglin - sure, I'm currently away but will pick it up on Monday.

@gunnarmorling
Copy link
Contributor

now that #429 is merged

@duglin, I reckon you mean #218?

Could you (or someone else) perhaps give a summary of the latest state of the discussion? The closed PR #218 adds the partitionkey attribute, but it actually might be removed again as per #430?

What @clemensv describes in #218 (comment) makes sense to me: make this a concern of the transport binding which would allow for a callback function for retrieving the partition/message key on an per-event basis. The spec-defined partitionkey attribute would still remain in place as basis for the default behavior in case no custom callback has been configured for the transport and a value can be defined by the producer.

One open question to me is how the callback would be defined. What exactly is it? E.g. a (Java) class implementing a certain interface for extracting the key based on an incoming event? Or would at the spec level just be defined that there is a callback mechanism but it'd then be up to specific implementations of this to define the concrete mechanism?

Thanks!

@duglin
Copy link
Collaborator

duglin commented Jun 4, 2019

@bluemonk3y the travis errors seem real. Also, can you sign your commits?

@duglin duglin removed the needs work label Jun 8, 2019
@duglin
Copy link
Collaborator

duglin commented Jun 10, 2019

Even though I didn't tag this as "v1.0" (it's "try-for-v1.0"), it's been lingering for a while so it'll be first on the PR review list this week - please look it over when you get a chance.


This example shows the *binary* mode mapping of an event into the
Kafka message. All other CloudEvents attributes
are mapped to Kafka Header fields with prefix `cloudEvents_`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ce_ might just be sufficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


Examples:

* `eventTime` maps to `cloudEvents_eventTime`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs update

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/eventTime/time/

Example for the [JSON format][JSON-format]:

``` text
content-type: application/cloudevents+json; charset=UTF-8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows no prefix

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this missing a ce_ or is the sentence in the first paragraph of this section incorrect?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intro paragraph is wrong

placed into the Kafka message value section
using an [event format](#14-event-formats).

In the *binary* content mode, the value of the event `data` attribute MUST be
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to move this paragraph up so it's next to the one on line 52, that also talks about "binary" mode?

Examples:

* `eventTime` maps to `cloudEvents_eventTime`
* `eventID` maps to `cloudEvents_eventID`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/eventID/id/

@duglin
Copy link
Collaborator

duglin commented Jun 13, 2019

@hschaffner did you have any comments on this one? In particular around the key stuff

@duglin
Copy link
Collaborator

duglin commented Jun 17, 2019

@bluemonk3y rebase needed - and there are some outstanding comments

kafka-transport-binding.md Outdated Show resolved Hide resolved
cloudEvents_eventTime: "2018-04-05T03:56:24Z"
cloudEvents_eventID: "1234-1234-1234"
cloudEvents_source: "/mycontext/subcontext"
ce_specVersion: "0.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/specVersion/specversion/

Copy link
Collaborator

@duglin duglin Jun 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and "0.4-wip"

README.md Outdated Show resolved Hide resolved
json-format.md Outdated Show resolved Hide resolved
kafka-transport-binding.md Outdated Show resolved Hide resolved
------------------- value --------------------

{
"specVersion" : "0.1",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/specVersion/specversion/
s/0.1/0.4-wip/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I fixed the wonky rebase!

@duglin
Copy link
Collaborator

duglin commented Jun 17, 2019

I think the rebase got a little wonky

Kafka transport binding for CloudEvents, similar to the HTTP binding and proposed NATS, MQTT, AMQP bindings.

Signed-off-by: Neil Avery <neil@confluent.io>
### 2.1. data Attribute

The `data` attribute is assumed to contain opaque application data that is
encoded as declared by the `contentType` attribute.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contentType or "datacontenttype" ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think I might be wrong, but I wanted to verify.

here.

The receiver of the event can distinguish between the two content modes by
inspecting the `ce_contentType` [Header][Kafka-Message-Header] of the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ce_contentType/ce_datacontenttype/ I think (but it's the "T" that jumped out at me)

kafka-transport-binding.md Outdated Show resolved Hide resolved

#### 3.3.1. Kafka Content-Type

The [Kafka `ce_contentType`] property field MUST be set to the media
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"T" -> "t"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think the "ce_" here is incorrect since this isn't a CE property. Right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree


------------------ headers -------------------

ce_contentType: application/cloudevents+json; charset=UTF-8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thin the "ce_" here is incorrect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree - removed

@@ -1,4 +1,4 @@
# CloudEvent Specs for Proprietary Protocols and Encodings
### CloudEvent Specs for Proprietary Protocols and Encodings
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is invalid

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

spec.md Outdated
@@ -58,6 +58,14 @@ that does not support that feature will then silently ignore that part of the
message. The sender needs to be prepared for the situation where a receiver
ignores that feature.

For clarity, when a feature is marked as "OPTIONAL" this means that it is
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the edits in this doc are invalid

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duglin - I believe I have applied the changes correctly ;) - would you mind reviewing?

Kafka transport binding for CloudEvents, similar to the HTTP binding and proposed NATS, MQTT, AMQP bindings.

Signed-off-by: Neil Avery <neil@confluent.io>
@duglin
Copy link
Collaborator

duglin commented Jun 21, 2019

@bluemonk3y looks good to me.

Can I get one more LGTM and then I'll merge!!

@duglin
Copy link
Collaborator

duglin commented Jun 27, 2019

Approved on the 6/20 call with the minor rebase fixes

@duglin
Copy link
Collaborator

duglin commented Jun 27, 2019

Approved, again :-) , on the 6/28 call - but this time with the rebase fixes

@duglin duglin merged commit 7c3bdb1 into cloudevents:master Jun 27, 2019
@duglin
Copy link
Collaborator

duglin commented Jun 27, 2019

@bluemonk3y et al... thanks a ton for your patience and hard work on this one!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants