Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.common.Configurable;

import java.util.Map;

import static org.apache.kafka.connect.mirror.MirrorClientConfig.HEARTBEATS_TOPIC;

/**
* The replication policy that imitates the behavior of MirrorMaker 1.
*
* <p>The policy doesn't rename topics: {@code topic1} remains {@code topic1} after replication.
* There is one exception to this: for {@code heartbeats}, it behaves identical to {@link DefaultReplicationPolicy}.
*
* <p>The policy has some notable limitations. The most important one is that the policy is unable to detect
* cycles for any topic apart from {@code heartbeats}. This makes cross-replication effectively impossible.
*
* <p>Another limitation is that {@link MirrorClient#remoteTopics()} will be able to list only
* {@code heartbeats} topics.
*
* <p>{@link MirrorClient#countHopsForTopic(String, String)} will return {@code -1} for any topic
* apart from {@code heartbeats}.
*
* <p>The policy supports {@link DefaultReplicationPolicy}'s configurations
* for the behavior related to {@code heartbeats}.
*/
public class LegacyReplicationPolicy implements ReplicationPolicy, Configurable {
// Replication sub-policy for heartbeats topics
private final DefaultReplicationPolicy heartbeatTopicReplicationPolicy = new DefaultReplicationPolicy();

@Override
public void configure(final Map<String, ?> props) {
heartbeatTopicReplicationPolicy.configure(props);
}

@Override
public String formatRemoteTopic(final String sourceClusterAlias, final String topic) {
if (isOriginalTopicHeartbeats(topic)) {
return heartbeatTopicReplicationPolicy.formatRemoteTopic(sourceClusterAlias, topic);
} else {
return topic;
}
}

@Override
public String topicSource(final String topic) {
if (isOriginalTopicHeartbeats(topic)) {
return heartbeatTopicReplicationPolicy.topicSource(topic);
} else {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen alternative solutions floating around that use a configurable source here. Basically, the configuration passed to configure() is consulted to find the "source cluster", rather than looking at the topic name. That approach lets you return an actual source here, which obviates the new canTrackSource() method etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've explored this possibility, too. The main problem with it is that the replication policy should answer differently for source and target clusters. It's essential for methods like MirrorSourceConnector.isCycle and MirrorClient.remoteTopics. For a source, topicSource should return null; for a target, a predefined value.

It leaves two possibility. In one, we set up two different replication policy instances with different configurations, e.g.:

replication.policy.source.class=org.apache.kafka.connect.mirror.LegacyReplicationPolicy
replication.policy.source.source=
replication.policy.target.class=org.apache.kafka.connect.mirror.LegacyReplicationPolicy
replication.policy.target.source=primary-cluster

Of course, we can make that the current configurations work as before.

Another possibility is to modify the ReplicationPolicy interface to allow it to pass additional information out (like canTrackSource or similar) or in (like topicSource(String topic, boolean isSourceCluster)).

What do you think would be the best approach?

}
}

@Override
public String upstreamTopic(final String topic) {
if (isOriginalTopicHeartbeats(topic)) {
return heartbeatTopicReplicationPolicy.upstreamTopic(topic);
} else {
return topic;
}
}

@Override
public String originalTopic(final String topic) {
if (isOriginalTopicHeartbeats(topic)) {
return HEARTBEATS_TOPIC;
} else {
return topic;
}
}

@Override
public boolean canTrackSource(final String topic) {
return isOriginalTopicHeartbeats(topic);
}

private boolean isOriginalTopicHeartbeats(final String topic) {
return HEARTBEATS_TOPIC.equals(heartbeatTopicReplicationPolicy.originalTopic(topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,9 @@ default boolean isInternalTopic(String topic) {
return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
|| topic.startsWith(".");
}

/** Checks if the policy can track back to the source of the topic. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by "track back to the source of the topic". The word "track" might mean a few things here, and it's not obvious what you mean. Can you clarify?

default boolean canTrackSource(String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a public API change like this is required, you will need to propose a small KIP. I'm unclear why it's required tho, and ideally we would not alter the existing API if possible.

If a new method is required, I think "track" is too ambiguous and should not be used here.

return !isInternalTopic(topic);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.junit.Test;

import java.util.Collections;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class LegacyReplicationPolicyTest {
@Test
public void testFormatRemoteTopic() {
final LegacyReplicationPolicy legacyReplicationPolicy = new LegacyReplicationPolicy();
assertEquals("aaa", legacyReplicationPolicy.formatRemoteTopic("source1", "aaa"));
assertEquals("source1.heartbeats", legacyReplicationPolicy.formatRemoteTopic("source1", "heartbeats"));
assertEquals("source2.source1.heartbeats", legacyReplicationPolicy.formatRemoteTopic("source2", "source1.heartbeats"));

legacyReplicationPolicy.configure(Collections.singletonMap("replication.policy.separator", "__"));
assertEquals("aaa", legacyReplicationPolicy.formatRemoteTopic("source1", "aaa"));
assertEquals("source1__heartbeats", legacyReplicationPolicy.formatRemoteTopic("source1", "heartbeats"));
}

@Test
public void testTopicSource() {
final LegacyReplicationPolicy legacyReplicationPolicy = new LegacyReplicationPolicy();
assertNull(legacyReplicationPolicy.topicSource("source1.aaa"));
assertNull(legacyReplicationPolicy.topicSource("heartbeats"));
assertEquals("source1", legacyReplicationPolicy.topicSource("source1.heartbeats"));
assertEquals("source2", legacyReplicationPolicy.topicSource("source2.source1.heartbeats"));
}

@Test
public void testUpstreamTopic() {
final LegacyReplicationPolicy legacyReplicationPolicy = new LegacyReplicationPolicy();
assertEquals("aaa", legacyReplicationPolicy.upstreamTopic("aaa"));
assertEquals("source1.aaa", legacyReplicationPolicy.upstreamTopic("source1.aaa"));
assertEquals("heartbeats", legacyReplicationPolicy.upstreamTopic("source1.heartbeats"));
}

@Test
public void testOriginalTopic() {
final LegacyReplicationPolicy legacyReplicationPolicy = new LegacyReplicationPolicy();
assertEquals("aaa", legacyReplicationPolicy.originalTopic("aaa"));
assertEquals("source1.aaa", legacyReplicationPolicy.originalTopic("source1.aaa"));
assertEquals("source2.source1.aaa", legacyReplicationPolicy.originalTopic("source2.source1.aaa"));
assertEquals("heartbeats", legacyReplicationPolicy.originalTopic("heartbeats"));
assertEquals("heartbeats", legacyReplicationPolicy.originalTopic("source2.source1.heartbeats"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

public class MirrorClientTest {
public class MirrorClientDefaultReplicationPolicyTest {

private static class FakeMirrorClient extends MirrorClient {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.common.Configurable;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class MirrorClientLegacyReplicationPolicyTest {

private static class FakeMirrorClient extends MirrorClient {

List<String> topics;

FakeMirrorClient(List<String> topics) {
super(null, new LegacyReplicationPolicy(), null);
this.topics = topics;
}

FakeMirrorClient() {
this(Collections.emptyList());
}

@Override
protected Set<String> listTopics() {
return new HashSet<>(topics);
}
}

@Test
public void testIsHeartbeatTopic() throws InterruptedException, TimeoutException {
MirrorClient client = new FakeMirrorClient();
assertTrue(client.isHeartbeatTopic("heartbeats"));
assertTrue(client.isHeartbeatTopic("source1.heartbeats"));
assertTrue(client.isHeartbeatTopic("source2.source1.heartbeats"));
assertFalse(client.isHeartbeatTopic("heartbeats!"));
assertFalse(client.isHeartbeatTopic("!heartbeats"));
assertFalse(client.isHeartbeatTopic("source1heartbeats"));
assertFalse(client.isHeartbeatTopic("source1-heartbeats"));
}

@Test
public void testIsCheckpointTopic() throws InterruptedException, TimeoutException {
MirrorClient client = new FakeMirrorClient();
assertTrue(client.isCheckpointTopic("source1.checkpoints.internal"));
assertFalse(client.isCheckpointTopic("checkpoints.internal"));
assertFalse(client.isCheckpointTopic("checkpoints-internal"));
assertFalse(client.isCheckpointTopic("checkpoints.internal!"));
assertFalse(client.isCheckpointTopic("!checkpoints.internal"));
assertFalse(client.isCheckpointTopic("source1checkpointsinternal"));
}

@Test
public void countHopsForTopicTest() throws InterruptedException, TimeoutException {
MirrorClient client = new FakeMirrorClient();

// We can count hops only for heartbeats topics.
assertEquals(-1, client.countHopsForTopic("topic", "source"));
assertEquals(-1, client.countHopsForTopic("source", "source"));
assertEquals(-1, client.countHopsForTopic("sourcetopic", "source"));
assertEquals(-1, client.countHopsForTopic("source1.topic", "source2"));
assertEquals(-1, client.countHopsForTopic("source1.topic", "source1"));
assertEquals(-1, client.countHopsForTopic("source2.source1.topic", "source2"));
assertEquals(-1, client.countHopsForTopic("source2.source1.topic", "source1"));
assertEquals(-1, client.countHopsForTopic("source3.source2.source1.topic", "source1"));
assertEquals(-1, client.countHopsForTopic("source3.source2.source1.topic", "source4"));

assertEquals(-1, client.countHopsForTopic("heartbeats", "source"));
assertEquals(1, client.countHopsForTopic("source1.heartbeats", "source1"));
assertEquals(1, client.countHopsForTopic("source2.source1.heartbeats", "source2"));
assertEquals(2, client.countHopsForTopic("source2.source1.heartbeats", "source1"));
assertEquals(3, client.countHopsForTopic("source3.source2.source1.heartbeats", "source1"));
assertEquals(-1, client.countHopsForTopic("source3.source2.source1.heartbeats", "source4"));
}

@Test
public void heartbeatTopicsTest() throws InterruptedException, TimeoutException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
"source1.heartbeats", "source2.source1.heartbeats", "source3.heartbeats"));
Set<String> heartbeatTopics = client.heartbeatTopics();
assertEquals(heartbeatTopics, new HashSet<>(Arrays.asList("heartbeats", "source1.heartbeats",
"source2.source1.heartbeats", "source3.heartbeats")));
}

@Test
public void checkpointsTopicsTest() throws InterruptedException, TimeoutException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "checkpoints.internal",
"source1.checkpoints.internal", "source2.source1.checkpoints.internal", "source3.checkpoints.internal"));
Set<String> checkpointTopics = client.checkpointTopics();
assertEquals(new HashSet<>(Arrays.asList("source1.checkpoints.internal",
"source2.source1.checkpoints.internal", "source3.checkpoints.internal")), checkpointTopics);
}

@Test
public void replicationHopsTest() throws InterruptedException, TimeoutException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
"source1.heartbeats", "source1.source2.heartbeats", "source3.heartbeats", "source4.topic1"));
assertEquals(1, client.replicationHops("source1"));
assertEquals(2, client.replicationHops("source2"));
assertEquals(1, client.replicationHops("source3"));
assertEquals(-1, client.replicationHops("source4")); // can't count hops for non-heartbeats topic
assertEquals(-1, client.replicationHops("source5"));
}

@Test
public void upstreamClustersTest() throws InterruptedException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
"source1.heartbeats", "source1.source2.heartbeats", "source3.source4.source5.heartbeats", "source6.topic1"));
Set<String> sources = client.upstreamClusters();
assertTrue(sources.contains("source1"));
assertTrue(sources.contains("source2"));
assertTrue(sources.contains("source3"));
assertTrue(sources.contains("source4"));
assertTrue(sources.contains("source5"));
assertFalse(sources.contains("source6")); // non-heartbeats topic can't indicate upstream cluster
assertFalse(sources.contains("sourceX"));
assertFalse(sources.contains(""));
assertFalse(sources.contains(null));
}

@Test
public void remoteTopicsTest() throws InterruptedException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
"source1.topic4", "source1.source2.topic5", "source3.source4.source5.topic6",
"source1.heartbeats", "source1.source2.heartbeats", "source3.source4.source5.heartbeats"));
Set<String> remoteTopics = client.remoteTopics();

// We recognize as remote only heartbeats topics.
assertFalse(remoteTopics.contains("topic1"));
assertFalse(remoteTopics.contains("topic2"));
assertFalse(remoteTopics.contains("topic3"));
assertFalse(remoteTopics.contains("source1.topic4"));
assertFalse(remoteTopics.contains("source1.source2.topic5"));
assertFalse(remoteTopics.contains("source3.source4.source5.topic6"));

assertTrue(remoteTopics.contains("source1.heartbeats"));
assertTrue(remoteTopics.contains("source1.source2.heartbeats"));
assertTrue(remoteTopics.contains("source3.source4.source5.heartbeats"));
}

@Test
public void remoteTopicsSeparatorTest() throws InterruptedException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
"source1__topic4", "source1__source2__topic5", "source3__source4__source5__topic6",
"source1__heartbeats", "source1__source2__heartbeats", "source3__source4__source5__heartbeats"));
((Configurable) client.replicationPolicy()).configure(
Collections.singletonMap("replication.policy.separator", "__"));
Set<String> remoteTopics = client.remoteTopics();

// We recognize as remote only heartbeats topics.
assertFalse(remoteTopics.contains("topic1"));
assertFalse(remoteTopics.contains("topic2"));
assertFalse(remoteTopics.contains("topic3"));
assertFalse(remoteTopics.contains("source1__topic4"));
assertFalse(remoteTopics.contains("source1__source2__topic5"));
assertFalse(remoteTopics.contains("source3__source4__source5__topic6"));

assertTrue(remoteTopics.contains("source1__heartbeats"));
assertTrue(remoteTopics.contains("source1__source2__heartbeats"));
assertTrue(remoteTopics.contains("source3__source4__source5__heartbeats"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,10 @@ List<TopicPartition> findSourceTopicPartitions()
List<TopicPartition> findTargetTopicPartitions()
throws InterruptedException, ExecutionException {
Set<String> topics = listTopics(targetAdminClient).stream()
.filter(t -> sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
// Allow replication policies that can't track back to the source of a topic (like LegacyReplicationPolicy)
// consider all topics as target topics.
.filter(t -> !replicationPolicy.canTrackSource(t)
|| sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
.collect(Collectors.toSet());
return describeTopics(targetAdminClient, topics).stream()
.flatMap(MirrorSourceConnector::expandTopicDescription)
Expand Down
Loading