Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -55,14 +55,14 @@ public class MirrorCheckpointConnector extends SourceConnector {
private Admin sourceAdminClient;
private Admin targetAdminClient;
private SourceAndTarget sourceAndTarget;
private List<String> knownConsumerGroups = Collections.emptyList();
private Set<String> knownConsumerGroups = Collections.emptySet();

public MirrorCheckpointConnector() {
// nop
}

// visible for testing
MirrorCheckpointConnector(List<String> knownConsumerGroups, MirrorCheckpointConfig config) {
MirrorCheckpointConnector(Set<String> knownConsumerGroups, MirrorCheckpointConfig config) {
this.knownConsumerGroups = knownConsumerGroups;
this.config = config;
}
Expand Down Expand Up @@ -116,7 +116,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.emptyList();
}
int numTasks = Math.min(maxTasks, knownConsumerGroups.size());
List<List<String>> groupsPartitioned = ConnectorUtils.groupPartitions(knownConsumerGroups, numTasks);
List<List<String>> groupsPartitioned = ConnectorUtils.groupPartitions(new ArrayList<>(knownConsumerGroups), numTasks);
return IntStream.range(0, numTasks)
.mapToObj(i -> config.taskConfigForConsumerGroups(groupsPartitioned.get(i), i))
.collect(Collectors.toList());
Expand All @@ -134,12 +134,10 @@ public String version() {

private void refreshConsumerGroups()
throws InterruptedException, ExecutionException {
List<String> consumerGroups = findConsumerGroups();
Set<String> newConsumerGroups = new HashSet<>();
newConsumerGroups.addAll(consumerGroups);
Set<String> consumerGroups = findConsumerGroups();
Set<String> newConsumerGroups = new HashSet<>(consumerGroups);
newConsumerGroups.removeAll(knownConsumerGroups);
Set<String> deadConsumerGroups = new HashSet<>();
deadConsumerGroups.addAll(knownConsumerGroups);
Set<String> deadConsumerGroups = new HashSet<>(knownConsumerGroups);
deadConsumerGroups.removeAll(consumerGroups);
if (!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
log.info("Found {} consumer groups for {}. {} are new. {} were removed. Previously had {}.",
Expand All @@ -156,15 +154,15 @@ private void loadInitialConsumerGroups()
knownConsumerGroups = findConsumerGroups();
}

List<String> findConsumerGroups()
Set<String> findConsumerGroups()
throws InterruptedException, ExecutionException {
List<String> filteredGroups = listConsumerGroups().stream()
.map(ConsumerGroupListing::groupId)
.filter(this::shouldReplicateByGroupFilter)
.collect(Collectors.toList());

List<String> checkpointGroups = new LinkedList<>();
List<String> irrelevantGroups = new LinkedList<>();
Set<String> checkpointGroups = new HashSet<>();
Set<String> irrelevantGroups = new HashSet<>();

for (String group : filteredGroups) {
Set<String> consumedTopics = listConsumerGroupOffsets(group).keySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -49,7 +48,7 @@ public void testMirrorCheckpointConnectorDisabled() {
MirrorCheckpointConfig config = new MirrorCheckpointConfig(
makeProps("emit.checkpoints.enabled", "false"));

List<String> knownConsumerGroups = new ArrayList<>();
Set<String> knownConsumerGroups = new HashSet<>();
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
Expand All @@ -65,7 +64,7 @@ public void testMirrorCheckpointConnectorEnabled() {
MirrorCheckpointConfig config = new MirrorCheckpointConfig(
makeProps("emit.checkpoints.enabled", "true"));

List<String> knownConsumerGroups = new ArrayList<>();
Set<String> knownConsumerGroups = new HashSet<>();
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
Expand All @@ -81,7 +80,7 @@ public void testMirrorCheckpointConnectorEnabled() {
@Test
public void testNoConsumerGroup() {
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(new ArrayList<>(), config);
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(new HashSet<>(), config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
Expand All @@ -92,7 +91,7 @@ public void testReplicationDisabled() {
// disable the replication
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps("enabled", "false"));

List<String> knownConsumerGroups = new ArrayList<>();
Set<String> knownConsumerGroups = new HashSet<>();
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
Expand All @@ -106,7 +105,7 @@ public void testReplicationEnabled() {
// enable the replication
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps("enabled", "true"));

List<String> knownConsumerGroups = new ArrayList<>();
Set<String> knownConsumerGroups = new HashSet<>();
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
Expand All @@ -120,7 +119,7 @@ public void testReplicationEnabled() {
@Test
public void testFindConsumerGroups() throws Exception {
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptyList(), config);
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
connector = spy(connector);

Collection<ConsumerGroupListing> groups = Arrays.asList(
Expand All @@ -132,21 +131,21 @@ public void testFindConsumerGroups() throws Exception {
doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString());
doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString());
doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString());
List<String> groupFound = connector.findConsumerGroups();
Set<String> groupFound = connector.findConsumerGroups();

Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
assertEquals(expectedGroups, new HashSet<>(groupFound),
assertEquals(expectedGroups, groupFound,
"Expected groups are not the same as findConsumerGroups");

doReturn(false).when(connector).shouldReplicateByTopicFilter(anyString());
List<String> topicFilterGroupFound = connector.findConsumerGroups();
assertEquals(Collections.emptyList(), topicFilterGroupFound);
Set<String> topicFilterGroupFound = connector.findConsumerGroups();
assertEquals(Collections.emptySet(), topicFilterGroupFound);
}

@Test
public void testFindConsumerGroupsInCommonScenarios() throws Exception {
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptyList(), config);
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
connector = spy(connector);

Collection<ConsumerGroupListing> groups = Arrays.asList(
Expand Down Expand Up @@ -177,8 +176,11 @@ public void testFindConsumerGroupsInCommonScenarios() throws Exception {
doReturn(offsetsForGroup3).when(connector).listConsumerGroupOffsets("g3");
doReturn(offsetsForGroup4).when(connector).listConsumerGroupOffsets("g4");

List<String> groupFound = connector.findConsumerGroups();
assertEquals(groupFound, Arrays.asList("g1", "g2"));
Set<String> groupFound = connector.findConsumerGroups();
Set<String> verifiedSet = new HashSet<>();
verifiedSet.add("g1");
verifiedSet.add("g2");
assertEquals(groupFound, verifiedSet);
}

}