Skip to content

Commit

Permalink
chore: cleanup from metric prefix change (#13566)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Aug 19, 2024
1 parent a143a95 commit e45bc07
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,63 +29,48 @@ object ClientConfigurationSupport {
// TODO move these metrics into a centralized metric registry as part of the MetricClient refactor/cleanup
.onAbort { l ->
logger.warn { "Attempt aborted. Attempt count ${l.attemptCount}" }
meterRegistry?.let {
r ->
r.counter(
"$metricPrefix.abort",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.result?.request?.method ?: UNKNOWN),
*getUrlTags(l.result?.request?.url),
).increment()
}
meterRegistry?.counter(
"$metricPrefix.abort",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.result?.request?.method ?: UNKNOWN),
*getUrlTags(l.result?.request?.url),
)?.increment()
}
.onFailure { l ->
logger.error(l.exception) { "Failed to call ${l.result?.request?.url ?: UNKNOWN}. Last response: ${l.result}" }
meterRegistry?.let {
r ->
r.counter(
"$metricPrefix.failure",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.result?.request?.method ?: UNKNOWN),
*getUrlTags(l.result?.request?.url),
).increment()
}
meterRegistry?.counter(
"$metricPrefix.failure",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.result?.request?.method ?: UNKNOWN),
*getUrlTags(l.result?.request?.url),
)?.increment()
}
.onRetry { l ->
logger.warn { "Retry attempt ${l.attemptCount} of $maxRetries. Last response: ${l.lastResult}" }
meterRegistry?.let {
r ->
r.counter(
"$metricPrefix.retry",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.lastResult?.request?.method ?: UNKNOWN),
*getUrlTags(l.lastResult?.request?.url),
).increment()
}
meterRegistry?.counter(
"$metricPrefix.retry",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.lastResult?.request?.method ?: UNKNOWN),
*getUrlTags(l.lastResult?.request?.url),
)?.increment()
}
.onRetriesExceeded { l ->
logger.error(l.exception) { "Retry attempts exceeded." }
meterRegistry?.let {
r ->
r.counter(
"$metricPrefix.retries_exceeded",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.result?.request?.method ?: UNKNOWN),
*getUrlTags(l.result?.request?.url),
).increment()
}
meterRegistry?.counter(
"$metricPrefix.retries_exceeded",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.result?.request?.method ?: UNKNOWN),
*getUrlTags(l.result?.request?.url),
)?.increment()
}
.onSuccess { l ->
logger.debug { "Successfully called ${l.result.request.url}. Response: ${l.result}, isRetry: ${l.isRetry}" }
meterRegistry?.let {
r ->
r.counter(
"$metricPrefix.success",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.result?.request?.method ?: UNKNOWN),
*getUrlTags(l.result?.request?.url),
).increment()
}
meterRegistry?.counter(
"$metricPrefix.success",
*metricTags,
*arrayOf("retry-attempt", l.attemptCount.toString(), "method", l.result?.request?.method ?: UNKNOWN),
*getUrlTags(l.result?.request?.url),
)?.increment()
}
.withDelay(Duration.ofSeconds(retryDelaySeconds))
.withJitter(jitterFactor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import jakarta.inject.Singleton
import java.io.FileInputStream
import java.security.interfaces.RSAPrivateKey
import java.util.Date
import java.util.Optional
import java.util.concurrent.TimeUnit

private val logger = KotlinLogging.logger {}
Expand Down Expand Up @@ -63,7 +62,7 @@ class InternalApiAuthenticationFactory {
@Value("\${airbyte.control.plane.auth-endpoint}") controlPlaneAuthEndpoint: String,
@Value("\${airbyte.data.plane.service-account.email}") dataPlaneServiceAccountEmail: String,
@Value("\${airbyte.data.plane.service-account.credentials-path}") dataPlaneServiceAccountCredentialsPath: String,
meterRegistry: Optional<MeterRegistry>,
meterRegistry: MeterRegistry?,
): String {
return try {
val now = Date()
Expand All @@ -86,10 +85,10 @@ class InternalApiAuthenticationFactory {
val key = cred.privateKey as RSAPrivateKey
val algorithm: com.auth0.jwt.algorithms.Algorithm = com.auth0.jwt.algorithms.Algorithm.RSA256(null, key)
val signedToken = token.sign(algorithm)
meterRegistry.ifPresent { registry -> registry.counter("api-client.auth-token.success").increment() }
meterRegistry?.counter("api-client.auth-token.success")?.increment()
return "Bearer $signedToken"
} catch (e: Exception) {
meterRegistry.ifPresent { registry -> registry.counter("api-client.auth-token.failure").increment() }
meterRegistry?.counter("api-client.auth-token.failure")?.increment()
logger.error(e) { "An issue occurred while generating a data plane auth token. Defaulting to empty string." }
""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@ import io.airbyte.metrics.lib.MetricAttribute
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tag
import jakarta.inject.Singleton
import java.util.Optional
import java.util.function.ToDoubleFunction
import java.util.stream.Collectors
import java.util.stream.Stream

@Singleton
class CustomMetricPublisher(
private val maybeMeterRegistry: Optional<MeterRegistry>,
private val maybeMeterRegistry: MeterRegistry?,
) {
fun count(
metricName: String,
vararg attributes: MetricAttribute,
) {
maybeMeterRegistry.ifPresent { it.counter(metricName, toTags(*attributes)).increment() }
maybeMeterRegistry?.counter(metricName, toTags(*attributes))?.increment()
}

fun <T> gauge(
Expand All @@ -30,7 +29,7 @@ class CustomMetricPublisher(
valueFunction: ToDoubleFunction<T>,
vararg attributes: MetricAttribute,
) {
maybeMeterRegistry.ifPresent { it.gauge(metricName, toTags(*attributes), stateObject, valueFunction) }
maybeMeterRegistry?.gauge(metricName, toTags(*attributes), stateObject, valueFunction)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.junit.jupiter.api.Test
import java.util.Optional

class CustomMetricPublisherTest {
@Test
Expand All @@ -25,7 +24,7 @@ class CustomMetricPublisherTest {
every { counter.increment() } returns Unit
every { meterRegistry.counter(metricName, toTags(metricAttribute)) } returns counter

val publisher = CustomMetricPublisher(Optional.of(meterRegistry))
val publisher = CustomMetricPublisher(meterRegistry)

publisher.count(metricName, metricAttribute)

Expand All @@ -42,7 +41,7 @@ class CustomMetricPublisherTest {

every { meterRegistry.gauge(metricName, toTags(metricAttribute), stateObject, any()) } returns stateObject

val publisher = CustomMetricPublisher(Optional.of(meterRegistry))
val publisher = CustomMetricPublisher(meterRegistry)

publisher.gauge(metricName, stateObject, valueFunction, metricAttribute)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import jakarta.inject.Singleton
import okhttp3.internal.http2.StreamResetException
import java.net.SocketTimeoutException
import java.time.Duration
import java.util.Optional

/**
* Micronaut bean factory for general application beans.
Expand Down Expand Up @@ -89,15 +88,15 @@ class ApplicationBeanFactory {
@Value("\${airbyte.kubernetes.client.retries.delay-seconds}") retryDelaySeconds: Long,
@Value("\${airbyte.kubernetes.client.retries.max}") maxRetries: Int,
@Named("kubeHttpErrorRetryPredicate") predicate: (Throwable) -> Boolean,
meterRegistry: Optional<MeterRegistry>,
meterRegistry: MeterRegistry?,
): RetryPolicy<Any> {
val metricTags = arrayOf("max_retries", maxRetries.toString())

return RetryPolicy.builder<Any>()
.handleIf(predicate)
.onRetry { l ->
meterRegistry.ifPresent { r ->
r.counter(
meterRegistry
?.counter(
"kube_api_client.retry",
*metricTags,
*arrayOf(
Expand All @@ -108,35 +107,31 @@ class ApplicationBeanFactory {
"exception_type",
l.lastException.javaClass.name,
),
).increment()
}
)?.increment()
}
.onAbort { l ->
meterRegistry.ifPresent { r ->
r.counter(
meterRegistry
?.counter(
"kube_api_client.abort",
*metricTags,
*arrayOf("retry_attempt", l.attemptCount.toString()),
).increment()
}
)?.increment()
}
.onFailedAttempt { l ->
meterRegistry.ifPresent { r ->
r.counter(
meterRegistry
?.counter(
"kube_api_client.failed",
*metricTags,
*arrayOf("retry_attempt", l.attemptCount.toString()),
).increment()
}
)?.increment()
}
.onSuccess { l ->
meterRegistry.ifPresent { r ->
r.counter(
meterRegistry
?.counter(
"kube_api_client.success",
*metricTags,
*arrayOf("retry_attempt", l.attemptCount.toString()),
).increment()
}
)?.increment()
}
.withDelay(Duration.ofSeconds(retryDelaySeconds))
.withMaxRetries(maxRetries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,27 @@ import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tag
import jakarta.inject.Singleton
import java.time.Duration
import java.util.Optional
import java.util.function.ToDoubleFunction
import java.util.stream.Collectors
import java.util.stream.Stream

@Singleton
class CustomMetricPublisher(
private val maybeMeterRegistry: Optional<MeterRegistry>,
private val maybeMeterRegistry: MeterRegistry?,
) {
fun count(
workloadLauncherMetricMetadata: WorkloadLauncherMetricMetadata,
vararg attributes: MetricAttribute,
) {
maybeMeterRegistry.ifPresent { it.counter(workloadLauncherMetricMetadata.metricName, toTags(*attributes)).increment() }
maybeMeterRegistry?.counter(workloadLauncherMetricMetadata.metricName, toTags(*attributes))?.increment()
}

fun timer(
workloadLauncherMetricMetadata: WorkloadLauncherMetricMetadata,
duration: Duration,
vararg attributes: MetricAttribute,
) {
maybeMeterRegistry.ifPresent { it.timer(workloadLauncherMetricMetadata.metricName, toTags(*attributes)).record(duration) }
maybeMeterRegistry?.timer(workloadLauncherMetricMetadata.metricName, toTags(*attributes))?.record(duration)
}

fun <T> gauge(
Expand All @@ -35,7 +34,7 @@ class CustomMetricPublisher(
valueFunction: ToDoubleFunction<T>,
vararg attributes: MetricAttribute,
) {
maybeMeterRegistry.ifPresent { it.gauge(workloadLauncherMetricMetadata.metricName, toTags(*attributes), stateObject, valueFunction) }
maybeMeterRegistry?.gauge(workloadLauncherMetricMetadata.metricName, toTags(*attributes), stateObject, valueFunction)
}

private fun toTags(vararg attributes: MetricAttribute): List<Tag> {
Expand Down

0 comments on commit e45bc07

Please sign in to comment.