diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 58347e334f5..fcef7aca256 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -139,8 +139,8 @@ public void onPartitionsRevoked(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { - LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", + context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); initialize(partitions); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index 01cc9b71878..3abadf2e12d 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -82,11 +82,11 @@ public void open(Map conf, final TopologyContext context, final SpoutOutputColle if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); + totalTasks, context.getThisTaskId(), topologyInstanceId); } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); + totalTasks, context.getThisTaskId(), topologyInstanceId); } context.registerMetric("kafkaOffset", new IMetric() { diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index f23c873ddf4..73e86e97b6f 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -254,7 +254,8 @@ public static Iterable> generateTuples(MessageMetadataSchemeAsMulti } - public static List calculatePartitionsForTask(List partitons, int totalTasks, int taskIndex) { + public static List calculatePartitionsForTask(List partitons, + int totalTasks, int taskIndex, int taskId) { Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); List taskPartitions = new ArrayList(); List partitions = new ArrayList(); @@ -269,20 +270,20 @@ public static List calculatePartitionsForTask(List taskPartitions) { - String taskPrefix = taskId(taskIndex, totalTasks); + private static void logPartitionMapping(int totalTasks, int taskIndex, List taskPartitions, int taskId) { + String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId); if (taskPartitions.isEmpty()) { - LOG.warn(taskPrefix + "no partitions assigned"); + LOG.warn(taskPrefix + " no partitions assigned"); } else { - LOG.info(taskPrefix + "assigned " + taskPartitions); + LOG.info(taskPrefix + " assigned " + taskPartitions); } } - public static String taskId(int taskIndex, int totalTasks) { - return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] "; + public static String taskPrefix(int taskIndex, int totalTasks, int taskId) { + return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId; } } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java index 628bfc0ac59..c3c5e97990b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java @@ -26,11 +26,11 @@ public class StaticCoordinator implements PartitionCoordinator { Map _managers = new HashMap(); List _allManagers = new ArrayList<>(); - public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { + public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { StaticHosts hosts = (StaticHosts) config.hosts; List partitions = new ArrayList(); partitions.add(hosts.getPartitionInformation()); - List myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex); + List myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId); for (Partition myPartition : myPartitions) { _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition)); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index 14be5845007..d9dbfb34f3e 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -23,7 +23,7 @@ import java.util.*; -import static org.apache.storm.kafka.KafkaUtils.taskId; +import static org.apache.storm.kafka.KafkaUtils.taskPrefix; public class ZkCoordinator implements PartitionCoordinator { private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class); @@ -31,6 +31,7 @@ public class ZkCoordinator implements PartitionCoordinator { SpoutConfig _spoutConfig; int _taskIndex; int _totalTasks; + int _taskId; String _topologyInstanceId; Map _managers = new HashMap(); List _cachedList = new ArrayList(); @@ -41,15 +42,16 @@ public class ZkCoordinator implements PartitionCoordinator { ZkState _state; Map _stormConf; - public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { - this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); + public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { + this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, buildReader(stormConf, spoutConfig)); } - public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { + public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; + _taskId = taskId; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; @@ -75,9 +77,9 @@ public List getMyManagedPartitions() { @Override public void refresh() { try { - LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections"); List brokerInfo = _reader.getBrokerInfo(); - List mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); + List mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId); Set curr = _managers.keySet(); Set newPartitions = new HashSet(mine); @@ -86,7 +88,7 @@ public void refresh() { Set deletedPartitions = new HashSet(curr); deletedPartitions.removeAll(mine); - LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString()); Map deletedManagers = new HashMap<>(); for (Partition id : deletedPartitions) { @@ -95,7 +97,7 @@ public void refresh() { for (PartitionManager manager : deletedManagers.values()) { if (manager != null) manager.close(); } - LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager( @@ -113,7 +115,7 @@ public void refresh() { throw new RuntimeException(e); } _cachedList = new ArrayList(_managers.values()); - LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing"); } @Override diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java index 9da6c0a5187..9362f91e1ae 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java @@ -271,7 +271,7 @@ public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTa partitions.add(globalPartitionInformation); int numTasks = numPartitions / partitionsPerTask; for (int i = 0 ; i < numTasks ; i++) { - assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size()); + assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size()); } } @@ -281,8 +281,8 @@ public void moreTasksThanPartitions() { List partitions = new ArrayList(); partitions.add(globalPartitionInformation); int numTasks = 2; - assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size()); - assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size()); + assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0, 0).size()); + assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size()); } @Test (expected = IllegalArgumentException.class ) @@ -290,6 +290,6 @@ public void assignInvalidTask() { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); List partitions = new ArrayList(); partitions.add(globalPartitionInformation); - KafkaUtils.calculatePartitionsForTask(partitions, 1, 1); + KafkaUtils.calculatePartitionsForTask(partitions, 1, 1, 1); } } diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index b23d5bcea2b..0b8684511e0 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -174,7 +174,7 @@ private void waitForRefresh() throws InterruptedException { private List buildCoordinators(int totalTasks) { List coordinatorList = new ArrayList(); for (int i = 0; i < totalTasks; i++) { - ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader); + ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, i, "test-id", reader); coordinatorList.add(coordinator); } return coordinatorList;