Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Prometheus to 1.2.1 #2145

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ lazy val prometheusBackend = (projectMatrix in file("observability/prometheus-ba
.settings(
name := "prometheus-backend",
libraryDependencies ++= Seq(
"io.prometheus" % "simpleclient" % "0.16.0"
"io.prometheus" % "prometheus-metrics-core" % "1.2.1"
),
scalaTest
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package sttp.client4.prometheus

import io.prometheus.metrics.core.datapoints.{GaugeDataPoint, Timer}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another follow-up task: in tapir, we are using some standardized labels for the http metrics; I suspect there might be some standards for opentelemetry / prometheus as well. If so, we should probably use these by default

import io.prometheus.metrics.core.metrics.{Counter, Gauge, Histogram, Summary}
import io.prometheus.metrics.model.registry.{Collector, PrometheusRegistry}

import java.util.concurrent.ConcurrentHashMap
import sttp.client4.{wrappers, _}
import io.prometheus.client.{CollectorRegistry, Counter, Gauge, Histogram, Summary}
import sttp.client4.listener.{ListenerBackend, RequestListener}
import sttp.client4.prometheus.PrometheusBackend.RequestCollectors
import sttp.client4.wrappers.FollowRedirectsBackend
Expand Down Expand Up @@ -64,11 +67,11 @@ object PrometheusBackend {
(r: (GenericRequest[_, _], Throwable)) => config.requestToFailureCounterMapper(r._1, r._2),
(req: GenericRequest[_, _]) => config.requestToSizeSummaryMapper(req),
(rr: (GenericRequest[_, _], Response[_])) => config.responseToSizeSummaryMapper(rr._1, rr._2),
config.collectorRegistry,
cacheFor(histograms, config.collectorRegistry),
cacheFor(gauges, config.collectorRegistry),
cacheFor(counters, config.collectorRegistry),
cacheFor(summaries, config.collectorRegistry)
config.prometheusRegistry,
cacheFor(histograms, config.prometheusRegistry),
cacheFor(gauges, config.prometheusRegistry),
cacheFor(counters, config.prometheusRegistry),
cacheFor(summaries, config.prometheusRegistry)
)

/** Add, if not present, a "method" label. That is, if the user already supplied such a label, it is left as-is.
Expand Down Expand Up @@ -117,12 +120,15 @@ object PrometheusBackend {

/** Clear cached collectors (gauges and histograms) both from the given collector registry, and from the backend.
*/
def clear(collectorRegistry: CollectorRegistry): Unit = {
collectorRegistry.clear()
histograms.remove(collectorRegistry)
gauges.remove(collectorRegistry)
counters.remove(collectorRegistry)
summaries.remove(collectorRegistry)
def clear(prometheusRegistry: PrometheusRegistry): Unit = {
unregister(prometheusRegistry, histograms)
histograms.remove(prometheusRegistry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the .remove could be part of the unregister maybe?

unregister(prometheusRegistry, gauges)
gauges.remove(prometheusRegistry)
unregister(prometheusRegistry, counters)
counters.remove(prometheusRegistry)
unregister(prometheusRegistry, summaries)
summaries.remove(prometheusRegistry)
()
}

Expand All @@ -132,19 +138,25 @@ object PrometheusBackend {
Hence, we need to store a global cache o created histograms/gauges, so that we can properly re-use them.
*/

private val histograms = new mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, Histogram]]
private val gauges = new mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, Gauge]]
private val counters = new mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, Counter]]
private val summaries = new mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, Summary]]
private def unregister[T <: Collector](prometheusRegistry: PrometheusRegistry, collectors: mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, T]]): Unit =
collectors
kciesielski marked this conversation as resolved.
Show resolved Hide resolved
.getOrElse(prometheusRegistry, new ConcurrentHashMap[String, T]())
.values()
.forEach(c => prometheusRegistry.unregister(c))

private val histograms = new mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, Histogram]]
private val gauges = new mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, Gauge]]
private val counters = new mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, Counter]]
private val summaries = new mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, Summary]]

private def cacheFor[T](
cache: mutable.WeakHashMap[CollectorRegistry, ConcurrentHashMap[String, T]],
collectorRegistry: CollectorRegistry
cache: mutable.WeakHashMap[PrometheusRegistry, ConcurrentHashMap[String, T]],
prometheusRegistry: PrometheusRegistry
): ConcurrentHashMap[String, T] =
cache.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a follow-up task, we should remove the .synchronized, as it shouldn't be used in Java21+, and repalce it with some kind of a concurrent data structure (or improve the cache in general)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe we can do it right away, this is invoked only during the backend's construction, so a simple lock should suffice in fact; but we should also protect the cleanup method, then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about separate PR focused on cache I would prefer not to mix it with this update

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok :)

cache.getOrElseUpdate(collectorRegistry, new ConcurrentHashMap[String, T]())
cache.getOrElseUpdate(prometheusRegistry, new ConcurrentHashMap[String, T]())
}
final case class RequestCollectors(maybeTimer: Option[Histogram.Timer], maybeGauge: Option[Gauge.Child])
final case class RequestCollectors(maybeTimer: Option[Timer], maybeGauge: Option[GaugeDataPoint])
}

class PrometheusListener(
Expand All @@ -155,22 +167,22 @@ class PrometheusListener(
requestToFailureCounterMapper: ((GenericRequest[_, _], Exception)) => Option[CollectorConfig],
requestToSizeSummaryMapper: GenericRequest[_, _] => Option[CollectorConfig],
responseToSizeSummaryMapper: ((GenericRequest[_, _], Response[_])) => Option[CollectorConfig],
collectorRegistry: CollectorRegistry,
prometheusRegistry: PrometheusRegistry,
histogramsCache: ConcurrentHashMap[String, Histogram],
gaugesCache: ConcurrentHashMap[String, Gauge],
countersCache: ConcurrentHashMap[String, Counter],
summariesCache: ConcurrentHashMap[String, Summary]
) extends RequestListener[Identity, RequestCollectors] {

override def beforeRequest(request: GenericRequest[_, _]): RequestCollectors = {
val requestTimer: Option[Histogram.Timer] = for {
val requestTimer: Option[Timer] = for {
histogramData <- requestToHistogramNameMapper(request)
histogram: Histogram = getOrCreateMetric(histogramsCache, histogramData, createNewHistogram)
} yield histogram.labels(histogramData.labelValues: _*).startTimer()
} yield histogram.labelValues(histogramData.labelValues: _*).startTimer()

val gauge: Option[Gauge.Child] = for {
val gauge: Option[GaugeDataPoint] = for {
gaugeData <- requestToInProgressGaugeNameMapper(request)
} yield getOrCreateMetric(gaugesCache, gaugeData, createNewGauge).labels(gaugeData.labelValues: _*)
} yield getOrCreateMetric(gaugesCache, gaugeData, createNewGauge).labelValues(gaugeData.labelValues: _*)

observeRequestContentLengthSummaryIfMapped(request, requestToSizeSummaryMapper)

Expand Down Expand Up @@ -214,7 +226,7 @@ class PrometheusListener(
mapper: T => Option[BaseCollectorConfig]
): Unit =
mapper(request).foreach { data =>
getOrCreateMetric(countersCache, data, createNewCounter).labels(data.labelValues: _*).inc()
getOrCreateMetric(countersCache, data, createNewCounter).labelValues(data.labelValues: _*).inc()
}

private def observeResponseContentLengthSummaryIfMapped(
Expand All @@ -224,7 +236,7 @@ class PrometheusListener(
): Unit =
mapper((request, response)).foreach { data =>
response.contentLength.map(_.toDouble).foreach { size =>
getOrCreateMetric(summariesCache, data, createNewSummary).labels(data.labelValues: _*).observe(size)
getOrCreateMetric(summariesCache, data, createNewSummary).labelValues(data.labelValues: _*).observe(size)
}
}

Expand All @@ -234,7 +246,7 @@ class PrometheusListener(
): Unit =
mapper(request).foreach { data =>
(request.contentLength: Option[Long]).map(_.toDouble).foreach { size =>
getOrCreateMetric(summariesCache, data, createNewSummary).labels(data.labelValues: _*).observe(size)
getOrCreateMetric(summariesCache, data, createNewSummary).labelValues(data.labelValues: _*).observe(size)
}
}

Expand All @@ -252,36 +264,36 @@ class PrometheusListener(

private def createNewHistogram(data: HistogramCollectorConfig): Histogram =
Histogram
.build()
.buckets(data.buckets: _*)
.builder()
.classicUpperBounds(data.buckets: _*)
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.register(collectorRegistry)
.register(prometheusRegistry)

private def createNewGauge(data: BaseCollectorConfig): Gauge =
Gauge
.build()
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.register(collectorRegistry)
.register(prometheusRegistry)

private def createNewCounter(data: BaseCollectorConfig): Counter =
Counter
.build()
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.register(collectorRegistry)
.register(prometheusRegistry)

private def createNewSummary(data: BaseCollectorConfig): Summary =
Summary
.build()
.builder()
.name(data.collectorName)
.labelNames(data.labelNames: _*)
.help(data.collectorName)
.register(collectorRegistry)
.register(prometheusRegistry)
}

trait BaseCollectorConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sttp.client4.prometheus

import io.prometheus.client.CollectorRegistry
import io.prometheus.metrics.model.registry.PrometheusRegistry
import sttp.client4.GenericRequest
import sttp.client4.Response
import sttp.client4.prometheus.PrometheusBackend._
Expand All @@ -25,7 +25,7 @@ final case class PrometheusConfig(
responseToSizeSummaryMapper: (GenericRequest[_, _], Response[_]) => Option[CollectorConfig] =
(req: GenericRequest[_, _], resp: Response[_]) =>
Some(addStatusLabel(addMethodLabel(CollectorConfig(DefaultResponseSizeName), req), resp)),
collectorRegistry: CollectorRegistry = CollectorRegistry.defaultRegistry
prometheusRegistry: PrometheusRegistry = PrometheusRegistry.defaultRegistry
)

object PrometheusConfig {
Expand Down
Loading
Loading