Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions docs/storm-kafka-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,9 @@ streams. If you are doing this for Trident a value must be in the List returned
otherwise trident can throw exceptions.


### Manual Partition Control (ADVANCED)
### Manual Partition Assigment (ADVANCED)

By default Kafka will automatically assign partitions to the current set of spouts. It handles lots of things, but in some cases you may want to manually assign the partitions.
This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right. This can all be handled by subclassing
Subscription and we have a few implementations that you can look at for examples on how to do this. ManualPartitionNamedSubscription and ManualPartitionPatternSubscription. Again
please be careful when using these or implementing your own.
By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality.

## Use the Maven Shade Plugin to Build the Uber Jar

Expand Down

Large diffs are not rendered by default.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.storm.kafka.spout;
package org.apache.storm.kafka.spout.subscription;

import java.util.Collections;
import java.util.HashSet;
Expand All @@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.TopicPartitionComparator;
import org.apache.storm.task.TopologyContext;

public class ManualPartitionSubscription extends Subscription {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.storm.kafka.spout;
package org.apache.storm.kafka.spout.subscription;

import java.util.List;
import org.apache.kafka.common.TopicPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.apache.storm.kafka.spout;
package org.apache.storm.kafka.spout.subscription;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.apache.storm.kafka.spout;
package org.apache.storm.kafka.spout.subscription;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.storm.kafka.spout;
package org.apache.storm.kafka.spout.subscription;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.storm.kafka.spout;
package org.apache.storm.kafka.spout.subscription;

import java.io.Serializable;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.apache.storm.kafka.spout;
package org.apache.storm.kafka.spout.subscription;

import java.io.Serializable;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
Expand All @@ -50,53 +49,38 @@ public class KafkaSpoutCommitTest {
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
private KafkaSpout<String, String> spout;
private KafkaSpoutConfig spoutConfig;
private KafkaSpoutConfig<String, String> spoutConfig;

@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;

private void setupSpout(Set<TopicPartition> assignedPartitions) {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
spoutConfig = getKafkaSpoutConfigBuilder(-1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build();

.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build();
consumerMock = mock(KafkaConsumer.class);
KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;

//Set up a spout listening to 1 topic partition
spout = new KafkaSpout<>(spoutConfig, consumerFactory);

spout.open(conf, contextMock, collectorMock);
spout.activate();

ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());

//Assign partitions to the spout
ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}

@Test
public void testCommitSuccessWithOffsetVoids() {
//Verify that the commit logic can handle offset voids
try (SimulatedTime simulatedTime = new SimulatedTime()) {
setupSpout(Collections.singleton(partition));
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
// Offsets emitted are 0,1,2,3,4,<void>,8,9
for (int i = 0; i < 5; i++) {
recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
for (int i = 8; i < 10; i++) {
recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);

when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords(records));
.thenReturn(new ConsumerRecords<>(records));

for (int i = 0; i < recordsForPartition.size(); i++) {
spout.nextTuple();
Expand Down
Loading