Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Adding partitions to a topic during runtime does not trigger a rebalance #235

Closed
toefel18 opened this issue Apr 28, 2018 · 8 comments
Closed

Comments

@toefel18
Copy link

Situation:

  1. there exists a topic my-test-topic
  2. an instance of cluster.Consumer is consuming from that topic
  3. I add 2 partitions to my-test-topic during runtime (for now assume that message-order does not matter)
  4. No rebalance happens and all messages on the 2 new partitions are missed.
  5. Restarting the application results in all partitions being assigned correctly again.
    • However, if InitialOffset = OffsetNewest(the default), the already produced messages will never be seen.

The official Java client library does trigger a rebalance, it appears to be a bug

@dim
Copy link
Member

dim commented Apr 30, 2018

@toefel18 I remember this has been raised in the past, but I am not sure if this is actually a bug. To me it would make more sense if the server triggered a rebalance when partitions are added. The only way for a client to identify new partitions is to constantly ping the brokers and track metadata changes. I assume this is what the java client must be doing, but even that will not really solve your problem, as added partitions will not be identified immediately. New partitions will only be registered with the client at the next metadata refresh call. At that point new messages may have already been added to those partitions and - as long as you configure InitialOffset = OffsetNewest, messages will be missed. Why don't you simply change InitialOffset = OffsetOldest?

@toefel18
Copy link
Author

toefel18 commented Apr 30, 2018

@dim I'm not sure how the kakfa protocol works underneath, so it could be that the java client uses polling, I should look at the timestamps in our log-files for that.

Your suggestion of InitialOffset = OffsetOldest is indeed a fix that we are planning to do, it doesn't fix that bsm/sarama doesn't eventually rebalance, I also feel that the server should push this rebalance instead of having clients polling for changes.

However, I don't want to have to manually restart my Go services because somebody decided to add a
partition. In my opinion, if the Java client does polling, this client should do it as well. Automatic re-balancing when using topic subscriptions instead of manual partition assignments is expected behavior and it is aligned with the behavior of the official client.

I'm not sure what the behavior of https://github.com/edenhill/librdkafka is, but that client has an official Go wrapper by confluent, maybe we should check the behavior of that client to see what is considered expected

@dim
Copy link
Member

dim commented May 2, 2018

@toefel18 thanks for the clarification, I will take this into account for the upcoming V3 - #234. In fact, I should probably add it to the low-level consumer IBM/sarama#1083

@ShaneSaww
Copy link

@dim do you know if this ended up getting patched in IBM/sarama#1099?

@dim
Copy link
Member

dim commented Oct 12, 2018

@ShaneSaww no, it didn't, but the idea was to keep the ClusterConsumer reasonable lightweight while allowing for customisation. I think this functionality could be easily added as part of the Handler.Setup callback.

type myHandler struct {
  topicChange chan struct{}
}

func (h *myHandler) Setup(sess sarama.ConsumerGroupSession) error {
  h.topicChange = make(chan struct{})

  go func() {
    for {
      select {
      case <-sess.Context().Done():
        return
      case time.After(5*time.Second):
        if h.topicsHaveChanged() {
          close(h.topicChange)
        }
      }
    }
  }
  return nil
}
   
func (h *myHandler) ConsumeClaim(_ sarama.ConsumerGroupSession, c sarama.ConsumerGroupClaim) error {
  for msg := range c.Messages() {
    select {
    case <-h.topicChange:
      return
    default:
    }
    ...
  }
  return nil
}

@dim dim closed this as completed Oct 12, 2018
@dim
Copy link
Member

dim commented Oct 12, 2018

I was toying with the idea of handler "middlewares" too and this could be just one of them

@ShaneSaww
Copy link

@dim That's awesome! I was starting down the path of doing this setup.
Thanks!
Do you think that this is worth adding into the base sarama library? The ability to do this is pretty needed for most consumers.

@PoacherBro
Copy link

I tested in the latest version and enable the sarama logger, found that when added a new partition, the producer will send to the latest partition, but consumer was not refreshed.
I can see all the producer and consumer print the log of fetching topic metadata, but only producer has reflected on.
Not sure it is sarama-cluster bug or kafka bug. Kafka version = 1.1.0.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants