Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new metric to monitor deliveries #15

Merged
merged 9 commits into from
Sep 18, 2024
Merged
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
fabric8Version=5.12.4
horizonParentVersion=4.1.0
horizonParentVersion=4.3.0
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService;
import de.telekom.eni.pandora.horizon.kafka.event.EventWriter;
import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper;
import de.telekom.eni.pandora.horizon.model.event.Status;
import de.telekom.eni.pandora.horizon.model.event.StatusMessage;
import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage;
Expand Down Expand Up @@ -40,6 +41,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static de.telekom.eni.pandora.horizon.metrics.HorizonMetricsConstants.METRIC_SENT_SSE_EVENTS;

/**
* Represents a Server-Sent Events (SSE) task responsible for emitting events to clients.
*
Expand All @@ -61,6 +64,7 @@ public class SseTask implements Runnable {
private final EventWriter eventWriter;
private final DeDuplicationService deDuplicationService;
private final HorizonTracer tracingHelper;
private final HorizonMetricsHelper metricsHelper;

@Setter
private String contentType = APPLICATION_STREAM_JSON_VALUE;
Expand Down Expand Up @@ -99,6 +103,7 @@ public SseTask(SseTaskStateContainer sseTaskStateContainer,
this.eventWriter = factory.getEventWriter();
this.deDuplicationService = factory.getDeDuplicationService();
this.tracingHelper = factory.getTracingHelper();
this.metricsHelper = factory.getMetricsHelper();
}

/**
Expand Down Expand Up @@ -316,6 +321,8 @@ private void sendEvent(SubscriptionEventMessage msg, boolean includeHttpHeaders)

bytesConsumed.addAndGet(eventJson.getBytes(StandardCharsets.UTF_8).length);
numberConsumed.incrementAndGet();

metricsHelper.getRegistry().counter(METRIC_SENT_SSE_EVENTS, metricsHelper.buildTagsFromSubscriptionEventMessage(msg)).increment();
} catch (JsonProcessingException e) {
var err = String.format("Error occurred while emitting the event: %s", e.getMessage());
log.info(err, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService;
import de.telekom.eni.pandora.horizon.kafka.event.EventWriter;
import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper;
import de.telekom.eni.pandora.horizon.mongo.repository.MessageStateMongoRepo;
import de.telekom.eni.pandora.horizon.tracing.HorizonTracer;
import de.telekom.horizon.pulsar.cache.ConnectionCache;
Expand Down Expand Up @@ -36,6 +37,7 @@ public class SseTaskFactory {
private final DeDuplicationService deDuplicationService;
private final HorizonTracer tracingHelper;
private final EventWriter eventWriter;
private final HorizonMetricsHelper metricsHelper;

/**
* Constructs an instance of {@code SseTaskFactory}.
Expand All @@ -57,6 +59,7 @@ public SseTaskFactory(
KafkaPicker kafkaPicker,
MessageStateMongoRepo messageStateMongoRepo,
DeDuplicationService deDuplicationService,
HorizonMetricsHelper metricsHelper,
HorizonTracer tracingHelper) {

this.pulsarConfig = pulsarConfig;
Expand All @@ -66,8 +69,8 @@ public SseTaskFactory(
this.tracingHelper = tracingHelper;
this.connectionCache = connectionCache;
this.deDuplicationService = deDuplicationService;

this.eventWriter = eventWriter;
this.metricsHelper = metricsHelper;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import de.telekom.eni.pandora.horizon.model.event.StatusMessage;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.testutils.MockHelper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -83,7 +86,6 @@ void testGetEventMessageContext(int polls) {
var eventWriterMock = mock(EventWriter.class);
ReflectionTestUtils.setField(eventMessageSupplier, "eventWriter", eventWriterMock, EventWriter.class);


// We do multiple calls to EventMessageSupplier.get() in order to test
// that each call will fetch the next event message in the queue
for (int i = 0; i < polls; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SseTaskFactoryTest {
void setupSseTaskFactoryTest() {
MockHelper.init();

sseTaskFactorySpy = spy(new SseTaskFactory(MockHelper.pulsarConfig, MockHelper.connectionCache, MockHelper.connectionGaugeCache, MockHelper.eventWriter, MockHelper.kafkaPicker, MockHelper.messageStateMongoRepo, MockHelper.deDuplicationService, MockHelper.tracingHelper));
sseTaskFactorySpy = spy(new SseTaskFactory(MockHelper.pulsarConfig, MockHelper.connectionCache, MockHelper.connectionGaugeCache, MockHelper.eventWriter, MockHelper.kafkaPicker, MockHelper.messageStateMongoRepo, MockHelper.deDuplicationService, MockHelper.metricsHelper, MockHelper.tracingHelper));
}

SubscriptionResource createSubscriptionResource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import de.telekom.horizon.pulsar.helper.SseTaskStateContainer;
import de.telekom.horizon.pulsar.helper.StreamLimit;
import de.telekom.horizon.pulsar.testutils.MockHelper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -39,6 +42,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static de.telekom.horizon.pulsar.testutils.MockHelper.metricsHelper;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand All @@ -59,7 +63,6 @@ void setupSseServiceTest() {
MockHelper.init();

var sseTask = new SseTask(sseTaskStateContainerMock, eventMessageSupplierMock, MockHelper.openConnectionGaugeValue, MockHelper.sseTaskFactory);

sseTaskSpy = spy(sseTask);
}

Expand Down Expand Up @@ -108,6 +111,12 @@ void testRun() throws IOException, InterruptedException {
// Used for verifying the timout worked
var started = Instant.now();

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

// PUBLIC METHOD WE WANT TO TEST
sseTaskSpy.run();

Expand Down Expand Up @@ -191,6 +200,12 @@ void testTerminateConnection() throws InterruptedException, IOException {
var succeededFuture = CompletableFuture.completedFuture(sendResult);
when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture);

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

sseTaskSpy.run();

var reachedZero = latch.await(10000, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -243,6 +258,12 @@ void testTerminateConnectionThroughMaxNumberStreamLimit() throws IOException {
var succeededFuture = CompletableFuture.completedFuture(sendResult);
when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture);

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

sseTaskSpy.run();

verify(emitterMock, times(maxNumberLimit)).send(anyString());
Expand Down Expand Up @@ -292,6 +313,12 @@ void testTerminateConnectionThroughMaxByteStreamLimit() throws IOException {
var succeededFuture = CompletableFuture.completedFuture(sendResult);
when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture);

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

sseTaskSpy.run();

ArgumentCaptor<String> jsonCaptor = ArgumentCaptor.forClass(String.class);
Expand Down Expand Up @@ -348,6 +375,12 @@ void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException {
var succeededFuture = CompletableFuture.completedFuture(sendResult);
when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture);

var counterMock = Mockito.mock(Counter.class);
var registryMock = Mockito.mock(MeterRegistry.class);
when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock);
when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty());
when(metricsHelper.getRegistry()).thenReturn(registryMock);

sseTaskSpy.run();

assertEquals(maxMinutes, ChronoUnit.MINUTES.between(sseTaskSpy.getStartTime(), sseTaskSpy.getStopTime()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService;
import de.telekom.eni.pandora.horizon.kafka.config.KafkaProperties;
import de.telekom.eni.pandora.horizon.kafka.event.EventWriter;
import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper;
import de.telekom.eni.pandora.horizon.model.db.Coordinates;
import de.telekom.eni.pandora.horizon.model.db.PartialEvent;
import de.telekom.eni.pandora.horizon.model.db.State;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class MockHelper {
public static Environment environment;
public static ResponseBodyEmitter emitter;
public static TokenService tokenService;
public static HorizonMetricsHelper metricsHelper;

public static DeDuplicationService deDuplicationService;

Expand All @@ -85,6 +87,7 @@ public static void init() {
subscriberCache = mock(SubscriberCache.class);
pulsarConfig = mock(PulsarConfig.class);
messageStateMongoRepo = mock(MessageStateMongoRepo.class);
metricsHelper = mock(HorizonMetricsHelper.class);
tracingHelper = mock(HorizonTracer.class);
environment = mock(Environment.class);
emitter = mock(ResponseBodyEmitter.class);
Expand All @@ -99,7 +102,7 @@ public static void init() {
lenient().when(pulsarConfig.getQueueCapacity()).thenReturn(100);

kafkaPicker = new KafkaPicker(kafkaTemplate);
sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, tracingHelper);
sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, metricsHelper, tracingHelper);
}

public static SubscriptionEventMessage createSubscriptionEventMessageForTesting(DeliveryType deliveryType, boolean withAdditionalFields) {
Expand Down
Loading