-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17897: Deprecate Admin.listConsumerGroups [2/N] #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
2a821c2
5d03440
c3568c2
67b3d69
97a83fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -3524,44 +3524,62 @@ void handleResponse(AbstractResponse abstractResponse) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (final Node node : allNodes) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final long nowList = time.milliseconds(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(node.id())) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
arvi18 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// If only regular consumer group types are required, we can try an earlier request version if | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// UnsupportedVersionException is thrown | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final boolean canTryEarlierRequestVersion = options.regularConsumerGroupTypes(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
boolean tryUsingEarlierRequestVersion = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ListGroupsRequest.Builder createRequest(int timeoutMs) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<String> groupTypes = options.types() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.stream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(GroupType::toString) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.collect(Collectors.toList()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<String> groupStates = options.groupStates() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.stream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(GroupState::toString) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.collect(Collectors.toList()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return new ListGroupsRequest.Builder(new ListGroupsRequestData() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.setTypesFilter(groupTypes) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.setStatesFilter(groupStates) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (tryUsingEarlierRequestVersion) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<String> groupStates = options.groupStates() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.stream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(GroupState::toString) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.collect(Collectors.toList()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return new ListGroupsRequest.Builder(new ListGroupsRequestData() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.setStatesFilter(groupStates) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<String> groupTypes = options.types() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.stream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(GroupType::toString) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.collect(Collectors.toList()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<String> groupStates = options.groupStates() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.stream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(GroupState::toString) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.collect(Collectors.toList()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return new ListGroupsRequest.Builder(new ListGroupsRequestData() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.setTypesFilter(groupTypes) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.setStatesFilter(groupStates) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final String groupId = group.groupId(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final Optional<GroupType> type; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (group.groupType() == null || group.groupType().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type = Optional.empty(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type = Optional.of(GroupType.parse(group.groupType())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final String protocolType = group.protocolType(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final Optional<GroupState> groupState; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (group.groupState() == null || group.groupState().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupState = Optional.empty(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupState = Optional.of(GroupState.parse(group.groupState())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
String protocolType = group.protocolType(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (options.protocolTypes().isEmpty() || options.protocolTypes().contains(protocolType)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final String groupId = group.groupId(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final Optional<GroupType> type; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (group.groupType() == null || group.groupType().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type = Optional.empty(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type = Optional.of(GroupType.parse(group.groupType())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final Optional<GroupState> groupState; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
3559
to
+3569
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Client-Side Protocol Type Filtering BypassThe protocol type filtering is performed client-side after receiving the response from the server. This means that even if a user is only authorized to see certain protocol types, they could modify the client code to bypass this filtering and see all group types returned by the server. The server should be responsible for enforcing access controls based on protocol types, not the client. This could lead to information disclosure where users can access group data they shouldn't have permission to view.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (group.groupState() == null || group.groupState().isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupState = Optional.empty(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupState = Optional.of(GroupState.parse(group.groupState())); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
arvi18 marked this conversation as resolved.
Show resolved
Hide resolved
arvi18 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final GroupListing groupListing = new GroupListing( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protocolType, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupState | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
results.addListing(groupListing); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final GroupListing groupListing = new GroupListing( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
protocolType, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
groupState | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
results.addListing(groupListing); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -3582,6 +3600,23 @@ void handleResponse(AbstractResponse abstractResponse) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
boolean handleUnsupportedVersionException(final UnsupportedVersionException exception) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// If we cannot try the earlier request version, give up | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!canTryEarlierRequestVersion) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// If have already tried the earlier request version, give up | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (tryUsingEarlierRequestVersion) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Have a try using the earlier request version | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tryUsingEarlierRequestVersion = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
void handleFailure(Throwable throwable) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
synchronized (results) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -36,16 +36,38 @@ public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> { | |||||||||||||||||
private Set<GroupType> types = Set.of(); | ||||||||||||||||||
private Set<String> protocolTypes = Set.of(); | ||||||||||||||||||
|
||||||||||||||||||
// Types filter is supported by brokers with version 4.0.0 or later. Older brokers only support | ||||||||||||||||||
// classic groups, so listing consumer groups on an older broker does not need to use a types filter. | ||||||||||||||||||
private boolean regularConsumerGroupTypes = false; | ||||||||||||||||||
|
||||||||||||||||||
/** | ||||||||||||||||||
* Only consumer groups will be returned by listGroups(). | ||||||||||||||||||
* This operation sets filters on group type and protocol type which select consumer groups. | ||||||||||||||||||
*/ | ||||||||||||||||||
public static ListGroupsOptions forConsumerGroups() { | ||||||||||||||||||
return new ListGroupsOptions() | ||||||||||||||||||
.withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER)) | ||||||||||||||||||
.withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER), true) | ||||||||||||||||||
.withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE)); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/** | ||||||||||||||||||
* Only share groups will be returned by listGroups(). | ||||||||||||||||||
* This operation sets a filter on group type which select share groups. | ||||||||||||||||||
*/ | ||||||||||||||||||
public static ListGroupsOptions forShareGroups() { | ||||||||||||||||||
return new ListGroupsOptions() | ||||||||||||||||||
.withTypes(Set.of(GroupType.SHARE)); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/** | ||||||||||||||||||
* Only streams groups will be returned by listGroups(). | ||||||||||||||||||
* This operation sets a filter on group type which select streams groups. | ||||||||||||||||||
*/ | ||||||||||||||||||
public static ListGroupsOptions forStreamsGroups() { | ||||||||||||||||||
return new ListGroupsOptions() | ||||||||||||||||||
.withTypes(Set.of(GroupType.STREAMS)); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/** | ||||||||||||||||||
* If groupStates is set, only groups in these states will be returned by listGroups(). | ||||||||||||||||||
* Otherwise, all groups are returned. | ||||||||||||||||||
|
@@ -56,6 +78,10 @@ public ListGroupsOptions inGroupStates(Set<GroupState> groupStates) { | |||||||||||||||||
return this; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/** | ||||||||||||||||||
* If protocol types is set, only groups of these protocol types will be returned by listGroups(). | ||||||||||||||||||
* Otherwise, all groups are returned. | ||||||||||||||||||
*/ | ||||||||||||||||||
public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) { | ||||||||||||||||||
this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes); | ||||||||||||||||||
return this; | ||||||||||||||||||
Comment on lines
85
to
87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Insecure Default in Protocol Type FilteringThe withProtocolTypes method treats null or empty sets as 'no filtering' rather than 'no access'. This follows the principle of being permissive by default, which is not a secure design pattern. If a developer forgets to specify protocol types, the system will allow access to all protocol types rather than denying access by default, potentially leading to unintended information disclosure.
Suggested change
|
||||||||||||||||||
|
@@ -66,7 +92,12 @@ public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) { | |||||||||||||||||
* Otherwise, all groups are returned. | ||||||||||||||||||
*/ | ||||||||||||||||||
public ListGroupsOptions withTypes(Set<GroupType> types) { | ||||||||||||||||||
return this.withTypes(types, false); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
ListGroupsOptions withTypes(Set<GroupType> types, boolean regularConsumerGroupTypes) { | ||||||||||||||||||
this.types = (types == null || types.isEmpty()) ? Set.of() : Set.copyOf(types); | ||||||||||||||||||
this.regularConsumerGroupTypes = regularConsumerGroupTypes; | ||||||||||||||||||
return this; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -90,4 +121,8 @@ public Set<String> protocolTypes() { | |||||||||||||||||
public Set<GroupType> types() { | ||||||||||||||||||
return types; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
boolean regularConsumerGroupTypes() { | ||||||||||||||||||
return regularConsumerGroupTypes; | ||||||||||||||||||
} | ||||||||||||||||||
} |
Uh oh!
There was an error while loading. Please reload this page.