Skip to content

Commit

Permalink
LC-290 Handle Mbean already registered (#213)
Browse files Browse the repository at this point in the history
* LC-290 Handle Mbean already registered

Connect does not support Mbean isolation per connector. As a result when more tasks are used it will end up creating the same instance and it will fail with exception. The code change adds the task into the mbean name. Also on task being stopped it unregisters the mbean.

* Fix the unit tests

---------

Co-authored-by: stheppi <jenkins@lenses.io>
  • Loading branch information
stheppi and stheppi authored Feb 10, 2025
1 parent 4db242c commit 3efef7c
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ class HttpSinkConnector extends SinkConnector with LazyLogging with JarManifestP

override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] = {
logger.info(s"[$sinkName] Creating $maxTasks tasks config")
List.fill(maxTasks) {
props.map(_.asJava).getOrElse(Map.empty[String, String].asJava)
}.asJava
val taskConfigs = (0 until maxTasks).map { taskNumber =>
val taskProps = props.getOrElse(Map.empty).updated(HttpSinkConfigDef.TaskNumberProp, taskNumber.toString)
taskProps.asJava
}
taskConfigs.asJava
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@ import cats.effect.Deferred
import cats.effect.IO
import cats.effect.Ref
import cats.effect.unsafe.IORuntime
import cats.syntax.all._
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.common.util.AsciiArtPrinter.printAsciiHeader
import io.lenses.streamreactor.common.utils.JarManifestProvided
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfig
import io.lenses.streamreactor.connect.http.sink.config.HttpSinkConfigDef
import io.lenses.streamreactor.connect.http.sink.metrics.HttpSinkMetrics
import io.lenses.streamreactor.connect.http.sink.metrics.MetricsRegistrar
import io.lenses.streamreactor.connect.http.sink.tpl.RawTemplate
import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition }
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask
import cats.syntax.all._
import io.lenses.streamreactor.common.utils.JarManifestProvided
import io.lenses.streamreactor.connect.http.sink.metrics.HttpSinkMetrics
import io.lenses.streamreactor.connect.http.sink.metrics.MetricsRegistrar

import java.util
import scala.jdk.CollectionConverters.IterableHasAsScala
Expand All @@ -54,7 +55,8 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided {
private def sinkName = maybeSinkName.getOrElse("Lenses.io HTTP Sink")
private val deferred: Deferred[IO, Either[Throwable, Unit]] = Deferred.unsafe[IO, Either[Throwable, Unit]]

private val errorRef: Ref[IO, List[Throwable]] = Ref.unsafe[IO, List[Throwable]](List.empty)
private val taskNumberRef: Ref[IO, Int] = Ref.unsafe[IO, Int](0)
private val errorRef: Ref[IO, List[Throwable]] = Ref.unsafe[IO, List[Throwable]](List.empty)

override def start(props: util.Map[String, String]): Unit = {

Expand All @@ -69,9 +71,14 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided {
}

(for {
config <- IO.fromEither(HttpSinkConfig.from(propsAsScala))
taskNumber <- propsAsScala.getOrElse(HttpSinkConfigDef.TaskNumberProp, "1").toIntOption match {
case Some(value) => IO(value)
case None => IO.raiseError(new IllegalArgumentException("Task number must be an integer"))
}
_ <- taskNumberRef.set(taskNumber)
config <- IO.fromEither(HttpSinkConfig.from(propsAsScala - HttpSinkConfigDef.TaskNumberProp))
metrics <- IO(new HttpSinkMetrics())
_ <- IO(MetricsRegistrar.registerMetricsMBean(metrics, sinkName))
_ <- IO(MetricsRegistrar.registerMetricsMBean(metrics, sinkName, taskNumber))
template = RawTemplate(config.endpoint, config.content, config.headers, config.nullPayloadHandler)
writerManager <- HttpWriterManager.apply(sinkName, config, template, deferred, metrics)
_ <- writerManager.start(refUpdateCallback)
Expand Down Expand Up @@ -180,6 +187,8 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided {

override def stop(): Unit =
(for {
taskNumber <- taskNumberRef.get
_ <- IO(MetricsRegistrar.unregisterMetricsMBean(sinkName, taskNumber))
_ <- maybeWriterManager.traverse { x =>
x.closeReportingControllers()
x.close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import scala.jdk.CollectionConverters._

object HttpSinkConfigDef {

val ConnectorPrefix: String = "connect.http"
val HttpMethodProp: String = "connect.http.method"
val TaskNumberProp: String = "connect.http.task.number"

val HttpMethodProp: String = "connect.http.method"
val HttpMethodDoc: String =
"""
|The HTTP method to use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,51 @@
package io.lenses.streamreactor.connect.http.sink.metrics

import java.lang.management.ManagementFactory
import javax.management.MBeanServer
import javax.management.ObjectName

object MetricsRegistrar {

val NameTemplate = "io.lenses.streamreactor.connect.http.sink:type=metrics,name=%s"
val NameTemplate = "io.lenses.streamreactor.connect.http.sink:type=metrics,name=%s,task=%d"

/**
* Register the metrics MBean exposing the count on 200, 400, 500 and other response codes as well as the http request time percentiles
* @param metrics
* @param sinkName
*/
def registerMetricsMBean(metrics: HttpSinkMetricsMBean, sinkName: String): Unit = {
val mbs = ManagementFactory.getPlatformMBeanServer
val objectName = new ObjectName(s"io.lenses.streamreactor.connect.http.sink:type=metrics,name=$sinkName")
mbs.registerMBean(metrics, objectName)
def registerMetricsMBean(metrics: HttpSinkMetricsMBean, sinkName: String, taskNumber: Int): Unit = {
val mbs: MBeanServer = ManagementFactory.getPlatformMBeanServer
val objectName =
new ObjectName(s"io.lenses.streamreactor.connect.http.sink:type=metrics,name=$sinkName,task=$taskNumber")
registerMBeanWithRetry(mbs, metrics, objectName)
()
}

def unregisterMetricsMBean(sinkName: String, taskNumber: Int): Unit = {
val mbs: MBeanServer = ManagementFactory.getPlatformMBeanServer
val objectName =
new ObjectName(s"io.lenses.streamreactor.connect.http.sink:type=metrics,name=$sinkName,task=$taskNumber")
if (mbs.isRegistered(objectName)) {
mbs.unregisterMBean(objectName)
}
}

private def registerMBeanWithRetry(mbs: MBeanServer, metrics: HttpSinkMetricsMBean, objectName: ObjectName): Unit = {
@annotation.tailrec
def retry(): Unit =
try {
if (mbs.isRegistered(objectName)) {
mbs.unregisterMBean(objectName)
}
mbs.registerMBean(metrics, objectName)
()
} catch {
// Connect might recreate the task on the same worker and thus the MBean might already be registered
// If so unregister and retry
case _: javax.management.InstanceAlreadyExistsException =>
mbs.unregisterMBean(objectName)
retry()
}
retry()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc
val userName = "user"
val password = "password"
val metrics = new HttpSinkMetrics
MetricsRegistrar.registerMetricsMBean(metrics, sinkName)
MetricsRegistrar.registerMetricsMBean(metrics, sinkName, 1)
val sender = new BasicAuthenticationHttpRequestSender(sinkName, method, client, userName, password, metrics)
val template = ProcessedTemplate(
"http://localhost:8080",
Expand All @@ -73,7 +73,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc
metrics.get5xxCount shouldBe 0

val mbs = ManagementFactory.getPlatformMBeanServer
val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName))
val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName, 1))
val mbean = mbs.getMBeanInfo(objectName)
mbean.getAttributes.map(_.getName) should contain allElementsOf List("4xxCount",
"5xxCount",
Expand Down Expand Up @@ -102,7 +102,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc
val userName = "user"
val password = "password"
val metrics = new HttpSinkMetrics
MetricsRegistrar.registerMetricsMBean(metrics, sinkName)
MetricsRegistrar.registerMetricsMBean(metrics, sinkName, 1)
val sender = new BasicAuthenticationHttpRequestSender(sinkName, method, client, userName, password, metrics)
val template = ProcessedTemplate(
"http://localhost:8080",
Expand All @@ -129,7 +129,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc
metrics.get5xxCount shouldBe 1

val mbs = ManagementFactory.getPlatformMBeanServer
val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName))
val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName, 1))
val mbean = mbs.getMBeanInfo(objectName)
mbean.getAttributes.map(_.getName) should contain allElementsOf List("4xxCount",
"5xxCount",
Expand Down Expand Up @@ -159,7 +159,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc
val userName = "user"
val password = "password"
val metrics = new HttpSinkMetrics
MetricsRegistrar.registerMetricsMBean(metrics, sinkName)
MetricsRegistrar.registerMetricsMBean(metrics, sinkName, 1)
val sender = new BasicAuthenticationHttpRequestSender(sinkName, method, client, userName, password, metrics)
val template = ProcessedTemplate(
"http://localhost:8080",
Expand All @@ -186,7 +186,7 @@ class BasicAuthenticationHttpRequestSenderTest extends AnyFunSuiteLike with Matc
metrics.get5xxCount shouldBe 0

val mbs = ManagementFactory.getPlatformMBeanServer
val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName))
val objectName = new ObjectName(MetricsRegistrar.NameTemplate.format(sinkName, 1))
val mbean = mbs.getMBeanInfo(objectName)
mbean.getAttributes.map(_.getName) should contain allElementsOf List("4xxCount",
"5xxCount",
Expand Down

0 comments on commit 3efef7c

Please sign in to comment.