Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Upon closed dead subscription, Resubscribe #261

Closed
steve-gray opened this issue Aug 30, 2018 · 1 comment
Closed

Upon closed dead subscription, Resubscribe #261

steve-gray opened this issue Aug 30, 2018 · 1 comment

Comments

@steve-gray
Copy link

It appears that when a partition suffers from the conditions that cause it to die, potentially as a result of either slow consumption or something else - there's conditions that lead to this code in Sarama being hit:


func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
	for _, child := range newSubscriptions {
		bc.subscriptions[child] = none{}
		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
	}

	for child := range bc.subscriptions {
		select {
		case <-child.dying:
			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
			close(child.trigger)
			delete(bc.subscriptions, child)
		default:
			break
		}
	}
}

(From consumer.go in Sarama)

Upon this dead subscription, there seems to be no consumer-group level hook to periodically check for this condition - so as long as the process remains alive it'll hold it's own lease on the partition and stop another process claiming it.

@dim
Copy link
Member

dim commented Sep 4, 2018

@steve-gray I am currently working on a PR to integrate consumer groups into sarama itself IBM/sarama#1099. As part of the new API, you should be able to address these issues in your implementation, i.e. trigger a rebalance by existing a handler when a partition is stuck/slow.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants