Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);

initialize(partitions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ public void open(Map conf, final TopologyContext context, final SpoutOutputColle
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, topologyInstanceId);
totalTasks, context.getThisTaskId(), topologyInstanceId);
} else {
_coordinator = new ZkCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, topologyInstanceId);
totalTasks, context.getThisTaskId(), topologyInstanceId);
}

context.registerMetric("kafkaOffset", new IMetric() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMulti
}


public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons,
int totalTasks, int taskIndex, int taskId) {
Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
List<Partition> taskPartitions = new ArrayList<Partition>();
List<Partition> partitions = new ArrayList<Partition>();
Expand All @@ -269,20 +270,20 @@ public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInf
Partition taskPartition = partitions.get(i);
taskPartitions.add(taskPartition);
}
logPartitionMapping(totalTasks, taskIndex, taskPartitions);
logPartitionMapping(totalTasks, taskIndex, taskPartitions, taskId);
return taskPartitions;
}

private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
String taskPrefix = taskId(taskIndex, totalTasks);
private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions, int taskId) {
String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId);
if (taskPartitions.isEmpty()) {
LOG.warn(taskPrefix + "no partitions assigned");
LOG.warn(taskPrefix + " no partitions assigned");
} else {
LOG.info(taskPrefix + "assigned " + taskPartitions);
LOG.info(taskPrefix + " assigned " + taskPartitions);
}
}

public static String taskId(int taskIndex, int totalTasks) {
return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
public static String taskPrefix(int taskIndex, int totalTasks, int taskId) {
return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ public class StaticCoordinator implements PartitionCoordinator {
Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
List<PartitionManager> _allManagers = new ArrayList<>();

public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
StaticHosts hosts = (StaticHosts) config.hosts;
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(hosts.getPartitionInformation());
List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex);
List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId);
for (Partition myPartition : myPartitions) {
_managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@

import java.util.*;

import static org.apache.storm.kafka.KafkaUtils.taskId;
import static org.apache.storm.kafka.KafkaUtils.taskPrefix;

public class ZkCoordinator implements PartitionCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);

SpoutConfig _spoutConfig;
int _taskIndex;
int _totalTasks;
int _taskId;
String _topologyInstanceId;
Map<Partition, PartitionManager> _managers = new HashMap();
List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
Expand All @@ -41,15 +42,16 @@ public class ZkCoordinator implements PartitionCoordinator {
ZkState _state;
Map _stormConf;

public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, buildReader(stormConf, spoutConfig));
}

public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) {
_spoutConfig = spoutConfig;
_connections = connections;
_taskIndex = taskIndex;
_totalTasks = totalTasks;
_taskId = taskId;
_topologyInstanceId = topologyInstanceId;
_stormConf = stormConf;
_state = state;
Expand All @@ -75,9 +77,9 @@ public List<PartitionManager> getMyManagedPartitions() {
@Override
public void refresh() {
try {
LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections");
List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo();
List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId);

Set<Partition> curr = _managers.keySet();
Set<Partition> newPartitions = new HashSet<Partition>(mine);
Expand All @@ -86,7 +88,7 @@ public void refresh() {
Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
deletedPartitions.removeAll(mine);

LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString());

Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
for (Partition id : deletedPartitions) {
Expand All @@ -95,7 +97,7 @@ public void refresh() {
for (PartitionManager manager : deletedManagers.values()) {
if (manager != null) manager.close();
}
LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());

for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(
Expand All @@ -113,7 +115,7 @@ public void refresh() {
throw new RuntimeException(e);
}
_cachedList = new ArrayList<PartitionManager>(_managers.values());
LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTa
partitions.add(globalPartitionInformation);
int numTasks = numPartitions / partitionsPerTask;
for (int i = 0 ; i < numTasks ; i++) {
assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size());
assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size());
}
}

Expand All @@ -281,15 +281,15 @@ public void moreTasksThanPartitions() {
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
int numTasks = 2;
assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size());
assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size());
assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0, 0).size());
assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size());
}

@Test (expected = IllegalArgumentException.class )
public void assignInvalidTask() {
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC);
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
KafkaUtils.calculatePartitionsForTask(partitions, 1, 1);
KafkaUtils.calculatePartitionsForTask(partitions, 1, 1, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void waitForRefresh() throws InterruptedException {
private List<ZkCoordinator> buildCoordinators(int totalTasks) {
List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
for (int i = 0; i < totalTasks; i++) {
ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader);
ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, i, "test-id", reader);
coordinatorList.add(coordinator);
}
return coordinatorList;
Expand Down