Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
447b38f
WIP LegacyReplicationPolicy and tests
ryannedolan May 7, 2021
8a1eec1
Merge branch 'trunk' of github.com:ryannedolan/kafka into trunk
ryannedolan May 7, 2021
697d212
WIP fix integration tests -- two FIXMEs
May 18, 2021
9af883e
steal mdedetrich's tests and IdentityReplicationPolicy, but without A…
May 20, 2021
53a2e2c
dropped LegacyReplicationPolicy
May 21, 2021
aaa10b2
WIP LegacyReplicationPolicy and tests
ryannedolan May 7, 2021
29a95e6
WIP fix integration tests -- two FIXMEs
May 18, 2021
a3d7cc8
steal mdedetrich's tests and IdentityReplicationPolicy, but without A…
May 20, 2021
7f621cf
dropped LegacyReplicationPolicy
May 21, 2021
2a03f8c
Merge branch 'KAFKA-9726' of github.com:ryannedolan/kafka into KAFKA-…
ryannedolan May 28, 2021
5050370
Merge branch 'apache:trunk' into KAFKA-9726
ryannedolan Jun 3, 2021
10ebae2
Merge branch 'apache:trunk' into KAFKA-9726
ryannedolan Jun 16, 2021
a9cf4e4
drop unnecessary change to mirror-client api
Jun 16, 2021
bf85481
improve tests for IdentityReplicationPolicy h/t @mimaison
Jun 16, 2021
627e099
drop unnecessary constructor from IdentityReplicationPolicy
Jun 21, 2021
b6628c4
Merge branch 'apache:trunk' into KAFKA-9726
ryannedolan Jun 21, 2021
5a5da9b
Merge branch 'KAFKA-9726' of github.com:ryannedolan/kafka into KAFKA-…
Jun 21, 2021
ada302c
mention IdentityReplicationPolicy in upgrade notes
Jun 29, 2021
5c99348
Merge branch 'apache:trunk' into KAFKA-9726
ryannedolan Jun 30, 2021
f9edd32
fix testOffsetSyncsTopicsOnTarget integration test to work with ident…
Jun 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating
* from legacy MM1, or for any use-case involving one-way replication.
*
* N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that
* your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely
* already be the case.
*/
public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class);

public static final String SOURCE_CLUSTER_ALIAS_CONFIG = "source.cluster.alias";

private String sourceClusterAlias = null;

@Override
public void configure(Map<String, ?> props) {
super.configure(props);
if (props.containsKey(SOURCE_CLUSTER_ALIAS_CONFIG)) {
sourceClusterAlias = (String) props.get(SOURCE_CLUSTER_ALIAS_CONFIG);
log.info("Using source cluster alias `{}`.", sourceClusterAlias);
}
}

/** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy does not include the source
* cluster alias in the remote topic name. Instead, topic names are unchanged.
*
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
*/
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
if (looksLikeHeartbeat(topic)) {
return super.formatRemoteTopic(sourceClusterAlias, topic);
} else {
return topic;
}
}

/** Unlike DefaultReplicationPolicy, IdendityReplicationPolicy cannot know the source of
* a remote topic based on its name alone. If `source.cluster.alias` is provided,
* `topicSource` will return that.
*
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
*/
@Override
public String topicSource(String topic) {
if (looksLikeHeartbeat(topic)) {
return super.topicSource(topic);
} else {
return sourceClusterAlias;
}
}

/** Since any topic may be a "remote topic", this just returns `topic`.
*
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
*/
@Override
public String upstreamTopic(String topic) {
if (looksLikeHeartbeat(topic)) {
return super.upstreamTopic(topic);
} else {
return topic;
}
}

private boolean looksLikeHeartbeat(String topic) {
return topic != null && topic.endsWith(MirrorClientConfig.HEARTBEATS_TOPIC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ Set<String> listTopics() throws InterruptedException {

int countHopsForTopic(String topic, String sourceClusterAlias) {
int hops = 0;
Set<String> visited = new HashSet<>();
while (true) {
hops++;
String source = replicationPolicy.topicSource(topic);
Expand All @@ -201,6 +202,12 @@ int countHopsForTopic(String topic, String sourceClusterAlias) {
if (source.equals(sourceClusterAlias)) {
return hops;
}
if (visited.contains(source)) {
// Extra check for IdentityReplicationPolicy and similar impls that cannot prevent cycles.
// We assume we're stuck in a cycle and will never find sourceClusterAlias.
return -1;
}
visited.add(source);
topic = replicationPolicy.upstreamTopic(topic);
}
}
Expand All @@ -223,7 +230,8 @@ boolean isRemoteTopic(String topic) {
Set<String> allSources(String topic) {
Set<String> sources = new HashSet<>();
String source = replicationPolicy.topicSource(topic);
while (source != null) {
while (source != null && !sources.contains(source)) {
// The extra Set.contains above is for ReplicationPolicies that cannot prevent cycles.
sources.add(source);
topic = replicationPolicy.upstreamTopic(topic);
source = replicationPolicy.topicSource(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface ReplicationPolicy {
*/
default String originalTopic(String topic) {
String upstream = upstreamTopic(topic);
if (upstream == null) {
if (upstream == null || upstream.equals(topic)) {
return topic;
} else {
return originalTopic(upstream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ private static class FakeMirrorClient extends MirrorClient {
List<String> topics;

FakeMirrorClient(List<String> topics) {
super(null, new DefaultReplicationPolicy(), null);
this(new DefaultReplicationPolicy(), topics);
}

FakeMirrorClient(ReplicationPolicy replicationPolicy, List<String> topics) {
super(null, replicationPolicy, null);
this.topics = topics;
}

Expand Down Expand Up @@ -131,6 +135,23 @@ public void upstreamClustersTest() throws InterruptedException {
assertFalse(sources.contains(null));
}

@Test
public void testIdentityReplicationUpstreamClusters() throws InterruptedException {
// IdentityReplicationPolicy treats heartbeats as a special case, so these should work as usual.
MirrorClient client = new FakeMirrorClient(identityReplicationPolicy("source"), Arrays.asList("topic1",
"topic2", "heartbeats", "source1.heartbeats", "source1.source2.heartbeats",
"source3.source4.source5.heartbeats"));
Set<String> sources = client.upstreamClusters();
assertTrue(sources.contains("source1"));
assertTrue(sources.contains("source2"));
assertTrue(sources.contains("source3"));
assertTrue(sources.contains("source4"));
assertTrue(sources.contains("source5"));
assertFalse(sources.contains(""));
assertFalse(sources.contains(null));
assertEquals(5, sources.size());
}

@Test
public void remoteTopicsTest() throws InterruptedException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
Expand All @@ -144,6 +165,20 @@ public void remoteTopicsTest() throws InterruptedException {
assertTrue(remoteTopics.contains("source3.source4.source5.topic6"));
}

@Test
public void testIdentityReplicationRemoteTopics() throws InterruptedException {
// IdentityReplicationPolicy should consider any topic to be remote.
MirrorClient client = new FakeMirrorClient(identityReplicationPolicy("source"), Arrays.asList(
"topic1", "topic2", "topic3", "heartbeats", "backup.heartbeats"));
Set<String> remoteTopics = client.remoteTopics();
assertTrue(remoteTopics.contains("topic1"));
assertTrue(remoteTopics.contains("topic2"));
assertTrue(remoteTopics.contains("topic3"));
// Heartbeats are treated as a special case
assertFalse(remoteTopics.contains("heartbeats"));
assertTrue(remoteTopics.contains("backup.heartbeats"));
}

@Test
public void remoteTopicsSeparatorTest() throws InterruptedException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
Expand All @@ -159,4 +194,25 @@ public void remoteTopicsSeparatorTest() throws InterruptedException {
assertTrue(remoteTopics.contains("source3__source4__source5__topic6"));
}

@Test
public void testIdentityReplicationTopicSource() {
MirrorClient client = new FakeMirrorClient(
identityReplicationPolicy("primary"), Arrays.asList());
assertEquals("topic1", client.replicationPolicy()
.formatRemoteTopic("primary", "topic1"));
assertEquals("primary", client.replicationPolicy()
.topicSource("topic1"));
// Heartbeats are handled as a special case
assertEquals("backup.heartbeats", client.replicationPolicy()
.formatRemoteTopic("backup", "heartbeats"));
assertEquals("backup", client.replicationPolicy()
.topicSource("backup.heartbeats"));
}

private ReplicationPolicy identityReplicationPolicy(String source) {
IdentityReplicationPolicy policy = new IdentityReplicationPolicy();
policy.configure(Collections.singletonMap(
IdentityReplicationPolicy.SOURCE_CLUSTER_ALIAS_CONFIG, source));
return policy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public Map<String, String> workerConfig(SourceAndTarget sourceAndTarget) {
props.putAll(stringsWithPrefix("header.converter"));
props.putAll(stringsWithPrefix("task"));
props.putAll(stringsWithPrefix("worker"));
props.putAll(stringsWithPrefix("replication.policy"));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In MirrorClientConfig ,it seems that it's not necessary to add replication.policy into the props.
As i see , org.apache.kafka.connect.mirror.MirrorClientConfig#replicationPolicy initialise the ReplicationPolicy instance,
and in MirrorClient this instance is used ,where ReplicationPolicy takes effect for real.


// transform any expression like ${provider:path:key}, since the worker doesn't do so
props = transform(props);
Expand Down Expand Up @@ -203,6 +204,7 @@ public Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget,
props.keySet().retainAll(MirrorConnectorConfig.CONNECTOR_CONFIG_DEF.names());

props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
props.putAll(stringsWithPrefix("replication.policy"));

Map<String, String> sourceClusterProps = clusterProps(sourceAndTarget.source());
// attrs non prefixed with producer|consumer|admin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,12 @@ boolean isCycle(String topic) {
} else if (source.equals(sourceAndTarget.target())) {
return true;
} else {
return isCycle(replicationPolicy.upstreamTopic(topic));
String upstreamTopic = replicationPolicy.upstreamTopic(topic);
if (upstreamTopic.equals(topic)) {
// Extra check for IdentityReplicationPolicy and similar impls that don't prevent cycles.
return false;
}
return isCycle(upstreamTopic);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,32 @@ public void testNoCycles() {
assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.source.topic1"), "should not allow cycles");
assertTrue(connector.shouldReplicateTopic("topic1"), "should allow anything else");
assertTrue(connector.shouldReplicateTopic("source.topic1"), "should allow anything else");
}

@Test
public void testIdentityReplication() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new IdentityReplicationPolicy(), x -> true, x -> true);
assertTrue(connector.shouldReplicateTopic("target.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("target.source.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("source.target.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("target.source.target.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("source.target.source.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("topic1"), "should allow normal topics");
assertTrue(connector.shouldReplicateTopic("othersource.topic1"), "should allow normal topics");
assertFalse(connector.shouldReplicateTopic("target.heartbeats"), "should not allow heartbeat cycles");
assertFalse(connector.shouldReplicateTopic("target.source.heartbeats"), "should not allow heartbeat cycles");
assertFalse(connector.shouldReplicateTopic("source.target.heartbeats"), "should not allow heartbeat cycles");
assertFalse(connector.shouldReplicateTopic("target.source.target.heartbeats"), "should not allow heartbeat cycles");
assertFalse(connector.shouldReplicateTopic("source.target.source.heartbeats"), "should not allow heartbeat cycles");
assertTrue(connector.shouldReplicateTopic("heartbeats"), "should allow heartbeat topics");
assertTrue(connector.shouldReplicateTopic("othersource.heartbeats"), "should allow heartbeat topics");
}

@Test
public void testAclFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
Expand Down
Loading