-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-9726 IdentityReplicationPolicy #10652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...st/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
Show resolved
Hide resolved
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
Outdated
Show resolved
Hide resolved
|
@ryannedolan Since #10762 was merged maybe it makes sense to rebase against the current |
|
@mimaison As my PR at #10648 has been superseded by this one, would it be possible to do a quick review of it? Afaik its ready and @ryannedolan has done great work in avoiding the need for a KIP so there shouldn't be any blockers. |
|
None of the failing tests are related. Ready to merge. |
|
@ryannedolan @mdedetrich Thanks, I'll try to take a look this week |
mimaison
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ryannedolan for the PR. It looks good overall, I left a few small comments
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
Outdated
Show resolved
Hide resolved
connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
Show resolved
Hide resolved
connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
Show resolved
Hide resolved
|
@mimaison thanks for the feedback and suggestions. I've updated accordingly. |
...t/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java
Outdated
Show resolved
Hide resolved
|
@ryannedolan I actually just realized, does it make sense to add the |
|
Yes, we can add a line in https://github.com/apache/kafka/blob/trunk/docs/upgrade.html#L85 to introduce this new ReplicationPolicy |
|
Is there anything left on this PR to be merged (apart from the changelog which is a nice to have)? |
|
I think changelog and documentation updates can come after this is merged. |
|
Let's put a line in the changelog now so we're sure it's included in the release notes. I'm happy to merge once this is done. I agree the documentation can come later. |
| in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details. | ||
| </li> | ||
|
|
||
| <li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison @mdedetrich wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make a quick amendment stating that it works like the original MM1 to make it more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think it's enough, thanks
|
@ryannedolan I was about to merge but I noticed |
Yep good catch. Fixed! |
|
Thanks @ryannedolan, @mdedetrich and @ivanyu |
This new policy enables active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy". This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously. When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles. Reviewers: Mickael Maison <mickael.maison@gmail.com> Co-authored-by: Ryanne Dolan <rdolan@twitter.com> Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com> Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io>
KAFKA-9726: Add IdentityReplicationPolicy to MirrorMaker2 (apache#10652)
This new policy enables active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy". This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously. When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles. Reviewers: Mickael Maison <mickael.maison@gmail.com> Co-authored-by: Ryanne Dolan <rdolan@twitter.com> Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com> Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io>
| props.putAll(stringsWithPrefix("header.converter")); | ||
| props.putAll(stringsWithPrefix("task")); | ||
| props.putAll(stringsWithPrefix("worker")); | ||
| props.putAll(stringsWithPrefix("replication.policy")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In MirrorClientConfig ,it seems that it's not necessary to add replication.policy into the props.
As i see , org.apache.kafka.connect.mirror.MirrorClientConfig#replicationPolicy initialise the ReplicationPolicy instance,
and in MirrorClient this instance is used ,where ReplicationPolicy takes effect for real.
|
|
||
| <li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics. | ||
| The existing <code>DefaultReplicationPolicy</code> is still used by default, but identity replication can be enabled via the | ||
| <code>replication.policy</code> configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's more clear to say "replication.policy.class " here ,you know , which means it's configured in that form in mm2.properties file.
Friendly to beginners .
This new policy enables active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy". This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously. When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles. Reviewers: Mickael Maison <mickael.maison@gmail.com> Co-authored-by: Ryanne Dolan <rdolan@twitter.com> Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com> Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io>
This new policy enables active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy". This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously. When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles. Reviewers: Mickael Maison <mickael.maison@gmail.com> Co-authored-by: Ryanne Dolan <rdolan@twitter.com> Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com> Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io>
This new policy enables active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy". This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously. When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles. Reviewers: Mickael Maison <mickael.maison@gmail.com> Co-authored-by: Ryanne Dolan <rdolan@twitter.com> Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com> Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io>
Enable active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy".
This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously.
When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles.
This PR includes major contributions from @mdedetrich and @ivanyu, so please include them in the commit log!