diff --git a/docs/modules/plugins/pages/plugin-kafka.adoc b/docs/modules/plugins/pages/plugin-kafka.adoc index db28aed93e..8fcf0cd93f 100644 --- a/docs/modules/plugins/pages/plugin-kafka.adoc +++ b/docs/modules/plugins/pages/plugin-kafka.adoc @@ -29,24 +29,30 @@ Where `` is the key of the producer configuration which should be === Steps -==== *Send the data* +==== *Send event with value* -Sends the data to the provided topic with no key or partition. +Sends the event with the value to the provided topic with no key or partition. +[source,gherkin] +---- +When I send event with value `$value` to `$producerKey` Kafka topic `$topic` +---- + +_Deprecated syntax (will be removed in VIVIDUS 0.6.0)_: [source,gherkin] ---- When I send data `$data` to `$producerKey` Kafka topic `$topic` ---- -* `$producerKey` - the key of Kafka producer configuration -* `$data` - the data to send -* `$topic` - the topic name +* `$value` - The event value. +* `$producerKey` - The key of Kafka producer configuration. +* `$topic` - The topic name. === Examples -.Send the data to the Kafka topic +.Send the event to the Kafka topic [source,gherkin] ---- -When I send data `my-data` to `dev` Kafka topic `my-topic` +When I send event with value `my-data` to `dev` Kafka topic `my-topic` ---- == Consumer @@ -82,80 +88,108 @@ Where `` is the key of the consumer configuration which should be Starts the Kafka consumer with the provided configuration to listen the specified topics. The consumer must be stopped when it's not needed. +[source,gherkin] +---- +When I start consuming events from `$consumerKey` Kafka topics `$topics` +---- + +_Deprecated syntax (will be removed in VIVIDUS 0.6.0)_: [source,gherkin] ---- When I start consuming messages from `$consumerKey` Kafka topics `$topics` ---- -* `$consumerKey` - the key of the Kafka consumer configuration -* `$topics` - the comma-separated set of topics to listen -==== *Drain/Peek the consumed messages* +* `$consumerKey` - The key of the Kafka consumer configuration. +* `$topics` - The comma-separated set of topics to listen. + +==== *Drain/Peek the consumed events* -Drains/Peeks the consumed messaged to the specified variable. If the consumer is not stopped, the new messages might arrive after the draining. If the consumer is stopped, all the messages received from the consumer start or after the last draining operation are stored to the variable. +Drains/Peeks the consumed events to the specified variable. If the consumer is not stopped, the new events might arrive after the draining. If the consumer is stopped, all the events received from the consumer start or after the last draining operation are stored to the variable. + +[source,gherkin] +---- +When I $queueOperation consumed `$consumerKey` Kafka events to $scopes variable `$variableName` +---- +_Deprecated syntax (will be removed in VIVIDUS 0.6.0)_: [source,gherkin] ---- When I $queueOperation consumed `$consumerKey` Kafka messages to $scopes variable `$variableName` ---- -* `$queueOperation` - `DRAIN` - saves the messages consumed since the last drain or from the consumption start and moves the consumer cursor to the position after the last consumed message, `PEEK` - saves the messages consumed since the last drain or from the consumption start and doesn't change the consumer cursor position -* `$consumerKey` - the key of the Kafka consumer configuration + +* `$queueOperation` - `DRAIN` - saves the events consumed since the last drain or from the consumption start and moves the consumer cursor to the position after the last consumed event, `PEEK` - saves the events consumed since the last drain or from the consumption start and doesn't change the consumer cursor position +* `$consumerKey` - The key of the Kafka consumer configuration. * `$scopes` - xref:commons:variables.adoc#_scopes[The comma-separated set of the variables scopes]. -* `$variableName` - the variable name to store the messages. The messages are accessible via zero-based index, e.g. `${my-var[0]}` will return the first received message. +* `$variableName` - The variable name to store the events. The events are accessible via zero-based index, e.g. `${my-var[0]}` will return the first received event. + +==== *Wait for the events* -==== *Wait for the messages* +Waits until the count of the consumed events (from the consumer start or after the last draining operation) matches to the rule or until the timeout is exceeded. -Waits until the count of the consumed messaged (from the consumer start or after the last draining operation) matches to the rule or until the timeout is exceeded. +[source,gherkin] +---- +When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka events is $comparisonRule `$expectedCount` +---- +_Deprecated syntax (will be removed in VIVIDUS 0.6.0)_: [source,gherkin] ---- When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka messages is $comparisonRule `$expectedCount` ---- -* `$timeout` - the maximum time to wait for the messages in {durations-format-link} format. -* `$consumerKey` - the key of the Kafka consumer configuration. -* `$comparisonRule` - xref:parameters:comparison-rule.adoc[the comparison rule]. -* `$expectedCount` - the expected count of the messages to be matched by the rule. + +* `$timeout` - The maximum time to wait for the events in {durations-format-link} format. +* `$consumerKey` - The key of the Kafka consumer configuration. +* `$comparisonRule` - xref:parameters:comparison-rule.adoc[The comparison rule]. +* `$expectedCount` - The expected count of the events to be matched by the rule. ==== *Stop the consumer* -Stops the Kafka consumer started by the corresponding step before. All recorded messages are kept and can be drained into the variable using the step described above. +Stops the Kafka consumer started by the corresponding step before. All recorded events are kept and can be drained into the variable using the step described above. + +[source,gherkin] +---- +When I stop consuming events from `$consumerKey` Kafka +---- +_Deprecated syntax (will be removed in VIVIDUS 0.6.0)_: [source,gherkin] ---- When I stop consuming messages from `$consumerKey` Kafka ---- -* `$consumerKey` - the key of the Kafka consumer configuration + +* `$consumerKey` - The key of the Kafka consumer configuration. === Examples -.Consume messages from the Kafka topic +.Consume events from the Kafka topic [source,gherkin] ---- -When I start consuming messages from `dev` Kafka topics `my-topic-1, my-topic-2` -!-- Perform any actions triggering the publishing of messages to Kafka -When I wait with `PT30S` timeout until count of consumed `dev` Kafka messages is greater than `1` -When I stop consuming messages from `dev` Kafka -When I drain consumed Kafka messages to scenario variable `consumed-messages` -Then `${consumed-messages[0]}` is equal to `some-expected-message` +When I start consuming events from `dev` Kafka topics `my-topic-1, my-topic-2` +!-- Perform any actions triggering the publishing of events to Kafka +When I wait with `PT30S` timeout until count of consumed `dev` Kafka events is greater than `1` +When I stop consuming events from `dev` Kafka +When I drain consumed Kafka events to scenario variable `consumed-events` +Then `${consumed-events[0]}` is equal to `some-expected-event` ---- -.Drain messages while listener is rinning +.Drain events while listener is running [source,gherkin] ---- -When I start consuming messages from `prod` Kafka topics `my-topic-1, my-topic-2` -!-- Perform any actions triggering the publishing of messages to Kafka -When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-X` -!-- Perform more actions triggering the publishing of messages to Kafka -When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-Y` -When I stop consuming messages from `prod` Kafka +When I start consuming events from `prod` Kafka topics `my-topic-1, my-topic-2` +!-- Perform any actions triggering the publishing of events to Kafka +When I drain consumed `prod` Kafka events to scenario variable `events-after-action-X` +!-- Perform more actions triggering the publishing of events to Kafka +When I drain consumed `prod` Kafka events to scenario variable `events-after-action-Y` +When I stop consuming events from `prod` Kafka ---- -.Peek messages while listener is rinning +.Peek events while listener is running [source,gherkin] ---- -When I start consuming messages from `prod` Kafka topics `my-topic-1, my-topic-2` -!-- Perform any actions triggering the publishing of messages to Kafka -When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-X` -!-- Perform more actions triggering the publishing of messages to Kafka -When I peek consumed `prod` Kafka messages to scenario variable `messages-after-action-Y` -When I stop consuming messages from `prod` Kafka +When I start consuming events from `prod` Kafka topics `my-topic-1, my-topic-2` +!-- Perform any actions triggering the publishing of events to Kafka +When I drain consumed `prod` Kafka events to scenario variable `events-after-action-X` +!-- Perform more actions triggering the publishing of events to Kafka +When I peek consumed `prod` Kafka events to scenario variable `events-after-action-Y` +When I stop consuming events from `prod` Kafka ---- diff --git a/vividus-plugin-kafka/src/main/java/org/vividus/steps/kafka/KafkaSteps.java b/vividus-plugin-kafka/src/main/java/org/vividus/steps/kafka/KafkaSteps.java index f9320bc6e6..a10c8e3811 100644 --- a/vividus-plugin-kafka/src/main/java/org/vividus/steps/kafka/KafkaSteps.java +++ b/vividus-plugin-kafka/src/main/java/org/vividus/steps/kafka/KafkaSteps.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ public class KafkaSteps private static final int WAIT_TIMEOUT_IN_MINUTES = 10; private static final Class LISTENER_KEY = GenericMessageListenerContainer.class; - private static final Class MESSAGES_KEY = ConsumerRecord.class; + private static final Class EVENTS_KEY = ConsumerRecord.class; private final Map> kafkaTemplates; private final Map> consumerFactories; @@ -113,19 +113,19 @@ private Map convert(String propertiesPrefix, IPropertyParser prop } /** - * Send the data to the provided topic with no key or partition. - * @param data The data to send + * Send the event with the specified value to the provided topic with no key or partition. + * @param value The event value * @param producerKey The key of the producer configuration * @param topic The topic name * @throws InterruptedException If the current thread was interrupted while waiting * @throws ExecutionException If the computation threw an exception * @throws TimeoutException If the wait timed out */ - @When("I send data `$data` to `$producerKey` Kafka topic `$topic`") - public void sendData(String data, String producerKey, String topic) throws InterruptedException, ExecutionException, - TimeoutException + @When("I send event with value `$value` to `$producerKey` Kafka topic `$topic`") + public void sendEvent(String value, String producerKey, String topic) + throws InterruptedException, ExecutionException, TimeoutException { - kafkaTemplates.get(producerKey).send(topic, data).get(WAIT_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES); + kafkaTemplates.get(producerKey).send(topic, value).get(WAIT_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES); } /** @@ -135,25 +135,25 @@ public void sendData(String data, String producerKey, String topic) throws Inter * @param consumerKey The key of the producer configuration * @param topics The comma-separated set of topics to listen */ - @When("I start consuming messages from `$consumerKey` Kafka topics `$topics`") + @When("I start consuming events from `$consumerKey` Kafka topics `$topics`") public void startKafkaListener(String consumerKey, Set topics) { stopListener(getListeners().remove(consumerKey), false); - BlockingQueue messageQueue = new LinkedBlockingDeque<>(); - testContext.get(MESSAGES_KEY, HashMap::new).put(consumerKey, messageQueue); + BlockingQueue eventValueQueue = new LinkedBlockingDeque<>(); + testContext.get(EVENTS_KEY, HashMap::new).put(consumerKey, eventValueQueue); ContainerProperties containerProperties = new ContainerProperties(topics.toArray(new String[0])); containerProperties.setMessageListener( - (MessageListener) data -> messageQueue.add(data.value())); + (MessageListener) data -> eventValueQueue.add(data.value())); GenericMessageListenerContainer container = new KafkaMessageListenerContainer<>( consumerFactories.get(consumerKey), containerProperties); container.start(); getListeners().put(consumerKey, container); - LOGGER.info("Kafka message listener is started"); + LOGGER.info("Kafka event listener is started"); } /** - * Waits until the count of the consumed messaged (from the consumer start or after the last draining operation) + * Waits until the count of the consumed messages (from the consumer start or after the last draining operation) * matches to the rule or until the timeout is exceeded. * * @param timeout The maximum time to wait for the messages in ISO-8601 format @@ -168,46 +168,80 @@ public void startKafkaListener(String consumerKey, Set topics) *
  • not equal to (!=)
  • * * @param expectedCount The expected count of the messages to be matched by the rule + * @deprecated Use step: "When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka events is + * $comparisonRule `$expectedCount`" */ + @Deprecated(since = "0.5.6", forRemoval = true) @When("I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka messages is $comparisonRule" + " `$expectedCount`") public void waitForKafkaMessages(Duration timeout, String consumerKey, ComparisonRule comparisonRule, int expectedCount) { + LOGGER.warn("The step: \"When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka " + + "events is $comparisonRule `$expectedCount`\" is deprecated and will be removed in VIVIDUS 0.6.0. " + + "Use step: \"`$expectedCount`\""); Matcher countMatcher = comparisonRule.getComparisonRule(expectedCount); Integer result = new DurationBasedWaiter(timeout, Duration.ofSeconds(1)).wait( - () -> getMessagesBy(consumerKey).size(), + () -> getEventsBy(consumerKey).size(), countMatcher::matches); softAssert.assertThat("Total count of consumed Kafka messages", result, countMatcher); } - private BlockingQueue getMessagesBy(String key) + /** + * Waits until the count of the consumed events (from the consumer start or after the last draining operation) + * matches to the rule or until the timeout is exceeded. + * + * @param timeout The maximum time to wait for the event in ISO-8601 format + * @param consumerKey The key of the producer configuration + * @param comparisonRule The rule to match the quantity of events. The supported rules: + *
      + *
    • less than (<)
    • + *
    • less than or equal to (<=)
    • + *
    • greater than (>)
    • + *
    • greater than or equal to (>=)
    • + *
    • equal to (=)
    • + *
    • not equal to (!=)
    • + *
    + * @param expectedCount The expected count of the events to be matched by the rule + */ + @When("I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka events is $comparisonRule" + + " `$expectedCount`") + public void waitForKafkaEvents(Duration timeout, String consumerKey, ComparisonRule comparisonRule, + int expectedCount) + { + Matcher countMatcher = comparisonRule.getComparisonRule(expectedCount); + Integer result = new DurationBasedWaiter(timeout, Duration.ofSeconds(1)).wait( + () -> getEventsBy(consumerKey).size(), countMatcher::matches); + softAssert.assertThat("Total count of consumed Kafka events", result, countMatcher); + } + + private BlockingQueue getEventsBy(String key) { - return testContext.>>get(MESSAGES_KEY).get(key); + return testContext.>>get(EVENTS_KEY).get(key); } /** - * Stops the Kafka consumer started by the corresponding step before. All recorded messages are kept and can be + * Stops the Kafka consumer started by the corresponding step before. All recorded events are kept and can be * drained into the variable using the step described above. * @param consumerKey The key of the producer configuration */ - @When("I stop consuming messages from `$consumerKey` Kafka") + @When("I stop consuming events from `$consumerKey` Kafka") public void stopKafkaListener(String consumerKey) { stopListener(getListeners().remove(consumerKey), true); } /** - * Drains the consumed messaged to the specified variable. If the consumer is not stopped, the new messages might - * arrive after the draining. If the consumer is stopped, all the messages received from the consumer start or + * Drains the consumed events to the specified variable. If the consumer is not stopped, the new events might + * arrive after the draining. If the consumer is stopped, all the events received from the consumer start or * after the last draining operation are stored to the variable. * @param queueOperation The one of:
    *
      - *
    • PEEK - saves the messages consumed since the last drain or from the + *
    • PEEK - saves the events consumed since the last drain or from the * consumption start and doesn't change the consumer cursor position - *
    • DRAIN - saves the messages consumed since the last drain or from the + *
    • DRAIN - saves the events consumed since the last drain or from the * consumption start and moves the consumer cursor - * to the position after the last consumed message + * to the position after the last consumed event *
    * @param consumerKey The key of the producer configuration * @param scopes The set (comma separated list of scopes e.g.: STORY, NEXT_BATCHES) of variable's scope
    @@ -218,14 +252,14 @@ public void stopKafkaListener(String consumerKey) *
  • STORY - the variable will be available within the whole story, *
  • NEXT_BATCHES - the variable will be available starting from next batch * - * @param variableName the variable name to store the messages. The messages are accessible via zero-based index, - * e.g. `${my-var[0]}` will return the first received message. + * @param variableName the variable name to store the events. The events are accessible via zero-based index, + * e.g. `${my-var[0]}` will return the first received event. */ - @When("I $queueOperation consumed `$consumerKey` Kafka messages to $scopes variable `$variableName`") - public void processKafkaMessages(QueueOperation queueOperation, String consumerKey, Set scopes, + @When("I $queueOperation consumed `$consumerKey` Kafka events to $scopes variable `$variableName`") + public void processKafkaEvents(QueueOperation queueOperation, String consumerKey, Set scopes, String variableName) { - variableContext.putVariable(scopes, variableName, queueOperation.performOn(getMessagesBy(consumerKey))); + variableContext.putVariable(scopes, variableName, queueOperation.performOn(getEventsBy(consumerKey))); } @AfterStory @@ -242,12 +276,12 @@ private void stopListener(GenericMessageListenerContainer contai if (container != null) { container.stop(); - LOGGER.info("Kafka message listener is stopped"); + LOGGER.info("Kafka event listener is stopped"); } else if (throwExceptionIfNoListener) { throw new IllegalStateException( - "No Kafka message listener is running, did you forget to start consuming messages?"); + "No Kafka event listener is running, did you forget to start consuming events?"); } } @@ -261,22 +295,22 @@ protected enum QueueOperation PEEK { @Override - List performOn(BlockingQueue messagesQueue) + List performOn(BlockingQueue eventsQueue) { - return new ArrayList<>(messagesQueue); + return new ArrayList<>(eventsQueue); } }, DRAIN { @Override - List performOn(BlockingQueue messagesQueue) + List performOn(BlockingQueue eventsQueue) { - List messages = new ArrayList<>(); - messagesQueue.drainTo(messages); - return messages; + List events = new ArrayList<>(); + eventsQueue.drainTo(events); + return events; } }; - abstract List performOn(BlockingQueue messagesQueue); + abstract List performOn(BlockingQueue eventsQueue); } } diff --git a/vividus-plugin-kafka/src/main/resources/steps/defaults/deprecated.steps b/vividus-plugin-kafka/src/main/resources/steps/defaults/deprecated.steps new file mode 100644 index 0000000000..ae03e3aa52 --- /dev/null +++ b/vividus-plugin-kafka/src/main/resources/steps/defaults/deprecated.steps @@ -0,0 +1,18 @@ +Composite: When I send data `$data` to `$producerKey` Kafka topic `$topic` +!-- WARNING: This step "When I send data `$data` to `$producerKey` Kafka topic `$topic`" is deprecated and will be removed in VIVIDUS 0.6.0 +When I send event with value `` to `` Kafka topic `` + + +Composite: When I start consuming messages from `$consumerKey` Kafka topics `$topics` +!-- WARNING: This step "When I start consuming messages from `$consumerKey` Kafka topics `$topics`" is deprecated and will be removed in VIVIDUS 0.6.0 +When I start consuming events from `` Kafka topics `` + + +Composite: When I stop consuming messages from `$consumerKey` Kafka +!-- WARNING: This step "When I stop consuming messages from `$consumerKey` Kafka" is deprecated and will be removed in VIVIDUS 0.6.0 +When I stop consuming events from `` Kafka + + +Composite: When I $queueOperation consumed `$consumerKey` Kafka messages to $scopes variable `$variableName` +!-- WARNING: This step "When I $queueOperation consumed `$consumerKey` Kafka messages to $scopes variable `$variableName`" is deprecated and will be removed in VIVIDUS 0.6.0 +When I consumed `` Kafka events to variable `` diff --git a/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsIntegrationTests.java b/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsIntegrationTests.java index ae80f79e55..e5ff32fedc 100644 --- a/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsIntegrationTests.java +++ b/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.vividus.steps.kafka; import static com.github.valfirst.slf4jtest.LoggingEvent.info; +import static com.github.valfirst.slf4jtest.LoggingEvent.warn; import static java.util.stream.Collectors.toMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -35,6 +36,7 @@ import java.util.function.BiConsumer; import java.util.stream.Stream; +import com.github.valfirst.slf4jtest.LoggingEvent; import com.github.valfirst.slf4jtest.TestLogger; import com.github.valfirst.slf4jtest.TestLoggerFactory; import com.github.valfirst.slf4jtest.TestLoggerFactoryExtension; @@ -78,6 +80,12 @@ class KafkaStepsIntegrationTests private static final String VARIABLE_NAME = "var"; + private static final String EQUAL_TO_ONE_MATCHER = "a value equal to <1>"; + + private static final String LISTENER_STARTED = "Kafka event listener is started"; + + private static final String LISTENER_STOPPED = "Kafka event listener is stopped"; + private static final Set SCOPES = Set.of(VariableScope.SCENARIO); private final TestLogger logger = TestLoggerFactory.getTestLogger(KafkaSteps.class); @@ -113,36 +121,60 @@ static Stream kafkaOperations() { return Stream.of( Arguments.of((BiConsumer) (steps, context) -> { - steps.processKafkaMessages(QueueOperation.DRAIN, CONSUMER, SCOPES, VARIABLE_NAME); - steps.processKafkaMessages(QueueOperation.DRAIN, CONSUMER, SCOPES, VARIABLE_NAME); + steps.processKafkaEvents(QueueOperation.DRAIN, CONSUMER, SCOPES, VARIABLE_NAME); + steps.processKafkaEvents(QueueOperation.DRAIN, CONSUMER, SCOPES, VARIABLE_NAME); InOrder ordered = Mockito.inOrder(context); ordered.verify(context).putVariable(SCOPES, VARIABLE_NAME, List.of(ANY_DATA)); ordered.verify(context).putVariable(SCOPES, VARIABLE_NAME, List.of()); }), Arguments.of((BiConsumer) (steps, context) -> { - steps.processKafkaMessages(QueueOperation.PEEK, CONSUMER, SCOPES, VARIABLE_NAME); - steps.processKafkaMessages(QueueOperation.PEEK, CONSUMER, SCOPES, VARIABLE_NAME); + steps.processKafkaEvents(QueueOperation.PEEK, CONSUMER, SCOPES, VARIABLE_NAME); + steps.processKafkaEvents(QueueOperation.PEEK, CONSUMER, SCOPES, VARIABLE_NAME); verify(context, times(2)).putVariable(SCOPES, VARIABLE_NAME, List.of(ANY_DATA)); })); } @ParameterizedTest @MethodSource("kafkaOperations") - void shouldProduceToAndConsumerFromKafka(BiConsumer test) + void shouldProduceMessagesToAndConsumeMessagesFromKafka(BiConsumer test) throws InterruptedException, ExecutionException, TimeoutException { kafkaSteps.startKafkaListener(CONSUMER, Set.of(TOPIC)); - kafkaSteps.sendData(ANY_DATA, PRODUCER, TOPIC); + kafkaSteps.sendEvent(ANY_DATA, PRODUCER, TOPIC); ComparisonRule comparisonRule = ComparisonRule.EQUAL_TO; kafkaSteps.waitForKafkaMessages(Duration.ofSeconds(10), CONSUMER, comparisonRule, 1); verify(softAssert).assertThat(eq("Total count of consumed Kafka messages"), eq(1), - argThat(matcher -> "a value equal to <1>".equals(matcher.toString()))); + argThat(matcher -> EQUAL_TO_ONE_MATCHER.equals(matcher.toString()))); + + kafkaSteps.stopKafkaListener(CONSUMER); + + LoggingEvent deprecatedStep = warn("The step: \"When I wait with `$timeout` timeout until count of consumed " + + "`$consumerKey` Kafka events is $comparisonRule `$expectedCount`\" is deprecated and will be removed" + + " in VIVIDUS 0.6.0. Use step: \"`$expectedCount`\""); + assertThat(logger.getLoggingEvents(), is(List.of(info(LISTENER_STARTED), deprecatedStep, + info(LISTENER_STOPPED)))); + + test.accept(kafkaSteps, variableContext); + } + + @ParameterizedTest + @MethodSource("kafkaOperations") + void shouldProduceEventsToAndConsumeEventsFromKafka(BiConsumer test) + throws InterruptedException, ExecutionException, TimeoutException + { + kafkaSteps.startKafkaListener(CONSUMER, Set.of(TOPIC)); + + kafkaSteps.sendEvent(ANY_DATA, PRODUCER, TOPIC); + + ComparisonRule comparisonRule = ComparisonRule.EQUAL_TO; + kafkaSteps.waitForKafkaEvents(Duration.ofSeconds(10), CONSUMER, comparisonRule, 1); + verify(softAssert).assertThat(eq("Total count of consumed Kafka events"), eq(1), + argThat(matcher -> EQUAL_TO_ONE_MATCHER.equals(matcher.toString()))); kafkaSteps.stopKafkaListener(CONSUMER); - assertThat(logger.getLoggingEvents(), - is(List.of(info("Kafka message listener is started"), info("Kafka message listener is stopped")))); + assertThat(logger.getLoggingEvents(), is(List.of(info(LISTENER_STARTED), info(LISTENER_STOPPED)))); test.accept(kafkaSteps, variableContext); } diff --git a/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsTests.java b/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsTests.java index 195fcab9be..81f3d87826 100644 --- a/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsTests.java +++ b/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ class KafkaStepsTests private static final Class LISTENER_KEY = GenericMessageListenerContainer.class; - private static final String LISTENER_IS_STOPPED = "Kafka message listener is stopped"; + private static final String LISTENER_IS_STOPPED = "Kafka event listener is stopped"; private final TestLogger logger = TestLoggerFactory.getTestLogger(KafkaSteps.class); @@ -90,7 +90,7 @@ void shouldThrowExceptionWhenTryingToStopNotRunningListener() mockListeners(new HashMap<>()); IllegalStateException exception = assertThrows(IllegalStateException.class, () -> kafkaSteps.stopKafkaListener(KEY2)); - assertEquals("No Kafka message listener is running, did you forget to start consuming messages?", + assertEquals("No Kafka event listener is running, did you forget to start consuming events?", exception.getMessage()); } @@ -151,7 +151,7 @@ void shouldStopStartedKafkaListenerIfNewKafkaListenerIsCreated() assertEquals(construction.constructed().get(1), listeners.get(consumerKey)); assertThat(construction.constructed(), hasSize(2)); - String listenerIsStarted = "Kafka message listener is started"; + String listenerIsStarted = "Kafka event listener is started"; assertThat(logger.getLoggingEvents(), is(List.of(info(listenerIsStarted), info(LISTENER_IS_STOPPED), diff --git a/vividus-tests/src/main/resources/story/system/Kafka.story b/vividus-tests/src/main/resources/story/system/Kafka.story index 98db4581f6..21c3686c18 100644 --- a/vividus-tests/src/main/resources/story/system/Kafka.story +++ b/vividus-tests/src/main/resources/story/system/Kafka.story @@ -2,9 +2,11 @@ Meta: @epic vividus-plugin-kafka @requirementId 1049 -Scenario: Produce/consume data to/from Kafka -Given I initialize scenario variable `message` with value `message-from-system-vividus-test-#{generate(regexify '[a-z]{8}')}` +Scenario: Init Given I initialize story variable `topic` with value `l6eo4ztm-default` + +Scenario: [DEPRECATED] Produce/consume data to/from Kafka +Given I initialize scenario variable `message` with value `message-from-system-vividus-test-#{generate(regexify '[a-z]{8}')}` When I start consuming messages from `vividus` Kafka topics `${topic}` When I send data `${message}` to `vividus` Kafka topic `${topic}` When I wait with `PT30S` timeout until count of consumed `vividus` Kafka messages is equal to `1` @@ -13,14 +15,37 @@ When I drain consumed `vividus` Kafka messages to scenario variable `consumed-me Then `${consumed-messages[0]}` is equal to `${message}` -Scenario: Wait until expected message appears in the Kafka topic +Scenario: [DEPRECATED] Wait until expected message appears in the Kafka topic When I start consuming messages from `vividus` Kafka topics `${topic}` When I send data `{"key" : "failed"}` to `vividus` Kafka topic `${topic}` When I send data `{"key" : "passed"}` to `vividus` Kafka topic `${topic}` When I execute steps with delay `PT1S` at most 30 times while variable `messageCount` is = `0`: -|step | -|When I peek consumed `vividus` Kafka messages to scenario variable `messages` | +|step | +|When I peek consumed `vividus` Kafka messages to scenario variable `messages` | |When I save number of elements from `${messages}` found by JSON path `$..[?(@.key == "failed")]` to scenario variable `messageCount`| When I drain consumed `vividus` Kafka messages to scenario variable `consumed-messages` Then `${consumed-messages}` is equal to `[{"key" : "failed"}, {"key" : "passed"}]` When I stop consuming messages from `vividus` Kafka + + +Scenario: Produce/consume events to/from Kafka +Given I initialize scenario variable `event-value` with value `event-from-system-vividus-test-#{generate(regexify '[a-z]{8}')}` +When I start consuming events from `vividus` Kafka topics `${topic}` +When I send event with value `${event-value}` to `vividus` Kafka topic `${topic}` +When I wait with `PT30S` timeout until count of consumed `vividus` Kafka events is equal to `1` +When I stop consuming events from `vividus` Kafka +When I drain consumed `vividus` Kafka events to scenario variable `consumed-events` +Then `${consumed-events[0]}` is equal to `${event-value}` + + +Scenario: Wait until expected event appears in the Kafka topic +When I start consuming events from `vividus` Kafka topics `${topic}` +When I send event with value `{"status" : "failed"}` to `vividus` Kafka topic `${topic}` +When I send event with value `{"status" : "passed"}` to `vividus` Kafka topic `${topic}` +When I execute steps with delay `PT1S` at most 30 times while variable `eventCount` is = `0`: +|step | +|When I peek consumed `vividus` Kafka events to scenario variable `events` | +|When I save number of elements from `${events}` found by JSON path `$..[?(@.status == "failed")]` to scenario variable `eventCount`| +When I drain consumed `vividus` Kafka events to scenario variable `consumed-events` +Then `${consumed-events}` is equal to `[{"status" : "failed"}, {"status" : "passed"}]` +When I stop consuming events from `vividus` Kafka