Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions docs/storm-kafka-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,9 @@ streams. If you are doing this for Trident a value must be in the List returned
otherwise trident can throw exceptions.


### Manual Partition Control (ADVANCED)
### Manual Partition Assigment (ADVANCED)

By default Kafka will automatically assign partitions to the current set of spouts. It handles lots of things, but in some cases you may want to manually assign the partitions.
This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right. This can all be handled by subclassing
Subscription and we have a few implementations that you can look at for examples on how to do this. ManualPartitionNamedSubscription and ManualPartitionPatternSubscription. Again
please be careful when using these or implementing your own.
By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality.

## Use the Maven Shade Plugin to Build the Uber Jar

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -55,12 +56,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
/**
* Retry in a tight loop (keep unit tests fasts) do not use in production.
*/
public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));

// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
Expand Down Expand Up @@ -128,7 +123,7 @@ public static enum FirstPollOffsetStrategy {
public static class Builder<K, V> {

private final Map<String, Object> kafkaProps;
private Subscription subscription;
private final Subscription subscription;
private final SerializableDeserializer<K> keyDes;
private final Class<? extends Deserializer<K>> keyDesClazz;
private final SerializableDeserializer<V> valueDes;
Expand All @@ -143,15 +138,16 @@ public static class Builder<K, V> {
private boolean emitNullTuples = false;

public Builder(String bootstrapServers, String... topics) {
this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
}

public Builder(String bootstrapServers, Collection<String> topics) {
this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
new NamedTopicFilter(new HashSet<String>(topics))));
}

public Builder(String bootstrapServers, Pattern topics) {
this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new PatternSubscription(topics));
this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
}

/**
Expand All @@ -161,7 +157,7 @@ public Builder(String bootstrapServers, Pattern topics) {
*/
@Deprecated
public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String... topics) {
this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
}

/**
Expand All @@ -171,7 +167,7 @@ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Seri
*/
@Deprecated
public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics))));
}

/**
Expand All @@ -181,7 +177,7 @@ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Seri
*/
@Deprecated
public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
}

/**
Expand All @@ -199,23 +195,23 @@ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Seri
*/
@Deprecated
public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String... topics) {
this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
}

/**
* @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
*/
@Deprecated
public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics))));
}

/**
* @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
*/
@Deprecated
public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
}

/**
Expand Down Expand Up @@ -479,7 +475,6 @@ public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
* documentation in {@link FirstPollOffsetStrategy}
*
* @param firstPollOffsetStrategy Offset used by Kafka spout first poll
*
*/
public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
this.firstPollOffsetStrategy = firstPollOffsetStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.TopicPartitionComparator;
import org.apache.storm.task.TopologyContext;

public class ManualPartitionSubscription extends Subscription {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.kafka.spout;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.slf4j.LoggerFactory;

/**
* Subscribe to all topics that follow a given list of values
* Subscribe to all topics that follow a given list of values.
* @deprecated Please use {@link ManualPartitionSubscription} with {@link NamedTopicFilter} instead
*/
@Deprecated
public class NamedSubscription extends Subscription {
private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);
private static final long serialVersionUID = 3438543305215813839L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.slf4j.LoggerFactory;

/**
* Subscribe to all topics that match a given pattern
* Subscribe to all topics that match a given pattern.
* @deprecated Please use {@link ManualPartitionSubscription} with {@link PatternTopicFilter} instead
*/
@Deprecated
public class PatternSubscription extends Subscription {
private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);
private static final long serialVersionUID = 3438543305215813839L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
Expand All @@ -50,58 +49,38 @@ public class KafkaSpoutCommitTest {
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
private KafkaSpout<String, String> spout;
private KafkaSpoutConfig spoutConfig;
private KafkaSpoutConfig<String, String> spoutConfig;

@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;

private void setupSpout(Set<TopicPartition> assignedPartitions) {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
spoutConfig = getKafkaSpoutConfigBuilder(-1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build();

.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build();
consumerMock = mock(KafkaConsumer.class);
KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() {
@Override
public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
return consumerMock;
}
};

//Set up a spout listening to 1 topic partition
spout = new KafkaSpout<>(spoutConfig, consumerFactory);

spout.open(conf, contextMock, collectorMock);
spout.activate();

ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());

//Assign partitions to the spout
ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}

@Test
public void testCommitSuccessWithOffsetVoids() {
//Verify that the commit logic can handle offset voids
try (SimulatedTime simulatedTime = new SimulatedTime()) {
setupSpout(Collections.singleton(partition));
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
// Offsets emitted are 0,1,2,3,4,<void>,8,9
for (int i = 0; i < 5; i++) {
recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
for (int i = 8; i < 10; i++) {
recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);

when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords(records));
.thenReturn(new ConsumerRecords<>(records));

for (int i = 0; i < recordsForPartition.size(); i++) {
spout.nextTuple();
Expand Down
Loading