Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-175] Make consumerId unique per thread #176

Merged
merged 5 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.3.1 (UNRELEASED)
#### Bug Fixes
- [ISSUE-175](https://github.com/SourceLabOrg/kafka-webview/issues/175) Update multi-threaded consumers with unique consumerId [PR](https://github.com/SourceLabOrg/kafka-webview/pull/176).

## 2.3.0 (06/19/2019)
#### New Features
- [ISSUE-166](https://github.com/SourceLabOrg/kafka-webview/issues/166) Add groupSearchFilter property to specify the filter used to list LDAP group membership. Thanks for the contribution @[BlueIcarus](https://github.com/BlueIcarus)!
Expand Down
4 changes: 2 additions & 2 deletions dev-cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<artifactId>kafka-webview</artifactId>
<groupId>org.sourcelab</groupId>
<version>2.3.0</version>
<version>2.3.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dev-cluster</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>

<!-- Require Maven 3.3.9 -->
<prerequisites>
Expand Down
2 changes: 1 addition & 1 deletion kafka-webview-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-webview</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-webview-plugin</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions kafka-webview-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
<parent>
<artifactId>kafka-webview</artifactId>
<groupId>org.sourcelab</groupId>
<version>2.3.0</version>
<version>2.3.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-webview-ui</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>

<!-- Module Description and Ownership -->
<name>Kafka WebView UI</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ public String readTopic(
/**
* GET Displays info about a specific consumers group in a cluster.
*/
@RequestMapping(path = "/{clusterId}/consumer/{consumerId:.+}", method = RequestMethod.GET)
@RequestMapping(path = "/{clusterId}/consumer/{groupId:.+}", method = RequestMethod.GET)
public String readConsumer(
@PathVariable final Long clusterId,
@PathVariable final String consumerId,
@PathVariable final String groupId,
final Model model,
final RedirectAttributes redirectAttributes) {

Expand All @@ -182,12 +182,12 @@ public String readConsumer(
return "redirect:/";
}
model.addAttribute("cluster", cluster);
model.addAttribute("consumerId", consumerId);
model.addAttribute("groupId", groupId);

// Setup breadcrumbs
setupBreadCrumbs(model)
.addCrumb(cluster.getName(), "/cluster/" + clusterId)
.addCrumb("Consumer " + consumerId, null);
.addCrumb("Consumer Group " + groupId, null);


// Display template
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,22 @@ private Map<String, Object> applyCommonSettings(
final String consumerId,
final Map<String, Object> config
) {
// Generate consumerId with our configured static prefix.
final String prefixedConsumerId = consumerIdPrefix.concat("-").concat(consumerId);
// Generate groupId with our configured static prefix.
final String prefixedGroupId = consumerIdPrefix.concat("-").concat(consumerId);

// Generate consumerId, which should be unique per user and thread.
final String prefixedConsumerId = prefixedGroupId.concat("-") + Thread.currentThread().getId();

// Set common config items
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterConfig.getConnectString());
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);

// ClientId and ConsumerGroupId are intended to be unique for each user session.
// groupId is intended to be unique for each user session.
// clientId is intended to be unique per user session and thread.
// See Issue-57 https://github.com/SourceLabOrg/kafka-webview/issues/57#issuecomment-363508531
// See Issue-175 https://github.com/SourceLabOrg/kafka-webview/issues/175
config.put(ConsumerConfig.CLIENT_ID_CONFIG, prefixedConsumerId);
config.put(ConsumerConfig.GROUP_ID_CONFIG, prefixedConsumerId);
config.put(ConsumerConfig.GROUP_ID_CONFIG, prefixedGroupId);

// Optionally configure SSL
applySslSettings(clusterConfig, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public List<ConsumerGroupDetails> getConsumerGroupDetails(final List<String> con
});

// Sort list by consumer group id
consumerGroupDetails.sort(Comparator.comparing(ConsumerGroupDetails::getConsumerId));
consumerGroupDetails.sort(Comparator.comparing(ConsumerGroupDetails::getGroupId));

// Return immutable list.
return Collections.unmodifiableList(consumerGroupDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public ParallelWebKafkaConsumer(

@Override
public KafkaResults consumePerPartition() {
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
final List<TopicPartition> allTopicPartitions = getAllPartitions(kafkaConsumer);

// To preserve order
Expand Down Expand Up @@ -131,7 +131,7 @@ public KafkaResults consumePerPartition() {

@Override
public ConsumerState seek(final Map<Integer, Long> partitionOffsetMap) {
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
for (final Map.Entry<Integer, Long> entry : partitionOffsetMap.entrySet()) {
kafkaConsumer.seek(
new TopicPartition(clientConfig.getTopicConfig().getTopicName(), entry.getKey()),
Expand All @@ -145,7 +145,7 @@ public ConsumerState seek(final Map<Integer, Long> partitionOffsetMap) {

@Override
public ConsumerState seek(final long timestamp) {
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
// Find offsets for timestamp
final Map<TopicPartition, Long> timestampMap = new HashMap<>();
for (final TopicPartition topicPartition : getAllPartitions(kafkaConsumer)) {
Expand Down Expand Up @@ -173,7 +173,7 @@ public void close() {

@Override
public void previous() {
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
// Get all available partitions
final List<TopicPartition> topicPartitions = getAllPartitions(kafkaConsumer);

Expand Down Expand Up @@ -203,7 +203,7 @@ public void previous() {

@Override
public void next() {
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
// Get all available partitions
final List<TopicPartition> topicPartitions = getAllPartitions(kafkaConsumer);

Expand Down Expand Up @@ -231,7 +231,7 @@ public void next() {

@Override
public ConsumerState toHead() {
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
// Get all available partitions
final List<TopicPartition> topicPartitions = getAllPartitions(kafkaConsumer);

Expand All @@ -253,7 +253,7 @@ public ConsumerState toHead() {

@Override
public ConsumerState toTail() {
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
// Get all available partitions
final List<TopicPartition> topicPartitions = getAllPartitions(kafkaConsumer);

Expand All @@ -274,7 +274,20 @@ public ConsumerState toTail() {
}
}

/**
* Creates a new consumer, but does NOT subscribe to any partitions.
* Will be required to subscribe to the partitions manually you require.
* @return KafkaConsumer
*/
private KafkaConsumer createNewConsumer() {
return kafkaConsumerFactory.createConsumer(clientConfig);
}

/**
* Creates a new consumer AND subscribes to all partitions.
* @return KafkaConsumer
*/
private KafkaConsumer createNewConsumerAndSubscribeAllPartitions() {
return kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* Details about a specific consumer group.
*/
public class ConsumerGroupDetails {
private final String consumerId;
private final String groupId;
private final boolean isSimple;
private final String partitionAssignor;
private final String state;
Expand All @@ -43,30 +43,30 @@ public class ConsumerGroupDetails {

/**
* Constructor.
* @param consumerId consumer group id.
* @param groupId consumer group id.
* @param isSimple if its a simple consumer group
* @param partitionAssignor How partitions are assigned
* @param state state of consumer
* @param members members in the group.
* @param coordinator node that is acting as the coordinator for this group.
*/
public ConsumerGroupDetails(
final String consumerId,
final String groupId,
final boolean isSimple,
final String partitionAssignor,
final String state,
final List<Member> members,
final NodeDetails coordinator) {
this.consumerId = consumerId;
this.groupId = groupId;
this.isSimple = isSimple;
this.partitionAssignor = partitionAssignor;
this.members = Collections.unmodifiableList(new ArrayList<>(members));
this.state = state;
this.coordinator = coordinator;
}

public String getConsumerId() {
return consumerId;
public String getGroupId() {
return groupId;
}

public boolean isSimple() {
Expand Down Expand Up @@ -142,7 +142,7 @@ public String toString() {
@Override
public String toString() {
return "ConsumerGroupDetails{"
+ "consumerId='" + consumerId + '\''
+ "groupId='" + groupId + '\''
+ ", isSimple=" + isSimple
+ ", partitionAssignor='" + partitionAssignor + '\''
+ ", state='" + state + '\''
Expand Down
10 changes: 5 additions & 5 deletions kafka-webview-ui/src/main/resources/templates/cluster/read.html
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@

jQuery.each(ClusterInfo.cachedConsumers, function(index, result) {
// Filter
if (!SearchTools.doesMatchText(searchStr, result.consumerId)) {
if (!SearchTools.doesMatchText(searchStr, result.groupId)) {
// Skip
return;
}
Expand All @@ -175,8 +175,8 @@

// Generate html from template
var properties = {
id: result.consumerId,
encodedId: encodeURIComponent(result.consumerId),
id: result.groupId,
encodedId: encodeURIComponent(result.groupId),
topic: uniqueTopics,
state: result.state,
numberOfMembers: result.members.length,
Expand Down Expand Up @@ -214,11 +214,11 @@
ClusterInfo.handleAllConsumers(results);
});
},
removeConsumer: function(consumerId) {
removeConsumer: function(groupId) {
if (!confirm('Are you sure you want to remove this consumer group?')) {
return;
}
ApiClient.removeConsumer(ClusterInfo.clusterId, consumerId, function(result) {
ApiClient.removeConsumer(ClusterInfo.clusterId, groupId, function(result) {
UITools.showSuccess('Successfully remove consumer group.');

// Display success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
var ConsumerInfo = {
clusterId: '[[${cluster.id}]]',
clusterName: '[[${cluster.name}]]',
consumerId: '[[${consumerId}]]',
groupId: '[[${groupId}]]',

// Holds reference to last consumer membership data.
lastMembershipDetails: [],
Expand Down Expand Up @@ -49,7 +49,7 @@
jQuery('#consumer-membership-table').toggle(false);

// Request Cluster Information
ApiClient.getConsumerDetails(ConsumerInfo.clusterId, ConsumerInfo.consumerId, ConsumerInfo.handleConsumerDetails);
ApiClient.getConsumerDetails(ConsumerInfo.clusterId, ConsumerInfo.groupId, ConsumerInfo.handleConsumerDetails);
},

// Handle results from ConsumerDetails request
Expand Down Expand Up @@ -78,7 +78,7 @@
// Generate html from template
var properties = {
clusterName: ConsumerInfo.clusterName,
consumerId: ConsumerInfo.consumerId,
groupId: ConsumerInfo.groupId,
topic: uniqueTopics,
state: results.state,
numberOfMembers: results.members.length,
Expand Down Expand Up @@ -152,7 +152,7 @@
if (!ConsumerInfo.autoRefreshOffsets) {
return;
}
ApiClient.getConsumerOffsetsWithTailPositions(ConsumerInfo.clusterId, ConsumerInfo.consumerId, ConsumerInfo.handleConsumerOffsets);
ApiClient.getConsumerOffsetsWithTailPositions(ConsumerInfo.clusterId, ConsumerInfo.groupId, ConsumerInfo.handleConsumerOffsets);
},

// Handle ConsumerOffsets Results
Expand Down Expand Up @@ -425,7 +425,7 @@
<div class="card">
<div class="card-header">
<i class="fa fa-align-justify"></i>
Cluster <strong th:text="${cluster.name}"></strong> Consumer <strong th:text="${consumerId}"></strong>
Cluster <strong th:text="${cluster.name}"></strong> Consumer <strong th:text="${groupId}"></strong>
</div>
<div class="card-body">
<!-- Display Loader First -->
Expand Down Expand Up @@ -461,7 +461,7 @@
<div class="card">
<div class="card-header">
<i class="fa fa-align-justify"></i>
Consumer <strong th:text="${consumerId}"></strong> Membership
Consumer Group <strong th:text="${groupId}"></strong> Membership

<div class="btn-group float-right" role="group" aria-label="Button group">
<a id="enable-refresh-details-toggle" class="btn" href="#" style="padding-bottom: 0;" onclick="ConsumerInfo.toggleAutoRefreshForConsumerDetails(false); return false;">
Expand Down Expand Up @@ -509,7 +509,7 @@
<div class="card">
<div class="card-header">
<i class="fa fa-align-justify"></i>
Consumer <strong th:text="${consumerId}"></strong> Positions
Consumer Group <strong th:text="${groupId}"></strong> Positions

<div class="btn-group float-right" role="group" aria-label="Button group">
<a id="enable-refresh-offsets-toggle" class="btn" href="#" style="padding-bottom: 0;" onclick="ConsumerInfo.toggleAutoRefreshForConsumerOffsets(false); return false;">
Expand Down Expand Up @@ -565,8 +565,8 @@
<td>{{clusterName}}</td>
</tr>
<tr>
<td><strong>Consumer Id</strong></td>
<td>{{consumerId}}</td>
<td><strong>Group Id</strong></td>
<td>{{groupId}}</td>
</tr>
<tr>
<td><strong>Topic</strong></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void test_listConsumersAndDetails() throws Exception {
// {"consumerId":"test-consumer-id-1543825835154","partitionAssignor":"","state":"Empty","members":[],"coordinator":{"id":1,"host":"127.0.0.1","port":52168,"rack":null},"simple":false}]

// Validate submit button seems to show up.
.andExpect(content().string(containsString("{\"consumerId\":\"" + consumerId )))
.andExpect(content().string(containsString("{\"groupId\":\"" + consumerId )))
.andExpect(content().string(containsString("partitionAssignor")))
.andExpect(content().string(containsString("state")))
.andExpect(content().string(containsString("members")))
Expand Down Expand Up @@ -435,7 +435,7 @@ public void test_specificConsumerDetails() throws Exception {
// {"consumerId":"test-consumer-id-1543909384618","partitionAssignor":"","state":"Empty","members":[],"coordinator":{"id":1,"host":"127.0.0.1","port":51229,"rack":null},"simple":false}

// Validate submit button seems to show up.
.andExpect(content().string(containsString("{\"consumerId\":\"" + consumerId )))
.andExpect(content().string(containsString("{\"groupId\":\"" + consumerId )))
.andExpect(content().string(containsString("partitionAssignor")))
.andExpect(content().string(containsString("state")))
.andExpect(content().string(containsString("members")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class KafkaClientConfigUtilTest {
private final KafkaClientConfigUtil util = new KafkaClientConfigUtil("/tmp", "TestPrefix");

private final String consumerId = "MyConsumer";
private final String expectedFinalConsumerId = "TestPrefix-MyConsumer";
private final String expectedFinalGroupId = "TestPrefix-MyConsumer";
private final String expectedFinalConsumerId = "TestPrefix-MyConsumer-" + Thread.currentThread().getId();
private final String expectedBrokerHosts = "localhost:9092,yourHost:8282";
private final int expectedRequestTimeoutValue = 15000;

Expand Down Expand Up @@ -226,7 +227,7 @@ private void validateDefaultKeys(final Map<String, Object> config) {
validateKey(config, AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, expectedBrokerHosts);
validateKey(config, AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, expectedRequestTimeoutValue);
validateKey(config, AdminClientConfig.CLIENT_ID_CONFIG, expectedFinalConsumerId);
validateKey(config, ConsumerConfig.GROUP_ID_CONFIG, expectedFinalConsumerId);
validateKey(config, ConsumerConfig.GROUP_ID_CONFIG, expectedFinalGroupId);
}

private void validateKey(final Map<String, Object> config, final String key, final Object expectedValue) {
Expand Down
Loading