Skip to content

Commit

Permalink
Expose metrics defined by the spec in Prometheus format (#244)
Browse files Browse the repository at this point in the history
This PR exposes the following metrics:
1. http_events_sent_total
2. http_requests_produce_total
3. http_requests_malformed_total

All of the above metrics are counters.

(1) is incremented when the dispatcher commits the offset to Kafka.
(2) is incremented when the receiver sends successfully the message to Kafka.
(3) is incremented when the receiver receives a malformed event.

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi authored Oct 1, 2020
1 parent 867a8cb commit d964c4b
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 56 deletions.
5 changes: 5 additions & 0 deletions data-plane/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public class MetricsOptionsProvider {
* Get metrics options from the given metrics configurations.
*
* @param metricsConfigs metrics configurations.
* @param registry registry name
* @return metrics options
*/
public static MetricsOptions get(final BaseEnv metricsConfigs) {
public static MetricsOptions get(final BaseEnv metricsConfigs, final String registry) {
return new MicrometerMetricsOptions()
.setEnabled(true)
.setRegistryName(registry)
.setPrometheusOptions(new VertxPrometheusOptions()
.setEmbeddedServerOptions(new HttpServerOptions().setPort(metricsConfigs.getMetricsPort()))
.setEmbeddedServerEndpoint(metricsConfigs.getMetricsPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
package dev.knative.eventing.kafka.broker.core.metrics;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import dev.knative.eventing.kafka.broker.core.utils.BaseEnv;
import io.micrometer.core.instrument.MeterRegistry;
import org.junit.jupiter.api.Test;

public class MetricsOptionsProviderTest {

@Test
public void get() {
final var metricsOptions = MetricsOptionsProvider.get(new BaseEnv(s -> "1"));
final var metricsOptions = MetricsOptionsProvider.get(new BaseEnv(s -> "1"), "name");
assertThat(metricsOptions.isEnabled()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import dev.knative.eventing.kafka.broker.core.Egress;
import dev.knative.eventing.kafka.broker.core.Resource;
import dev.knative.eventing.kafka.broker.dispatcher.strategy.UnorderedConsumerRecordOffsetStrategy;
import io.micrometer.core.instrument.Counter;
import io.vertx.kafka.client.consumer.KafkaConsumer;

@FunctionalInterface
Expand All @@ -27,8 +28,8 @@ public interface ConsumerRecordOffsetStrategyFactory<K, V> {
ConsumerRecordOffsetStrategy<K, V> get(final KafkaConsumer<K, V> consumer, final Resource resource,
final Egress egress);

static <K, V> ConsumerRecordOffsetStrategyFactory<K, V> unordered() {
return (consumer, broker, trigger) -> new UnorderedConsumerRecordOffsetStrategy<>(consumer);
static <K, V> ConsumerRecordOffsetStrategyFactory<K, V> unordered(final Counter eventsSentCounter) {
return (consumer, broker, trigger) -> new UnorderedConsumerRecordOffsetStrategy<>(consumer, eventsSentCounter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.vertx.core.VertxOptions;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.micrometer.backends.BackendRegistries;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
Expand All @@ -37,6 +38,16 @@

public class Main {

// Micrometer employs a naming convention that separates lowercase words with a '.' (dot) character.
// Different monitoring systems have different recommendations regarding naming convention, and some naming
// conventions may be incompatible for one system and not another.
// Each Micrometer implementation for a monitoring system comes with a naming convention that transforms lowercase
// dot notation names to the monitoring system’s recommended naming convention.
// Additionally, this naming convention implementation sanitizes metric names and tags of special characters that
// are disallowed by the monitoring system.
public static final String HTTP_EVENTS_SENT_COUNT = "http.events.sent"; // prometheus format --> http_events_sent_total
public static final String METRICS_REGISTRY_NAME = "metrics";

private static final Logger logger = LoggerFactory.getLogger(Main.class);

/**
Expand All @@ -57,17 +68,20 @@ public static void main(final String[] args) {
logger.info("Starting Dispatcher {}", keyValue("env", env));

final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env))
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);
final var eventsSentCounter = metricsRegistry.counter(HTTP_EVENTS_SENT_COUNT);

Runtime.getRuntime().addShutdownHook(new Thread(vertx::close));

final var producerConfig = Configurations.getProperties(env.getProducerConfigFilePath());
final var consumerConfig = Configurations.getProperties(env.getConsumerConfigFilePath());
final var webClientConfig = Configurations.getPropertiesAsJson(env.getWebClientConfigFilePath());

final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered();
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered(eventsSentCounter);

final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static net.logstash.logback.argument.StructuredArguments.keyValue;

import dev.knative.eventing.kafka.broker.dispatcher.ConsumerRecordOffsetStrategy;
import io.micrometer.core.instrument.Counter;
import io.vertx.core.Future;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
Expand All @@ -36,16 +37,20 @@ public final class UnorderedConsumerRecordOffsetStrategy<K, V> implements
.getLogger(UnorderedConsumerRecordOffsetStrategy.class);

private final KafkaConsumer<K, V> consumer;
private final Counter eventsSentCounter;

/**
* All args constructor.
*
* @param consumer Kafka consumer.
* @param consumer Kafka consumer.
* @param eventsSentCounter events sent counter
*/
public UnorderedConsumerRecordOffsetStrategy(final KafkaConsumer<K, V> consumer) {
public UnorderedConsumerRecordOffsetStrategy(final KafkaConsumer<K, V> consumer, final Counter eventsSentCounter) {
Objects.requireNonNull(consumer, "provide consumer");
Objects.requireNonNull(eventsSentCounter, "provide eventsSentCounter");

this.consumer = consumer;
this.eventsSentCounter = eventsSentCounter;
}

/**
Expand All @@ -64,12 +69,15 @@ public void successfullySentToSubscriber(final KafkaConsumerRecord<K, V> record)
// TODO evaluate if it's worth committing offsets at specified intervals per partition.
// commit each record
commit(record)
.onSuccess(ignored -> logger.debug(
"committed {} {} {}",
keyValue("topic", record.topic()),
keyValue("partition", record.partition()),
keyValue("offset", record.offset())
))
.onSuccess(ignored -> {
eventsSentCounter.increment();
logger.debug(
"committed {} {} {}",
keyValue("topic", record.topic()),
keyValue("partition", record.partition()),
keyValue("offset", record.offset())
);
})
.onFailure(cause -> logger.error(
"failed to commit {} {} {}",
keyValue("topic", record.topic()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.mock;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.Egress;
Expand All @@ -33,6 +34,7 @@
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventDeserializer;
import io.cloudevents.kafka.CloudEventSerializer;
import io.micrometer.core.instrument.Counter;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxExtension;
Expand Down Expand Up @@ -64,7 +66,7 @@ public void shouldAlwaysSucceed(final Vertx vertx) {
.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName());

final var verticleFactory = new HttpConsumerVerticleFactory(
ConsumerRecordOffsetStrategyFactory.unordered(),
ConsumerRecordOffsetStrategyFactory.unordered(mock(Counter.class)),
consumerProperties,
WebClient.create(vertx),
vertx,
Expand Down Expand Up @@ -151,7 +153,7 @@ public void shouldNotThrowIllegalArgumentExceptionIfNotDLQ(final Vertx vertx) {
.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class.getName());

final var verticleFactory = new HttpConsumerVerticleFactory(
ConsumerRecordOffsetStrategyFactory.unordered(),
ConsumerRecordOffsetStrategyFactory.unordered(mock(Counter.class)),
consumerProperties,
WebClient.create(vertx),
vertx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static dev.knative.eventing.kafka.broker.core.file.FileWatcherTest.write;
import static dev.knative.eventing.kafka.broker.core.testing.utils.CoreObjects.contract;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.ObjectsCreator;
Expand All @@ -31,6 +32,7 @@
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.v1.CloudEventBuilder;
import io.cloudevents.http.vertx.VertxMessageFactory;
import io.micrometer.core.instrument.Counter;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxExtension;
Expand Down Expand Up @@ -69,7 +71,7 @@ public void testUnorderedConsumer(final Vertx vertx, final VertxTestContext cont
client,
vertx,
producerConfigs,
ConsumerRecordOffsetStrategyFactory.unordered()
ConsumerRecordOffsetStrategyFactory.unordered(mock(Counter.class))
);

final var event = new CloudEventBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Meter.Id;
import io.micrometer.core.instrument.cumulative.CumulativeCounter;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
Expand All @@ -40,58 +41,58 @@
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.mockito.Mockito;

@ExtendWith(VertxExtension.class)
@Execution(value = ExecutionMode.CONCURRENT)
public class UnorderedConsumerRecordOffsetStrategyTest {

private static final int TIMEOUT_MS = 200;

@Test
@SuppressWarnings("unchecked")
public void recordReceived() {
final KafkaConsumer<Object, Object> consumer = mock(KafkaConsumer.class);
new UnorderedConsumerRecordOffsetStrategy<>(consumer).recordReceived(null);
final Counter eventsSentCounter = mock(Counter.class);
new UnorderedConsumerRecordOffsetStrategy<>(consumer, eventsSentCounter).recordReceived(null);

shouldNeverCommit(consumer);
shouldNeverPause(consumer);
verify(eventsSentCounter, never()).increment();
}

@Test
public void shouldCommitSuccessfullyOnSuccessfullySentToSubscriber(final Vertx vertx) {
shouldCommit(vertx, (kafkaConsumerRecord, unorderedConsumerOffsetManager)
public void shouldCommitSuccessfullyOnSuccessfullySentToSubscriber() {
shouldCommit((kafkaConsumerRecord, unorderedConsumerOffsetManager)
-> unorderedConsumerOffsetManager.successfullySentToSubscriber(kafkaConsumerRecord));
}

@Test
public void shouldCommitSuccessfullyOnSuccessfullySentToDLQ(final Vertx vertx) {
shouldCommit(vertx, (kafkaConsumerRecord, unorderedConsumerOffsetManager)
public void shouldCommitSuccessfullyOnSuccessfullySentToDLQ() {
shouldCommit((kafkaConsumerRecord, unorderedConsumerOffsetManager)
-> unorderedConsumerOffsetManager.successfullySentToDLQ(kafkaConsumerRecord));
}

@Test
@SuppressWarnings("unchecked")
public void failedToSendToDLQ() {
final KafkaConsumer<Object, Object> consumer = mock(KafkaConsumer.class);
new UnorderedConsumerRecordOffsetStrategy<>(consumer).failedToSendToDLQ(null, null);
final Counter eventsSentCounter = mock(Counter.class);

new UnorderedConsumerRecordOffsetStrategy<>(consumer, eventsSentCounter).failedToSendToDLQ(null, null);

shouldNeverCommit(consumer);
shouldNeverPause(consumer);
verify(eventsSentCounter, never()).increment();
}

@Test
public void ShouldCommitSuccessfullyOnRecordDiscarded(final Vertx vertx) {
shouldCommit(vertx, (kafkaConsumerRecord, unorderedConsumerOffsetManager)
-> unorderedConsumerOffsetManager.recordDiscarded(kafkaConsumerRecord));
public void ShouldCommitSuccessfullyOnRecordDiscarded() {
shouldCommit((kafkaConsumerRecord, unorderedConsumerOffsetManager) ->
unorderedConsumerOffsetManager.recordDiscarded(kafkaConsumerRecord));
}

@SuppressWarnings("unchecked")
private static <K, V> void shouldCommit(
final Vertx vertx,
final BiConsumer<KafkaConsumerRecord<K, V>, UnorderedConsumerRecordOffsetStrategy<K, V>> rConsumer) {

final var topic = "topic-42";
Expand Down Expand Up @@ -122,7 +123,10 @@ private static <K, V> void shouldCommit(
}).when(consumer).commit(
(Map<io.vertx.kafka.client.common.TopicPartition, io.vertx.kafka.client.consumer.OffsetAndMetadata>) any());

final var offsetManager = new UnorderedConsumerRecordOffsetStrategy<>(consumer);
final Counter eventsSentCounter = new CumulativeCounter(mock(Id.class));

final var offsetManager = new UnorderedConsumerRecordOffsetStrategy<>(consumer, eventsSentCounter);

final var record = new KafkaConsumerRecordImpl<>(
new ConsumerRecord<K, V>(
topic,
Expand All @@ -140,6 +144,7 @@ final var record = new KafkaConsumerRecordImpl<>(
assertThat(committed).hasSize(1);
assertThat(committed.keySet()).containsExactlyInAnyOrder(topicPartition);
assertThat(committed.values()).containsExactlyInAnyOrder(new OffsetAndMetadata(offset + 1, ""));
assertThat(eventsSentCounter.count()).isEqualTo(1.0D);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.micrometer.backends.BackendRegistries;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
Expand All @@ -38,6 +39,17 @@

public class Main {

// Micrometer employs a naming convention that separates lowercase words with a '.' (dot) character.
// Different monitoring systems have different recommendations regarding naming convention, and some naming
// conventions may be incompatible for one system and not another.
// Each Micrometer implementation for a monitoring system comes with a naming convention that transforms lowercase
// dot notation names to the monitoring system’s recommended naming convention.
// Additionally, this naming convention implementation sanitizes metric names and tags of special characters that
// are disallowed by the monitoring system.
public static final String HTTP_REQUESTS_MALFORMED_COUNT = "http.requests.malformed"; // prometheus format --> http_requests_malformed_total
public static final String HTTP_REQUESTS_PRODUCE_COUNT = "http.requests.produce"; // prometheus format --> http_requests_produce_total
public static final String METRICS_REGISTRY_NAME = "metrics";

private static final Logger logger = LoggerFactory.getLogger(Main.class);

/**
Expand All @@ -60,15 +72,22 @@ public static void main(final String[] args) {
logger.info("Starting Receiver {}", keyValue("env", env));

final var vertx = Vertx.vertx(
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env))
new VertxOptions().setMetricsOptions(MetricsOptionsProvider.get(env, METRICS_REGISTRY_NAME))
);

final var metricsRegistry = BackendRegistries.getNow(METRICS_REGISTRY_NAME);

final var badRequestCounter = metricsRegistry.counter(HTTP_REQUESTS_MALFORMED_COUNT);
final var produceEventsCounter = metricsRegistry.counter(HTTP_REQUESTS_PRODUCE_COUNT);

producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final var handler = new RequestHandler<>(
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(vertx, properties)
properties -> KafkaProducer.create(vertx, properties),
badRequestCounter,
produceEventsCounter
);

final var httpServerOptions = new HttpServerOptions(
Expand Down
Loading

0 comments on commit d964c4b

Please sign in to comment.