Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing, just pointing it out, though it's probably fine this way: the "task ID" is referred to in 3 different styles in these log lines through this PR:

  • taskId
  • Task-ID
  • task-ID

I think each is consistent within their respective log lines. Just wondering if there's any value to it being consistent across them. Also wonder if there's any existing convention in other log lines in the code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how I have derived the various styles :

  • taskId: Task.java has component ID named as componentId, so used taskId as the variable name for task ID.
  • Task-ID: This style is only being used in the print statement and is consistent with the existing style.
  • task-ID: Only used once, consistent with the other variable names in the log statement here. This can be renamed to task-Id.
  • I have used taskID as the variable name in rest of the files because taskId is the name of a function in same set of files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Srishty. Regarding taskId() being a function name as the justification for taskID being the variable name: in at least one case the name of the function is bad and should be changed. i.e., KafkaUtils.java's taskId() should be taskPrefix() or something else. It's not the "Task ID".

context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);

initialize(partitions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ public void open(Map<String, Object> conf, final TopologyContext context, final
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, topologyInstanceId);
totalTasks, context.getThisTaskId(), topologyInstanceId);
} else {
_coordinator = new ZkCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, topologyInstanceId);
totalTasks, context.getThisTaskId(), topologyInstanceId);
}

context.registerMetric("kafkaOffset", new IMetric() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMulti
}


public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons,
int totalTasks, int taskIndex, int taskId) {
Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
List<Partition> taskPartitions = new ArrayList<Partition>();
List<Partition> partitions = new ArrayList<Partition>();
Expand All @@ -273,20 +274,20 @@ public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInf
Partition taskPartition = partitions.get(i);
taskPartitions.add(taskPartition);
}
logPartitionMapping(totalTasks, taskIndex, taskPartitions);
logPartitionMapping(totalTasks, taskIndex, taskPartitions, taskId);
return taskPartitions;
}

private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
String taskPrefix = taskId(taskIndex, totalTasks);
private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions, int taskId) {
String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId);
if (taskPartitions.isEmpty()) {
LOG.warn(taskPrefix + "no partitions assigned");
LOG.warn(taskPrefix + " no partitions assigned");
} else {
LOG.info(taskPrefix + "assigned " + taskPartitions);
LOG.info(taskPrefix + " assigned " + taskPartitions);
}
}

public static String taskId(int taskIndex, int totalTasks) {
return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
public static String taskPrefix(int taskIndex, int totalTasks, int taskId) {
return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ public class StaticCoordinator implements PartitionCoordinator {
Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
List<PartitionManager> _allManagers = new ArrayList<>();

public StaticCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
public StaticCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig config, ZkState state,
int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
StaticHosts hosts = (StaticHosts) config.hosts;
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(hosts.getPartitionInformation());
List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex);
List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId);
for (Partition myPartition : myPartitions) {
_managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, topoConf, config, myPartition));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@

import java.util.*;

import static org.apache.storm.kafka.KafkaUtils.taskId;
import static org.apache.storm.kafka.KafkaUtils.taskPrefix;

public class ZkCoordinator implements PartitionCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);

SpoutConfig _spoutConfig;
int _taskIndex;
int _totalTasks;
int _taskId;
String _topologyInstanceId;
Map<Partition, PartitionManager> _managers = new HashMap();
List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
Expand All @@ -41,15 +42,18 @@ public class ZkCoordinator implements PartitionCoordinator {
ZkState _state;
Map _topoConf;

public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(topoConf, spoutConfig));
public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state,
int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, buildReader(topoConf, spoutConfig));
}

public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state,
int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) {
_spoutConfig = spoutConfig;
_connections = connections;
_taskIndex = taskIndex;
_totalTasks = totalTasks;
_taskId = taskId;
_topologyInstanceId = topologyInstanceId;
_topoConf = topoConf;
_state = state;
Expand All @@ -75,9 +79,9 @@ public List<PartitionManager> getMyManagedPartitions() {
@Override
public void refresh() {
try {
LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections");
List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo();
List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId);

Set<Partition> curr = _managers.keySet();
Set<Partition> newPartitions = new HashSet<Partition>(mine);
Expand All @@ -86,7 +90,7 @@ public void refresh() {
Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
deletedPartitions.removeAll(mine);

LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString());

Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
for (Partition id : deletedPartitions) {
Expand All @@ -95,7 +99,7 @@ public void refresh() {
for (PartitionManager manager : deletedManagers.values()) {
if (manager != null) manager.close();
}
LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());

for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(
Expand All @@ -113,7 +117,7 @@ public void refresh() {
throw new RuntimeException(e);
}
_cachedList = new ArrayList<PartitionManager>(_managers.values());
LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTa
partitions.add(globalPartitionInformation);
int numTasks = numPartitions / partitionsPerTask;
for (int i = 0 ; i < numTasks ; i++) {
assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size());
assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size());
}
}

Expand All @@ -281,15 +281,15 @@ public void moreTasksThanPartitions() {
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
int numTasks = 2;
assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size());
assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size());
assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0, 0).size());
assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size());
}

@Test (expected = IllegalArgumentException.class )
public void assignInvalidTask() {
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC);
List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
partitions.add(globalPartitionInformation);
KafkaUtils.calculatePartitionsForTask(partitions, 1, 1);
KafkaUtils.calculatePartitionsForTask(partitions, 1, 1, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private void waitForRefresh() throws InterruptedException {
private List<ZkCoordinator> buildCoordinators(int totalTasks) {
List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
for (int i = 0; i < totalTasks; i++) {
ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, topoConf, spoutConfig, state, i, totalTasks, "test-id", reader);
ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, topoConf, spoutConfig, state, i, totalTasks, i, "test-id", reader);
coordinatorList.add(coordinator);
}
return coordinatorList;
Expand Down
2 changes: 1 addition & 1 deletion storm-client/src/jvm/org/apache/storm/daemon/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public List<Integer> getOutgoingTasks(Integer outTaskId, String stream, List<Obj

public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
if (debug) {
LOG.info("Emitting: {} {} {}", componentId, stream, values);
LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values);
}

List<Integer> outTasks = new ArrayList<>();
Expand Down