diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4cdc65383d52c..f6e28656c8dd2 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -36,7 +36,6 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition} -import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.collection.mutable.HashMap import scala.util.control.ControlThrowable @@ -58,7 +57,10 @@ import scala.util.{Failure, Success, Try} * enable.auto.commit=false * 3. Mirror Maker Setting: * abort.on.send.failure=true + * + * @deprecated Since 3.0, use the Connect-based MirrorMaker instead (aka MM2). */ +@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0") object MirrorMaker extends Logging with KafkaMetricsGroup { private[tools] var producer: MirrorMakerProducer = null @@ -80,6 +82,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def main(args: Array[String]): Unit = { + warn("This tool is deprecated and may be removed in a future major release.") info("Starting mirror maker") try { val opts = new MirrorMakerOptions(args) @@ -191,7 +194,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { setName(threadName) - @nowarn("cat=deprecation") private def toBaseConsumerRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord = BaseConsumerRecord(record.topic, record.partition, @@ -414,12 +416,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { * If message.handler.args is specified. A constructor that takes in a String as argument must exist. */ trait MirrorMakerMessageHandler { - @nowarn("cat=deprecation") def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] } private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { - @nowarn("cat=deprecation") override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, record.headers)) diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala index 0fca86b85517b..ef75787bb7deb 100644 --- a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala @@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.BeforeEach +@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", since = "3.0") class MirrorMakerIntegrationTest extends KafkaServerTestHarness { override def generateConfigs: Seq[KafkaConfig] = diff --git a/docs/upgrade.html b/docs/upgrade.html index 678984cf6b506..5a9555e9e489f 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -97,6 +97,10 @@
Notable changes in 3 DefaultReplicationPolicy, cannot prevent replication cycles based on topic names, so take care to avoid cycles when constructing your replication topology. +
  • The original MirrorMaker (MM1) and related classes have been deprecated. Please use the Connect-based + MirrorMaker (MM2), as described in the + Geo-Replication section. +
  • Notable changes in 2.8.0