-
Notifications
You must be signed in to change notification settings - Fork 82
Add http endpoint to communicate dispatcher readySubscriber status #344
Add http endpoint to communicate dispatcher readySubscriber status #344
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few comments, great work Lukas!
} | ||
} | ||
} else { //ensure the pointer is populated or things go boom | ||
d.channelSubscriptions[channelRef] = &KafkaSubscription{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've a feeling this needs to be protected with a lock, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean like the outer lock d.consumerUpdateLock.Lock()
on L236?
/assign |
return nil | ||
} | ||
|
||
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). | ||
func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | ||
consumer.logger.Info(fmt.Sprintf("Starting partition consumer, topic: %s, partition: %d, initialOffset: %d", claim.Topic(), claim.Partition(), claim.InitialOffset())) | ||
consumer.handler.SetReady(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this logic IMO is not 100% fine, there is no such ready
state for a consumer group, there is only the concept of claims: consuming x partitions of y topics. I personally interpret the readyness of KafkaChannel (but that's a personal opinion and is subject to changes) as:
KafkaChannel is ready whenever all his consumer groups are consuming all partitions of the topic
Hence I suppose the dispatcher should expose the topics/partitions map is consuming, and the control plane collect them and, checking with the kafka topic informations from kafka, figure out if all the partitions are being consumed or not. Look at https://github.com/knative-sandbox/eventing-kafka/pull/328/files#diff-318b63710b5768647bfeb5b9bca7a04b80f3a8388439a6b8bc50eddfb8fed8e7R33 for more details.
While the code here assumes that whenever we start the consumer group, it will get all a claim for all the partitions of the subscribed topic. This is eventually true as soon as there is only one consumer in the consumer group, aka only one dispatcher is actually consuming the topic. This might not be the case if you increase the replicas of the dispatcher pod without HA enabled (aka everybody consumes everything).
Also another side note is that you should set this readyness state in the Setup
function of the consumer group IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two points:
- I first tried in my PoC with setting it up in the
Setup
function, and there's still a gap between events received and the actual consumer dispatching messages. So the bug is ONLY solved when the subscription is added to ready subscribers as a result of the first call ofConsumeClaim
. - About the ready
- This is definitely our own concept. It signifies that the "dispatching routine" is ready. Naming is irrelevant. We're marking that this dispatcher, has a go-routine that's ready to dispatch messages from that "claim".
- IMHO, from a "subscriber" POV , readiness of the subscription means that there's at least one consumer that joined in that consumer group, regardless of partitions assignment. Cause IMHO that's the concern of the Kafka broker, eventually a rebalance will kick in, unassigned partitions will be reassigned, and messages from last commit will be dispatched. So the Kafka controller should only worry if there's at least one "consumer" that is dispatching for that "consumer group". It shouldn't even worry later if that sole consumer dies, as long as the consumer group exists. Cause the sender should keep sending, and the HA of the "consumers" is a Kafka Broker concern, once they're back, and a rebalance kicks in, all messages sent since the last commit will be delivered since the consume group already existed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I first tried in my PoC with setting it up in the Setup function, and there's still a gap between events received and the actual consumer dispatching messages. So the bug is ONLY solved when the subscription is added to ready subscribers as a result of the first call of ConsumeClaim .
Ok, I guess that's a wrong understanding from my side of Sarama
readiness of the subscription means that there's at least one consumer that joined in that consumer group, regardless of partitions assignment
I'm not talking about listening for the partition assignment, i'm talking about flagging ready when all the partitions of a topic are effectively consumed, aka there's a consumer which is consuming it. Otherwise, status ready just means "whenever the consumer group is created", because when you create the consumer group the broker will set up for you the offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to distinguish between Channel readiness from Subscription readiness.
IMO, a channel is ready when it is ready to receive events, independently of the status of the subscribers. The reason is subscribers come and go. When a subscriber is created or deleted, it does not mean the channel cannot receive events.
Regarding subscription readiness, IMHO, it must provide the following guarantee:
After a subscription is marked ready, all events being sent to the channel are guaranteed to be forwarded to his subscriber. The subscriber MAY receive events before being marked as ready.
So from a technical PoV I agree with @slinkydeveloper:
A subscription is ready whenever his consumer group is consuming all partitions of the topic
A partition is considered being consumed when the initial offset has been resolved, which happens during the first fetch call. And in order to not miss an event, all partitions offset must be resolved.
Note that if the dispatcher crashes in-between the first fetch call and before committing the offset then the guarantee expressed above is broken. If we want to provide a strong guarantee, we should mark the subscription ready when all partition offsets have been committed at least once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, a channel is ready when it is ready to receive events, independently of the status of the subscribers. The reason is subscribers come and go. When a subscriber is created or deleted, it does not mean the channel cannot receive events.
👍
Note that if the dispatcher crashes in-between the first fetch call and before committing the offset then the guarantee expressed above is broken. If we want to provide a strong guarantee, we should mark the subscription ready when all partition offsets have been committed at least once.
I believe we don't need that (you can't commit if you're never ready 😄) but in general 👍 with your readyness definition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback, I've updated where we call SetReady
to be true after we handle a message from the channel, false if there was an error.
@lberk Do we also need some changes to the dispatcher yaml to expose the port? |
/assign |
55231cb
to
1159051
Compare
caf4240
to
b183732
Compare
Codecov Report
@@ Coverage Diff @@
## master #344 +/- ##
==========================================
+ Coverage 75.17% 75.26% +0.08%
==========================================
Files 124 124
Lines 4705 4782 +77
==========================================
+ Hits 3537 3599 +62
- Misses 948 961 +13
- Partials 220 222 +2
Continue to review full report at Codecov.
|
/unhold |
The following is the coverage report on the affected files.
|
/lgtm I think we're good to go, when @devguyio will glue the code together with the control plane bits, we'll figure out the remaining details |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: lberk, slinkydeveloper The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@@ -83,13 +85,15 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup | |||
if err != nil { | |||
consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err)) | |||
consumer.errors <- err | |||
consumer.handler.SetReady(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we go with the proposed definition for when a subscriber is ready, this is not needed.
} | ||
|
||
if mustMark { | ||
session.MarkMessage(message, "") // Mark kafka message as processed | ||
if ce := consumer.logger.Desugar().Check(zap.DebugLevel, "debugging"); ce != nil { | ||
consumer.logger.Debugw("Message marked", zap.String("topic", message.Topic), zap.Binary("value", message.Value)) | ||
} | ||
consumer.handler.SetReady(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same. This should move out of the condition.
…native-extensions#344) * Groundwork for dispatcher subscriber status w/ consumers * Add functionality to serve http of subscribers * Drop callback functions * Fix failing unit test, add unsub check for chanref * Rework http handler to be dispatcher local (not kafkasubscription) * Variable typo fix * Fix copyright years * Change header name to constant * Move subscription handler to its own ServeHTTP func * Remove channelRef in KafkaSubscription * Change bad channelref request to http.StatusNotFound * Add namespace to subscriptions http output * Add Unit tests for servehttp & setready * Split uriSplit into channelRefName{,space} vars * Expose dispatcher http-sub-status port in disatcher svc * Add servehttp diagnostic messages * One more uriSplit -> channelRefName variable rename * Change how we write the http response * Add empty SetReady() method to source RA * Fix consumer_handler_test * more linting * Add back ObserveKind until controller implements substatus scraper * Add more ServeHTTP unit tests * slightly alter where we mark a handler as ready or not
…native-extensions#344) * Groundwork for dispatcher subscriber status w/ consumers * Add functionality to serve http of subscribers * Drop callback functions * Fix failing unit test, add unsub check for chanref * Rework http handler to be dispatcher local (not kafkasubscription) * Variable typo fix * Fix copyright years * Change header name to constant * Move subscription handler to its own ServeHTTP func * Remove channelRef in KafkaSubscription * Change bad channelref request to http.StatusNotFound * Add namespace to subscriptions http output * Add Unit tests for servehttp & setready * Split uriSplit into channelRefName{,space} vars * Expose dispatcher http-sub-status port in disatcher svc * Add servehttp diagnostic messages * One more uriSplit -> channelRefName variable rename * Change how we write the http response * Add empty SetReady() method to source RA * Fix consumer_handler_test * more linting * Add back ObserveKind until controller implements substatus scraper * Add more ServeHTTP unit tests * slightly alter where we mark a handler as ready or not
* Add http endpoint to communicate dispatcher readySubscriber status (#344) * Groundwork for dispatcher subscriber status w/ consumers * Add functionality to serve http of subscribers * Drop callback functions * Fix failing unit test, add unsub check for chanref * Rework http handler to be dispatcher local (not kafkasubscription) * Variable typo fix * Fix copyright years * Change header name to constant * Move subscription handler to its own ServeHTTP func * Remove channelRef in KafkaSubscription * Change bad channelref request to http.StatusNotFound * Add namespace to subscriptions http output * Add Unit tests for servehttp & setready * Split uriSplit into channelRefName{,space} vars * Expose dispatcher http-sub-status port in disatcher svc * Add servehttp diagnostic messages * One more uriSplit -> channelRefName variable rename * Change how we write the http response * Add empty SetReady() method to source RA * Fix consumer_handler_test * more linting * Add back ObserveKind until controller implements substatus scraper * Add more ServeHTTP unit tests * slightly alter where we mark a handler as ready or not * Add Subscription prober (#433) * Add Subscription prober Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix endpoints informer in cons. KafkaChannel controller * Fix unittests after adding status prober Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Format and order go imports in cons. channel controller Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Rename import alias and remove unused variable Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Add dispatcher prober test for tesitng a single pod Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Support probing dispatchers for multiple partitions kafka channels Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Update deps Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix conumer handler test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * remove unused hashes from status probing test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Apply review comments and add a prober test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Remove old comment Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix fake status manager Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Return error if IsReady returns an error Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Change probing to be partition based and fix some corner cases of channel deletion * Change cleanup logic to clean ready subscriptions only Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Remove cleanup to avaid consumers race Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Do not test controller name from generated source which can change. (#320) * Update codegen Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> Co-authored-by: Lukas Berk <lberk@redhat.com> Co-authored-by: Travis Minke <travis.minke@sap.com>
* Add http endpoint to communicate dispatcher readySubscriber status (knative-extensions#344) * Groundwork for dispatcher subscriber status w/ consumers * Add functionality to serve http of subscribers * Drop callback functions * Fix failing unit test, add unsub check for chanref * Rework http handler to be dispatcher local (not kafkasubscription) * Variable typo fix * Fix copyright years * Change header name to constant * Move subscription handler to its own ServeHTTP func * Remove channelRef in KafkaSubscription * Change bad channelref request to http.StatusNotFound * Add namespace to subscriptions http output * Add Unit tests for servehttp & setready * Split uriSplit into channelRefName{,space} vars * Expose dispatcher http-sub-status port in disatcher svc * Add servehttp diagnostic messages * One more uriSplit -> channelRefName variable rename * Change how we write the http response * Add empty SetReady() method to source RA * Fix consumer_handler_test * more linting * Add back ObserveKind until controller implements substatus scraper * Add more ServeHTTP unit tests * slightly alter where we mark a handler as ready or not * Add Subscription prober (knative-extensions#433) * Add Subscription prober Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix endpoints informer in cons. KafkaChannel controller * Fix unittests after adding status prober Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Format and order go imports in cons. channel controller Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Rename import alias and remove unused variable Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Add dispatcher prober test for tesitng a single pod Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Support probing dispatchers for multiple partitions kafka channels Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Update deps Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix conumer handler test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * remove unused hashes from status probing test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Apply review comments and add a prober test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Remove old comment Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix fake status manager Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Return error if IsReady returns an error Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Change probing to be partition based and fix some corner cases of channel deletion * Change cleanup logic to clean ready subscriptions only Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Remove cleanup to avaid consumers race Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Do not test controller name from generated source which can change. (knative-extensions#320) * Update codegen Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> Co-authored-by: Lukas Berk <lberk@redhat.com> Co-authored-by: Travis Minke <travis.minke@sap.com>
* Add http endpoint to communicate dispatcher readySubscriber status (knative-extensions#344) * Groundwork for dispatcher subscriber status w/ consumers * Add functionality to serve http of subscribers * Drop callback functions * Fix failing unit test, add unsub check for chanref * Rework http handler to be dispatcher local (not kafkasubscription) * Variable typo fix * Fix copyright years * Change header name to constant * Move subscription handler to its own ServeHTTP func * Remove channelRef in KafkaSubscription * Change bad channelref request to http.StatusNotFound * Add namespace to subscriptions http output * Add Unit tests for servehttp & setready * Split uriSplit into channelRefName{,space} vars * Expose dispatcher http-sub-status port in disatcher svc * Add servehttp diagnostic messages * One more uriSplit -> channelRefName variable rename * Change how we write the http response * Add empty SetReady() method to source RA * Fix consumer_handler_test * more linting * Add back ObserveKind until controller implements substatus scraper * Add more ServeHTTP unit tests * slightly alter where we mark a handler as ready or not * Add Subscription prober (knative-extensions#433) * Add Subscription prober Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix endpoints informer in cons. KafkaChannel controller * Fix unittests after adding status prober Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Format and order go imports in cons. channel controller Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Rename import alias and remove unused variable Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Add dispatcher prober test for tesitng a single pod Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Support probing dispatchers for multiple partitions kafka channels Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Update deps Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix conumer handler test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * remove unused hashes from status probing test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Apply review comments and add a prober test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Remove old comment Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix fake status manager Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Return error if IsReady returns an error Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Change probing to be partition based and fix some corner cases of channel deletion * Change cleanup logic to clean ready subscriptions only Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Remove cleanup to avaid consumers race Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Do not test controller name from generated source which can change. (knative-extensions#320) * Update codegen Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> Co-authored-by: Lukas Berk <lberk@redhat.com> Co-authored-by: Travis Minke <travis.minke@sap.com>
* Add http endpoint to communicate dispatcher readySubscriber status (#344) * Groundwork for dispatcher subscriber status w/ consumers * Add functionality to serve http of subscribers * Drop callback functions * Fix failing unit test, add unsub check for chanref * Rework http handler to be dispatcher local (not kafkasubscription) * Variable typo fix * Fix copyright years * Change header name to constant * Move subscription handler to its own ServeHTTP func * Remove channelRef in KafkaSubscription * Change bad channelref request to http.StatusNotFound * Add namespace to subscriptions http output * Add Unit tests for servehttp & setready * Split uriSplit into channelRefName{,space} vars * Expose dispatcher http-sub-status port in disatcher svc * Add servehttp diagnostic messages * One more uriSplit -> channelRefName variable rename * Change how we write the http response * Add empty SetReady() method to source RA * Fix consumer_handler_test * more linting * Add back ObserveKind until controller implements substatus scraper * Add more ServeHTTP unit tests * slightly alter where we mark a handler as ready or not * Add Subscription prober (#433) * Add Subscription prober Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix endpoints informer in cons. KafkaChannel controller * Fix unittests after adding status prober Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Format and order go imports in cons. channel controller Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Rename import alias and remove unused variable Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Add dispatcher prober test for tesitng a single pod Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Support probing dispatchers for multiple partitions kafka channels Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Update deps Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix conumer handler test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * remove unused hashes from status probing test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Apply review comments and add a prober test Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Remove old comment Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Fix fake status manager Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Return error if IsReady returns an error Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Change probing to be partition based and fix some corner cases of channel deletion * Change cleanup logic to clean ready subscriptions only Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Remove cleanup to avaid consumers race Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> * Do not test controller name from generated source which can change. (#320) * Update codegen Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com> Co-authored-by: Lukas Berk <lberk@redhat.com> Co-authored-by: Travis Minke <travis.minke@sap.com> Co-authored-by: Ahmed Abdalla Abdelrehim <aabdelre@redhat.com> Co-authored-by: Lukas Berk <lberk@redhat.com> Co-authored-by: Travis Minke <travis.minke@sap.com>
…ons#344) Co-authored-by: Ali Ok <aliok@redhat.com>
this is a WIP/PoC for one half of a more immediate fix for the
'readySubscriber' issues in the kafkachannel. This will need some of
the work that @devguyio is working on with lifting the network prober
to knative/pkg. However, I'm about as far as I think I can get until
I know more about that implementation.
/hold