diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index a7971e28f88..97ed35919a0 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -71,7 +71,14 @@
org.mockito
- mockito-all
+ mockito-core
+ test
+ ${mockito.version}
+
+
+ org.hamcrest
+ hamcrest-all
+ 1.3
test
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 439492b26fb..ce01a76e241 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -52,6 +52,8 @@
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
public class KafkaSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
@@ -62,6 +64,7 @@ public class KafkaSpout extends BaseRichSpout {
// Kafka
private final KafkaSpoutConfig kafkaSpoutConfig;
+ private final KafkaConsumerFactory kafkaConsumerFactory;
private transient KafkaConsumer kafkaConsumer;
private transient boolean consumerAutoCommitMode;
@@ -84,8 +87,14 @@ public class KafkaSpout extends BaseRichSpout {
public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) {
+ this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+ }
+
+ //This constructor is here for testing
+ KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig, KafkaConsumerFactory kafkaConsumerFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
}
@Override
@@ -145,6 +154,10 @@ private void initialize(Collection partitions) {
}
retryService.retainAll(partitions);
+
+ //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
+ Set partitionsSet = new HashSet(partitions);
+ emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
for (TopicPartition tp : partitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
@@ -289,16 +302,19 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord record) {
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
- } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
- final List tuple = tuplesBuilder.buildTuple(record);
- kafkaSpoutStreams.emit(collector, tuple, msgId);
- emitted.add(msgId);
- numUncommittedOffsets++;
- if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
- retryService.remove(msgId); // re-emitted hence remove from failed
+ } else {
+ boolean isScheduled = retryService.isScheduled(msgId);
+ if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+ final List tuple = tuplesBuilder.buildTuple(record);
+ kafkaSpoutStreams.emit(collector, tuple, msgId);
+ emitted.add(msgId);
+ numUncommittedOffsets++;
+ if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+ retryService.remove(msgId);
+ }
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+ return true;
}
- LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
- return true;
}
return false;
}
@@ -333,6 +349,11 @@ private void commitOffsetsForAckedTuples() {
@Override
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ if(!emitted.contains(msgId)) {
+ LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+ return;
+ }
+
if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
acked.get(msgId.getTopicPartition()).add(msgId);
}
@@ -344,8 +365,12 @@ public void ack(Object messageId) {
@Override
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
- emitted.remove(msgId);
+ if(!emitted.contains(msgId)) {
+ LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+ return;
+ }
if (msgId.numFails() < maxRetries) {
+ emitted.remove(msgId);
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
@@ -362,8 +387,7 @@ public void activate() {
}
private void subscribeKafkaConsumer() {
- kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
- kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
final List topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 7c97ac9b04c..8aa525bdc77 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -105,8 +105,8 @@ private Map setDefaultsAndGetKafkaProps(Map kafk
public static class Builder {
private final Map kafkaProps;
- private Deserializer keyDeserializer;
- private Deserializer valueDeserializer;
+ private SerializableDeserializer keyDeserializer;
+ private SerializableDeserializer valueDeserializer;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
private int maxRetries = DEFAULT_MAX_RETRIES;
@@ -164,7 +164,7 @@ public Builder(Map kafkaProps, KafkaSpoutStreams kafkaSpoutStrea
/**
* Specifying this key deserializer overrides the property key.deserializer
*/
- public Builder setKeyDeserializer(Deserializer keyDeserializer) {
+ public Builder setKeyDeserializer(SerializableDeserializer keyDeserializer) {
this.keyDeserializer = keyDeserializer;
return this;
}
@@ -172,7 +172,7 @@ public Builder setKeyDeserializer(Deserializer keyDeserializer) {
/**
* Specifying this value deserializer overrides the property value.deserializer
*/
- public Builder setValueDeserializer(Deserializer valueDeserializer) {
+ public Builder setValueDeserializer(SerializableDeserializer valueDeserializer) {
this.valueDeserializer = valueDeserializer;
return this;
}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
new file mode 100644
index 00000000000..eb76a90697e
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * @param The type this deserializer deserializes to.
+ */
+public interface SerializableDeserializer extends Deserializer, Serializable {
+}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
new file mode 100644
index 00000000000..0b253b41f89
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+/**
+ * This is here to enable testing
+ */
+public interface KafkaConsumerFactory extends Serializable {
+ public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig);
+}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
new file mode 100644
index 00000000000..79003882869
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+public class KafkaConsumerFactoryDefault implements KafkaConsumerFactory {
+
+ @Override
+ public KafkaConsumer createConsumer(KafkaSpoutConfig kafkaSpoutConfig) {
+ return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+ }
+
+}
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
new file mode 100644
index 00000000000..3e077ab6bd3
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.mockito.Captor;
+import static org.mockito.Mockito.reset;
+import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class KafkaSpoutRebalanceTest {
+
+ @Captor
+ private ArgumentCaptor> commitCapture;
+
+ private TopologyContext contextMock;
+ private SpoutOutputCollector collectorMock;
+ private Map conf;
+ private KafkaConsumer consumerMock;
+ private KafkaConsumerFactory consumerFactoryMock;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ contextMock = mock(TopologyContext.class);
+ collectorMock = mock(SpoutOutputCollector.class);
+ conf = new HashMap<>();
+ consumerMock = mock(KafkaConsumer.class);
+ consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+ }
+
+ //Returns messageIds in order of emission
+ private List emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
+ //Setup spout with mock consumer so we can get at the rebalance listener
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ ArgumentCaptor rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ List assignedPartitions = new ArrayList<>();
+ assignedPartitions.add(partitionThatWillBeRevoked);
+ assignedPartitions.add(assignedPartition);
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+ //Make the consumer return a single message for each partition
+ Map>> firstPartitionRecords = new HashMap<>();
+ firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+ Map>> secondPartitionRecords = new HashMap<>();
+ secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(firstPartitionRecords))
+ .thenReturn(new ConsumerRecords(secondPartitionRecords))
+ .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+
+ //Emit the messages
+ spout.nextTuple();
+ ArgumentCaptor messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+ reset(collectorMock);
+ spout.nextTuple();
+ ArgumentCaptor messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+
+ //Now rebalance
+ consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+ consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+
+ List emittedMessageIds = new ArrayList<>();
+ emittedMessageIds.add(messageIdForRevokedPartition.getValue());
+ emittedMessageIds.add(messageIdForAssignedPartition.getValue());
+ return emittedMessageIds;
+ }
+
+ @Test
+ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
+ KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
+ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Ack both emitted tuples
+ spout.ack(emittedMessageIds.get(0));
+ spout.ack(emittedMessageIds.get(1));
+
+ //Ensure the commit timer has expired
+ Thread.sleep(510);
+
+ //Make the spout commit any acked tuples
+ spout.nextTuple();
+ //Verify that it only committed the message on the assigned partition
+ verify(consumerMock).commitSync(commitCapture.capture());
+
+ Map commitCaptureMap = commitCapture.getValue();
+ assertThat(commitCaptureMap, hasKey(assignedPartition));
+ assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ }
+
+ @Test
+ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+ KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
+ KafkaSpout spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Fail both emitted tuples
+ spout.fail(emittedMessageIds.get(0));
+ spout.fail(emittedMessageIds.get(1));
+
+ //Check that only the tuple on the currently assigned partition is retried
+ verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
+ verify(retryServiceMock).schedule(emittedMessageIds.get(1));
+ }
+}
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index baece9331b5..6921f7c382a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -46,9 +46,17 @@ public static StormTopology getTopologyKafkaSpout(int port) {
return tp.createTopology();
}
- public static KafkaSpoutConfig getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
- return new KafkaSpoutConfig.Builder(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
- .setOffsetCommitPeriodMs(10_000)
+ static public KafkaSpoutConfig getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+ }
+
+ static public KafkaSpoutConfig getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+ }
+
+ static public KafkaSpoutConfig getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+ return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
.setPollTimeoutMs(1000)