Skip to content

Commit cd3b930

Browse files
committed
STORM-2407: KafkaTridentSpoutOpaque Doesn't Poll Data From All Topic-Partitions When Parallelism Hint Not a Multiple Total Topic-Partitions
- Introduce logic to poll data from the topic partitions assigned to each task
1 parent 943bef6 commit cd3b930

File tree

7 files changed

+74
-90
lines changed

7 files changed

+74
-90
lines changed

external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,15 @@
1717
*******************************************************************************/
1818
package org.apache.storm.eventhubs.trident;
1919

20-
import java.util.List;
21-
import java.util.Map;
22-
2320
import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
2421
import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
25-
2622
import org.apache.storm.trident.operation.TridentCollector;
2723
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
2824
import org.apache.storm.trident.topology.TransactionAttempt;
2925

26+
import java.util.List;
27+
import java.util.Map;
28+
3029
/**
3130
* A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
3231
*/

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java

Lines changed: 33 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535

3636
import java.io.Serializable;
3737
import java.util.ArrayList;
38-
import java.util.Arrays;
3938
import java.util.Collection;
4039
import java.util.Collections;
4140
import java.util.HashSet;
@@ -71,7 +70,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
7170

7271
private TopologyContext topologyContext;
7372

74-
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
73+
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
7574
this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
7675
this.kafkaManager = kafkaManager;
7776
this.topologyContext = topologyContext;
@@ -87,14 +86,14 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, Topo
8786
/**
8887
* Creates instance of this class with default 500 millisecond refresh subscription timer
8988
*/
90-
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext topologyContext) {
89+
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
9190
this(kafkaManager, topologyContext, new Timer(500,
9291
kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
9392
}
9493

9594
@Override
9695
public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
97-
KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
96+
KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
9897

9998
LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
10099
tx, currBatchPartition, lastBatch, collector);
@@ -105,10 +104,10 @@ public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttemp
105104
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
106105

107106
if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
108-
LOG.warn("SKIPPING processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
109-
"[collector = {}] because it is not assigned {} to consumer instance [{}] of consumer group [{}]",
110-
tx, currBatchPartition, lastBatch, collector, assignments, kafkaConsumer,
111-
kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
107+
LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
108+
"[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
109+
"of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
110+
kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
112111
} else {
113112
try {
114113
// pause other topic-partitions to only poll from current topic-partition
@@ -205,67 +204,40 @@ public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionRes
205204
/**
206205
* Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
207206
* for this task must be assigned to the Kafka consumer running on this task.
207+
*
208208
* @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
209209
* @return ordered list of topic partitions for this task
210210
*/
211211
@Override
212212
public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<TopicPartition> allPartitionInfo) {
213-
final int numTopicPartitions = allPartitionInfo == null ? 0 : allPartitionInfo.size();
214-
final int taskIndex = topologyContext.getThisTaskIndex();
215-
final int numTasks = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
216-
217-
LOG.debug("Computing task ordered list of topic-partitions from all partitions list {}, " +
218-
"for task with index [{}] of total tasks [{}] ", allPartitionInfo, taskIndex, numTasks);
219-
220-
final Set<TopicPartition> assignment = kafkaConsumer.assignment();
221-
LOG.debug("Consumer [{}] has assigned topic-partitions {}", kafkaConsumer, assignment);
222-
223-
List<KafkaTridentSpoutTopicPartition> taskOrderedTps = new ArrayList<>(numTopicPartitions);
224-
225-
if (numTopicPartitions > 0) {
226-
final KafkaTridentSpoutTopicPartition[] tps = new KafkaTridentSpoutTopicPartition[numTopicPartitions];
227-
int tpTaskComputedIdx = taskIndex;
228-
/*
229-
* Put this task's Kafka consumer assigned topic-partitions in the right index locations such
230-
* that distribution by OpaquePartitionedTridentSpoutExecutor can be done correctly. This algorithm
231-
* does the distribution in exactly the same way as the one used in OpaquePartitionedTridentSpoutExecutor
232-
*/
233-
for (TopicPartition assignedTp : assignment) {
234-
if (tpTaskComputedIdx >= numTopicPartitions) {
235-
LOG.warn("Ignoring attempt to add consumer [{}] assigned topic-partition [{}] to index [{}], " +
236-
"out of bounds [{}]. ", kafkaConsumer, assignedTp, tpTaskComputedIdx, numTopicPartitions);
237-
break;
238-
}
239-
tps[tpTaskComputedIdx] = new KafkaTridentSpoutTopicPartition(assignedTp);
240-
LOG.debug("Added consumer assigned topic-partition [{}] to position [{}] for task with index [{}]",
241-
assignedTp, tpTaskComputedIdx, taskIndex);
242-
tpTaskComputedIdx += numTasks;
243-
}
213+
final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allPartitionInfo);
214+
LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
215+
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
216+
return allPartitions;
217+
}
244218

245-
// Put topic-partitions assigned to consumer instances running in different tasks in the empty slots
246-
int i = 0;
247-
for (TopicPartition tp : allPartitionInfo) {
248-
/*
249-
* Topic-partition not assigned to the Kafka consumer associated with this emitter task, hence not yet
250-
* added to the list of task ordered partitions. To be processed next.
251-
*/
252-
if (!assignment.contains(tp)) {
253-
for (; i < numTopicPartitions; i++) {
254-
if (tps[i] == null) { // find empty slot to put the topic-partition
255-
tps[i] = new KafkaTridentSpoutTopicPartition(tp);
256-
LOG.debug("Added to position [{}] topic-partition [{}], which is assigned to a consumer " +
257-
"running on a task other than task with index [{}] ", i, tp, taskIndex);
258-
i++;
259-
break;
260-
}
261-
}
262-
}
219+
@Override
220+
public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, List<TopicPartition> allPartitionInfo) {
221+
final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
222+
LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
223+
final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
224+
LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
225+
return taskTps;
226+
}
227+
228+
private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
229+
final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps == null ? 0 : tps.size());
230+
if (tps != null) {
231+
for (TopicPartition tp : tps) {
232+
LOG.trace("Added topic-partition [{}]", tp);
233+
kttp.add(new KafkaTridentSpoutTopicPartition(tp));
263234
}
264-
taskOrderedTps = Arrays.asList(tps);
265235
}
266-
LOG.debug("Returning ordered list of topic-partitions {} for task with index [{}], of total tasks [{}] ",
267-
taskOrderedTps, taskIndex, numTasks);
268-
return taskOrderedTps;
236+
return kttp;
237+
}
238+
239+
private int getNumTasks() {
240+
return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
269241
}
270242

271243
@Override

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
3535
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
3636

3737
private final KafkaTridentSpoutManager<K, V> kafkaManager;
38-
private KafkaTridentSpoutEmitter<K, V> kafkaTridentSpoutEmitter;
39-
private KafkaTridentSpoutOpaqueCoordinator<K, V> coordinator;
4038

41-
4239
public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
4340
this(new KafkaTridentSpoutManager<>(conf));
4441
}
@@ -73,9 +70,6 @@ public Fields getOutputFields() {
7370
@Override
7471
public String toString() {
7572
return super.toString() +
76-
"{kafkaManager=" + kafkaManager +
77-
", kafkaTridentSpoutEmitter=" + kafkaTridentSpoutEmitter +
78-
", coordinator=" + coordinator +
79-
'}';
73+
"{kafkaManager=" + kafkaManager + '}';
8074
}
8175
}

external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,6 @@ private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition pa
160160

161161
/**
162162
* re-emit the batch described by the meta data provided
163-
*
164-
* @param attempt
165-
* @param collector
166-
* @param partition
167-
* @param meta
168163
*/
169164
private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
170165
LOG.info("re-emitting batch, attempt " + attempt);
@@ -176,7 +171,7 @@ private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector c
176171
ByteBufferMessageSet msgs = null;
177172
msgs = fetchMessages(consumer, partition, offset);
178173

179-
if(msgs != null) {
174+
if (msgs != null) {
180175
for (MessageAndOffset msg : msgs) {
181176
if (offset == nextOffset) {
182177
break;

storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.storm.trident.topology.TransactionAttempt;
2323
import org.apache.storm.tuple.Fields;
2424

25+
import java.util.ArrayList;
2526
import java.util.List;
2627
import java.util.Map;
2728

@@ -52,13 +53,35 @@ interface Emitter<Partitions, Partition extends ISpoutPartition, M> {
5253
* This method is called when this task is responsible for a new set of partitions. Should be used
5354
* to manage things like connections to brokers.
5455
*/
55-
void refreshPartitions(List<Partition> partitionResponsibilities);
56+
void refreshPartitions(List<Partition> partitionResponsibilities);
57+
58+
/**
59+
* @return The oredered list of partitions being processed by all the tasks
60+
*/
5661
List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
62+
63+
/**
64+
* @return The list of partitions that are to be processed by the task with id {@code taskId}
65+
*/
66+
default List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo){
67+
final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo);
68+
final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size());
69+
if (orderedPartitions != null) {
70+
for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
71+
taskPartitions.add(orderedPartitions.get(i));
72+
}
73+
}
74+
return taskPartitions;
75+
}
76+
5777
void close();
5878
}
5979

60-
Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context);
61-
Coordinator getCoordinator(Map conf, TopologyContext context);
80+
Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context);
81+
82+
Coordinator getCoordinator(Map conf, TopologyContext context);
83+
6284
Map<String, Object> getComponentConfiguration();
85+
6386
Fields getOutputFields();
6487
}

storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
import java.util.ArrayList;
3130
import java.util.HashMap;
3231
import java.util.HashSet;
3332
import java.util.List;
@@ -111,16 +110,14 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl
111110
tx, coordinatorMeta, collector, this);
112111

113112
if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
114-
List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
115113
_partitionStates.clear();
116-
List<ISpoutPartition> myPartitions = new ArrayList<>();
117-
for(int i=_index; i < partitions.size(); i+=_numTasks) {
118-
ISpoutPartition p = partitions.get(i);
119-
String id = p.getId();
120-
myPartitions.add(p);
121-
_partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
114+
final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
115+
for (ISpoutPartition partition : taskPartitions) {
116+
_partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
122117
}
123-
_emitter.refreshPartitions(myPartitions);
118+
119+
// refresh all partitions for backwards compatibility with old spout
120+
_emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
124121
_savedCoordinatorMeta = coordinatorMeta;
125122
_changedMeta = true;
126123
}

storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737
import java.util.List;
3838
import java.util.Map;
3939

40+
/**
41+
* Class that contains the logic to extract the transactional state info from zookeeper. All transactional state
42+
* is kept in zookeeper. This class only contains references to Curator, which is used to get all info from zookeeper.
43+
*/
4044
public class TransactionalState {
4145
private static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class);
4246

0 commit comments

Comments
 (0)