From d3398e11d6ea5420d5180d0e9214db553ec32e1f Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Tue, 17 Sep 2024 10:58:20 +0200 Subject: [PATCH 1/9] feat(gradle.properties): add new parent SNAPSHOT with new METRIC_SENT_SSE_EVENTS --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index c85b76f..a24dd8b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ fabric8Version=5.12.4 -horizonParentVersion=4.1.0 \ No newline at end of file +horizonParentVersion=0.0.0-feature-new-sse-metric-SNAPSHOT \ No newline at end of file From 36d73db4e9d030918df9024571155b3831143bda Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Tue, 17 Sep 2024 10:59:18 +0200 Subject: [PATCH 2/9] feat(src/pulsar/service): add new metric and metricsHelper from parent library --- .../horizon/pulsar/service/EventMessageSupplier.java | 8 ++++++++ .../de/telekom/horizon/pulsar/service/SseTaskFactory.java | 4 ++++ .../horizon/pulsar/service/SseTaskFactoryTest.java | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java index d3df8da..365b9b4 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.model.db.State; import de.telekom.eni.pandora.horizon.model.event.DeliveryType; import de.telekom.eni.pandora.horizon.model.event.Status; @@ -38,6 +39,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Supplier; +import static de.telekom.eni.pandora.horizon.metrics.HorizonMetricsConstants.METRIC_SENT_SSE_EVENTS; + /** * Supplier for providing {@link EventMessageContext} instances. * @@ -60,6 +63,7 @@ public class EventMessageSupplier implements Supplier { private final EventWriter eventWriter; private final MessageStateMongoRepo messageStateMongoRepo; private final HorizonTracer tracingHelper; + private final HorizonMetricsHelper metricsHelper; private final ConcurrentLinkedQueue messageStates = new ConcurrentLinkedQueue<>(); private Instant lastPoll; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -78,6 +82,7 @@ public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boole this.messageStateMongoRepo = factory.getMessageStateMongoRepo(); this.kafkaPicker = factory.getKafkaPicker(); this.eventWriter = factory.getEventWriter(); + this.metricsHelper = factory.getMetricsHelper(); this.tracingHelper = factory.getTracingHelper(); this.includeHttpHeaders = includeHttpHeaders; this.streamLimit = streamLimit; @@ -122,6 +127,9 @@ public EventMessageContext get() { } } + metricsHelper.getRegistry().counter(METRIC_SENT_SSE_EVENTS, metricsHelper.buildTagsFromSubscriptionEventMessage(message)).increment(); + span.annotate("export metrics"); + return new EventMessageContext(message, includeHttpHeaders, streamLimit, span, spanInScope); } catch (CouldNotPickMessageException | SubscriberDoesNotMatchSubscriptionException e) { handleException(state, e); diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java index 83253c2..fc6b0b6 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java @@ -6,6 +6,7 @@ import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.mongo.repository.MessageStateMongoRepo; import de.telekom.eni.pandora.horizon.tracing.HorizonTracer; import de.telekom.horizon.pulsar.cache.ConnectionCache; @@ -36,6 +37,7 @@ public class SseTaskFactory { private final DeDuplicationService deDuplicationService; private final HorizonTracer tracingHelper; private final EventWriter eventWriter; + private final HorizonMetricsHelper metricsHelper; /** * Constructs an instance of {@code SseTaskFactory}. @@ -57,6 +59,7 @@ public SseTaskFactory( KafkaPicker kafkaPicker, MessageStateMongoRepo messageStateMongoRepo, DeDuplicationService deDuplicationService, + HorizonMetricsHelper metricsHelper, HorizonTracer tracingHelper) { this.pulsarConfig = pulsarConfig; @@ -68,6 +71,7 @@ public SseTaskFactory( this.deDuplicationService = deDuplicationService; this.eventWriter = eventWriter; + this.metricsHelper = metricsHelper; } /** diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java index ee21908..2b63732 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskFactoryTest.java @@ -38,7 +38,7 @@ class SseTaskFactoryTest { void setupSseTaskFactoryTest() { MockHelper.init(); - sseTaskFactorySpy = spy(new SseTaskFactory(MockHelper.pulsarConfig, MockHelper.connectionCache, MockHelper.connectionGaugeCache, MockHelper.eventWriter, MockHelper.kafkaPicker, MockHelper.messageStateMongoRepo, MockHelper.deDuplicationService, MockHelper.tracingHelper)); + sseTaskFactorySpy = spy(new SseTaskFactory(MockHelper.pulsarConfig, MockHelper.connectionCache, MockHelper.connectionGaugeCache, MockHelper.eventWriter, MockHelper.kafkaPicker, MockHelper.messageStateMongoRepo, MockHelper.deDuplicationService, MockHelper.metricsHelper, MockHelper.tracingHelper)); } SubscriptionResource createSubscriptionResource() { From 0eaee4da426d8cfce6184741b7d1c56dbdf4b1d1 Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Tue, 17 Sep 2024 10:59:51 +0200 Subject: [PATCH 3/9] feat(src/pulsar/testutils): update mockHelper with metricsHelper --- .../java/de/telekom/horizon/pulsar/testutils/MockHelper.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java b/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java index 30e8cd4..063b723 100644 --- a/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java +++ b/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java @@ -10,6 +10,7 @@ import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService; import de.telekom.eni.pandora.horizon.kafka.config.KafkaProperties; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.model.db.Coordinates; import de.telekom.eni.pandora.horizon.model.db.PartialEvent; import de.telekom.eni.pandora.horizon.model.db.State; @@ -62,6 +63,7 @@ public class MockHelper { public static Environment environment; public static ResponseBodyEmitter emitter; public static TokenService tokenService; + public static HorizonMetricsHelper metricsHelper; public static DeDuplicationService deDuplicationService; @@ -99,7 +101,7 @@ public static void init() { lenient().when(pulsarConfig.getQueueCapacity()).thenReturn(100); kafkaPicker = new KafkaPicker(kafkaTemplate); - sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, tracingHelper); + sseTaskFactory = new SseTaskFactory(pulsarConfig, connectionCache, connectionGaugeCache, eventWriter, kafkaPicker, messageStateMongoRepo, deDuplicationService, metricsHelper, tracingHelper); } public static SubscriptionEventMessage createSubscriptionEventMessageForTesting(DeliveryType deliveryType, boolean withAdditionalFields) { From 71422fea3d0bc86122219ce3185f854e1f0e80db Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Tue, 17 Sep 2024 11:26:41 +0200 Subject: [PATCH 4/9] fix(src/pulsar/service): try to fix test with mock of metricsHelper --- .../pulsar/service/EventMessageSupplierTest.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java index fa7fbdc..a8ef0e7 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java @@ -7,11 +7,15 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.model.event.DeliveryType; import de.telekom.eni.pandora.horizon.model.event.Status; import de.telekom.eni.pandora.horizon.model.event.StatusMessage; import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; @@ -22,6 +26,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.SliceImpl; import org.springframework.data.domain.Sort; @@ -38,6 +43,9 @@ @Slf4j class EventMessageSupplierTest { + @MockBean + HorizonMetricsHelper metricsHelper; + EventMessageSupplier eventMessageSupplier; @BeforeEach @@ -83,6 +91,11 @@ void testGetEventMessageContext(int polls) { var eventWriterMock = mock(EventWriter.class); ReflectionTestUtils.setField(eventMessageSupplier, "eventWriter", eventWriterMock, EventWriter.class); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + + doNothing().when(counterMock).increment(); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); // We do multiple calls to EventMessageSupplier.get() in order to test // that each call will fetch the next event message in the queue @@ -90,6 +103,9 @@ void testGetEventMessageContext(int polls) { // We mock the actual picking of a message from Kafka here when(MockHelper.kafkaTemplate.receive(eq(MockHelper.TEST_TOPIC), eq(states.get(i).getCoordinates().partition()), eq(states.get(i).getCoordinates().offset()), eq(Duration.ofMillis(30000)))).thenReturn(record); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + // PUBLIC METHOD WE WANT TO TEST var result = eventMessageSupplier.get(); assertNotNull(result); From 5b024926a970165aeeb7627c40b2b8be1d8d7399 Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Tue, 17 Sep 2024 12:14:40 +0200 Subject: [PATCH 5/9] fix(src/pulsar/service): remove span and fix eventMessageSupplierTest --- .../pulsar/service/EventMessageSupplier.java | 1 - .../pulsar/service/EventMessageSupplierTest.java | 13 +++---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java index 365b9b4..49a70eb 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java @@ -128,7 +128,6 @@ public EventMessageContext get() { } metricsHelper.getRegistry().counter(METRIC_SENT_SSE_EVENTS, metricsHelper.buildTagsFromSubscriptionEventMessage(message)).increment(); - span.annotate("export metrics"); return new EventMessageContext(message, includeHttpHeaders, streamLimit, span, spanInScope); } catch (CouldNotPickMessageException | SubscriberDoesNotMatchSubscriptionException e) { diff --git a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java index a8ef0e7..a962248 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; -import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.model.event.DeliveryType; import de.telekom.eni.pandora.horizon.model.event.Status; import de.telekom.eni.pandora.horizon.model.event.StatusMessage; @@ -26,7 +25,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.SliceImpl; import org.springframework.data.domain.Sort; @@ -36,6 +34,7 @@ import java.util.List; import java.util.Optional; +import static de.telekom.horizon.pulsar.testutils.MockHelper.metricsHelper; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -43,9 +42,6 @@ @Slf4j class EventMessageSupplierTest { - @MockBean - HorizonMetricsHelper metricsHelper; - EventMessageSupplier eventMessageSupplier; @BeforeEach @@ -93,9 +89,9 @@ void testGetEventMessageContext(int polls) { var counterMock = Mockito.mock(Counter.class); var registryMock = Mockito.mock(MeterRegistry.class); - - doNothing().when(counterMock).increment(); when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); // We do multiple calls to EventMessageSupplier.get() in order to test // that each call will fetch the next event message in the queue @@ -103,9 +99,6 @@ void testGetEventMessageContext(int polls) { // We mock the actual picking of a message from Kafka here when(MockHelper.kafkaTemplate.receive(eq(MockHelper.TEST_TOPIC), eq(states.get(i).getCoordinates().partition()), eq(states.get(i).getCoordinates().offset()), eq(Duration.ofMillis(30000)))).thenReturn(record); - when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); - when(metricsHelper.getRegistry()).thenReturn(registryMock); - // PUBLIC METHOD WE WANT TO TEST var result = eventMessageSupplier.get(); assertNotNull(result); From b5798db3244c136690f393c2e8469519b172ab7c Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Tue, 17 Sep 2024 12:15:02 +0200 Subject: [PATCH 6/9] fix(src/pulsar/testutils): update mockHelper with metricsHelper --- .../java/de/telekom/horizon/pulsar/testutils/MockHelper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java b/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java index 063b723..adf3c86 100644 --- a/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java +++ b/src/test/java/de/telekom/horizon/pulsar/testutils/MockHelper.java @@ -87,6 +87,7 @@ public static void init() { subscriberCache = mock(SubscriberCache.class); pulsarConfig = mock(PulsarConfig.class); messageStateMongoRepo = mock(MessageStateMongoRepo.class); + metricsHelper = mock(HorizonMetricsHelper.class); tracingHelper = mock(HorizonTracer.class); environment = mock(Environment.class); emitter = mock(ResponseBodyEmitter.class); From e9a7bb4a33715ebeaeed531a2da6a54eec061eb2 Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Tue, 17 Sep 2024 14:50:40 +0200 Subject: [PATCH 7/9] fix(src/pulsar/service): remove metric from here and add the metric to the right place --- .../pulsar/service/EventMessageSupplier.java | 4 --- .../horizon/pulsar/service/SseTask.java | 7 ++++ .../pulsar/service/SseTaskFactory.java | 1 - .../service/EventMessageSupplierTest.java | 7 ---- .../horizon/pulsar/service/SseTaskTest.java | 35 ++++++++++++++++++- 5 files changed, 41 insertions(+), 13 deletions(-) diff --git a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java index 49a70eb..31fcac4 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java @@ -63,7 +63,6 @@ public class EventMessageSupplier implements Supplier { private final EventWriter eventWriter; private final MessageStateMongoRepo messageStateMongoRepo; private final HorizonTracer tracingHelper; - private final HorizonMetricsHelper metricsHelper; private final ConcurrentLinkedQueue messageStates = new ConcurrentLinkedQueue<>(); private Instant lastPoll; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -82,7 +81,6 @@ public EventMessageSupplier(String subscriptionId, SseTaskFactory factory, boole this.messageStateMongoRepo = factory.getMessageStateMongoRepo(); this.kafkaPicker = factory.getKafkaPicker(); this.eventWriter = factory.getEventWriter(); - this.metricsHelper = factory.getMetricsHelper(); this.tracingHelper = factory.getTracingHelper(); this.includeHttpHeaders = includeHttpHeaders; this.streamLimit = streamLimit; @@ -127,8 +125,6 @@ public EventMessageContext get() { } } - metricsHelper.getRegistry().counter(METRIC_SENT_SSE_EVENTS, metricsHelper.buildTagsFromSubscriptionEventMessage(message)).increment(); - return new EventMessageContext(message, includeHttpHeaders, streamLimit, span, spanInScope); } catch (CouldNotPickMessageException | SubscriberDoesNotMatchSubscriptionException e) { handleException(state, e); diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java index 6d18bdc..0d8fcad 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTask.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.telekom.eni.pandora.horizon.cache.service.DeDuplicationService; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; +import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.model.event.Status; import de.telekom.eni.pandora.horizon.model.event.StatusMessage; import de.telekom.eni.pandora.horizon.model.event.SubscriptionEventMessage; @@ -40,6 +41,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +import static de.telekom.eni.pandora.horizon.metrics.HorizonMetricsConstants.METRIC_SENT_SSE_EVENTS; + /** * Represents a Server-Sent Events (SSE) task responsible for emitting events to clients. * @@ -61,6 +64,7 @@ public class SseTask implements Runnable { private final EventWriter eventWriter; private final DeDuplicationService deDuplicationService; private final HorizonTracer tracingHelper; + private final HorizonMetricsHelper metricsHelper; @Setter private String contentType = APPLICATION_STREAM_JSON_VALUE; @@ -99,6 +103,7 @@ public SseTask(SseTaskStateContainer sseTaskStateContainer, this.eventWriter = factory.getEventWriter(); this.deDuplicationService = factory.getDeDuplicationService(); this.tracingHelper = factory.getTracingHelper(); + this.metricsHelper = factory.getMetricsHelper(); } /** @@ -316,6 +321,8 @@ private void sendEvent(SubscriptionEventMessage msg, boolean includeHttpHeaders) bytesConsumed.addAndGet(eventJson.getBytes(StandardCharsets.UTF_8).length); numberConsumed.incrementAndGet(); + + metricsHelper.getRegistry().counter(METRIC_SENT_SSE_EVENTS, metricsHelper.buildTagsFromSubscriptionEventMessage(msg)).increment(); } catch (JsonProcessingException e) { var err = String.format("Error occurred while emitting the event: %s", e.getMessage()); log.info(err, e); diff --git a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java index fc6b0b6..493d846 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/SseTaskFactory.java @@ -69,7 +69,6 @@ public SseTaskFactory( this.tracingHelper = tracingHelper; this.connectionCache = connectionCache; this.deDuplicationService = deDuplicationService; - this.eventWriter = eventWriter; this.metricsHelper = metricsHelper; } diff --git a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java index a962248..847b2c7 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/EventMessageSupplierTest.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Optional; -import static de.telekom.horizon.pulsar.testutils.MockHelper.metricsHelper; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -87,12 +86,6 @@ void testGetEventMessageContext(int polls) { var eventWriterMock = mock(EventWriter.class); ReflectionTestUtils.setField(eventMessageSupplier, "eventWriter", eventWriterMock, EventWriter.class); - var counterMock = Mockito.mock(Counter.class); - var registryMock = Mockito.mock(MeterRegistry.class); - when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); - when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); - when(metricsHelper.getRegistry()).thenReturn(registryMock); - // We do multiple calls to EventMessageSupplier.get() in order to test // that each call will fetch the next event message in the queue for (int i = 0; i < polls; i++) { diff --git a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java index 539cbbd..fc8a327 100644 --- a/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java +++ b/src/test/java/de/telekom/horizon/pulsar/service/SseTaskTest.java @@ -18,6 +18,9 @@ import de.telekom.horizon.pulsar.helper.SseTaskStateContainer; import de.telekom.horizon.pulsar.helper.StreamLimit; import de.telekom.horizon.pulsar.testutils.MockHelper; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static de.telekom.horizon.pulsar.testutils.MockHelper.metricsHelper; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -59,7 +63,6 @@ void setupSseServiceTest() { MockHelper.init(); var sseTask = new SseTask(sseTaskStateContainerMock, eventMessageSupplierMock, MockHelper.openConnectionGaugeValue, MockHelper.sseTaskFactory); - sseTaskSpy = spy(sseTask); } @@ -108,6 +111,12 @@ void testRun() throws IOException, InterruptedException { // Used for verifying the timout worked var started = Instant.now(); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + // PUBLIC METHOD WE WANT TO TEST sseTaskSpy.run(); @@ -191,6 +200,12 @@ void testTerminateConnection() throws InterruptedException, IOException { var succeededFuture = CompletableFuture.completedFuture(sendResult); when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + sseTaskSpy.run(); var reachedZero = latch.await(10000, TimeUnit.MILLISECONDS); @@ -243,6 +258,12 @@ void testTerminateConnectionThroughMaxNumberStreamLimit() throws IOException { var succeededFuture = CompletableFuture.completedFuture(sendResult); when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + sseTaskSpy.run(); verify(emitterMock, times(maxNumberLimit)).send(anyString()); @@ -292,6 +313,12 @@ void testTerminateConnectionThroughMaxByteStreamLimit() throws IOException { var succeededFuture = CompletableFuture.completedFuture(sendResult); when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + sseTaskSpy.run(); ArgumentCaptor jsonCaptor = ArgumentCaptor.forClass(String.class); @@ -348,6 +375,12 @@ void testTerminateConnectionThroughMaxMinutesStreamLimit() throws IOException { var succeededFuture = CompletableFuture.completedFuture(sendResult); when(eventWriterMock.send(anyString(), notNull(), any())).thenReturn(succeededFuture); + var counterMock = Mockito.mock(Counter.class); + var registryMock = Mockito.mock(MeterRegistry.class); + when(registryMock.counter(any(), any(Tags.class))).thenReturn(counterMock); + when(metricsHelper.buildTagsFromSubscriptionEventMessage(any())).thenReturn(Tags.empty()); + when(metricsHelper.getRegistry()).thenReturn(registryMock); + sseTaskSpy.run(); assertEquals(maxMinutes, ChronoUnit.MINUTES.between(sseTaskSpy.getStartTime(), sseTaskSpy.getStopTime())); From 79730f2fd0575679fada9ae1de3f8c8bd1137650 Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Wed, 18 Sep 2024 11:34:45 +0200 Subject: [PATCH 8/9] feat(gradle.properties): update spring-parent to 4.3.0 --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index a24dd8b..f184a04 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ fabric8Version=5.12.4 -horizonParentVersion=0.0.0-feature-new-sse-metric-SNAPSHOT \ No newline at end of file +horizonParentVersion=4.3.0 \ No newline at end of file From 09e5ae6fca0a10077c3eeb6e780beb6589deb4fe Mon Sep 17 00:00:00 2001 From: Julian Spierefka Date: Wed, 18 Sep 2024 11:42:54 +0200 Subject: [PATCH 9/9] feat(src/pulsar/service): remove unused imports --- .../telekom/horizon/pulsar/service/EventMessageSupplier.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java index 31fcac4..d3df8da 100644 --- a/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java +++ b/src/main/java/de/telekom/horizon/pulsar/service/EventMessageSupplier.java @@ -8,7 +8,6 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.telekom.eni.pandora.horizon.kafka.event.EventWriter; -import de.telekom.eni.pandora.horizon.metrics.HorizonMetricsHelper; import de.telekom.eni.pandora.horizon.model.db.State; import de.telekom.eni.pandora.horizon.model.event.DeliveryType; import de.telekom.eni.pandora.horizon.model.event.Status; @@ -39,8 +38,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Supplier; -import static de.telekom.eni.pandora.horizon.metrics.HorizonMetricsConstants.METRIC_SENT_SSE_EVENTS; - /** * Supplier for providing {@link EventMessageContext} instances. *