-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add KafkaPartition and KafkaPartitionKey annotations #387
Add KafkaPartition and KafkaPartitionKey annotations #387
Conversation
Andre Prata seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
if (partitionKey != null) { | ||
byte[] partitionKeyBytes = Optional.ofNullable(serdeRegistry.pickSerializer(argument)) | ||
.orElseGet(ByteArraySerializer::new) | ||
.serialize(finalTopic, parameterValues[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how expensive it might be to pick a serializer from the registry on every single call.
If it's deemed expensive then we can/should keep it in a map, but I'm not entirely sure what it should be keyed by. I suppose for the current implementation it would make the most sense to key by Class
, but in the end that's more or less what I'd expect the registry implementations to do.
* <p>When used in consumers, it is populated with the partition that the record was received from.</p> | ||
* | ||
* <p>Note that while using {@link KafkaPartition} in the same method as {@link KafkaPartitionKey} | ||
* will not throw an exception, the outcome of doing so is left unspecified.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is good behaviour. I'm happy to implement a different one with some advice/guidance
Thanks for the contribution, this looks good |
Could you address the compilation errors? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolve compilation errors please.
I believe I fixed the Java 8 compilation issue. I'm not entirely sure whether GraalVM's build will be fixed though, I'm not sure why it failed |
Looks like one test is flaky, perhaps some concurrency issue and/or misuse of index-based access to queues, but I'm not sure since I don't have a strong grasp of the language. I'll look more into it but any input would be appreciated. |
yeah getting async tests right can be challenging it might be best to wait first until the size of the collection is what you expect and then do any index based queries |
Obviously the messages aren't necessarily received in the same order that they are sent, since in these tests they travel in different partitions
It was fairly obvious, actually. Should be good now. I appreciate the patience :-) |
Thanks for the contribution! |
Hi all,
Although I haven't faced this need in micronaut projects yet, I have found it potentially useful to control partitioning separately from record keys.
This is useful when Keys have specific functional requirement (deduplication, log compaction, etc), but where partitioning of a topic needs to be made on a separate field (often having a one to many relationship with record keys, although that's not a requirement).
This PR brings in the ability to do just that with two new additional annotations:
@KafkaPartition
and@KafkaPartitionKey
.