diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/LegacyReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/LegacyReplicationPolicy.java
new file mode 100644
index 0000000000000..261266ea444fb
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/LegacyReplicationPolicy.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+
+import static org.apache.kafka.connect.mirror.MirrorClientConfig.HEARTBEATS_TOPIC;
+
+/**
+ * The replication policy that imitates the behavior of MirrorMaker 1.
+ *
+ *
The policy doesn't rename topics: {@code topic1} remains {@code topic1} after replication.
+ * There is one exception to this: for {@code heartbeats}, it behaves identical to {@link DefaultReplicationPolicy}.
+ *
+ *
The policy has some notable limitations. The most important one is that the policy is unable to detect
+ * cycles for any topic apart from {@code heartbeats}. This makes cross-replication effectively impossible.
+ *
+ *
Another limitation is that {@link MirrorClient#remoteTopics()} will be able to list only
+ * {@code heartbeats} topics.
+ *
+ *
{@link MirrorClient#countHopsForTopic(String, String)} will return {@code -1} for any topic
+ * apart from {@code heartbeats}.
+ *
+ *
The policy supports {@link DefaultReplicationPolicy}'s configurations
+ * for the behavior related to {@code heartbeats}.
+ */
+public class LegacyReplicationPolicy implements ReplicationPolicy, Configurable {
+ // Replication sub-policy for heartbeats topics
+ private final DefaultReplicationPolicy heartbeatTopicReplicationPolicy = new DefaultReplicationPolicy();
+
+ @Override
+ public void configure(final Map props) {
+ heartbeatTopicReplicationPolicy.configure(props);
+ }
+
+ @Override
+ public String formatRemoteTopic(final String sourceClusterAlias, final String topic) {
+ if (isOriginalTopicHeartbeats(topic)) {
+ return heartbeatTopicReplicationPolicy.formatRemoteTopic(sourceClusterAlias, topic);
+ } else {
+ return topic;
+ }
+ }
+
+ @Override
+ public String topicSource(final String topic) {
+ if (isOriginalTopicHeartbeats(topic)) {
+ return heartbeatTopicReplicationPolicy.topicSource(topic);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public String upstreamTopic(final String topic) {
+ if (isOriginalTopicHeartbeats(topic)) {
+ return heartbeatTopicReplicationPolicy.upstreamTopic(topic);
+ } else {
+ return topic;
+ }
+ }
+
+ @Override
+ public String originalTopic(final String topic) {
+ if (isOriginalTopicHeartbeats(topic)) {
+ return HEARTBEATS_TOPIC;
+ } else {
+ return topic;
+ }
+ }
+
+ @Override
+ public boolean canTrackSource(final String topic) {
+ return isOriginalTopicHeartbeats(topic);
+ }
+
+ private boolean isOriginalTopicHeartbeats(final String topic) {
+ return HEARTBEATS_TOPIC.equals(heartbeatTopicReplicationPolicy.originalTopic(topic));
+ }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
index 11f73f50ceac5..939c1d3e6103c 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -57,4 +57,9 @@ default boolean isInternalTopic(String topic) {
return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
|| topic.startsWith(".");
}
+
+ /** Checks if the policy can track back to the source of the topic. */
+ default boolean canTrackSource(String topic) {
+ return !isInternalTopic(topic);
+ }
}
diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/LegacyReplicationPolicyTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/LegacyReplicationPolicyTest.java
new file mode 100644
index 0000000000000..937245108e041
--- /dev/null
+++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/LegacyReplicationPolicyTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class LegacyReplicationPolicyTest {
+ @Test
+ public void testFormatRemoteTopic() {
+ final LegacyReplicationPolicy legacyReplicationPolicy = new LegacyReplicationPolicy();
+ assertEquals("aaa", legacyReplicationPolicy.formatRemoteTopic("source1", "aaa"));
+ assertEquals("source1.heartbeats", legacyReplicationPolicy.formatRemoteTopic("source1", "heartbeats"));
+ assertEquals("source2.source1.heartbeats", legacyReplicationPolicy.formatRemoteTopic("source2", "source1.heartbeats"));
+
+ legacyReplicationPolicy.configure(Collections.singletonMap("replication.policy.separator", "__"));
+ assertEquals("aaa", legacyReplicationPolicy.formatRemoteTopic("source1", "aaa"));
+ assertEquals("source1__heartbeats", legacyReplicationPolicy.formatRemoteTopic("source1", "heartbeats"));
+ }
+
+ @Test
+ public void testTopicSource() {
+ final LegacyReplicationPolicy legacyReplicationPolicy = new LegacyReplicationPolicy();
+ assertNull(legacyReplicationPolicy.topicSource("source1.aaa"));
+ assertNull(legacyReplicationPolicy.topicSource("heartbeats"));
+ assertEquals("source1", legacyReplicationPolicy.topicSource("source1.heartbeats"));
+ assertEquals("source2", legacyReplicationPolicy.topicSource("source2.source1.heartbeats"));
+ }
+
+ @Test
+ public void testUpstreamTopic() {
+ final LegacyReplicationPolicy legacyReplicationPolicy = new LegacyReplicationPolicy();
+ assertEquals("aaa", legacyReplicationPolicy.upstreamTopic("aaa"));
+ assertEquals("source1.aaa", legacyReplicationPolicy.upstreamTopic("source1.aaa"));
+ assertEquals("heartbeats", legacyReplicationPolicy.upstreamTopic("source1.heartbeats"));
+ }
+
+ @Test
+ public void testOriginalTopic() {
+ final LegacyReplicationPolicy legacyReplicationPolicy = new LegacyReplicationPolicy();
+ assertEquals("aaa", legacyReplicationPolicy.originalTopic("aaa"));
+ assertEquals("source1.aaa", legacyReplicationPolicy.originalTopic("source1.aaa"));
+ assertEquals("source2.source1.aaa", legacyReplicationPolicy.originalTopic("source2.source1.aaa"));
+ assertEquals("heartbeats", legacyReplicationPolicy.originalTopic("heartbeats"));
+ assertEquals("heartbeats", legacyReplicationPolicy.originalTopic("source2.source1.heartbeats"));
+ }
+}
diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientDefaultReplicationPolicyTest.java
similarity index 99%
rename from connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
rename to connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientDefaultReplicationPolicyTest.java
index c2536d5db8508..f7c782144f296 100644
--- a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
+++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientDefaultReplicationPolicyTest.java
@@ -31,7 +31,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
-public class MirrorClientTest {
+public class MirrorClientDefaultReplicationPolicyTest {
private static class FakeMirrorClient extends MirrorClient {
diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientLegacyReplicationPolicyTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientLegacyReplicationPolicyTest.java
new file mode 100644
index 0000000000000..d47dd33903853
--- /dev/null
+++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientLegacyReplicationPolicyTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.common.Configurable;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MirrorClientLegacyReplicationPolicyTest {
+
+ private static class FakeMirrorClient extends MirrorClient {
+
+ List topics;
+
+ FakeMirrorClient(List topics) {
+ super(null, new LegacyReplicationPolicy(), null);
+ this.topics = topics;
+ }
+
+ FakeMirrorClient() {
+ this(Collections.emptyList());
+ }
+
+ @Override
+ protected Set listTopics() {
+ return new HashSet<>(topics);
+ }
+ }
+
+ @Test
+ public void testIsHeartbeatTopic() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+ assertTrue(client.isHeartbeatTopic("heartbeats"));
+ assertTrue(client.isHeartbeatTopic("source1.heartbeats"));
+ assertTrue(client.isHeartbeatTopic("source2.source1.heartbeats"));
+ assertFalse(client.isHeartbeatTopic("heartbeats!"));
+ assertFalse(client.isHeartbeatTopic("!heartbeats"));
+ assertFalse(client.isHeartbeatTopic("source1heartbeats"));
+ assertFalse(client.isHeartbeatTopic("source1-heartbeats"));
+ }
+
+ @Test
+ public void testIsCheckpointTopic() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+ assertTrue(client.isCheckpointTopic("source1.checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints-internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints.internal!"));
+ assertFalse(client.isCheckpointTopic("!checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("source1checkpointsinternal"));
+ }
+
+ @Test
+ public void countHopsForTopicTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+
+ // We can count hops only for heartbeats topics.
+ assertEquals(-1, client.countHopsForTopic("topic", "source"));
+ assertEquals(-1, client.countHopsForTopic("source", "source"));
+ assertEquals(-1, client.countHopsForTopic("sourcetopic", "source"));
+ assertEquals(-1, client.countHopsForTopic("source1.topic", "source2"));
+ assertEquals(-1, client.countHopsForTopic("source1.topic", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source2.source1.topic", "source2"));
+ assertEquals(-1, client.countHopsForTopic("source2.source1.topic", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source3.source2.source1.topic", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source3.source2.source1.topic", "source4"));
+
+ assertEquals(-1, client.countHopsForTopic("heartbeats", "source"));
+ assertEquals(1, client.countHopsForTopic("source1.heartbeats", "source1"));
+ assertEquals(1, client.countHopsForTopic("source2.source1.heartbeats", "source2"));
+ assertEquals(2, client.countHopsForTopic("source2.source1.heartbeats", "source1"));
+ assertEquals(3, client.countHopsForTopic("source3.source2.source1.heartbeats", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source3.source2.source1.heartbeats", "source4"));
+ }
+
+ @Test
+ public void heartbeatTopicsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source2.source1.heartbeats", "source3.heartbeats"));
+ Set heartbeatTopics = client.heartbeatTopics();
+ assertEquals(heartbeatTopics, new HashSet<>(Arrays.asList("heartbeats", "source1.heartbeats",
+ "source2.source1.heartbeats", "source3.heartbeats")));
+ }
+
+ @Test
+ public void checkpointsTopicsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "checkpoints.internal",
+ "source1.checkpoints.internal", "source2.source1.checkpoints.internal", "source3.checkpoints.internal"));
+ Set checkpointTopics = client.checkpointTopics();
+ assertEquals(new HashSet<>(Arrays.asList("source1.checkpoints.internal",
+ "source2.source1.checkpoints.internal", "source3.checkpoints.internal")), checkpointTopics);
+ }
+
+ @Test
+ public void replicationHopsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source1.source2.heartbeats", "source3.heartbeats", "source4.topic1"));
+ assertEquals(1, client.replicationHops("source1"));
+ assertEquals(2, client.replicationHops("source2"));
+ assertEquals(1, client.replicationHops("source3"));
+ assertEquals(-1, client.replicationHops("source4")); // can't count hops for non-heartbeats topic
+ assertEquals(-1, client.replicationHops("source5"));
+ }
+
+ @Test
+ public void upstreamClustersTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source1.source2.heartbeats", "source3.source4.source5.heartbeats", "source6.topic1"));
+ Set sources = client.upstreamClusters();
+ assertTrue(sources.contains("source1"));
+ assertTrue(sources.contains("source2"));
+ assertTrue(sources.contains("source3"));
+ assertTrue(sources.contains("source4"));
+ assertTrue(sources.contains("source5"));
+ assertFalse(sources.contains("source6")); // non-heartbeats topic can't indicate upstream cluster
+ assertFalse(sources.contains("sourceX"));
+ assertFalse(sources.contains(""));
+ assertFalse(sources.contains(null));
+ }
+
+ @Test
+ public void remoteTopicsTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
+ "source1.topic4", "source1.source2.topic5", "source3.source4.source5.topic6",
+ "source1.heartbeats", "source1.source2.heartbeats", "source3.source4.source5.heartbeats"));
+ Set remoteTopics = client.remoteTopics();
+
+ // We recognize as remote only heartbeats topics.
+ assertFalse(remoteTopics.contains("topic1"));
+ assertFalse(remoteTopics.contains("topic2"));
+ assertFalse(remoteTopics.contains("topic3"));
+ assertFalse(remoteTopics.contains("source1.topic4"));
+ assertFalse(remoteTopics.contains("source1.source2.topic5"));
+ assertFalse(remoteTopics.contains("source3.source4.source5.topic6"));
+
+ assertTrue(remoteTopics.contains("source1.heartbeats"));
+ assertTrue(remoteTopics.contains("source1.source2.heartbeats"));
+ assertTrue(remoteTopics.contains("source3.source4.source5.heartbeats"));
+ }
+
+ @Test
+ public void remoteTopicsSeparatorTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
+ "source1__topic4", "source1__source2__topic5", "source3__source4__source5__topic6",
+ "source1__heartbeats", "source1__source2__heartbeats", "source3__source4__source5__heartbeats"));
+ ((Configurable) client.replicationPolicy()).configure(
+ Collections.singletonMap("replication.policy.separator", "__"));
+ Set remoteTopics = client.remoteTopics();
+
+ // We recognize as remote only heartbeats topics.
+ assertFalse(remoteTopics.contains("topic1"));
+ assertFalse(remoteTopics.contains("topic2"));
+ assertFalse(remoteTopics.contains("topic3"));
+ assertFalse(remoteTopics.contains("source1__topic4"));
+ assertFalse(remoteTopics.contains("source1__source2__topic5"));
+ assertFalse(remoteTopics.contains("source3__source4__source5__topic6"));
+
+ assertTrue(remoteTopics.contains("source1__heartbeats"));
+ assertTrue(remoteTopics.contains("source1__source2__heartbeats"));
+ assertTrue(remoteTopics.contains("source3__source4__source5__heartbeats"));
+ }
+
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index e6d6922626183..c06af4ff3c2fb 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -201,7 +201,10 @@ List findSourceTopicPartitions()
List findTargetTopicPartitions()
throws InterruptedException, ExecutionException {
Set topics = listTopics(targetAdminClient).stream()
- .filter(t -> sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
+ // Allow replication policies that can't track back to the source of a topic (like LegacyReplicationPolicy)
+ // consider all topics as target topics.
+ .filter(t -> !replicationPolicy.canTrackSource(t)
+ || sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
.collect(Collectors.toSet());
return describeTopics(targetAdminClient, topics).stream()
.flatMap(MirrorSourceConnector::expandTopicDescription)
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 2567eb454d304..bce512bd04c0f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -32,7 +32,7 @@
public class MirrorCheckpointTaskTest {
@Test
- public void testDownstreamTopicRenaming() {
+ public void testDownstreamTopicRenamingWithDefaultReplicationPolicy() {
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
assertEquals(new TopicPartition("source1.topic3", 4),
@@ -44,7 +44,19 @@ public void testDownstreamTopicRenaming() {
}
@Test
- public void testCheckpoint() {
+ public void testDownstreamTopicNotRenamingWithLegacyReplicationPolicy() {
+ MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+ new LegacyReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
+ assertEquals(new TopicPartition("topic3", 4),
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
+ assertEquals(new TopicPartition("target2.topic3", 5),
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)));
+ assertEquals(new TopicPartition("source6.topic7", 8),
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
+ }
+
+ @Test
+ public void testCheckpointDefaultReplicationPolicy() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
@@ -70,6 +82,33 @@ public void testCheckpoint() {
assertEquals(234L, sourceRecord2.timestamp().longValue());
}
+ @Test
+ public void testCheckpointLegacyReplicationPolicy() {
+ OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+ MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+ new LegacyReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
+ offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
+ offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
+ Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
+ new OffsetAndMetadata(10, null));
+ SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
+ assertEquals(new TopicPartition("topic1", 2), checkpoint1.topicPartition());
+ assertEquals("group9", checkpoint1.consumerGroupId());
+ assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()));
+ assertEquals(10, checkpoint1.upstreamOffset());
+ assertEquals(11, checkpoint1.downstreamOffset());
+ assertEquals(123L, sourceRecord1.timestamp().longValue());
+ Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6),
+ new OffsetAndMetadata(12, null));
+ SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
+ assertEquals(new TopicPartition("target2.topic5", 6), checkpoint2.topicPartition());
+ assertEquals("group11", checkpoint2.consumerGroupId());
+ assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()));
+ assertEquals(12, checkpoint2.upstreamOffset());
+ assertEquals(13, checkpoint2.downstreamOffset());
+ assertEquals(234L, sourceRecord2.timestamp().longValue());
+ }
+
@Test
public void testSyncOffset() {
Map> idleConsumerGroupsOffset = new HashMap<>();
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsDefaultReplicationPolicyIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsDefaultReplicationPolicyIntegrationTest.java
new file mode 100644
index 0000000000000..7c2b975c34ac0
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsDefaultReplicationPolicyIntegrationTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests MM2 replication and failover/failback logic for {@link DefaultReplicationPolicy}.
+ *
+ * MM2 is configured with active/active replication between two Kafka clusters with {@link DefaultReplicationPolicy}.
+ * Tests validate that records sent to either cluster arrive at the other cluster. Then, a consumer group is migrated
+ * from one cluster to the other and back. Tests validate that consumer offsets are translated and replicated
+ * between clusters during this failover and failback.
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsDefaultReplicationPolicyIntegrationTest extends MirrorConnectorsIntegrationTest {
+
+ private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsDefaultReplicationPolicyIntegrationTest.class);
+
+ @Before
+ public void setup() throws InterruptedException {
+ Properties brokerProps = new Properties();
+ brokerProps.put("auto.create.topics.enable", "false");
+
+ mm2Props = new HashMap<>();
+ mm2Props.put("clusters", "primary, backup");
+ mm2Props.put("max.tasks", "10");
+ mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
+ mm2Props.put("groups", "consumer-group-.*");
+ mm2Props.put("primary->backup.enabled", "true");
+ mm2Props.put("backup->primary.enabled", "true");
+ mm2Props.put("sync.topic.acls.enabled", "false");
+ mm2Props.put("emit.checkpoints.interval.seconds", "1");
+ mm2Props.put("emit.heartbeats.interval.seconds", "1");
+ mm2Props.put("refresh.topics.interval.seconds", "1");
+ mm2Props.put("refresh.groups.interval.seconds", "1");
+ mm2Props.put("checkpoints.topic.replication.factor", "1");
+ mm2Props.put("heartbeats.topic.replication.factor", "1");
+ mm2Props.put("offset-syncs.topic.replication.factor", "1");
+ mm2Props.put("config.storage.replication.factor", "1");
+ mm2Props.put("offset.storage.replication.factor", "1");
+ mm2Props.put("status.storage.replication.factor", "1");
+ mm2Props.put("replication.factor", "1");
+
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ Map primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget("backup", "primary"));
+ Map backupWorkerProps = mm2Config.workerConfig(new SourceAndTarget("primary", "backup"));
+
+ primary = new EmbeddedConnectCluster.Builder()
+ .name("primary-connect-cluster")
+ .numWorkers(3)
+ .numBrokers(1)
+ .brokerProps(brokerProps)
+ .workerProps(primaryWorkerProps)
+ .build();
+
+ backup = new EmbeddedConnectCluster.Builder()
+ .name("backup-connect-cluster")
+ .numWorkers(3)
+ .numBrokers(1)
+ .brokerProps(brokerProps)
+ .workerProps(backupWorkerProps)
+ .build();
+
+ primary.start();
+ primary.assertions().assertAtLeastNumWorkersAreUp(3,
+ "Workers of primary-connect-cluster did not start in time.");
+ backup.start();
+ backup.assertions().assertAtLeastNumWorkersAreUp(3,
+ "Workers of backup-connect-cluster did not start in time.");
+
+ // create these topics before starting the connectors so we don't need to wait for discovery
+ primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
+ primary.kafka().createTopic("backup.test-topic-1", 1);
+ primary.kafka().createTopic("heartbeats", 1);
+ backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
+ backup.kafka().createTopic("primary.test-topic-1", 1);
+ backup.kafka().createTopic("heartbeats", 1);
+
+ // produce to all partitions of test-topic-1
+ produceMessages(primary, "test-topic-1", "message-1-");
+ produceMessages(backup, "test-topic-1", "message-2-");
+
+ // Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
+ Map dummyProps = Collections.singletonMap("group.id", "consumer-group-dummy");
+ Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(dummyProps, "test-topic-1");
+ consumeAllMessages(dummyConsumer);
+ dummyConsumer.close();
+ dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(dummyProps, "test-topic-1");
+ consumeAllMessages(dummyConsumer);
+ dummyConsumer.close();
+
+ log.info("primary REST service: {}", primary.endpointForResource("connectors"));
+ log.info("backup REST service: {}", backup.endpointForResource("connectors"));
+
+ log.info("primary brokers: {}", primary.kafka().bootstrapServers());
+ log.info("backup brokers: {}", backup.kafka().bootstrapServers());
+
+ // now that the brokers are running, we can finish setting up the Connectors
+ mm2Props.put("primary.bootstrap.servers", primary.kafka().bootstrapServers());
+ mm2Props.put("backup.bootstrap.servers", backup.kafka().bootstrapServers());
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ Exit.setExitProcedure((status, errorCode) -> exited.set(true));
+ }
+
+ @Test
+ public void testReplication() throws InterruptedException {
+ String consumerGroupName = "consumer-group-testReplication";
+ Map consumerProps = new HashMap() {{
+ put("group.id", consumerGroupName);
+ put("auto.offset.reset", "latest");
+ }};
+
+ // create consumers before starting the connectors so we don't need to wait for discovery
+ Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
+ consumeAllMessages(primaryConsumer, 0);
+ primaryConsumer.close();
+
+ Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
+ consumeAllMessages(backupConsumer, 0);
+ backupConsumer.close();
+
+ waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup", false);
+ waitUntilMirrorMakerIsRunning(primary, mm2Config, "backup", "primary", false);
+ MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary"));
+ MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup"));
+
+ assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED,
+ primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
+ assertEquals("Records were not replicated to backup cluster.", NUM_RECORDS_PRODUCED,
+ backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count());
+ assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PRODUCED,
+ backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
+ assertEquals("Records were not replicated to primary cluster.", NUM_RECORDS_PRODUCED,
+ primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1").count());
+
+ assertEquals("Primary cluster doesn't have all records from both clusters.", NUM_RECORDS_PRODUCED * 2,
+ primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1", "test-topic-1").count());
+ assertEquals("Backup cluster doesn't have all records from both clusters.", NUM_RECORDS_PRODUCED * 2,
+ backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1", "test-topic-1").count());
+
+ assertTrue("Heartbeats were not emitted to primary cluster.",
+ primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
+ assertTrue("Heartbeats were not emitted to backup cluster.",
+ backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
+ assertTrue("Heartbeats were not replicated downstream to backup cluster.",
+ backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0);
+ assertTrue("Heartbeats were not replicated downstream to primary cluster.",
+ primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0);
+
+ assertTrue("Did not find upstream primary cluster.", backupClient.upstreamClusters().contains("primary"));
+ assertEquals("Did not calculate replication hops correctly.", 1, backupClient.replicationHops("primary"));
+ assertTrue("Did not find upstream backup cluster.", primaryClient.upstreamClusters().contains("backup"));
+ assertEquals("Did not calculate replication hops correctly.", 1, primaryClient.replicationHops("backup"));
+
+ assertTrue("Checkpoints were not emitted downstream to backup cluster.",
+ backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0);
+
+ Map backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, "primary",
+ Duration.ofMillis(CHECKPOINT_DURATION_MS));
+
+ assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+ new TopicPartition("primary.test-topic-1", 0)));
+
+ // Failover consumer group to backup cluster.
+ backupConsumer = backup.kafka().createConsumer(consumerProps);
+ backupConsumer.assign(allPartitions("test-topic-1", "primary.test-topic-1"));
+ seek(backupConsumer, backupOffsets);
+ consumeAllMessages(backupConsumer, 0);
+
+ assertTrue("Consumer failedover to zero offset.", backupConsumer.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
+ assertTrue("Consumer failedover beyond expected offset.", backupConsumer.position(
+ new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+ assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
+ CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
+
+ backupConsumer.close();
+
+ waitForCondition(() -> {
+ try {
+ return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
+ Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0));
+ } catch (Throwable e) {
+ return false;
+ }
+ }, CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster.");
+
+ waitForCondition(() -> {
+ try {
+ return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
+ Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
+ } catch (Throwable e) {
+ return false;
+ }
+ }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
+
+ Map primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
+ Duration.ofMillis(CHECKPOINT_DURATION_MS));
+
+ // Failback consumer group to primary cluster
+ primaryConsumer = primary.kafka().createConsumer(consumerProps);
+ primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
+ seek(primaryConsumer, primaryOffsets);
+ consumeAllMessages(primaryConsumer, 0);
+
+ assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+ assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+ assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position(
+ new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+ assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position(
+ new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+
+ Map>> messages2 = consumeAllMessages(primaryConsumer, 0);
+ // If offset translation was successful we expect no messages to be consumed after failback
+ assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size());
+ primaryConsumer.close();
+
+ // create more matching topics
+ primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+ backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
+
+ produceMessages(primary, "test-topic-2", "message-3-", 1);
+ produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+ assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
+ primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+ assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION,
+ backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+ assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION,
+ primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+ assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
+ backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+ }
+
+ @Test
+ public void testReplicationWithEmptyPartition() throws Exception {
+ String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
+ Map consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+
+ // create topics
+ String topic = "test-topic-with-empty-partition";
+ primary.kafka().createTopic(topic, NUM_PARTITIONS);
+
+ // produce to all test-topic-empty's partitions *but the last one*, on the primary cluster
+ produceMessages(primary, topic, "message-1-", NUM_PARTITIONS - 1);
+ // Consume, from the primary cluster, before starting the connectors so we don't need to wait for discovery
+ int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
+ try (Consumer consumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
+ consumeAllMessages(consumer, expectedRecords);
+ }
+
+ waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup", false);
+
+ Map offsets = waitForConsumerGroupOffsetReplication(
+ Collections.singletonList("primary." + topic), consumerGroupName, false);
+
+ // check translated offset for the last partition (empty partition)
+ OffsetAndMetadata oam = offsets.get(new TopicPartition("primary." + topic, NUM_PARTITIONS - 1));
+ assertNotNull("Offset of last partition was not replicated", oam);
+ assertEquals("Offset of last partition is not zero", 0, oam.offset());
+ }
+
+ @Test
+ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
+ String topic1 = "test-topic-auto-offset-sync-1";
+ String topic2 = "test-topic-auto-offset-sync-2";
+ String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
+ Map consumerProps = new HashMap() {{
+ put("group.id", consumerGroupName);
+ put("auto.offset.reset", "earliest");
+ }};
+
+ // create new topic
+ primary.kafka().createTopic(topic1, NUM_PARTITIONS);
+ backup.kafka().createTopic("primary." + topic1, 1);
+
+ // produce some records to the new topic in primary cluster
+ produceMessages(primary, topic1, "message-1-");
+ // create consumers before starting the connectors so we don't need to wait for discovery
+ try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic1)) {
+ consumeAllMessages(primaryConsumer);
+ }
+
+ // enable automated consumer group offset sync
+ mm2Props.put("sync.group.offsets.enabled", "true");
+ mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ // one way replication from primary to backup
+ mm2Props.put("backup->primary.enabled", "false");
+
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup", false);
+
+ waitForConsumerGroupOffsetReplication(
+ Collections.singletonList("primary." + topic1), consumerGroupName, true);
+
+ // create a consumer at backup cluster with same consumer group Id to consume 1 topic
+ ConsumerRecords records = null;
+ try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, "primary." + topic1)) {
+ records = backupConsumer.poll(Duration.ofMillis(500));
+ }
+
+ // the size of consumer record should be zero, because the offsets of the same consumer group
+ // have been automatically synchronized from primary to backup by the background job, so no
+ // more records to consume from the replicated topic by the same consumer group at backup cluster
+ assertEquals("consumer record size is not zero", 0, records.count());
+
+ // create a second topic
+ primary.kafka().createTopic(topic2, NUM_PARTITIONS);
+ backup.kafka().createTopic("primary." + topic2, 1);
+
+ // produce some records to the new topic in primary cluster
+ produceMessages(primary, topic2, "message-1-");
+
+ // create a consumer at primary cluster to consume the new topic
+ try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(
+ consumerProps, topic2)) {
+ // we need to wait for consuming all the records for MM2 replicating the expected offsets
+ consumeAllMessages(primaryConsumer);
+ }
+
+ waitForConsumerGroupOffsetReplication(Arrays.asList("primary." + topic1, "primary." + topic2), consumerGroupName, true);
+
+ // create a consumer at backup cluster with same consumer group Id to consume old and new topic
+ try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, "primary." + topic1, "primary." + topic2)) {
+ records = backupConsumer.poll(Duration.ofMillis(500));
+ }
+
+ // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+ assertEquals("consumer record size is not zero", 0, records.count());
+ }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
index a71b28c0aa9a1..c2a24394b5f31 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
@@ -22,16 +22,10 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.common.utils.Exit;
import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
@@ -41,7 +35,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,148 +42,25 @@
import java.util.stream.IntStream;
import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-/**
- * Tests MM2 replication and failover/failback logic.
- *
- * MM2 is configured with active/active replication between two Kafka clusters. Tests validate that
- * records sent to either cluster arrive at the other cluster. Then, a consumer group is migrated from
- * one cluster to the other and back. Tests validate that consumer offsets are translated and replicated
- * between clusters during this failover and failback.
- */
-@Category(IntegrationTest.class)
-public class MirrorConnectorsIntegrationTest {
-
- private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationTest.class);
-
- private static final int NUM_RECORDS_PER_PARTITION = 10;
- private static final int NUM_PARTITIONS = 10;
- private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION;
- private static final int RECORD_TRANSFER_DURATION_MS = 30_000;
- private static final int CHECKPOINT_DURATION_MS = 20_000;
- private static final int RECORD_CONSUME_DURATION_MS = 20_000;
- private static final int OFFSET_SYNC_DURATION_MS = 30_000;
-
- private final AtomicBoolean exited = new AtomicBoolean(false);
- private Map mm2Props;
- private MirrorMakerConfig mm2Config;
- private EmbeddedConnectCluster primary;
- private EmbeddedConnectCluster backup;
-
- @Before
- public void setup() throws InterruptedException {
- Properties brokerProps = new Properties();
- brokerProps.put("auto.create.topics.enable", "false");
-
- mm2Props = new HashMap<>();
- mm2Props.put("clusters", "primary, backup");
- mm2Props.put("max.tasks", "10");
- mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
- mm2Props.put("groups", "consumer-group-.*");
- mm2Props.put("primary->backup.enabled", "true");
- mm2Props.put("backup->primary.enabled", "true");
- mm2Props.put("sync.topic.acls.enabled", "false");
- mm2Props.put("emit.checkpoints.interval.seconds", "1");
- mm2Props.put("emit.heartbeats.interval.seconds", "1");
- mm2Props.put("refresh.topics.interval.seconds", "1");
- mm2Props.put("refresh.groups.interval.seconds", "1");
- mm2Props.put("checkpoints.topic.replication.factor", "1");
- mm2Props.put("heartbeats.topic.replication.factor", "1");
- mm2Props.put("offset-syncs.topic.replication.factor", "1");
- mm2Props.put("config.storage.replication.factor", "1");
- mm2Props.put("offset.storage.replication.factor", "1");
- mm2Props.put("status.storage.replication.factor", "1");
- mm2Props.put("replication.factor", "1");
-
- mm2Config = new MirrorMakerConfig(mm2Props);
- Map primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget("backup", "primary"));
- Map backupWorkerProps = mm2Config.workerConfig(new SourceAndTarget("primary", "backup"));
-
- primary = new EmbeddedConnectCluster.Builder()
- .name("primary-connect-cluster")
- .numWorkers(3)
- .numBrokers(1)
- .brokerProps(brokerProps)
- .workerProps(primaryWorkerProps)
- .build();
-
- backup = new EmbeddedConnectCluster.Builder()
- .name("backup-connect-cluster")
- .numWorkers(3)
- .numBrokers(1)
- .brokerProps(brokerProps)
- .workerProps(backupWorkerProps)
- .build();
-
- primary.start();
- primary.assertions().assertAtLeastNumWorkersAreUp(3,
- "Workers of primary-connect-cluster did not start in time.");
- backup.start();
- backup.assertions().assertAtLeastNumWorkersAreUp(3,
- "Workers of backup-connect-cluster did not start in time.");
-
- // create these topics before starting the connectors so we don't need to wait for discovery
- primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
- primary.kafka().createTopic("backup.test-topic-1", 1);
- primary.kafka().createTopic("heartbeats", 1);
- backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
- backup.kafka().createTopic("primary.test-topic-1", 1);
- backup.kafka().createTopic("heartbeats", 1);
+abstract class MirrorConnectorsIntegrationTest {
- // produce to all partitions of test-topic-1
- produceMessages(primary, "test-topic-1", "message-1-");
- produceMessages(backup, "test-topic-1", "message-2-");
+ protected static final int NUM_RECORDS_PER_PARTITION = 10;
+ protected static final int NUM_PARTITIONS = 10;
+ protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION;
+ protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
+ protected static final int CHECKPOINT_DURATION_MS = 20_000;
+ protected static final int RECORD_CONSUME_DURATION_MS = 20_000;
+ protected static final int OFFSET_SYNC_DURATION_MS = 30_000;
- // Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
- Map dummyProps = Collections.singletonMap("group.id", "consumer-group-dummy");
- Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(dummyProps, "test-topic-1");
- consumeAllMessages(dummyConsumer);
- dummyConsumer.close();
- dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(dummyProps, "test-topic-1");
- consumeAllMessages(dummyConsumer);
- dummyConsumer.close();
-
- log.info("primary REST service: {}", primary.endpointForResource("connectors"));
- log.info("backup REST service: {}", backup.endpointForResource("connectors"));
-
- log.info("primary brokers: {}", primary.kafka().bootstrapServers());
- log.info("backup brokers: {}", backup.kafka().bootstrapServers());
-
- // now that the brokers are running, we can finish setting up the Connectors
- mm2Props.put("primary.bootstrap.servers", primary.kafka().bootstrapServers());
- mm2Props.put("backup.bootstrap.servers", backup.kafka().bootstrapServers());
- mm2Config = new MirrorMakerConfig(mm2Props);
-
- Exit.setExitProcedure((status, errorCode) -> exited.set(true));
- }
-
-
- private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
- MirrorMakerConfig mm2Config, String primary, String backup) throws InterruptedException {
-
- connectCluster.configureConnector("MirrorSourceConnector",
- mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorSourceConnector.class));
- connectCluster.configureConnector("MirrorCheckpointConnector",
- mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorCheckpointConnector.class));
- connectCluster.configureConnector("MirrorHeartbeatConnector",
- mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorHeartbeatConnector.class));
-
- // we wait for the connector and tasks to come up for each connector, so that when we do the
- // actual testing, we are certain that the tasks are up and running; this will prevent
- // flaky tests where the connector and tasks didn't start up in time for the tests to be
- // run
- Set connectorNames = new HashSet<>(Arrays.asList("MirrorSourceConnector",
- "MirrorCheckpointConnector", "MirrorHeartbeatConnector"));
+ protected final AtomicBoolean exited = new AtomicBoolean(false);
- for (String connector : connectorNames) {
- connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector, 1,
- "Connector " + connector + " tasks did not start in time on cluster: " + connectCluster);
- }
- }
+ protected Map mm2Props;
+ protected MirrorMakerConfig mm2Config;
+ protected EmbeddedConnectCluster primary;
+ protected EmbeddedConnectCluster backup;
@After
public void close() {
@@ -211,165 +81,43 @@ public void close() {
}
}
- @Test
- public void testReplication() throws InterruptedException {
- String consumerGroupName = "consumer-group-testReplication";
- Map consumerProps = new HashMap() {{
- put("group.id", consumerGroupName);
- put("auto.offset.reset", "latest");
- }};
-
- // create consumers before starting the connectors so we don't need to wait for discovery
- Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
- consumeAllMessages(primaryConsumer, 0);
- primaryConsumer.close();
-
- Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
- consumeAllMessages(backupConsumer, 0);
- backupConsumer.close();
-
- waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
- waitUntilMirrorMakerIsRunning(primary, mm2Config, "backup", "primary");
- MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary"));
- MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup"));
-
- assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED,
- primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
- assertEquals("Records were not replicated to backup cluster.", NUM_RECORDS_PRODUCED,
- backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count());
- assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PRODUCED,
- backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
- assertEquals("Records were not replicated to primary cluster.", NUM_RECORDS_PRODUCED,
- primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1").count());
-
- assertEquals("Primary cluster doesn't have all records from both clusters.", NUM_RECORDS_PRODUCED * 2,
- primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1", "test-topic-1").count());
- assertEquals("Backup cluster doesn't have all records from both clusters.", NUM_RECORDS_PRODUCED * 2,
- backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1", "test-topic-1").count());
-
- assertTrue("Heartbeats were not emitted to primary cluster.",
- primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
- assertTrue("Heartbeats were not emitted to backup cluster.",
- backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
- assertTrue("Heartbeats were not replicated downstream to backup cluster.",
- backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0);
- assertTrue("Heartbeats were not replicated downstream to primary cluster.",
- primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0);
-
- assertTrue("Did not find upstream primary cluster.", backupClient.upstreamClusters().contains("primary"));
- assertEquals("Did not calculate replication hops correctly.", 1, backupClient.replicationHops("primary"));
- assertTrue("Did not find upstream backup cluster.", primaryClient.upstreamClusters().contains("backup"));
- assertEquals("Did not calculate replication hops correctly.", 1, primaryClient.replicationHops("backup"));
-
- assertTrue("Checkpoints were not emitted downstream to backup cluster.",
- backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0);
-
- Map backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, "primary",
- Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
- assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
- new TopicPartition("primary.test-topic-1", 0)));
-
- // Failover consumer group to backup cluster.
- backupConsumer = backup.kafka().createConsumer(consumerProps);
- backupConsumer.assign(allPartitions("test-topic-1", "primary.test-topic-1"));
- seek(backupConsumer, backupOffsets);
- consumeAllMessages(backupConsumer, 0);
-
- assertTrue("Consumer failedover to zero offset.", backupConsumer.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
- assertTrue("Consumer failedover beyond expected offset.", backupConsumer.position(
- new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
- assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
- CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
-
- backupConsumer.close();
-
- waitForCondition(() -> {
- try {
- return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
- Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0));
- } catch (Throwable e) {
- return false;
- }
- }, CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster.");
-
- waitForCondition(() -> {
- try {
- return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
- Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
- } catch (Throwable e) {
- return false;
- }
- }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
-
- Map primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
- Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
- // Failback consumer group to primary cluster
- primaryConsumer = primary.kafka().createConsumer(consumerProps);
- primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
- seek(primaryConsumer, primaryOffsets);
- consumeAllMessages(primaryConsumer, 0);
-
- assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
- assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
- assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position(
- new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
- assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position(
- new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
-
- Map>> messages2 = consumeAllMessages(primaryConsumer, 0);
- // If offset translation was successful we expect no messages to be consumed after failback
- assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size());
- primaryConsumer.close();
-
- // create more matching topics
- primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
- backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
-
- produceMessages(primary, "test-topic-2", "message-3-", 1);
- produceMessages(backup, "test-topic-3", "message-4-", 1);
-
- assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
- primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
- assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION,
- backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
-
- assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION,
- primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
- assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
- backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+ protected void deleteAllTopics(EmbeddedKafkaCluster cluster) {
+ Admin client = cluster.createAdminClient();
+ try {
+ client.deleteTopics(client.listTopics().names().get());
+ } catch (Throwable e) {
+ }
}
- @Test
- public void testReplicationWithEmptyPartition() throws Exception {
- String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
- Map consumerProps = Collections.singletonMap("group.id", consumerGroupName);
-
- // create topics
- String topic = "test-topic-with-empty-partition";
- primary.kafka().createTopic(topic, NUM_PARTITIONS);
+ protected void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
+ MirrorMakerConfig mm2Config, String primary, String backup, boolean onlyHeartbeat) throws InterruptedException {
- // produce to all test-topic-empty's partitions *but the last one*, on the primary cluster
- produceMessages(primary, topic, "message-1-", NUM_PARTITIONS - 1);
- // Consume, from the primary cluster, before starting the connectors so we don't need to wait for discovery
- int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
- try (Consumer consumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
- consumeAllMessages(consumer, expectedRecords);
+ if (!onlyHeartbeat) {
+ connectCluster.configureConnector("MirrorSourceConnector",
+ mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorSourceConnector.class));
+ connectCluster.configureConnector("MirrorCheckpointConnector",
+ mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorCheckpointConnector.class));
}
+ connectCluster.configureConnector("MirrorHeartbeatConnector",
+ mm2Config.connectorBaseConfig(new SourceAndTarget(primary, backup), MirrorHeartbeatConnector.class));
- waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
-
- Map offsets = waitForConsumerGroupOffsetReplication(
- Collections.singletonList("primary." + topic), consumerGroupName, false);
+ // we wait for the connector and tasks to come up for each connector, so that when we do the
+ // actual testing, we are certain that the tasks are up and running; this will prevent
+ // flaky tests where the connector and tasks didn't start up in time for the tests to be
+ // run
+ Set connectorNames = new HashSet<>(Collections.singletonList("MirrorHeartbeatConnector"));
+ if (!onlyHeartbeat) {
+ connectorNames.add("MirrorSourceConnector");
+ connectorNames.add("MirrorCheckpointConnector");
+ }
- // check translated offset for the last partition (empty partition)
- OffsetAndMetadata oam = offsets.get(new TopicPartition("primary." + topic, NUM_PARTITIONS - 1));
- assertNotNull("Offset of last partition was not replicated", oam);
- assertEquals("Offset of last partition is not zero", 0, oam.offset());
+ for (String connector : connectorNames) {
+ connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector, 1,
+ "Connector " + connector + " tasks did not start in time on cluster: " + connectCluster);
+ }
}
- private Map waitForConsumerGroupOffsetReplication(List topics, String consumerGroupId, boolean waitForAutoSync)
+ protected Map waitForConsumerGroupOffsetReplication(List topics, String consumerGroupId, boolean waitForAutoSync)
throws InterruptedException {
Admin backupClient = backup.kafka().createAdminClient();
List tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
@@ -400,7 +148,7 @@ private Map waitForConsumerGroupOffsetReplica
return offsets;
}
- private Map getTranslatedOffsets(String consumerGroupId)
+ protected Map getTranslatedOffsets(String consumerGroupId)
throws TimeoutException, InterruptedException {
return RemoteClusterUtils.translateOffsets(
mm2Config.clientConfig("backup").adminConfig(),
@@ -409,91 +157,11 @@ private Map getTranslatedOffsets(String consu
Duration.ofMillis(CHECKPOINT_DURATION_MS));
}
- @Test
- public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
- String topic1 = "test-topic-auto-offset-sync-1";
- String topic2 = "test-topic-auto-offset-sync-2";
- String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
- Map consumerProps = new HashMap() {{
- put("group.id", consumerGroupName);
- put("auto.offset.reset", "earliest");
- }};
-
- // create new topic
- primary.kafka().createTopic(topic1, NUM_PARTITIONS);
- backup.kafka().createTopic("primary." + topic1, 1);
-
- // produce some records to the new topic in primary cluster
- produceMessages(primary, topic1, "message-1-");
- // create consumers before starting the connectors so we don't need to wait for discovery
- try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic1)) {
- consumeAllMessages(primaryConsumer);
- }
-
- // enable automated consumer group offset sync
- mm2Props.put("sync.group.offsets.enabled", "true");
- mm2Props.put("sync.group.offsets.interval.seconds", "1");
- // one way replication from primary to backup
- mm2Props.put("backup->primary.enabled", "false");
-
- mm2Config = new MirrorMakerConfig(mm2Props);
-
- waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
-
- waitForConsumerGroupOffsetReplication(
- Collections.singletonList("primary." + topic1), consumerGroupName, true);
-
- // create a consumer at backup cluster with same consumer group Id to consume 1 topic
- ConsumerRecords records = null;
- try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
- consumerProps, "primary." + topic1)) {
- records = backupConsumer.poll(Duration.ofMillis(500));
- }
-
- // the size of consumer record should be zero, because the offsets of the same consumer group
- // have been automatically synchronized from primary to backup by the background job, so no
- // more records to consume from the replicated topic by the same consumer group at backup cluster
- assertEquals("consumer record size is not zero", 0, records.count());
-
- // create a second topic
- primary.kafka().createTopic(topic2, NUM_PARTITIONS);
- backup.kafka().createTopic("primary." + topic2, 1);
-
- // produce some records to the new topic in primary cluster
- produceMessages(primary, topic2, "message-1-");
-
- // create a consumer at primary cluster to consume the new topic
- try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(
- consumerProps, topic2)) {
- // we need to wait for consuming all the records for MM2 replicating the expected offsets
- consumeAllMessages(primaryConsumer);
- }
-
- waitForConsumerGroupOffsetReplication(Arrays.asList("primary." + topic1, "primary." + topic2), consumerGroupName, true);
-
- // create a consumer at backup cluster with same consumer group Id to consume old and new topic
- try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
- consumerProps, "primary." + topic1, "primary." + topic2)) {
- records = backupConsumer.poll(Duration.ofMillis(500));
- }
-
- // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
- assertEquals("consumer record size is not zero", 0, records.count());
- }
-
- private void deleteAllTopics(EmbeddedKafkaCluster cluster) {
- Admin client = cluster.createAdminClient();
- try {
- client.deleteTopics(client.listTopics().names().get());
- } catch (Throwable e) {
- }
- }
-
- private void produceMessages(EmbeddedConnectCluster cluster, String topicName, String msgPrefix) {
+ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, String msgPrefix) {
produceMessages(cluster, topicName, msgPrefix, NUM_PARTITIONS);
}
- private void produceMessages(EmbeddedConnectCluster cluster, String topicName, String msgPrefix, int numPartitions) {
+ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, String msgPrefix, int numPartitions) {
// produce the configured number of records to all specified partitions
int cnt = 0;
for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
@@ -501,15 +169,15 @@ private void produceMessages(EmbeddedConnectCluster cluster, String topicName, S
cluster.kafka().produce(topicName, p, "key", msgPrefix + cnt++);
}
- private Map>> consumeAllMessages(Consumer consumer) throws InterruptedException {
+ protected Map>> consumeAllMessages(Consumer consumer) throws InterruptedException {
return consumeAllMessages(consumer, null, null);
}
- private Map>> consumeAllMessages(Consumer consumer, Integer expectedRecords) throws InterruptedException {
+ protected Map>> consumeAllMessages(Consumer consumer, Integer expectedRecords) throws InterruptedException {
return consumeAllMessages(consumer, expectedRecords, null);
}
- private Map>> consumeAllMessages(Consumer consumer, Integer numExpectedRecords, Duration timeoutDurationMs)
+ protected Map>> consumeAllMessages(Consumer consumer, Integer numExpectedRecords, Duration timeoutDurationMs)
throws InterruptedException {
int expectedRecords = numExpectedRecords != null ? numExpectedRecords : NUM_RECORDS_PRODUCED;
int timeoutMs = (int) (timeoutDurationMs != null ? timeoutDurationMs.toMillis() : RECORD_CONSUME_DURATION_MS);
@@ -528,7 +196,7 @@ private Map>> consumeAllMess
return records.stream().collect(Collectors.groupingBy(c -> new TopicPartition(c.topic(), c.partition())));
}
- private void seek(Consumer consumer, Map offsets) throws InterruptedException {
+ protected void seek(Consumer consumer, Map offsets) throws InterruptedException {
// In case offsets are replicated faster than actual records, wait until records are replicated before seeking
waitForCondition(() -> {
boolean ready = true;
@@ -546,7 +214,7 @@ private void seek(Consumer consumer, Map allPartitions(String... topics) {
+ protected List allPartitions(String... topics) {
return IntStream.range(0, NUM_PARTITIONS)
.boxed()
.flatMap(p -> Arrays.stream(topics).map(t -> new TopicPartition(t, p)))
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsLegacyReplicationPolicyIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsLegacyReplicationPolicyIntegrationTest.java
new file mode 100644
index 0000000000000..33d47547205ba
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsLegacyReplicationPolicyIntegrationTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests MM2 replication and failover logic for {@link LegacyReplicationPolicy}.
+ *
+ * MM2 is configured with active/passive replication between two Kafka clusters with {@link LegacyReplicationPolicy}.
+ * Tests validate that records sent to the primary cluster arrive at the backup cluster. Then, a consumer group is
+ * migrated from the primary cluster to the backup cluster. Tests validate that consumer offsets
+ * are translated and replicated from the primary cluster to the backup cluster during this failover.
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsLegacyReplicationPolicyIntegrationTest extends MirrorConnectorsIntegrationTest {
+
+ private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsDefaultReplicationPolicyIntegrationTest.class);
+
+ @Before
+ public void setup() throws InterruptedException {
+ Properties brokerProps = new Properties();
+ brokerProps.put("auto.create.topics.enable", "false");
+
+ mm2Props = new HashMap<>();
+ mm2Props.put("clusters", "primary, backup");
+ mm2Props.put("max.tasks", "10");
+ mm2Props.put("topics", "test-topic-.*");
+ mm2Props.put("groups", "consumer-group-.*");
+ mm2Props.put("primary->backup.enabled", "true");
+ mm2Props.put("backup->primary.enabled", "false");
+ mm2Props.put("sync.topic.acls.enabled", "false");
+ mm2Props.put("emit.checkpoints.interval.seconds", "1");
+ mm2Props.put("emit.heartbeats.interval.seconds", "1");
+ mm2Props.put("refresh.topics.interval.seconds", "1");
+ mm2Props.put("refresh.groups.interval.seconds", "1");
+ mm2Props.put("checkpoints.topic.replication.factor", "1");
+ mm2Props.put("heartbeats.topic.replication.factor", "1");
+ mm2Props.put("offset-syncs.topic.replication.factor", "1");
+ mm2Props.put("config.storage.replication.factor", "1");
+ mm2Props.put("offset.storage.replication.factor", "1");
+ mm2Props.put("status.storage.replication.factor", "1");
+ mm2Props.put("replication.factor", "1");
+ mm2Props.put("replication.policy.class", LegacyReplicationPolicy.class.getName());
+
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ Map primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget("backup", "primary"));
+ Map backupWorkerProps = mm2Config.workerConfig(new SourceAndTarget("primary", "backup"));
+
+ primary = new EmbeddedConnectCluster.Builder()
+ .name("primary-connect-cluster")
+ .numWorkers(3)
+ .numBrokers(1)
+ .brokerProps(brokerProps)
+ .workerProps(primaryWorkerProps)
+ .build();
+
+ backup = new EmbeddedConnectCluster.Builder()
+ .name("backup-connect-cluster")
+ .numWorkers(3)
+ .numBrokers(1)
+ .brokerProps(brokerProps)
+ .workerProps(backupWorkerProps)
+ .build();
+
+ primary.start();
+ primary.assertions().assertAtLeastNumWorkersAreUp(3,
+ "Workers of primary-connect-cluster did not start in time.");
+ backup.start();
+ backup.assertions().assertAtLeastNumWorkersAreUp(3,
+ "Workers of backup-connect-cluster did not start in time.");
+
+ // create these topics before starting the connectors so we don't need to wait for discovery
+ primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
+ primary.kafka().createTopic("heartbeats", 1);
+ backup.kafka().createTopic("heartbeats", 1);
+
+ // produce to all partitions of test-topic-1
+ produceMessages(primary, "test-topic-1", "message-1-");
+
+ // Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
+ Map dummyProps = Collections.singletonMap("group.id", "consumer-group-dummy");
+ Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(dummyProps, "test-topic-1");
+ consumeAllMessages(dummyConsumer);
+ dummyConsumer.close();
+
+ log.info("primary REST service: {}", primary.endpointForResource("connectors"));
+ log.info("backup REST service: {}", backup.endpointForResource("connectors"));
+
+ log.info("primary brokers: {}", primary.kafka().bootstrapServers());
+ log.info("backup brokers: {}", backup.kafka().bootstrapServers());
+
+ // now that the brokers are running, we can finish setting up the Connectors
+ mm2Props.put("primary.bootstrap.servers", primary.kafka().bootstrapServers());
+ mm2Props.put("backup.bootstrap.servers", backup.kafka().bootstrapServers());
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ Exit.setExitProcedure((status, errorCode) -> exited.set(true));
+ }
+
+ @Test
+ public void testReplication() throws InterruptedException {
+ String consumerGroupName = "consumer-group-testReplication";
+ Map consumerProps = new HashMap() {
+ {
+ put("group.id", consumerGroupName);
+ put("auto.offset.reset", "latest");
+ }
+ };
+
+ // create consumer before starting the connectors so we don't need to wait for discovery
+ Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
+ consumeAllMessages(primaryConsumer, 0);
+ primaryConsumer.close();
+
+ waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup", false);
+ waitUntilMirrorMakerIsRunning(primary, mm2Config, "backup", "primary", true);
+ MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary"));
+ MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup"));
+
+ assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED,
+ primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
+ assertEquals("Records were not replicated to backup cluster.", NUM_RECORDS_PRODUCED,
+ backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
+
+ assertTrue("Heartbeats were not emitted to primary cluster.",
+ primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
+ assertTrue("Heartbeats were not replicated downstream to backup cluster.",
+ backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0);
+
+ assertTrue("Did not find upstream primary cluster.", backupClient.upstreamClusters().contains("primary"));
+ assertEquals("Did not calculate replication hops correctly.", 1, backupClient.replicationHops("primary"));
+
+ // Note that replication policy is not used for checkpoint topic names.
+ assertTrue("Checkpoints were not emitted downstream to backup cluster.",
+ backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0);
+
+ Map backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, "primary",
+ Duration.ofMillis(CHECKPOINT_DURATION_MS));
+
+ assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+ new TopicPartition("test-topic-1", 0)));
+
+ // Failover consumer group to backup cluster.
+ Consumer backupConsumer = backup.kafka().createConsumer(consumerProps);
+ backupConsumer.assign(allPartitions("test-topic-1", "test-topic-1"));
+ seek(backupConsumer, backupOffsets);
+ consumeAllMessages(backupConsumer, 0);
+
+ assertTrue("Consumer failedover to zero offset.", backupConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+ assertTrue("Consumer failedover beyond expected offset.", backupConsumer.position(
+ new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+
+ backupConsumer.close();
+
+ // create more matching topics
+ primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+ produceMessages(primary, "test-topic-2", "message-3-", 1);
+
+ assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
+ primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+
+ assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
+ backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+ }
+
+ @Test
+ public void testReplicationWithEmptyPartition() throws Exception {
+ String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
+ Map consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+
+ // create topics
+ String topic = "test-topic-with-empty-partition";
+ primary.kafka().createTopic(topic, NUM_PARTITIONS);
+
+ // produce to all test-topic-empty's partitions *but the last one*, on the primary cluster
+ produceMessages(primary, topic, "message-1-", NUM_PARTITIONS - 1);
+ // Consume, from the primary cluster, before starting the connectors so we don't need to wait for discovery
+ int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
+ try (Consumer consumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
+ consumeAllMessages(consumer, expectedRecords);
+ }
+
+ waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup", false);
+
+ Map offsets = waitForConsumerGroupOffsetReplication(
+ Collections.singletonList(topic), consumerGroupName, false);
+
+ // check translated offset for the last partition (empty partition)
+ OffsetAndMetadata oam = offsets.get(new TopicPartition(topic, NUM_PARTITIONS - 1));
+ assertNotNull("Offset of last partition was not replicated", oam);
+ assertEquals("Offset of last partition is not zero", 0, oam.offset());
+ }
+
+ @Test
+ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
+ String topic1 = "test-topic-auto-offset-sync-1";
+ String topic2 = "test-topic-auto-offset-sync-2";
+ String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
+ Map consumerProps = new HashMap() {
+ {
+ put("group.id", consumerGroupName);
+ put("auto.offset.reset", "earliest");
+ }
+ };
+
+ // create new topic
+ primary.kafka().createTopic(topic1, NUM_PARTITIONS);
+ backup.kafka().createTopic(topic1, 1);
+
+ // produce some records to the new topic in primary cluster
+ produceMessages(primary, topic1, "message-1-");
+ // create consumers before starting the connectors so we don't need to wait for discovery
+ try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic1)) {
+ consumeAllMessages(primaryConsumer);
+ }
+
+ // enable automated consumer group offset sync
+ mm2Props.put("sync.group.offsets.enabled", "true");
+ mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ // one way replication from primary to backup
+ mm2Props.put("backup->primary.enabled", "false");
+
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup", false);
+
+ waitForConsumerGroupOffsetReplication(Collections.singletonList(topic1), consumerGroupName, true);
+
+ // create a consumer at backup cluster with same consumer group Id to consume 1 topic
+ ConsumerRecords records = null;
+ try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, topic1)) {
+ records = backupConsumer.poll(Duration.ofMillis(500));
+ }
+
+ // the size of consumer record should be zero, because the offsets of the same consumer group
+ // have been automatically synchronized from primary to backup by the background job, so no
+ // more records to consume from the replicated topic by the same consumer group at backup cluster
+ assertEquals("consumer record size is not zero", 0, records.count());
+
+ // create a second topic
+ primary.kafka().createTopic(topic2, NUM_PARTITIONS);
+ backup.kafka().createTopic(topic2, 1);
+
+ // produce some records to the new topic in primary cluster
+ produceMessages(primary, topic2, "message-1-");
+
+ // create a consumer at primary cluster to consume the new topic
+ try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(
+ consumerProps, topic2)) {
+ // we need to wait for consuming all the records for MM2 replicating the expected offsets
+ consumeAllMessages(primaryConsumer);
+ }
+
+ waitForConsumerGroupOffsetReplication(Arrays.asList(topic1, topic2), consumerGroupName, true);
+
+ // create a consumer at backup cluster with same consumer group Id to consume old and new topic
+ try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, topic1, topic2)) {
+ records = backupConsumer.poll(Duration.ofMillis(500));
+ }
+
+ // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+ assertEquals("consumer record size is not zero", 0, records.count());
+ }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorDefaultReplicationPolicyTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorDefaultReplicationPolicyTest.java
new file mode 100644
index 0000000000000..4faca168180a9
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorDefaultReplicationPolicyTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MirrorSourceConnectorDefaultReplicationPolicyTest extends MirrorSourceConnectorTest {
+
+ @Override
+ protected ReplicationPolicy replicationPolicy() {
+ return new DefaultReplicationPolicy();
+ }
+
+ @Test
+ public void testNoCycles() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), x -> true, x -> true);
+ assertFalse("should not allow cycles", connector.shouldReplicateTopic("target.topic1"));
+ assertFalse("should not allow cycles", connector.shouldReplicateTopic("target.source.topic1"));
+ assertFalse("should not allow cycles", connector.shouldReplicateTopic("source.target.topic1"));
+ assertTrue("should allow anything else", connector.shouldReplicateTopic("topic1"));
+ assertTrue("should allow anything else", connector.shouldReplicateTopic("source.topic1"));
+ }
+
+ @Test
+ public void testAclTransformation() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new DefaultReplicationPolicy(), x -> true, x -> true);
+ AclBinding allowAllAclBinding = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
+ AclBinding processedAllowAllAclBinding = connector.targetAclBinding(allowAllAclBinding);
+ String expectedRemoteTopicName = "source" + DefaultReplicationPolicy.SEPARATOR_DEFAULT
+ + allowAllAclBinding.pattern().name();
+ assertTrue("should change topic name",
+ processedAllowAllAclBinding.pattern().name().equals(expectedRemoteTopicName));
+ assertTrue("should change ALL to READ", processedAllowAllAclBinding.entry().operation() == AclOperation.READ);
+ assertTrue("should not change ALLOW",
+ processedAllowAllAclBinding.entry().permissionType() == AclPermissionType.ALLOW);
+
+ AclBinding denyAllAclBinding = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY));
+ AclBinding processedDenyAllAclBinding = connector.targetAclBinding(denyAllAclBinding);
+ assertTrue("should not change ALL", processedDenyAllAclBinding.entry().operation() == AclOperation.ALL);
+ assertTrue("should not change DENY",
+ processedDenyAllAclBinding.entry().permissionType() == AclPermissionType.DENY);
+ }
+
+ @Test
+ public void testRefreshTopicPartitions() throws Exception {
+ testRefreshTopicPartitions("source.topic");
+ }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorLegacyReplicationPolicyTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorLegacyReplicationPolicyTest.java
new file mode 100644
index 0000000000000..b98c8075c8475
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorLegacyReplicationPolicyTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MirrorSourceConnectorLegacyReplicationPolicyTest extends MirrorSourceConnectorTest {
+
+ @Override
+ protected ReplicationPolicy replicationPolicy() {
+ return new LegacyReplicationPolicy();
+ }
+
+ @Test
+ public void testCycles() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new LegacyReplicationPolicy(), x -> true, x -> true);
+ // LegacyReplicationPolicy can prevent cycles only for heartbeats topics.
+ assertTrue("should allow cycles", connector.shouldReplicateTopic("target.topic1"));
+ assertTrue("should allow cycles", connector.shouldReplicateTopic("target.source.topic1"));
+ assertTrue("should allow cycles", connector.shouldReplicateTopic("source.target.topic1"));
+ assertTrue("should allow anything else", connector.shouldReplicateTopic("topic1"));
+ assertTrue("should allow anything else", connector.shouldReplicateTopic("source.topic1"));
+
+ assertFalse("should not allow cycles for heartbeats", connector.shouldReplicateTopic("target.heartbeats"));
+ assertFalse("should not allow cycles for heartbeats", connector.shouldReplicateTopic("target.source.heartbeats"));
+ assertFalse("should not allow cycles for heartbeats", connector.shouldReplicateTopic("source.target.heartbeats"));
+ assertTrue("should allow anything else for heartbeats", connector.shouldReplicateTopic("heartbeats"));
+ assertTrue("should allow anything else for heartbeats", connector.shouldReplicateTopic("source.heartbeats"));
+ }
+
+ @Test
+ public void testAclTransformation() {
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ new LegacyReplicationPolicy(), x -> true, x -> true);
+ AclBinding allowAllAclBinding = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
+ AclBinding processedAllowAllAclBinding = connector.targetAclBinding(allowAllAclBinding);
+ String expectedRemoteTopicName = allowAllAclBinding.pattern().name();
+ assertTrue("should change topic name",
+ processedAllowAllAclBinding.pattern().name().equals(expectedRemoteTopicName));
+ assertTrue("should change ALL to READ", processedAllowAllAclBinding.entry().operation() == AclOperation.READ);
+ assertTrue("should not change ALLOW",
+ processedAllowAllAclBinding.entry().permissionType() == AclPermissionType.ALLOW);
+
+ AclBinding denyAllAclBinding = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY));
+ AclBinding processedDenyAllAclBinding = connector.targetAclBinding(denyAllAclBinding);
+ assertTrue("should not change ALL", processedDenyAllAclBinding.entry().operation() == AclOperation.ALL);
+ assertTrue("should not change DENY",
+ processedDenyAllAclBinding.entry().permissionType() == AclPermissionType.DENY);
+ }
+
+ @Test
+ public void testRefreshTopicPartitions() throws Exception {
+ testRefreshTopicPartitions("topic");
+ }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index e86d21ee67e6c..df3e67eb4912a 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -16,6 +16,16 @@
*/
package org.apache.kafka.connect.mirror;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
@@ -24,40 +34,30 @@
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
-import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.connect.connector.ConnectorContext;
-import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Test;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.TASK_TOPIC_PARTITIONS;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.any;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MirrorSourceConnectorTest {
+abstract class MirrorSourceConnectorTest {
@Test
public void testReplicatesHeartbeatsByDefault() {
- MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
+ MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+ replicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
assertTrue("should replicate heartbeats", connector.shouldReplicateTopic("heartbeats"));
assertTrue("should replicate upstream heartbeats", connector.shouldReplicateTopic("us-west.heartbeats"));
}
@@ -70,58 +70,21 @@ public void testReplicatesHeartbeatsDespiteFilter() {
assertTrue("should replicate upstream heartbeats", connector.shouldReplicateTopic("us-west.heartbeats"));
}
- @Test
- public void testNoCycles() {
- MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
- assertFalse("should not allow cycles", connector.shouldReplicateTopic("target.topic1"));
- assertFalse("should not allow cycles", connector.shouldReplicateTopic("target.source.topic1"));
- assertFalse("should not allow cycles", connector.shouldReplicateTopic("source.target.topic1"));
- assertTrue("should allow anything else", connector.shouldReplicateTopic("topic1"));
- assertTrue("should allow anything else", connector.shouldReplicateTopic("source.topic1"));
- }
-
@Test
public void testAclFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
+ replicationPolicy(), x -> true, x -> true);
assertFalse("should not replicate ALLOW WRITE", connector.shouldReplicateAcl(
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))));
+ new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))));
assertTrue("should replicate ALLOW ALL", connector.shouldReplicateAcl(
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))));
- }
-
- @Test
- public void testAclTransformation() {
- MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, x -> true);
- AclBinding allowAllAclBinding = new AclBinding(
- new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
- AclBinding processedAllowAllAclBinding = connector.targetAclBinding(allowAllAclBinding);
- String expectedRemoteTopicName = "source" + DefaultReplicationPolicy.SEPARATOR_DEFAULT
- + allowAllAclBinding.pattern().name();
- assertTrue("should change topic name",
- processedAllowAllAclBinding.pattern().name().equals(expectedRemoteTopicName));
- assertTrue("should change ALL to READ", processedAllowAllAclBinding.entry().operation() == AclOperation.READ);
- assertTrue("should not change ALLOW",
- processedAllowAllAclBinding.entry().permissionType() == AclPermissionType.ALLOW);
-
- AclBinding denyAllAclBinding = new AclBinding(
- new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
- new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY));
- AclBinding processedDenyAllAclBinding = connector.targetAclBinding(denyAllAclBinding);
- assertTrue("should not change ALL", processedDenyAllAclBinding.entry().operation() == AclOperation.ALL);
- assertTrue("should not change DENY",
- processedDenyAllAclBinding.entry().permissionType() == AclPermissionType.DENY);
+ new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))));
}
-
@Test
public void testConfigPropertyFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
+ replicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
ArrayList entries = new ArrayList<>();
entries.add(new ConfigEntry("name-1", "value-1"));
entries.add(new ConfigEntry("min.insync.replicas", "2"));
@@ -179,10 +142,9 @@ public void testMirrorSourceConnectorTaskConfig() {
assertEquals("t0-2,t0-5,t1-0,t2-1", t3.get(TASK_TOPIC_PARTITIONS));
}
- @Test
- public void testRefreshTopicPartitions() throws Exception {
+ public void testRefreshTopicPartitions(String newTopic) throws Exception {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
- new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
+ replicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);
@@ -196,20 +158,22 @@ public void testRefreshTopicPartitions() throws Exception {
connector.refreshTopicPartitions();
Map expectedPartitionCounts = new HashMap<>();
- expectedPartitionCounts.put("source.topic", 1L);
- List expectedNewTopics = Arrays.asList(new NewTopic("source.topic", 1, (short) 0));
+ expectedPartitionCounts.put(newTopic, 1L);
+ List expectedNewTopics = Arrays.asList(new NewTopic(newTopic, 1, (short) 0));
verify(connector, times(2)).computeAndCreateTopicPartitions();
verify(connector, times(2)).createTopicPartitions(
- eq(expectedPartitionCounts),
- eq(expectedNewTopics),
- eq(Collections.emptyMap()));
+ eq(expectedPartitionCounts),
+ eq(expectedNewTopics),
+ eq(Collections.emptyMap()));
- List targetTopicPartitions = Arrays.asList(new TopicPartition("source.topic", 0));
+ List targetTopicPartitions = Arrays.asList(new TopicPartition(newTopic, 0));
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
connector.refreshTopicPartitions();
// once target topic is created, refreshTopicPartitions() will NOT call computeAndCreateTopicPartitions() again
verify(connector, times(2)).computeAndCreateTopicPartitions();
}
+
+ protected abstract ReplicationPolicy replicationPolicy();
}