diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java new file mode 100644 index 0000000000000..7320b18020112 --- /dev/null +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating + * from legacy MM1, or for any use-case involving one-way replication. + * + * N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that + * your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely + * already be the case. + */ +public class IdentityReplicationPolicy extends DefaultReplicationPolicy { + private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class); + + public static final String SOURCE_CLUSTER_ALIAS_CONFIG = "source.cluster.alias"; + + private String sourceClusterAlias = null; + + @Override + public void configure(Map props) { + super.configure(props); + if (props.containsKey(SOURCE_CLUSTER_ALIAS_CONFIG)) { + sourceClusterAlias = (String) props.get(SOURCE_CLUSTER_ALIAS_CONFIG); + log.info("Using source cluster alias `{}`.", sourceClusterAlias); + } + } + + /** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy does not include the source + * cluster alias in the remote topic name. Instead, topic names are unchanged. + * + * In the special case of heartbeats, we defer to DefaultReplicationPolicy. + */ + @Override + public String formatRemoteTopic(String sourceClusterAlias, String topic) { + if (looksLikeHeartbeat(topic)) { + return super.formatRemoteTopic(sourceClusterAlias, topic); + } else { + return topic; + } + } + + /** Unlike DefaultReplicationPolicy, IdendityReplicationPolicy cannot know the source of + * a remote topic based on its name alone. If `source.cluster.alias` is provided, + * `topicSource` will return that. + * + * In the special case of heartbeats, we defer to DefaultReplicationPolicy. + */ + @Override + public String topicSource(String topic) { + if (looksLikeHeartbeat(topic)) { + return super.topicSource(topic); + } else { + return sourceClusterAlias; + } + } + + /** Since any topic may be a "remote topic", this just returns `topic`. + * + * In the special case of heartbeats, we defer to DefaultReplicationPolicy. + */ + @Override + public String upstreamTopic(String topic) { + if (looksLikeHeartbeat(topic)) { + return super.upstreamTopic(topic); + } else { + return topic; + } + } + + private boolean looksLikeHeartbeat(String topic) { + return topic != null && topic.endsWith(MirrorClientConfig.HEARTBEATS_TOPIC); + } +} diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java index 17d18ecb58a27..aa0bca1f9194d 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java @@ -192,6 +192,7 @@ Set listTopics() throws InterruptedException { int countHopsForTopic(String topic, String sourceClusterAlias) { int hops = 0; + Set visited = new HashSet<>(); while (true) { hops++; String source = replicationPolicy.topicSource(topic); @@ -201,6 +202,12 @@ int countHopsForTopic(String topic, String sourceClusterAlias) { if (source.equals(sourceClusterAlias)) { return hops; } + if (visited.contains(source)) { + // Extra check for IdentityReplicationPolicy and similar impls that cannot prevent cycles. + // We assume we're stuck in a cycle and will never find sourceClusterAlias. + return -1; + } + visited.add(source); topic = replicationPolicy.upstreamTopic(topic); } } @@ -223,7 +230,8 @@ boolean isRemoteTopic(String topic) { Set allSources(String topic) { Set sources = new HashSet<>(); String source = replicationPolicy.topicSource(topic); - while (source != null) { + while (source != null && !sources.contains(source)) { + // The extra Set.contains above is for ReplicationPolicies that cannot prevent cycles. sources.add(source); topic = replicationPolicy.upstreamTopic(topic); source = replicationPolicy.topicSource(topic); diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java index 11f73f50ceac5..b6eb26c8707bc 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java @@ -45,7 +45,7 @@ public interface ReplicationPolicy { */ default String originalTopic(String topic) { String upstream = upstreamTopic(topic); - if (upstream == null) { + if (upstream == null || upstream.equals(topic)) { return topic; } else { return originalTopic(upstream); diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java index 766b91ce69944..2e1b9b771942e 100644 --- a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java +++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java @@ -37,7 +37,11 @@ private static class FakeMirrorClient extends MirrorClient { List topics; FakeMirrorClient(List topics) { - super(null, new DefaultReplicationPolicy(), null); + this(new DefaultReplicationPolicy(), topics); + } + + FakeMirrorClient(ReplicationPolicy replicationPolicy, List topics) { + super(null, replicationPolicy, null); this.topics = topics; } @@ -131,6 +135,23 @@ public void upstreamClustersTest() throws InterruptedException { assertFalse(sources.contains(null)); } + @Test + public void testIdentityReplicationUpstreamClusters() throws InterruptedException { + // IdentityReplicationPolicy treats heartbeats as a special case, so these should work as usual. + MirrorClient client = new FakeMirrorClient(identityReplicationPolicy("source"), Arrays.asList("topic1", + "topic2", "heartbeats", "source1.heartbeats", "source1.source2.heartbeats", + "source3.source4.source5.heartbeats")); + Set sources = client.upstreamClusters(); + assertTrue(sources.contains("source1")); + assertTrue(sources.contains("source2")); + assertTrue(sources.contains("source3")); + assertTrue(sources.contains("source4")); + assertTrue(sources.contains("source5")); + assertFalse(sources.contains("")); + assertFalse(sources.contains(null)); + assertEquals(5, sources.size()); + } + @Test public void remoteTopicsTest() throws InterruptedException { MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3", @@ -144,6 +165,20 @@ public void remoteTopicsTest() throws InterruptedException { assertTrue(remoteTopics.contains("source3.source4.source5.topic6")); } + @Test + public void testIdentityReplicationRemoteTopics() throws InterruptedException { + // IdentityReplicationPolicy should consider any topic to be remote. + MirrorClient client = new FakeMirrorClient(identityReplicationPolicy("source"), Arrays.asList( + "topic1", "topic2", "topic3", "heartbeats", "backup.heartbeats")); + Set remoteTopics = client.remoteTopics(); + assertTrue(remoteTopics.contains("topic1")); + assertTrue(remoteTopics.contains("topic2")); + assertTrue(remoteTopics.contains("topic3")); + // Heartbeats are treated as a special case + assertFalse(remoteTopics.contains("heartbeats")); + assertTrue(remoteTopics.contains("backup.heartbeats")); + } + @Test public void remoteTopicsSeparatorTest() throws InterruptedException { MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3", @@ -159,4 +194,25 @@ public void remoteTopicsSeparatorTest() throws InterruptedException { assertTrue(remoteTopics.contains("source3__source4__source5__topic6")); } + @Test + public void testIdentityReplicationTopicSource() { + MirrorClient client = new FakeMirrorClient( + identityReplicationPolicy("primary"), Arrays.asList()); + assertEquals("topic1", client.replicationPolicy() + .formatRemoteTopic("primary", "topic1")); + assertEquals("primary", client.replicationPolicy() + .topicSource("topic1")); + // Heartbeats are handled as a special case + assertEquals("backup.heartbeats", client.replicationPolicy() + .formatRemoteTopic("backup", "heartbeats")); + assertEquals("backup", client.replicationPolicy() + .topicSource("backup.heartbeats")); + } + + private ReplicationPolicy identityReplicationPolicy(String source) { + IdentityReplicationPolicy policy = new IdentityReplicationPolicy(); + policy.configure(Collections.singletonMap( + IdentityReplicationPolicy.SOURCE_CLUSTER_ALIAS_CONFIG, source)); + return policy; + } } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index 298bce34fe531..33cd8a7ac3031 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -175,6 +175,7 @@ public Map workerConfig(SourceAndTarget sourceAndTarget) { props.putAll(stringsWithPrefix("header.converter")); props.putAll(stringsWithPrefix("task")); props.putAll(stringsWithPrefix("worker")); + props.putAll(stringsWithPrefix("replication.policy")); // transform any expression like ${provider:path:key}, since the worker doesn't do so props = transform(props); @@ -203,6 +204,7 @@ public Map connectorBaseConfig(SourceAndTarget sourceAndTarget, props.keySet().retainAll(MirrorConnectorConfig.CONNECTOR_CONFIG_DEF.names()); props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG)); + props.putAll(stringsWithPrefix("replication.policy")); Map sourceClusterProps = clusterProps(sourceAndTarget.source()); // attrs non prefixed with producer|consumer|admin diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index e1db62a7b6ce0..fd6bf9f27649c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -492,7 +492,12 @@ boolean isCycle(String topic) { } else if (source.equals(sourceAndTarget.target())) { return true; } else { - return isCycle(replicationPolicy.upstreamTopic(topic)); + String upstreamTopic = replicationPolicy.upstreamTopic(topic); + if (upstreamTopic.equals(topic)) { + // Extra check for IdentityReplicationPolicy and similar impls that don't prevent cycles. + return false; + } + return isCycle(upstreamTopic); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index 68d149c755ff8..b4c8ca651d29a 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -77,10 +77,32 @@ public void testNoCycles() { assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles"); assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles"); assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles"); + assertFalse(connector.shouldReplicateTopic("target.source.target.topic1"), "should not allow cycles"); + assertFalse(connector.shouldReplicateTopic("source.target.source.topic1"), "should not allow cycles"); assertTrue(connector.shouldReplicateTopic("topic1"), "should allow anything else"); assertTrue(connector.shouldReplicateTopic("source.topic1"), "should allow anything else"); } + @Test + public void testIdentityReplication() { + MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), + new IdentityReplicationPolicy(), x -> true, x -> true); + assertTrue(connector.shouldReplicateTopic("target.topic1"), "should allow cycles"); + assertTrue(connector.shouldReplicateTopic("target.source.topic1"), "should allow cycles"); + assertTrue(connector.shouldReplicateTopic("source.target.topic1"), "should allow cycles"); + assertTrue(connector.shouldReplicateTopic("target.source.target.topic1"), "should allow cycles"); + assertTrue(connector.shouldReplicateTopic("source.target.source.topic1"), "should allow cycles"); + assertTrue(connector.shouldReplicateTopic("topic1"), "should allow normal topics"); + assertTrue(connector.shouldReplicateTopic("othersource.topic1"), "should allow normal topics"); + assertFalse(connector.shouldReplicateTopic("target.heartbeats"), "should not allow heartbeat cycles"); + assertFalse(connector.shouldReplicateTopic("target.source.heartbeats"), "should not allow heartbeat cycles"); + assertFalse(connector.shouldReplicateTopic("source.target.heartbeats"), "should not allow heartbeat cycles"); + assertFalse(connector.shouldReplicateTopic("target.source.target.heartbeats"), "should not allow heartbeat cycles"); + assertFalse(connector.shouldReplicateTopic("source.target.source.heartbeats"), "should not allow heartbeat cycles"); + assertTrue(connector.shouldReplicateTopic("heartbeats"), "should allow heartbeat topics"); + assertTrue(connector.shouldReplicateTopic("othersource.heartbeats"), "should allow heartbeat topics"); + } + @Test public void testAclFiltering() { MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java new file mode 100644 index 0000000000000..43b1fcbf6d1d3 --- /dev/null +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.mirror.IdentityReplicationPolicy; +import org.apache.kafka.connect.mirror.MirrorClient; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Tag; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; + +/** + * Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}. + * + *

MM2 is configured with active/passive replication between two Kafka clusters with {@link IdentityReplicationPolicy}. + * Tests validate that records sent to the primary cluster arrive at the backup cluster. Then, a consumer group is + * migrated from the primary cluster to the backup cluster. Tests validate that consumer offsets + * are translated and replicated from the primary cluster to the backup cluster during this failover. + */ +@Tag("integration") +public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrationBaseTest { + @BeforeEach + public void startClusters() throws Exception { + super.startClusters(new HashMap() {{ + put("replication.policy.class", IdentityReplicationPolicy.class.getName()); + put("topics", "test-topic-.*"); + put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true"); + }}); + } + + @Test + public void testReplication() throws Exception { + produceMessages(primary, "test-topic-1"); + String consumerGroupName = "consumer-group-testReplication"; + Map consumerProps = new HashMap() {{ + put("group.id", consumerGroupName); + put("auto.offset.reset", "latest"); + }}; + // warm up consumers before starting the connectors so we don't need to wait for discovery + warmUpConsumer(consumerProps); + + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS); + + MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS)); + MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)); + + // make sure the topic is auto-created in the other cluster + waitForTopicCreated(primary, "test-topic-1"); + waitForTopicCreated(backup, "test-topic-1"); + assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG), + "topic config was not synced"); + + assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(), + "Records were not produced to primary cluster."); + assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(), + "Records were not replicated to backup cluster."); + + assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0, + "Heartbeats were not emitted to primary cluster."); + assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0, + "Heartbeats were not emitted to backup cluster."); + assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0, + "Heartbeats were not replicated downstream to backup cluster."); + assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0, + "Heartbeats were not replicated downstream to primary cluster."); + + assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster."); + assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly."); + assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0, + "Checkpoints were not emitted downstream to backup cluster."); + + Map backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS, + Duration.ofMillis(CHECKPOINT_DURATION_MS)); + + assertTrue(backupOffsets.containsKey( + new TopicPartition("test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets); + + // Failover consumer group to backup cluster. + try (Consumer primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) { + primaryConsumer.assign(backupOffsets.keySet()); + backupOffsets.forEach(primaryConsumer::seek); + primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + primaryConsumer.commitAsync(); + + assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedover to zero offset."); + assertTrue(primaryConsumer.position( + new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset."); + } + + primaryClient.close(); + backupClient.close(); + + // create more matching topics + primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); + + // make sure the topic is auto-created in the other cluster + waitForTopicCreated(backup, "test-topic-2"); + + // only produce messages to the first partition + produceMessages(primary, "test-topic-2", 1); + + // expect total consumed messages equals to NUM_RECORDS_PER_PARTITION + assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(), + "Records were not produced to primary cluster."); + assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(), + "New topic was not replicated to backup cluster."); + } + + @Test + public void testReplicationWithEmptyPartition() throws Exception { + String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition"; + Map consumerProps = Collections.singletonMap("group.id", consumerGroupName); + + // create topic + String topic = "test-topic-with-empty-partition"; + primary.kafka().createTopic(topic, NUM_PARTITIONS); + + // produce to all test-topic-empty's partitions, except the last partition + produceMessages(primary, topic, NUM_PARTITIONS - 1); + + // consume before starting the connectors so we don't need to wait for discovery + int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1); + try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) { + waitForConsumingAllRecords(primaryConsumer, expectedRecords); + } + + // one way replication from primary to backup + mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + mm2Config = new MirrorMakerConfig(mm2Props); + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record + Thread.sleep(TimeUnit.SECONDS.toMillis(3)); + + // note that with IdentityReplicationPolicy, topics on the backup are NOT renamed to PRIMARY_CLUSTER_ALIAS + "." + topic + String backupTopic = topic; + + // consume all records from backup cluster + try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, + backupTopic)) { + waitForConsumingAllRecords(backupConsumer, expectedRecords); + } + + try (Admin backupClient = backup.kafka().createAdminClient()) { + // retrieve the consumer group offset from backup cluster + Map remoteOffsets = + backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get(); + + // pinpoint the offset of the last partition which does not receive records + OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1)); + // offset of the last partition should exist, but its value should be 0 + assertNotNull(offset, "Offset of last partition was not replicated"); + assertEquals(0, offset.offset(), "Offset of last partition is not zero"); + } + } + + @Test + public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException { + produceMessages(primary, "test-topic-1"); + String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync"; + Map consumerProps = new HashMap() {{ + put("group.id", consumerGroupName); + put("auto.offset.reset", "earliest"); + }}; + // create consumers before starting the connectors so we don't need to wait for discovery + try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, + "test-topic-1")) { + // we need to wait for consuming all the records for MM2 replicating the expected offsets + waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED); + } + + // enable automated consumer group offset sync + mm2Props.put("sync.group.offsets.enabled", "true"); + mm2Props.put("sync.group.offsets.interval.seconds", "1"); + // one way replication from primary to backup + mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // make sure the topic is created in the other cluster + waitForTopicCreated(primary, "backup.test-topic-1"); + waitForTopicCreated(backup, "test-topic-1"); + // create a consumer at backup cluster with same consumer group Id to consume 1 topic + Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo( + consumerProps, "test-topic-1"); + + waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"), + consumerGroupName, NUM_RECORDS_PRODUCED); + + ConsumerRecords records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + + // the size of consumer record should be zero, because the offsets of the same consumer group + // have been automatically synchronized from primary to backup by the background job, so no + // more records to consume from the replicated topic by the same consumer group at backup cluster + assertEquals(0, records.count(), "consumer record size is not zero"); + + // now create a new topic in primary cluster + primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); + // make sure the topic is created in backup cluster + waitForTopicCreated(backup, "test-topic-2"); + + // produce some records to the new topic in primary cluster + produceMessages(primary, "test-topic-2"); + + // create a consumer at primary cluster to consume the new topic + try (Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "group.id", "consumer-group-1"), "test-topic-2")) { + // we need to wait for consuming all the records for MM2 replicating the expected offsets + waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED); + } + + // create a consumer at backup cluster with same consumer group Id to consume old and new topic + backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "group.id", consumerGroupName), "test-topic-1", "test-topic-2"); + + waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"), + consumerGroupName, NUM_RECORDS_PRODUCED); + + records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + // similar reasoning as above, no more records to consume by the same consumer group at backup cluster + assertEquals(0, records.count(), "consumer record size is not zero"); + backupConsumer.close(); + } +} diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index b82b26305e41a..6fb7a81676bc2 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.mirror.SourceAndTarget; import org.apache.kafka.connect.mirror.Checkpoint; import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.ReplicationPolicy; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; @@ -80,29 +81,29 @@ public abstract class MirrorConnectorsIntegrationBaseTest { private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class); - private static final int NUM_RECORDS_PER_PARTITION = 10; - private static final int NUM_PARTITIONS = 10; - private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; - private static final int RECORD_TRANSFER_DURATION_MS = 30_000; - private static final int CHECKPOINT_DURATION_MS = 20_000; + protected static final int NUM_RECORDS_PER_PARTITION = 10; + protected static final int NUM_PARTITIONS = 10; + protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; + protected static final int RECORD_TRANSFER_DURATION_MS = 30_000; + protected static final int CHECKPOINT_DURATION_MS = 20_000; private static final int RECORD_CONSUME_DURATION_MS = 20_000; private static final int OFFSET_SYNC_DURATION_MS = 30_000; private static final int TOPIC_SYNC_DURATION_MS = 60_000; private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000; private static final int NUM_WORKERS = 3; - private static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500); + protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500); protected static final String PRIMARY_CLUSTER_ALIAS = "primary"; protected static final String BACKUP_CLUSTER_ALIAS = "backup"; - private static final List> CONNECTOR_LIST = + protected static final List> CONNECTOR_LIST = Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class); private volatile boolean shuttingDown; protected Map mm2Props = new HashMap<>(); - private MirrorMakerConfig mm2Config; - private EmbeddedConnectCluster primary; - private EmbeddedConnectCluster backup; + protected MirrorMakerConfig mm2Config; + protected EmbeddedConnectCluster primary; + protected EmbeddedConnectCluster backup; - private Exit.Procedure exitProcedure; + protected Exit.Procedure exitProcedure; private Exit.Procedure haltProcedure; protected Properties primaryBrokerProps = new Properties(); @@ -112,6 +113,14 @@ public abstract class MirrorConnectorsIntegrationBaseTest { @BeforeEach public void startClusters() throws Exception { + startClusters(new HashMap() {{ + put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*"); + put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true"); + put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true"); + }}); + } + + public void startClusters(Map additionalMM2Config) throws Exception { shuttingDown = false; exitProcedure = (code, message) -> { if (shuttingDown) { @@ -142,10 +151,11 @@ public void startClusters() throws Exception { primaryBrokerProps.put("auto.create.topics.enable", "false"); backupBrokerProps.put("auto.create.topics.enable", "false"); - + mm2Props.putAll(basicMM2Config()); + mm2Props.putAll(additionalMM2Config); - mm2Config = new MirrorMakerConfig(mm2Props); + mm2Config = new MirrorMakerConfig(mm2Props); primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS)); backupWorkerProps.putAll(mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS))); @@ -174,7 +184,7 @@ public void startClusters() throws Exception { waitForTopicCreated(primary, "mm2-status.backup.internal"); waitForTopicCreated(primary, "mm2-offsets.backup.internal"); waitForTopicCreated(primary, "mm2-configs.backup.internal"); - + backup.start(); backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, "Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not start in time."); @@ -377,9 +387,11 @@ public void testReplicationWithEmptyPartition() throws Exception { // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record Thread.sleep(TimeUnit.SECONDS.toMillis(3)); + String backupTopic = PRIMARY_CLUSTER_ALIAS + "." + topic; + // consume all records from backup cluster - try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, - PRIMARY_CLUSTER_ALIAS + "." + topic)) { + try (Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, + backupTopic)) { waitForConsumingAllRecords(backupConsumer, expectedRecords); } @@ -389,7 +401,7 @@ public void testReplicationWithEmptyPartition() throws Exception { backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get(); // pinpoint the offset of the last partition which does not receive records - OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(PRIMARY_CLUSTER_ALIAS + "." + topic, NUM_PARTITIONS - 1)); + OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1)); // offset of the last partition should exist, but its value should be 0 assertNotNull(offset, "Offset of last partition was not replicated"); assertEquals(0, offset.offset(), "Offset of last partition is not zero"); @@ -482,6 +494,9 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { produceMessages(primary, "test-topic-1"); + ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy(); + String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1"); + // Check offsets are pushed to the checkpoint topic Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); @@ -489,13 +504,13 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { ConsumerRecords records = backupConsumer.poll(Duration.ofSeconds(1L)); for (ConsumerRecord record : records) { Checkpoint checkpoint = Checkpoint.deserializeRecord(record); - if ((PRIMARY_CLUSTER_ALIAS + ".test-topic-1").equals(checkpoint.topicPartition().topic())) { + if (remoteTopic.equals(checkpoint.topicPartition().topic())) { return true; } } return false; }, 30_000, - "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + "test-topic-1" + "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1" ); // Ensure no offset-syncs topics have been created on the primary cluster @@ -507,7 +522,7 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception { /* * launch the connectors on kafka connect cluster and check if they are running */ - private static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster, + protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster, List> connectorClasses, MirrorMakerConfig mm2Config, String primary, String backup) throws InterruptedException { for (Class connector : connectorClasses) { @@ -527,7 +542,7 @@ private static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connect /* * wait for the topic created on the cluster */ - private static void waitForTopicCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException { + protected static void waitForTopicCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException { try (final Admin adminClient = cluster.kafka().createAdminClient()) { waitForCondition(() -> adminClient.listTopics().names().get().contains(topicName), TOPIC_SYNC_DURATION_MS, "Topic: " + topicName + " didn't get created on cluster: " + cluster.getName() @@ -549,7 +564,7 @@ private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Excepti /* * retrieve the config value based on the input cluster, topic and config name */ - private static String getTopicConfig(EmbeddedKafkaCluster cluster, String topic, String configName) throws Exception { + protected static String getTopicConfig(EmbeddedKafkaCluster cluster, String topic, String configName) throws Exception { try (Admin client = cluster.createAdminClient()) { Collection cr = Collections.singleton( new ConfigResource(ConfigResource.Type.TOPIC, topic)); @@ -584,7 +599,7 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, * given consumer group, topics and expected number of records, make sure the consumer group * offsets are eventually synced to the expected offset numbers */ - private static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, + protected static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, Consumer consumer, List topics, String consumerGroupId, int numRecords) throws InterruptedException { try (Admin adminClient = connect.kafka().createAdminClient()) { @@ -614,7 +629,7 @@ private static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster co /* * make sure the consumer to consume expected number of records */ - private static void waitForConsumingAllRecords(Consumer consumer, int numExpectedRecords) + protected static void waitForConsumingAllRecords(Consumer consumer, int numExpectedRecords) throws InterruptedException { final AtomicInteger totalConsumedRecords = new AtomicInteger(0); waitForCondition(() -> { @@ -627,14 +642,11 @@ private static void waitForConsumingAllRecords(Consumer consumer, int /* * MM2 config to use in integration tests */ - private static Map basicMM2Config() { + protected static Map basicMM2Config() { Map mm2Props = new HashMap<>(); mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS); mm2Props.put("max.tasks", "10"); - mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*"); mm2Props.put("groups", "consumer-group-.*"); - mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true"); - mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true"); mm2Props.put("sync.topic.acls.enabled", "false"); mm2Props.put("emit.checkpoints.interval.seconds", "1"); mm2Props.put("emit.heartbeats.interval.seconds", "1"); @@ -672,7 +684,7 @@ private void createTopics() { /* * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly */ - private void warmUpConsumer(Map consumerProps) throws InterruptedException { + protected void warmUpConsumer(Map consumerProps) throws InterruptedException { Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1"); dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); dummyConsumer.commitSync(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index 83168d1518175..db6f67e56e60f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -371,7 +371,7 @@ public void createTopic(String topic, int partitions, int replication, MapNotable changes in 3 understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass in new ConsumerGroupMetadata(consumerGroupId) to work with older brokers. See KIP-732 for more details. - +

  • The Connect-based MirrorMaker (MM2) includes changes to support IdentityReplicationPolicy, enabling replication without renaming topics. + The existing DefaultReplicationPolicy is still used by default, but identity replication can be enabled via the + replication.policy configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for + use-cases with simple one-way replication topologies where topic renaming is undesirable. Note that IdentityReplicationPolicy, unlike + DefaultReplicationPolicy, cannot prevent replication cycles based on topic names, so take care to avoid cycles when constructing your + replication topology. +
  • Notable changes in 2.8.0