Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
*******************************************************************************/
package org.apache.storm.eventhubs.trident;

import java.util.List;
import java.util.Map;

import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;

import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;

import java.util.List;
import java.util.Map;

/**
* A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -71,7 +70,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident

private TopologyContext topologyContext;

public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
this.kafkaManager = kafkaManager;
this.topologyContext = topologyContext;
Expand All @@ -87,14 +86,14 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, Topo
/**
* Creates instance of this class with default 500 millisecond refresh subscription timer
*/
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext topologyContext) {
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
this(kafkaManager, topologyContext, new Timer(500,
kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
}

@Override
public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {

LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, currBatchPartition, lastBatch, collector);
Expand All @@ -105,10 +104,10 @@ public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttemp
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();

if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
LOG.warn("SKIPPING processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
"[collector = {}] because it is not assigned {} to consumer instance [{}] of consumer group [{}]",
tx, currBatchPartition, lastBatch, collector, assignments, kafkaConsumer,
kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
"[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
"of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
} else {
try {
// pause other topic-partitions to only poll from current topic-partition
Expand Down Expand Up @@ -205,67 +204,40 @@ public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionRes
/**
* Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
* for this task must be assigned to the Kafka consumer running on this task.
*
* @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
* @return ordered list of topic partitions for this task
*/
@Override
public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<TopicPartition> allPartitionInfo) {
final int numTopicPartitions = allPartitionInfo == null ? 0 : allPartitionInfo.size();
final int taskIndex = topologyContext.getThisTaskIndex();
final int numTasks = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();

LOG.debug("Computing task ordered list of topic-partitions from all partitions list {}, " +
"for task with index [{}] of total tasks [{}] ", allPartitionInfo, taskIndex, numTasks);

final Set<TopicPartition> assignment = kafkaConsumer.assignment();
LOG.debug("Consumer [{}] has assigned topic-partitions {}", kafkaConsumer, assignment);

List<KafkaTridentSpoutTopicPartition> taskOrderedTps = new ArrayList<>(numTopicPartitions);

if (numTopicPartitions > 0) {
final KafkaTridentSpoutTopicPartition[] tps = new KafkaTridentSpoutTopicPartition[numTopicPartitions];
int tpTaskComputedIdx = taskIndex;
/*
* Put this task's Kafka consumer assigned topic-partitions in the right index locations such
* that distribution by OpaquePartitionedTridentSpoutExecutor can be done correctly. This algorithm
* does the distribution in exactly the same way as the one used in OpaquePartitionedTridentSpoutExecutor
*/
for (TopicPartition assignedTp : assignment) {
if (tpTaskComputedIdx >= numTopicPartitions) {
LOG.warn("Ignoring attempt to add consumer [{}] assigned topic-partition [{}] to index [{}], " +
"out of bounds [{}]. ", kafkaConsumer, assignedTp, tpTaskComputedIdx, numTopicPartitions);
break;
}
tps[tpTaskComputedIdx] = new KafkaTridentSpoutTopicPartition(assignedTp);
LOG.debug("Added consumer assigned topic-partition [{}] to position [{}] for task with index [{}]",
assignedTp, tpTaskComputedIdx, taskIndex);
tpTaskComputedIdx += numTasks;
}
final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allPartitionInfo);
LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
return allPartitions;
}

// Put topic-partitions assigned to consumer instances running in different tasks in the empty slots
int i = 0;
for (TopicPartition tp : allPartitionInfo) {
/*
* Topic-partition not assigned to the Kafka consumer associated with this emitter task, hence not yet
* added to the list of task ordered partitions. To be processed next.
*/
if (!assignment.contains(tp)) {
for (; i < numTopicPartitions; i++) {
if (tps[i] == null) { // find empty slot to put the topic-partition
tps[i] = new KafkaTridentSpoutTopicPartition(tp);
LOG.debug("Added to position [{}] topic-partition [{}], which is assigned to a consumer " +
"running on a task other than task with index [{}] ", i, tp, taskIndex);
i++;
break;
}
}
}
@Override
public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, List<TopicPartition> allPartitionInfo) {
final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
return taskTps;
}

private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps == null ? 0 : tps.size());
if (tps != null) {
for (TopicPartition tp : tps) {
LOG.trace("Added topic-partition [{}]", tp);
kttp.add(new KafkaTridentSpoutTopicPartition(tp));
}
taskOrderedTps = Arrays.asList(tps);
}
LOG.debug("Returning ordered list of topic-partitions {} for task with index [{}], of total tasks [{}] ",
taskOrderedTps, taskIndex, numTasks);
return taskOrderedTps;
return kttp;
}

private int getNumTasks() {
return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);

private final KafkaTridentSpoutManager<K, V> kafkaManager;
private KafkaTridentSpoutEmitter<K, V> kafkaTridentSpoutEmitter;
private KafkaTridentSpoutOpaqueCoordinator<K, V> coordinator;


public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
this(new KafkaTridentSpoutManager<>(conf));
}
Expand Down Expand Up @@ -73,9 +70,6 @@ public Fields getOutputFields() {
@Override
public String toString() {
return super.toString() +
"{kafkaManager=" + kafkaManager +
", kafkaTridentSpoutEmitter=" + kafkaTridentSpoutEmitter +
", coordinator=" + coordinator +
'}';
"{kafkaManager=" + kafkaManager + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,6 @@ private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition pa

/**
* re-emit the batch described by the meta data provided
*
* @param attempt
* @param collector
* @param partition
* @param meta
*/
private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
LOG.info("re-emitting batch, attempt " + attempt);
Expand All @@ -176,7 +171,7 @@ private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector c
ByteBufferMessageSet msgs = null;
msgs = fetchMessages(consumer, partition, offset);

if(msgs != null) {
if (msgs != null) {
for (MessageAndOffset msg : msgs) {
if (offset == nextOffset) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.storm.trident.topology.TransactionAttempt;
import org.apache.storm.tuple.Fields;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -52,13 +53,35 @@ interface Emitter<Partitions, Partition extends ISpoutPartition, M> {
* This method is called when this task is responsible for a new set of partitions. Should be used
* to manage things like connections to brokers.
*/
void refreshPartitions(List<Partition> partitionResponsibilities);
void refreshPartitions(List<Partition> partitionResponsibilities);

/**
* @return The oredered list of partitions being processed by all the tasks
*/
List<Partition> getOrderedPartitions(Partitions allPartitionInfo);

/**
* @return The list of partitions that are to be processed by the task with id {@code taskId}
*/
default List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo){
final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo);
final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size());
if (orderedPartitions != null) {
for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
taskPartitions.add(orderedPartitions.get(i));
}
}
return taskPartitions;
}

void close();
}

Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context);
Coordinator getCoordinator(Map conf, TopologyContext context);
Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context);

Coordinator getCoordinator(Map conf, TopologyContext context);

Map<String, Object> getComponentConfiguration();

Fields getOutputFields();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -111,16 +110,14 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl
tx, coordinatorMeta, collector, this);

if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
_partitionStates.clear();
List<ISpoutPartition> myPartitions = new ArrayList<>();
for(int i=_index; i < partitions.size(); i+=_numTasks) {
ISpoutPartition p = partitions.get(i);
String id = p.getId();
myPartitions.add(p);
_partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
for (ISpoutPartition partition : taskPartitions) {
_partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
}
_emitter.refreshPartitions(myPartitions);

// refresh all partitions for backwards compatibility with old spout
_emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
_savedCoordinatorMeta = coordinatorMeta;
_changedMeta = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
import java.util.List;
import java.util.Map;

/**
* Class that contains the logic to extract the transactional state info from zookeeper. All transactional state
* is kept in zookeeper. This class only contains references to Curator, which is used to get all info from zookeeper.
*/
public class TransactionalState {
private static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class);

Expand Down