From 3efef7cac22e8c37be27266960bbaabd27815514 Mon Sep 17 00:00:00 2001 From: Stefan Bocutiu Date: Mon, 10 Feb 2025 14:54:36 +0000 Subject: [PATCH] LC-290 Handle Mbean already registered (#213) * LC-290 Handle Mbean already registered Connect does not support Mbean isolation per connector. As a result when more tasks are used it will end up creating the same instance and it will fail with exception. The code change adds the task into the mbean name. Also on task being stopped it unregisters the mbean. * Fix the unit tests --------- Co-authored-by: stheppi --- .../connect/http/sink/HttpSinkConnector.scala | 8 ++-- .../connect/http/sink/HttpSinkTask.scala | 23 +++++++---- .../http/sink/config/HttpSinkConfigDef.scala | 5 ++- .../http/sink/metrics/MetricsRegistrar.scala | 40 ++++++++++++++++--- ...cAuthenticationHttpRequestSenderTest.scala | 12 +++--- 5 files changed, 65 insertions(+), 23 deletions(-) diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala index 762e319f5..59c718864 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConnector.scala @@ -51,8 +51,10 @@ class HttpSinkConnector extends SinkConnector with LazyLogging with JarManifestP override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = { logger.info(s"[$sinkName] Creating $maxTasks tasks config") - List.fill(maxTasks) { - props.map(_.asJava).getOrElse(Map.empty[String, String].asJava) - }.asJava + val taskConfigs = (0 until maxTasks).map { taskNumber => + val taskProps = props.getOrElse(Map.empty).updated(HttpSinkConfigDef.TaskNumberProp, taskNumber.toString) + taskProps.asJava + } + taskConfigs.asJava } } diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala index a95855603..693cfb8d9 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala @@ -21,12 +21,17 @@ import cats.effect.Deferred import cats.effect.IO import cats.effect.Ref import cats.effect.unsafe.IORuntime +import cats.syntax.all._ import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.common.util.AsciiArtPrinter.printAsciiHeader +import io.lenses.streamreactor.common.utils.JarManifestProvided import io.lenses.streamreactor.connect.cloud.common.model.Offset import io.lenses.streamreactor.connect.cloud.common.model.Topic import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig +import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef +import io.lenses.streamreactor.connect.http.sink.metrics.HttpSinkMetrics +import io.lenses.streamreactor.connect.http.sink.metrics.MetricsRegistrar import io.lenses.streamreactor.connect.http.sink.tpl.RawTemplate import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType import org.apache.kafka.clients.consumer.OffsetAndMetadata @@ -34,10 +39,6 @@ import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition } import org.apache.kafka.connect.errors.ConnectException import org.apache.kafka.connect.sink.SinkRecord import org.apache.kafka.connect.sink.SinkTask -import cats.syntax.all._ -import io.lenses.streamreactor.common.utils.JarManifestProvided -import io.lenses.streamreactor.connect.http.sink.metrics.HttpSinkMetrics -import io.lenses.streamreactor.connect.http.sink.metrics.MetricsRegistrar import java.util import scala.jdk.CollectionConverters.IterableHasAsScala @@ -54,7 +55,8 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided { private def sinkName = maybeSinkName.getOrElse("Lenses.io HTTP Sink") private val deferred: Deferred[IO, Either[Throwable, Unit]] = Deferred.unsafe[IO, Either[Throwable, Unit]] - private val errorRef: Ref[IO, List[Throwable]] = Ref.unsafe[IO, List[Throwable]](List.empty) + private val taskNumberRef: Ref[IO, Int] = Ref.unsafe[IO, Int](0) + private val errorRef: Ref[IO, List[Throwable]] = Ref.unsafe[IO, List[Throwable]](List.empty) override def start(props: util.Map[String, String]): Unit = { @@ -69,9 +71,14 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided { } (for { - config <- IO.fromEither(HttpSinkConfig.from(propsAsScala)) + taskNumber <- propsAsScala.getOrElse(HttpSinkConfigDef.TaskNumberProp, "1").toIntOption match { + case Some(value) => IO(value) + case None => IO.raiseError(new IllegalArgumentException("Task number must be an integer")) + } + _ <- taskNumberRef.set(taskNumber) + config <- IO.fromEither(HttpSinkConfig.from(propsAsScala - HttpSinkConfigDef.TaskNumberProp)) metrics <- IO(new HttpSinkMetrics()) - _ <- IO(MetricsRegistrar.registerMetricsMBean(metrics, sinkName)) + _ <- IO(MetricsRegistrar.registerMetricsMBean(metrics, sinkName, taskNumber)) template = RawTemplate(config.endpoint, config.content, config.headers, config.nullPayloadHandler) writerManager <- HttpWriterManager.apply(sinkName, config, template, deferred, metrics) _ <- writerManager.start(refUpdateCallback) @@ -180,6 +187,8 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided { override def stop(): Unit = (for { + taskNumber <- taskNumberRef.get + _ <- IO(MetricsRegistrar.unregisterMetricsMBean(sinkName, taskNumber)) _ <- maybeWriterManager.traverse { x => x.closeReportingControllers() x.close diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala index 2167aab26..13e24b047 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala @@ -31,8 +31,9 @@ import scala.jdk.CollectionConverters._ object HttpSinkConfigDef { - val ConnectorPrefix: String = "connect.http" - val HttpMethodProp: String = "connect.http.method" + val TaskNumberProp: String = "connect.http.task.number" + + val HttpMethodProp: String = "connect.http.method" val HttpMethodDoc: String = """ |The HTTP method to use. diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/metrics/MetricsRegistrar.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/metrics/MetricsRegistrar.scala index 9430d5db8..957f77ba9 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/metrics/MetricsRegistrar.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/metrics/MetricsRegistrar.scala @@ -16,21 +16,51 @@ package io.lenses.streamreactor.connect.http.sink.metrics import java.lang.management.ManagementFactory +import javax.management.MBeanServer import javax.management.ObjectName object MetricsRegistrar { - val NameTemplate = "io.lenses.streamreactor.connect.http.sink:type=metrics,name=%s" + val NameTemplate = "io.lenses.streamreactor.connect.http.sink:type=metrics,name=%s,task=%d" /** * Register the metrics MBean exposing the count on 200, 400, 500 and other response codes as well as the http request time percentiles * @param metrics * @param sinkName */ - def registerMetricsMBean(metrics: HttpSinkMetricsMBean, sinkName: String): Unit = { - val mbs = ManagementFactory.getPlatformMBeanServer - val objectName = new ObjectName(s"io.lenses.streamreactor.connect.http.sink:type=metrics,name=$sinkName") - mbs.registerMBean(metrics, objectName) + def registerMetricsMBean(metrics: HttpSinkMetricsMBean, sinkName: String, taskNumber: Int): Unit = { + val mbs: MBeanServer = ManagementFactory.getPlatformMBeanServer + val objectName = + new ObjectName(s"io.lenses.streamreactor.connect.http.sink:type=metrics,name=$sinkName,task=$taskNumber") + registerMBeanWithRetry(mbs, metrics, objectName) () } + + def unregisterMetricsMBean(sinkName: String, taskNumber: Int): Unit = { + val mbs: MBeanServer = ManagementFactory.getPlatformMBeanServer + val objectName = + new ObjectName(s"io.lenses.streamreactor.connect.http.sink:type=metrics,name=$sinkName,task=$taskNumber") + if (mbs.isRegistered(objectName)) { + mbs.unregisterMBean(objectName) + } + } + + private def registerMBeanWithRetry(mbs: MBeanServer, metrics: HttpSinkMetricsMBean, objectName: ObjectName): Unit = { + @annotation.tailrec + def retry(): Unit = + try { + if (mbs.isRegistered(objectName)) { + mbs.unregisterMBean(objectName) + } + mbs.registerMBean(metrics, objectName) + () + } catch { + // Connect might recreate the task on the same worker and thus the MBean might already be registered + // If so unregister and retry + case _: javax.management.InstanceAlreadyExistsException => + mbs.unregisterMBean(objectName) + retry() + } + retry() + } } diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/client/BasicAuthenticationHttpRequestSenderTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/client/BasicAuthenticationHttpRequestSenderTest.scala index 3f02a6f32..ffa30b3e2 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/client/BasicAuthenticationHttpRequestSenderTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/client/BasicAuthenticationHttpRequestSenderTest.scala @@ -46,7 +46,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc val userName = "user" val password = "password" val metrics = new HttpSinkMetrics - MetricsRegistrar.registerMetricsMBean(metrics, sinkName) + MetricsRegistrar.registerMetricsMBean(metrics, sinkName, 1) val sender = new BasicAuthenticationHttpRequestSender(sinkName, method, client, userName, password, metrics) val template = ProcessedTemplate( "http://localhost:8080", @@ -73,7 +73,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc metrics.get5xxCount shouldBe 0 val mbs = ManagementFactory.getPlatformMBeanServer - val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName)) + val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName, 1)) val mbean = mbs.getMBeanInfo(objectName) mbean.getAttributes.map(_.getName) should contain allElementsOf List("4xxCount", "5xxCount", @@ -102,7 +102,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc val userName = "user" val password = "password" val metrics = new HttpSinkMetrics - MetricsRegistrar.registerMetricsMBean(metrics, sinkName) + MetricsRegistrar.registerMetricsMBean(metrics, sinkName, 1) val sender = new BasicAuthenticationHttpRequestSender(sinkName, method, client, userName, password, metrics) val template = ProcessedTemplate( "http://localhost:8080", @@ -129,7 +129,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc metrics.get5xxCount shouldBe 1 val mbs = ManagementFactory.getPlatformMBeanServer - val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName)) + val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName, 1)) val mbean = mbs.getMBeanInfo(objectName) mbean.getAttributes.map(_.getName) should contain allElementsOf List("4xxCount", "5xxCount", @@ -159,7 +159,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc val userName = "user" val password = "password" val metrics = new HttpSinkMetrics - MetricsRegistrar.registerMetricsMBean(metrics, sinkName) + MetricsRegistrar.registerMetricsMBean(metrics, sinkName, 1) val sender = new BasicAuthenticationHttpRequestSender(sinkName, method, client, userName, password, metrics) val template = ProcessedTemplate( "http://localhost:8080", @@ -186,7 +186,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc metrics.get5xxCount shouldBe 0 val mbs = ManagementFactory.getPlatformMBeanServer - val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName)) + val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName, 1)) val mbean = mbs.getMBeanInfo(objectName) mbean.getAttributes.map(_.getName) should contain allElementsOf List("4xxCount", "5xxCount",