Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal for an event key field as an extension #218

Merged
merged 1 commit into from
May 10, 2019

Conversation

jroper
Copy link
Contributor

@jroper jroper commented May 25, 2018

Closes #209.

This adds an eventKey field to the spec, for the purposes of establishing a causal relationship between events.

It would be worth discussing and getting feedback from multiple sources on the type of this field. Originally, I was going to make it Object, but if it's an object, it may need additional information around it such as a content type.

Instead I've made it String for simplicity, this works well with existing implementations such as AWS Kinesis and Azure Event Hubs, but may present an issue for Kafka which treats its keys as binary, and provides out of the box support for example for 64 bit long encoding and double precision floating point encoding. That said, it's not impossible for Kafka to support this, it just means any automated support for events in Kafka might require base64 encoding them, or potentially including additional metadata to indicate how its been encoded. Generally, this shouldn't be a problem anyway, since the value of the key is typcially opaque, and primarily useful for deciding on a partition, so as long as it is stable and can be hashed, it's fine.

@Vlaaaaaaad
Copy link

This seems kind of a duplicate of #128, doesn't it? This has a lower scope, but still.

Some context: in the May 24th call the PR that sparked this( #191) was discussed and due to the fact that the idea of event and function correlation( be it in simple chaining, more complicated workflows or whatever else) keeps coming up there will be a separate longer meeting to discuss tackling this( see Doodle at https://doodle.com/poll/kqkmdedrznpwgcq6 ). There are also 2 relevant workstreams for this: cncf/wg-serverless#56 for events and cncf/wg-serverless#55 for functions.

@duglin duglin added the v1.0 label May 25, 2018
@jroper
Copy link
Contributor Author

jroper commented May 29, 2018

I should be at the meeting later this week (as long as I remember to set my alarm, it's at 4am my time).

I did look at #128 first, the thing that made me think it's different is that it says the key is for an "application instance". The event key field definitely has nothing to do with applications, and is all about entities, indeed, multiple applications could emit events for the same entity. Though perhaps the wording is bad there, perhaps the wording there is specifically thinking about an IoT world, where application and entity are often the same thing.

If #128 was to fulfill this, then I think the following things would need to be addressed:

@BenBeattieHood
Copy link

I agree. A single cause (correlation-id) can have multiple events: eg. the command 'PromoteUser' might emit events 'UserMovedTeams', 'UserChangedTitle', 'UserPayGradeChanged' etc

@jroper
Copy link
Contributor Author

jroper commented May 30, 2018

Interestingly, I read #128 as being about correlation to an application, it does say in the description:

A path string pointing to a field in the event data that can be used to correlate all the events associated with the same application instance.

And then as an example it lists headers.body.cathy-house-number. A correlation id for the purposes of tracing a single interaction with the system, or correlating a request with a response, which I think is what you are referring to, is a completely different thing to what is being described in #128. But I think that's a major problem with #128, it's misleading, when people think "correlation-id" in the context of messaging, they think tracing and request/response correlation as you did. So that definitely has the wrong name I think. Existing implementations that have this concept use the word key (in some cases, partition key), so the name here should be key, so as not to be confused with correlation ids used for tracing.

@Vlaaaaaaad
Copy link

At the Correlation Meeting had on the 30th of May it was made clear by @jroper and @clemensv that this is a completely different thing that also needs to be addressed. Apologies for my misunderstanding.

In this context the correlation id may also be referred to as partition key. This partition key may be inserted in the bag of properties that will likely be defined by #128, or it may be added as part of each transport binding or integration, or it may be a top-level field as defined in this PR but that is something for people more familiar with the matter to decide.

Still, my initial comment saying that this is a duplicate of #128 was wrong and is to be disregarded.

@BenBeattieHood
Copy link

BenBeattieHood commented May 30, 2018

Thanks for clarifying @Vlaaaaaaad

I'd disagree that correlation id can be used as a partition key. The correlation id refers to the command or incident caused a batch of events.

In contrast, an aggregate instance considered as the boundary of concurrency. Each aggregate instance is asynchronous to others; but all events within an aggregate instance's stream are produced synchronously and need to be consumed in order.

Partition keys are therefore better leveraged per synchonous stream, therefore usually partitions are determined by an equation like: aggregate_id mod partition_count = partition_id. (fwiw scaling/autoscaling therefore often needs to take account of draining/reallocating in-flight messages based on the same equation)

@jroper
Copy link
Contributor Author

jroper commented Jun 6, 2018

As discussed in the meeting today, one of the main reasons why this would be useful is that there are multiple transports that support this as a first class concept, including Kafka, Azure Event Hubs and AWS Kinesis (and that's just the ones that I know off the top of my head - any cloud based messaging service that intends to handle events at scale is going to need to support some form of partitioning, and hence have a concept of a partition key).

Personally, I'm moving towards calling it a partitionKey, as this makes it absolutely clear what its intended purpose is. This could limit the scope to the extent of excluding other useful ways it would be used, but I do think the most important reason for its existence is partitioning, and by naming it partition key, this makes it exactly clear what it is and isn't, which I think is an important thing in a spec that is going to abstract across many different transports. There are many other types of keys out there when considering messaging, and there's a lot of overlap with keys and labels, and in the labels discussion we have decided to explicitly not mandate a particular name for them, rather they're just generic properties, introducing a generic name like eventKey for partition keys I think is in conflict with the decision made for labels, I think it needs to be partitionKey to make it distinct and give it a reason to exist as a first class concept. Any other uses of keys can be covered by using the properties field defined for labels.

@duglin
Copy link
Collaborator

duglin commented Jun 18, 2018

From the 6/15 f2f notes:

  • Because this is about getting this working with Kafka, we can solve this in transport concerns.
  • AI Clemens and Doug: Add this non-goal of defining transport

@bluemonk3y
Copy link

I would agree with @jroper - and also prefer 'partitionKey' - it maps on to Kafka, Kinesis, and Eventhub concerns and removes ambiguity about its purpose. The constraint of the key being that it needs to ensure correctness in the streams representation. So while it will be pushed into transport concerns it is a field that the user may need to control, it is unlikely that correlationId will meet the ordering/correctness where there is a fan-out, fan-in of events (or other patterns)

@duglin
Copy link
Collaborator

duglin commented Jun 22, 2018

More clarity from @clemensv about the f2f resolution :This means that the transport binding ought to define how to extract information like this from the event metadata. The partitioning key is a transport artifact.

@duglin
Copy link
Collaborator

duglin commented Aug 16, 2018

@jroper if the platform decides how the string is to be interpreted then that implies some out of band communication between the two - and not something that a random implementer of the spec can really use, correct?

If so, I wonder if:

  1. in order for this to be a useful property for the spec, if we need to actually be more prescriptive about its value and how consumers are to interpret it, or
  2. make it a multi-property thing where there's another property (e.g. "eventKeyType") that allows for a URL to indicate how the "eventKey" is meant to be interpreted. Then our spec offers the common place for information like this to go, but provides the extensibility for people to define their own interpretation rules. And we can even include some well-defined URLs for the common usecases. Or,
  3. leave this as an extension because each provider will have their own way they want this to appear. Which means we don't add anything to our spec.

@duglin
Copy link
Collaborator

duglin commented Sep 1, 2018

@jroper any update on this?

@duglin
Copy link
Collaborator

duglin commented Sep 7, 2018

@jroper ping

@jroper
Copy link
Contributor Author

jroper commented Sep 7, 2018

Sorry @duglin, I'm on paternity leave at the moment, is it ok if I get to this in a week?

@duglin
Copy link
Collaborator

duglin commented Sep 7, 2018 via email

@duglin
Copy link
Collaborator

duglin commented Sep 18, 2018

@jroper hope all is well with you and the little one... any update on this?

@clemensv clemensv mentioned this pull request Sep 18, 2018
@jroper
Copy link
Contributor Author

jroper commented Sep 20, 2018

Personally I prefer the first one, to be very prescriptive about what it is, given that the intended use by multiple transports all agree exactly about what it is and how it's meant to be used. I'll update the PR.

@jroper jroper force-pushed the event-key branch 2 times, most recently from c4ce4d1 to a71353b Compare September 20, 2018 00:30
@duglin
Copy link
Collaborator

duglin commented Sep 20, 2018

Couple of questions:

  • this attribute will link events together (presumably because they're in different queues) and the consumer is supposed process events with the same partitionKey in proper order, right? If so, what defines the order?
  • does this relate to the 'sequence' extension being proposed?

@bluemonk3y
Copy link

@duglin - I agree - the event key forms the partition key to ensure the logical stream is maintained and hence can be processed correctly. There should also be an event-time which is further used to determine when the event was created and can be used by processors to ensure correct ordering. The downside to this approach is that it presumes a WAL/clock sync, so it could be used in conjunction with a sequence that is incremented as subsequent events within the stream are generated. The benefit of the event-time is that it helps capture latency within the stream.

@duglin
Copy link
Collaborator

duglin commented Sep 20, 2018

If we expect the eventTime to be the ordering mechanism then it might be good to say that in there, and say that if partitionKey is present then eventTime MUST also be present.

@jroper
Copy link
Contributor Author

jroper commented Sep 21, 2018

No, it doesn't link events in different queues, it ensures that when a logical queue is partitioned into multiple physical queues, that linked events go into the same queue.

So let's talk about Kafka. Let's say I have an e-commerce app, and I have a topic called "order-events", where all events pertaining to orders are published. Now, this is a large e-commerce app, processing a million sales per second at peak times, there is no way a single topic (or the consumers of such a topic) can handle that throughput. So the topic is partitioned, let's say into 100 queues, so now the throughput on each partition is a managable 10000 events per second. But, how does the partitioning occur? How do you decide which partition to put a particular event on? The answer is the partition key. All events for the same order need to be put on the same partition so that the events for that order can be processed in order. So, for the partition key, I will use the order id. The order id then gets hashed to a partition id, and the event gets placed on the corresponding partition.

The whole point of this is to ensure that consumers don't need to put events back together in order to process them in order, it's to ensure no timestamp or sequence number mechanism is needed.

Event hubs, Kinesis, and many other pub sub brokers work in exactly the same way. And in fact, it's how distributed databases work too - you partition on a particular key so that all related data ends up in the same physical place so that processing can occur independently without coordination.

@duglin
Copy link
Collaborator

duglin commented Sep 21, 2018

@jroper thank you very much for that example - that really clears it up for me.
The text ...the order of delivery of related events, that is events with the same partition key, is maintained. read to me like "ordering across queues".

I wonder if something like this might be ok:

A partition key for the event, for the purposes of defining a
causal relationship between multiple events, such that if a
transport partitions events into multiple queues then all events
with the same partition key will be placed into the same
queue to ensure their delivery order will be maintained.

@bluemonk3y
Copy link

Personally, I struggle with the use of 'queue' as using 'stream' is a more natural fit - queue can imply functionality whereas stream implies a series of related events that use the same partition-key for a logical association.

@cathyhongzhang
Copy link

I would like this to be done in this PR so that everyone can be on the same page as to what this partition key means and how to use it.

@clemensv
Copy link
Contributor

clemensv commented Apr 11, 2019

@cathyhongzhang This mechanism here is for forming partitions, it's not for fine grained event correlation. What this here serves is that you're getting a million events per second and you need to split that up into 100 partitions so that each processor only needs to handle 10000 per second. And you're doing that with a key, so that all events with the same key land at the same processor; but you're not making the concrete partition choice with the key, but some hashing algorithm makes that choice for you. And if you need to break down the 10000 events/sec again, you need to pick a different key so that there's again even distribution across the 100 upstream partitions that then each ought to get 100 events/sec to handle.

Your notion of event correlation is a very different one. You want to be able to identify, for instance, whether a set of events were raised from different devices that all exist in the same room. That's a completely different motivation. It might compose with the partitioning model (i.e. you might want to keep events from the same place together), but you will also want that correlation also without partitioning even being in play.

@duglin
Copy link
Collaborator

duglin commented Apr 11, 2019

I like that example, I think it does a good job explaining why and when this type of field is needed in a very easy to follow usecase. And includes the need to change it between hops! Which was a big sticking point for finding a resolution.

@cathyhongzhang , if we added that to the text of this PR would that address your concerns?

@cathyhongzhang
Copy link

Thanks Clemensv for the explanation! So this is not for the purpose of defining a generic correlation key. @duglin , yes it will be good to add the explanation, which addresses my concern.

@jroper
Copy link
Contributor Author

jroper commented Apr 12, 2019

Just to make clear, it is the event source that knows the event key. Generally, the consumer doesn't care what the key is. The producer sets the event key so that the transport can then partition it correctly.

A more concrete use case might be an e-commerce system. Through the lifetime of a given order, the following events might be produced:

  1. Order submitted.
  2. Payment received.
  3. Order dispatched.

These events must be handled by consumers in order, since it doesn't make sense to handle an order dispatched event before the order submitted event for that order. But in a sufficiently large system, say where there are 100000 orders per second at peak times (eg during Black Friday sales), the events need to be partitioned in order for the system to be able to handle the load. If they were partitioned randomly, then the above messages could all end up on different partitions, and this could result in them being consumed out of order. So instead, when the producer publishes the event, it specifies a key for the event, which would be the ID of the order, and this allows the transport to ensure events with the same key end up on the same partition, and so are guaranteed to be consumed in the correct order. The consumer doesn't usually care about the key (usually, it will already be somewhere in the event anyway), since the transport has already ensured for the consumer that the events are in the correct order - the key has served its purpose by that point.

I'm not sure how much of this should go in the spec - is it the responsibility of the spec to teach people concepts about partitioning events? I would have thought no, this is a specification, not documentation, it shouldn't be teaching concepts. If the reader is unfamiliar with event partition keys, they can search "event partition keys" in Google, and literally every result on the first page of that search is a page that explains the concept (as it happens, most of the results are for Azure event hubs).

@duglin
Copy link
Collaborator

duglin commented Apr 12, 2019

@jroper thanks for the comment. I agree that our spec isn't the place to "teach" people stuff like that, but there are times when it helps to provide a bit of background on how we expect fields to be used, especially when the terseness of a spec can be misleading to folks who are not experts on that aspect. And that's one of the reasons behind the "primer". It's for useful info/guidance we want to share but isn't really appropriate for a formal spec. Now, having said that, since this attribute isn't part of the spec, it's an extension, for now I think it would make sense to add a short bit of descriptive text like you and @clemensv wrote into this extension's md file. If/when this attribute gets put into the spec itself, then that text can go into the primer.

Does this make sense?

@cathyhongzhang
Copy link

@jroper Your comment emphasizes that the partition key is to ensure that the events are handled by the consumers in order. But in general, the "guarantee order" part is usually not an attribute associated with a partition key concept. Sequence number is usually used to guarantee the order. Partition key is used a lot in data partition when a big chunk of data is broken up into pieces/segments (e.g. during streaming or for performance improvement of data store) and the key is used to group those segments together. Here is a link to AWS partition key concept https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html

Your concept of partition key serves two purposes: to ensure that events with the same key end up on the same partition, and to guarantee the events to be consumed in the correct order.

So what is the concept of this partition key in our spec?

How could the consumer use it? For example, is it only used for grouping events from the same source or could it be used for grouping events from different sources?

We have had several discussions on event correlation before. My understanding is that this is not positioned for generic event correlation. It will be helpful to the event consumers if we can define/explain this key's concept scope and usage.

@duglin
Copy link
Collaborator

duglin commented Apr 13, 2019

@cathyhongzhang , I'm sure @jroper or @clemensv will correct me if I'm wrong, but I believe that the difference between the partition key and the sequence number is that the partition key is set by the sender of the event for that particular hop, while the sequence number is more of an implementation detail of the receiving side that will put the message into the appropriate bucket/shard - and it will add/use the sequence number to ensure ordering. While some implementations might choose to expose the sequence number to the applications (e.g. perhaps add it as an extension CE attribute), that is really an implementation choice and not really needed by the receiving application since the infrastructure managing the events to/from the bucket/shard is responsible maintaining the order.

But, you said:

Your concept of partition key serves two purposes: to ensure that events with the same key end up on the same partition, and to guarantee the events to be consumed in the correct order.

and I don't believe that was @jroper's intent. Yes using the partition key implies there will be some ordering within each bucket, but the partition key itself does do the ordering - it's whatever mechanism the receiver uses to ensure it. Some may use a sequence number, but others might use something else - like an ordered queue. But, that's an implementation detail that we don't need to expose via this extension. And I think the AWS documentation you referenced is consistent with this.

Net: I actually think you're both in agreement and correct and I think what you're asking for is just some more clarification text. What if the introductory paragraph was modified to say this:

This extension defines an attribute for use by message brokers and their
clients that support partitioning of events, typically for the purpose of
scaling.

Often in large scale systems, during heavy load times, events being received need to be
partitioned into multiple buckets so that each bucket can be separately processed in order
for the system to manage the incoming load. A partitioning key can be used to determine
which bucket each event goes into. The entity sending the events can ensure that events
that need to be placed into the same bucket are done so by using the same partition key on
those events. This is often done to ensure they are processed in the correct order since
events within the same bucket will typically be processed serially. How the receiving entity
ensures this ordering is an implementation detail and out of scope of this specification.

@cathyhongzhang Does this help any?
@jroper @clemensv Did I get this right?

@duglin
Copy link
Collaborator

duglin commented Apr 17, 2019

ping @cathyhongzhang @jroper @clemensv ^^^

@cathyhongzhang
Copy link

@duglin Thanks for writing the clarification text. The text definitely helps. As to the sequence or ordering, I don't think it is all the receiving entity's job and it is not just an implementation detail. In many cases, because messages/events might be routed over different network paths which add different latency, the events could arrive at the receiving entity out of the original order. The sending entity has to put a sequence number in the event so that the receiving entity knows the order. As long as we do not put "are guaranteed to be consumed in the correct order" (taken from @jroper's text) or any "ensure order" text in the partition key description, I am good.

@duglin
Copy link
Collaborator

duglin commented Apr 17, 2019

Thanks @cathyhongzhang, what if I changed it to be this:

This extension defines an attribute for use by message brokers and their
clients that support partitioning of events, typically for the purpose of
scaling.

Often in large scale systems, during heavy load times, events being received need to be
partitioned into multiple buckets so that each bucket can be separately processed in order
for the system to manage the incoming load. A partitioning key can be used to determine
which bucket each event goes into. The entity sending the events can ensure that events
that need to be placed into the same bucket are done so by using the same partition key on
those events. This is often done to ensure the events are processed in the order in which
they are received since events within the same bucket will typically be processed serially.
How the receiving entity ensures this ordering is an implementation detail and out of scope
of this specification.

@cathyhongzhang
Copy link

@duglin Most of the part looks good to me. I feel we can remove this last part ----"This is often done to ensure the events are processed in the order in which they are received since events within the same bucket will typically be processed serially. How the receiving entity ensures this ordering is an implementation detail and out of scopeof this specification." Adding this part seems to cause more confusion than clarification. My 2-cent.

@duglin
Copy link
Collaborator

duglin commented Apr 17, 2019

I'm ok with that. @jroper you ok? If so, wanna update the PR? Maybe we can get it in tomorrow????

See cloudevents#209.

Defines a partitionKey extension attribute for use with message brokers
that support partitioning.

Signed-off-by: James Roper <james@jazzy.id.au>
@jroper
Copy link
Contributor Author

jroper commented Apr 18, 2019

Updated.

@duglin
Copy link
Collaborator

duglin commented Apr 18, 2019

@jroper thanks!

@duglin
Copy link
Collaborator

duglin commented Apr 18, 2019

@jroper we discussed this on today's call - and it was exciting :-)
A couple of things...
1 - could you watch the recording of it once its available? It should be available here: https://www.youtube.com/playlist?list=PLj6h78yzYM2Ph7YoBIgsZNW_RGJvNlFOt in a few days. And then comment on some of the issues/concerns that Heinz raised.
2 - I put the zoom chat transcript at the end of today's meeting minutes - could you read those? It might give you some insight into some of the concerns even before the video is ready.
3 - could you add some examples (use cases) to the PR? There seemed to be some confusion as to how this will be used, even in the Kafka case. In particular some folks seemed to think the info wasn't necessary and redundant. I think having a crisp explanation (as part of the PR text itself) could help this group as any future readers/users of the extension.

@duglin
Copy link
Collaborator

duglin commented Apr 23, 2019

@clemensv
Copy link
Contributor

clemensv commented May 8, 2019

The recording of the call was time well spent listening.

We're somewhat going in circles, unfortunately, but I don't think we're that far off from a resolution.

Further up in this thread, @duglin quotes me from the F2F: "This means that the transport binding ought to define how to extract information like this from the event metadata. The partitioning key is a transport artifact."

I agree with Heinz that "partitionKey" as an artifact is completely superfluous if the Kafka SDK client for CloudEvents provides a callback hook into which an app can hook a function that can have full visibility into the event, and then generate a partitionkey as its return value based on any combination of the event metadata and data. A variation of the callback could even determine the partition number outright by some algorithm.

At the same time, it doesn't hurt for there to be a "partitionKey" extension which the Kafka SDK client for CloudEvents can look for and use when there's no such callback registered. Creating the key while putting together the event is often pretty easy and keeps the app code straightforward in simpler scenarios. And such keys are not exclusive to Kafka - we have partitioned queues and sessions/groups in Service Bus and use partition/session keys to keep related data together in order, and those keys have a very similar function.

  • The Kafka binding should mandate a callback mechanism for the implementation that asks the application to provide a key based on a given event. That resulting key never materializes in the CloudEvent and is only used for the Kafka client to pick the partition.
  • We should also go forward with this extension that allows for a partitionKey to be specified by the app as it constructs the event, because there are plenty of scenarios where the publisher is aware of the architecture detail and wants to influence partitioning.
  • The Kafka binding should first rely on the callback, and if not present rely on the extension, and if not present pick a random partition.

@duglin
Copy link
Collaborator

duglin commented May 8, 2019

@clemensv thanks for that comment/compromise proposal - it also helped me understand some of the concerns that were being expressed.

What do people think? @jroper @hschaffner @Vlaaaaaaad @bluemonk3y @Tapppi

@bluemonk3y
Copy link

@clemensv and @duglin - LGTM. One minor point - for the transport binding, where the callback is specified, i.e. is it part of the configuration or an anonymous function injected at event-creation time. I see pros/cons for both.

@clemensv
Copy link
Contributor

clemensv commented May 8, 2019

@bluemonk3y The transport binding spec should mandate that the implementation provides a hook for an application-supplied function that generates a key or selects the partition on a per-event basis. The exact shape of that mechanism might vary between implementations; you could make this some extraction rule in a clever framework or just a free form function.

@duglin duglin changed the title Proposal for an event key field to the spec Proposal for an event key field as an extension May 9, 2019
@duglin
Copy link
Collaborator

duglin commented May 10, 2019

This PR was approved on today's call with the following conditions/actions:

Merging!!!! thanks everyone!

@duglin duglin merged commit 608c561 into cloudevents:master May 10, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Event keys