From 5c5373a05bbef91eef802ea8107158fbbd62dce5 Mon Sep 17 00:00:00 2001 From: Dmitry Sergeev Date: Wed, 30 Dec 2015 13:13:58 +0200 Subject: [PATCH 1/6] STORM-822 Implement Kafka 0.9 consumer API --- external/storm-kafka/pom.xml | 2 +- .../src/jvm/storm/kafka/KafkaConfig.java | 14 +++ .../jvm/storm/kafka/KafkaJavaApiSpout.java | 107 ++++++++++++++++++ storm-core/src/jvm/backtype/storm/Config.java | 15 +++ 4 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index 8f2867b0578..9f423532590 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -136,7 +136,7 @@ org.apache.kafka kafka-clients - 0.8.2.1 + 0.9.0.0 provided diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java index 49c7526b999..5a0014bc13a 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java @@ -21,12 +21,14 @@ import backtype.storm.spout.RawMultiScheme; import java.io.Serializable; +import java.util.List; public class KafkaConfig implements Serializable { private static final long serialVersionUID = 5276718734571623855L; public final BrokerHosts hosts; public final String topic; + public final List topics; public final String clientId; public int fetchSizeBytes = 1024 * 1024; @@ -44,9 +46,21 @@ public KafkaConfig(BrokerHosts hosts, String topic) { this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); } + public KafkaConfig(BrokerHosts hosts, List topics) { + this(hosts, topics, kafka.api.OffsetRequest.DefaultClientId()); + } + public KafkaConfig(BrokerHosts hosts, String topic, String clientId) { this.hosts = hosts; this.topic = topic; + this.topics = null; + this.clientId = clientId; + } + + public KafkaConfig(BrokerHosts hosts, List topics, String clientId) { + this.hosts = hosts; + this.topic = null; + this.topics = topics; this.clientId = clientId; } diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java new file mode 100644 index 00000000000..ebb38d4f24b --- /dev/null +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java @@ -0,0 +1,107 @@ +package storm.kafka; + +import backtype.storm.Config; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class KafkaJavaApiSpout extends BaseRichSpout { + + public static final Logger LOG = LoggerFactory.getLogger(KafkaJavaApiSpout.class); + + SpoutConfig _spoutConfig; + SpoutOutputCollector _collector; + KafkaConsumer consumer; + ConcurrentMap messages; + long pollTimeout; + long maxFailCount; + + public KafkaJavaApiSpout(SpoutConfig spoutConfig) { + this._spoutConfig = spoutConfig; + } + + @Override + public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { + _collector = collector; + + LOG.debug("Opening KafkaJavaApiSpout..."); + + if(conf.containsKey(Config.KAFKA_POLL_TIMEOUT)) { + pollTimeout = (Long) conf.get(Config.KAFKA_POLL_TIMEOUT); + }else{ + pollTimeout = 100; + } + if(conf.containsKey(Config.KAFKA_MAX_FAIL_ATTEMPTS_COUNT)){ + maxFailCount = (Long) conf.get(Config.KAFKA_MAX_FAIL_ATTEMPTS_COUNT); + }else{ + maxFailCount = 5; + } + if(consumer == null){ + consumer = new KafkaConsumer(conf); + } + messages = new ConcurrentHashMap(); + + //check is multi-topic + if(_spoutConfig.topic == null){ + consumer.subscribe(_spoutConfig.topics); + }else{ + consumer.subscribe(Collections.singletonList(_spoutConfig.topic)); + } + } + + @Override + public void nextTuple() { + LOG.debug("Polling next tuple..."); + ConsumerRecords records = consumer.poll(pollTimeout); + for (ConsumerRecord record : (Iterable>) records) { + Values message = new Values(record.key(), record.value(), record.topic(), maxFailCount); + messages.putIfAbsent(record.offset(), message); + _collector.emit(message, record.offset()); + } + } + + @Override + public void fail(Object msgId) { + LOG.debug("Message with offset {} failed", msgId); + Values message = messages.get(msgId); + Long currentAttempt = (Long) message.get(3); //get attempt (which is 3rd arg) + if(currentAttempt < 1){ + LOG.debug("Message with offset {} reached maximum fail attempts. Skipping...", msgId); + }else{ + message.set(3, currentAttempt-1); + _collector.emit(message, msgId); + } + } + + @Override + public void ack(Object msgId) { + LOG.debug("Message with offset {} proceeded successfully", msgId); + messages.remove(msgId); + } + + @Override + public void close() { + consumer.close(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("key", "message", "topic", "attempt")); + } + + + +} diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 9db99773f82..9071ca0f8fd 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -2066,6 +2066,21 @@ public class Config extends HashMap { @isInteger public static final String NIMBUS_CODE_SYNC_FREQ_SECS = "nimbus.code.sync.freq.secs"; + /** + * Kafka spout config + * The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns + * immediately with any records available now. Must not be negative. + */ + @isInteger + public static final String KAFKA_POLL_TIMEOUT = "kafka.poll.timeout"; + + /** + * Kafka spout config + * Maximum count of re-emit attempts per tuple. It decrements by each fail() method called with this tuple. + */ + @isInteger + public static final String KAFKA_MAX_FAIL_ATTEMPTS_COUNT = "kafka.max.fails"; + public static void setClasspath(Map conf, String cp) { conf.put(Config.TOPOLOGY_CLASSPATH, cp); } From cd6a2f934efa13060600cd6a27f828ac47976b27 Mon Sep 17 00:00:00 2001 From: Dmitry Sergeev Date: Wed, 30 Dec 2015 14:19:51 +0200 Subject: [PATCH 2/6] STORM-822 add license header --- .../src/jvm/storm/kafka/KafkaJavaApiSpout.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java index ebb38d4f24b..369021fe1c2 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package storm.kafka; import backtype.storm.Config; From dabe61e4698f696456e852b77c184ee414a99346 Mon Sep 17 00:00:00 2001 From: Dmitry Sergeev Date: Mon, 11 Jan 2016 12:11:45 +0200 Subject: [PATCH 3/6] Implement non-autocommit Kafka Spout --- .../jvm/storm/kafka/KafkaJavaApiSpout.java | 224 ++++++++++++++---- 1 file changed, 184 insertions(+), 40 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java index 369021fe1c2..fe545a4e4df 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java @@ -1,20 +1,3 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package storm.kafka; import backtype.storm.Config; @@ -24,29 +7,55 @@ import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.*; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class KafkaJavaApiSpout extends BaseRichSpout { + public static final Logger LOG = LoggerFactory.getLogger(KafkaJavaApiSpout.class); + private final List messagesList = new ArrayList(); + private KafkaConsumer consumer; + private Iterator> it; + private Map toBeCommitted; + private AtomicBoolean rebalanceFlag; + + + private int batchUpperLimit; + private int maxBatchDurationMillis; + + private List topicList; + + /** + * Lock critical section of code to be executed by one thread + * at the same time in order not to get a corrupted state when + * stopping kafka source. + */ + private Lock lock; + SpoutConfig _spoutConfig; SpoutOutputCollector _collector; - KafkaConsumer consumer; ConcurrentMap messages; long pollTimeout; long maxFailCount; + + protected KafkaJavaApiSpout() { + } + public KafkaJavaApiSpout(SpoutConfig spoutConfig) { + lock = new ReentrantLock(); this._spoutConfig = spoutConfig; } @@ -59,34 +68,124 @@ public void open(Map conf, final TopologyContext context, final SpoutOutputColle if(conf.containsKey(Config.KAFKA_POLL_TIMEOUT)) { pollTimeout = (Long) conf.get(Config.KAFKA_POLL_TIMEOUT); }else{ - pollTimeout = 100; + pollTimeout = 1000; } if(conf.containsKey(Config.KAFKA_MAX_FAIL_ATTEMPTS_COUNT)){ maxFailCount = (Long) conf.get(Config.KAFKA_MAX_FAIL_ATTEMPTS_COUNT); }else{ maxFailCount = 5; } - if(consumer == null){ - consumer = new KafkaConsumer(conf); + + toBeCommitted = new HashMap(); + lock = new ReentrantLock(); + rebalanceFlag = new AtomicBoolean(false); + + // Subscribe to multiple topics. Check is multi-topic + topicList = _spoutConfig.topic == null ? _spoutConfig.topics : Collections.singletonList(_spoutConfig.topic); + + if(topicList == null || topicList.isEmpty()) { + throw new KafkaException("At least one Kafka topic must be specified."); } - messages = new ConcurrentHashMap(); - //check is multi-topic - if(_spoutConfig.topic == null){ - consumer.subscribe(_spoutConfig.topics); - }else{ - consumer.subscribe(Collections.singletonList(_spoutConfig.topic)); + batchUpperLimit = conf.containsKey(KafkaSpoutConstants.BATCH_SIZE) ? (int)conf.get(KafkaSpoutConstants.BATCH_SIZE) + : KafkaSpoutConstants.DEFAULT_BATCH_SIZE; + maxBatchDurationMillis = conf.containsKey(KafkaSpoutConstants.BATCH_DURATION_MS) ? (int)conf.get(KafkaSpoutConstants.BATCH_DURATION_MS) + : KafkaSpoutConstants.DEFAULT_BATCH_DURATION; + + try { + //initialize a consumer. + consumer = new KafkaConsumer(conf); + } catch (Exception e) { + throw new KafkaException("Unable to create consumer. " + + "Check whether the Bootstrap server is up and that the " + + "Storm can connect to it.", e); } + + // We can use topic subscription or partition assignment strategy. + consumer.subscribe(topicList, new RebalanceListener(rebalanceFlag)); + + it = consumer.poll(pollTimeout).iterator(); + + LOG.info("Kafka spout started."); } @Override public void nextTuple() { LOG.debug("Polling next tuple..."); - ConsumerRecords records = consumer.poll(pollTimeout); - for (ConsumerRecord record : (Iterable>) records) { - Values message = new Values(record.key(), record.value(), record.topic(), maxFailCount); - messages.putIfAbsent(record.offset(), message); - _collector.emit(message, record.offset()); + final String batchUUID = UUID.randomUUID().toString(); + byte[] kafkaMessage; + String kafkaKey; + + try { + // prepare time variables for new batch + final long batchStartTime = System.currentTimeMillis(); + final long batchEndTime = System.currentTimeMillis() + maxBatchDurationMillis; + + while (messagesList.size() < batchUpperLimit && + System.currentTimeMillis() < batchEndTime) { + + if (it == null || !it.hasNext()) { + // Obtaining new records + // Poll time is remainder time for current batch. + ConsumerRecords records = consumer.poll( + Math.max(0, batchEndTime - System.currentTimeMillis())); + it = records.iterator(); + + // this flag is set to true in a callback when some partitions are revoked. + // If there are any records we commit them. + if (rebalanceFlag.get()) { + rebalanceFlag.set(false); + break; + } + // check records after poll + if (!it.hasNext()) { + LOG.debug("Returning with backoff. No more data to read"); + // batch time exceeded + break; + } + } + + // get next message + ConsumerRecord message = it.next(); + kafkaMessage = message.value(); + kafkaKey = message.key(); + + LOG.debug("Message: {}", new String(kafkaMessage)); + LOG.debug("Topic: {} Partition: {}", message.topic(), message.partition()); + + Values value = new Values(kafkaKey, kafkaMessage, message.topic(), maxFailCount); + messagesList.add(value); + messages.putIfAbsent(message.offset(), value); + + LOG.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime); + LOG.debug("Messages #: {}", messagesList.size()); + + + long offset = message.offset() + 1; + toBeCommitted.put(new TopicPartition(message.topic(), message.partition()), + new OffsetAndMetadata(offset, batchUUID)); + } + + if (messagesList.size() > 0) { + + for (Values v : messagesList){ + _collector.emit(v); + } + + LOG.debug("Emitted {}", messagesList.size()); + + messagesList.clear(); + // commit must not be interrupted when stops. + try { + lock.lock(); + consumer.commitSync(toBeCommitted); + toBeCommitted.clear(); + } finally { + lock.unlock(); + } + } + } catch (Exception e) { + LOG.error("KafkaJavaApiSpout EXCEPTION, {}", e); } } @@ -94,7 +193,7 @@ public void nextTuple() { public void fail(Object msgId) { LOG.debug("Message with offset {} failed", msgId); Values message = messages.get(msgId); - Long currentAttempt = (Long) message.get(3); //get attempt (which is 3rd arg) + Long currentAttempt = (Long) message.get(3); if(currentAttempt < 1){ LOG.debug("Message with offset {} reached maximum fail attempts. Skipping...", msgId); }else{ @@ -111,7 +210,16 @@ public void ack(Object msgId) { @Override public void close() { - consumer.close(); + if (consumer != null) { + try { + lock.lock(); + consumer.wakeup(); + consumer.close(); + } finally { + lock.unlock(); + } + } + LOG.info("Kafka Spout stopped."); } @Override @@ -119,6 +227,42 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("key", "message", "topic", "attempt")); } + public class KafkaSpoutConstants { + public static final String KAFKA_PREFIX = "kafka."; + public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer."; + public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String DEFAULT_VALUE_DESERIAIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; + public static final String BOOTSTRAP_SERVERS = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + public static final String TOPICS = KAFKA_PREFIX + "topics"; + public static final String DEFAULT_AUTO_COMMIT = "false"; + public static final String BATCH_SIZE = "batchSize"; + public static final String BATCH_DURATION_MS = "batchDurationMillis"; + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final int DEFAULT_BATCH_DURATION = 1000; + } +} + +class RebalanceListener implements ConsumerRebalanceListener { + private static final Logger log = LoggerFactory.getLogger(RebalanceListener.class); + private AtomicBoolean rebalanceFlag; + + public RebalanceListener(AtomicBoolean rebalanceFlag) { + this.rebalanceFlag = rebalanceFlag; + } + + // Set a flag that a rebalance has occurred. Then commit already read to kafka. + public void onPartitionsRevoked(Collection partitions) { + for (TopicPartition partition : partitions) { + log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition()); + rebalanceFlag.set(true); + } + } + + public void onPartitionsAssigned(Collection partitions) { + for (TopicPartition partition : partitions) { + log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition()); + } + } } From e796648b3e73fe6dcfae8fbfa746d12fd6a607eb Mon Sep 17 00:00:00 2001 From: Dmitry Sergeev Date: Mon, 11 Jan 2016 12:16:57 +0200 Subject: [PATCH 4/6] Add license header to Kafka spout --- .../src/jvm/storm/kafka/KafkaJavaApiSpout.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java index fe545a4e4df..c582e23f8df 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package storm.kafka; import backtype.storm.Config; From 97499abdc5fefaafb20748472f97d2766e275583 Mon Sep 17 00:00:00 2001 From: Dmitry Sergeev Date: Mon, 11 Jan 2016 17:06:36 +0200 Subject: [PATCH 5/6] Fix NPE --- external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java index c582e23f8df..b53c7ecb810 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; @@ -73,6 +74,7 @@ protected KafkaJavaApiSpout() { public KafkaJavaApiSpout(SpoutConfig spoutConfig) { lock = new ReentrantLock(); + messages = new ConcurrentHashMap<>(); this._spoutConfig = spoutConfig; } From d64ba5c578e38693516e5a5d9907f2b238ccb52e Mon Sep 17 00:00:00 2001 From: Dmitry Sergeev Date: Tue, 12 Jan 2016 11:51:33 +0200 Subject: [PATCH 6/6] Refactor KafkaJavaApiSpout --- .../storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java index b53c7ecb810..fc5cbecbe84 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaJavaApiSpout.java @@ -68,10 +68,6 @@ public class KafkaJavaApiSpout extends BaseRichSpout { long pollTimeout; long maxFailCount; - - protected KafkaJavaApiSpout() { - } - public KafkaJavaApiSpout(SpoutConfig spoutConfig) { lock = new ReentrantLock(); messages = new ConcurrentHashMap<>(); @@ -96,7 +92,6 @@ public void open(Map conf, final TopologyContext context, final SpoutOutputColle } toBeCommitted = new HashMap(); - lock = new ReentrantLock(); rebalanceFlag = new AtomicBoolean(false); // Subscribe to multiple topics. Check is multi-topic @@ -275,8 +270,8 @@ public RebalanceListener(AtomicBoolean rebalanceFlag) { public void onPartitionsRevoked(Collection partitions) { for (TopicPartition partition : partitions) { log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition()); - rebalanceFlag.set(true); } + rebalanceFlag.set(true); } public void onPartitionsAssigned(Collection partitions) {