Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions external/storm-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,19 @@
<!--test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
<version>${mockito.version}</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;

public class KafkaSpout<K, V> extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
Expand All @@ -61,6 +63,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {

// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
private final KafkaConsumerFactory kafkaConsumerFactory;
private transient KafkaConsumer<K, V> kafkaConsumer;
private transient boolean consumerAutoCommitMode;

Expand All @@ -83,8 +86,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {


public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
}

//This constructor is here for testing
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
this.kafkaConsumerFactory = kafkaConsumerFactory;
}

@Override
Expand Down Expand Up @@ -144,6 +153,16 @@ private void initialize(Collection<TopicPartition> partitions) {
}

retryService.retainAll(partitions);

//Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
Set<TopicPartition> partitionsSet = new HashSet(partitions);
Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
while (msgIdIterator.hasNext()) {
KafkaSpoutMessageId msgId = msgIdIterator.next();
if (!partitionsSet.contains(msgId.getTopicPartition())) {
msgIdIterator.remove();
}
}

for (TopicPartition tp : partitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
Expand Down Expand Up @@ -281,15 +300,18 @@ private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
final List<Object> tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
numUncommittedOffsets++;
if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
retryService.remove(msgId); // re-emitted hence remove from failed
} else {
boolean isScheduled = retryService.isScheduled(msgId);
if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
final List<Object> tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
numUncommittedOffsets++;
if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
retryService.remove(msgId);
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
}

Expand Down Expand Up @@ -323,6 +345,11 @@ private void commitOffsetsForAckedTuples() {
@Override
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if(!emitted.contains(msgId)) {
LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}

if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
acked.get(msgId.getTopicPartition()).add(msgId);
}
Expand All @@ -334,8 +361,12 @@ public void ack(Object messageId) {
@Override
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
emitted.remove(msgId);
if(!emitted.contains(msgId)) {
LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
if (msgId.numFails() < maxRetries) {
emitted.remove(msgId);
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
Expand All @@ -352,8 +383,7 @@ public void activate() {
}

private void subscribeKafkaConsumer() {
kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);

if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2016 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout.internal;

import java.io.Serializable;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;

/**
* This is here to enable testing
*/
public interface KafkaConsumerFactory<K, V> extends Serializable {
public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2016 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout.internal;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;

public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {

@Override
public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright 2016 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;

import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;

import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.when;

import org.junit.Before;
import org.mockito.Captor;

import static org.mockito.Mockito.reset;

import org.mockito.MockitoAnnotations;

import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

public class KafkaSpoutRebalanceTest {

@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;

private TopologyContext contextMock;
private SpoutOutputCollector collectorMock;
private Map conf;
private KafkaConsumer<String, String> consumerMock;
private KafkaConsumerFactory<String, String> consumerFactoryMock;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
contextMock = mock(TopologyContext.class);
collectorMock = mock(SpoutOutputCollector.class);
conf = new HashMap<>();
consumerMock = mock(KafkaConsumer.class);
consumerFactoryMock = new KafkaConsumerFactory<String, String>(){
@Override
public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
return consumerMock;
}
};
}

//Returns messageIds in order of emission
private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
//Setup spout with mock consumer so we can get at the rebalance listener
spout.open(conf, contextMock, collectorMock);
spout.activate();

ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
verify(consumerMock).subscribe(anyList(), rebalanceListenerCapture.capture());

//Assign partitions to the spout
ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
List<TopicPartition> assignedPartitions = new ArrayList<>();
assignedPartitions.add(partitionThatWillBeRevoked);
assignedPartitions.add(assignedPartition);
consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);

//Make the consumer return a single message for each partition
Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord<>(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords(firstPartitionRecords))
.thenReturn(new ConsumerRecords(secondPartitionRecords))
.thenReturn(new ConsumerRecords(Collections.emptyMap()));

//Emit the messages
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture());
reset(collectorMock);
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture());

//Now rebalance
consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));

List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
emittedMessageIds.add(messageIdForRevokedPartition.getValue());
emittedMessageIds.add(messageIdForAssignedPartition.getValue());
return emittedMessageIds;
}

@Test
public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);

//Emit a message on each partition and revoke the first partition
List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);

//Ack both emitted tuples
spout.ack(emittedMessageIds.get(0));
spout.ack(emittedMessageIds.get(1));

//Ensure the commit timer has expired
Thread.sleep(510);

//Make the spout commit any acked tuples
spout.nextTuple();
//Verify that it only committed the message on the assigned partition
verify(consumerMock).commitSync(commitCapture.capture());

Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
assertThat(commitCaptureMap, hasKey(assignedPartition));
assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
}

@Test
public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);

//Emit a message on each partition and revoke the first partition
List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);

//Fail both emitted tuples
spout.fail(emittedMessageIds.get(0));
spout.fail(emittedMessageIds.get(1));

//Check that only the tuple on the currently assigned partition is retried
verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
verify(retryServiceMock).schedule(emittedMessageIds.get(1));
}
}
Loading