-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-9726: Add LegacyReplicationPolicy for MM2 #9395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
909c5c2 to
409a20b
Compare
This commit adds a new replication policy for MirrorMaker 2, `LegacyReplicationPolicy`. This policy imitates MirrorMaker 1 behavior of not renaming replicated topics. The exception is made for `heartbeats` topic, that is replicated according to `DefaultReplicationPolicy`. Avoiding renaming topics brings a number of limitations, among which the most important one is the impossibility of detecting replication cycles. This makes cross-replication using `LegacyReplicationPolicy` effectively impossible. See `LegacyReplicationPolicy` Javadoc for details. A new method `canTrackSource` is added to `ReplicationPolicy`. Its result indicates if the replication policy can track back to the source topic of a topic. It is needed to allow detecting target topics work when `LegacyReplicationPolicy` is used. On the testing side, the tests have the same strategy as for `DefaultReplicationPolicy` with nicessary adjustments (e.g. no active/active replication is tested).
409a20b to
160719a
Compare
| || topic.startsWith("."); | ||
| } | ||
|
|
||
| /** Checks if the policy can track back to the source of the topic. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean by "track back to the source of the topic". The word "track" might mean a few things here, and it's not obvious what you mean. Can you clarify?
| } | ||
|
|
||
| /** Checks if the policy can track back to the source of the topic. */ | ||
| default boolean canTrackSource(String topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a public API change like this is required, you will need to propose a small KIP. I'm unclear why it's required tho, and ideally we would not alter the existing API if possible.
If a new method is required, I think "track" is too ambiguous and should not be used here.
| if (isOriginalTopicHeartbeats(topic)) { | ||
| return heartbeatTopicReplicationPolicy.topicSource(topic); | ||
| } else { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen alternative solutions floating around that use a configurable source here. Basically, the configuration passed to configure() is consulted to find the "source cluster", rather than looking at the topic name. That approach lets you return an actual source here, which obviates the new canTrackSource() method etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've explored this possibility, too. The main problem with it is that the replication policy should answer differently for source and target clusters. It's essential for methods like MirrorSourceConnector.isCycle and MirrorClient.remoteTopics. For a source, topicSource should return null; for a target, a predefined value.
It leaves two possibility. In one, we set up two different replication policy instances with different configurations, e.g.:
replication.policy.source.class=org.apache.kafka.connect.mirror.LegacyReplicationPolicy
replication.policy.source.source=
replication.policy.target.class=org.apache.kafka.connect.mirror.LegacyReplicationPolicy
replication.policy.target.source=primary-cluster
Of course, we can make that the current configurations work as before.
Another possibility is to modify the ReplicationPolicy interface to allow it to pass additional information out (like canTrackSource or similar) or in (like topicSource(String topic, boolean isSourceCluster)).
What do you think would be the best approach?
|
@ivanyu Can you close this since a new PR has been created that is up to date with the latest |
|
Yep, I'm closing this one |
This commit adds a new replication policy for MirrorMaker 2,
LegacyReplicationPolicy. This policy imitates MirrorMaker 1 behavior of not renaming replicated topics. The exception is made forheartbeatstopic, that is replicated according toDefaultReplicationPolicy.Avoiding renaming topics brings a number of limitations, among which the most important one is the impossibility of detecting replication cycles. This makes cross-replication using
LegacyReplicationPolicyeffectively impossible. SeeLegacyReplicationPolicyJavadoc for details.A new method
canTrackSourceis added toReplicationPolicy. Its result indicates if the replication policy can track back to the source topic of a topic. It is needed to allow detecting target topics work whenLegacyReplicationPolicyis used.On the testing side, the tests have the same strategy as for
DefaultReplicationPolicywith nicessary adjustments (e.g. no active/active replication is tested).Committer Checklist (excluded from commit message)