diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java new file mode 100644 index 000000000..f63b548fa --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/DaprPubSubIT.java @@ -0,0 +1,637 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.http; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.BulkPublishEntry; +import io.dapr.client.domain.BulkPublishRequest; +import io.dapr.client.domain.BulkPublishResponse; +import io.dapr.client.domain.CloudEvent; +import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.Metadata; +import io.dapr.client.domain.PublishEventRequest; +import io.dapr.config.Properties; +import io.dapr.it.pubsub.http.PubSubIT; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.spring.boot.autoconfigure.client.DaprClientAutoConfiguration; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.utils.TypeRef; +import org.assertj.core.api.SoftAssertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; + +import static io.dapr.it.Retry.callWithRetry; +import static io.dapr.it.TestUtils.assertThrowsDaprException; +import static io.dapr.it.TestUtils.assertThrowsDaprExceptionWithReason; +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT, + classes = { + TestPubSubApplication.class + } +) +@Testcontainers +@Tag("testcontainers") +public class DaprPubSubIT { + + private static final Network DAPR_NETWORK = Network.newNetwork(); + private static final Random RANDOM = new Random(); + private static final int PORT = RANDOM.nextInt(1000) + 8000; + private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String PUBSUB_APP_ID = "pubsub-dapr-app"; + private static final String PUBSUB_NAME = "pubsub"; + + // topics + private static final String TOPIC_BULK = "testingbulktopic"; + private static final String TOPIC_NAME = "testingtopic"; + private static final String ANOTHER_TOPIC_NAME = "anothertopic"; + private static final String TYPED_TOPIC_NAME = "typedtestingtopic"; + private static final String BINARY_TOPIC_NAME = "binarytopic"; + private static final String TTL_TOPIC_NAME = "ttltopic"; + private static final String LONG_TOPIC_NAME = "testinglongvalues"; + + + private static final int NUM_MESSAGES = 10; + + // typeRefs + private static final TypeRef> CLOUD_EVENT_LIST_TYPE_REF = new TypeRef<>() { + }; + private static final TypeRef>> CLOUD_EVENT_LONG_LIST_TYPE_REF = + new TypeRef<>() { + }; + private static final TypeRef>> CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF = + new TypeRef<>() { + }; + + @Container + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName(PUBSUB_APP_ID) + .withNetwork(DAPR_NETWORK) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .withAppPort(PORT); + + /** + * Expose the Dapr ports to the host. + * + * @param registry the dynamic property registry + */ + @DynamicPropertySource + static void daprProperties(DynamicPropertyRegistry registry) { + registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint); + registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint); + registry.add("server.port", () -> PORT); + } + + + @BeforeEach + public void setUp() { + org.testcontainers.Testcontainers.exposeHostPorts(PORT); + } + + @Test + @DisplayName("Should receive INVALID_ARGUMENT when the specified Pub/Sub name does not exist") + public void shouldReceiveInvalidArgument() throws Exception { + Wait.forLogMessage(APP_FOUND_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER); + + try (DaprClient client = createDaprClientBuilder().build()) { + assertThrowsDaprExceptionWithReason( + "INVALID_ARGUMENT", + "INVALID_ARGUMENT: pubsub unknown pubsub is not found", + "DAPR_PUBSUB_NOT_FOUND", + () -> client.publishEvent("unknown pubsub", "mytopic", "payload").block()); + } + } + + @Test + @DisplayName("Should receive INVALID_ARGUMENT using bulk publish when the specified Pub/Sub name does not exist") + public void shouldReceiveInvalidArgumentWithBulkPublish() throws Exception { + try (DaprPreviewClient client = createDaprClientBuilder().buildPreviewClient()) { + assertThrowsDaprException( + "INVALID_ARGUMENT", + "INVALID_ARGUMENT: pubsub unknown pubsub is not found", + () -> client.publishEvents("unknown pubsub", "mytopic", "text/plain", "message").block()); + } + } + + @Test + @DisplayName("Should publish some payload types successfully") + public void shouldPublishSomePayloadTypesWithNoError() throws Exception { + + DaprObjectSerializer serializer = createJacksonObjectSerializer(); + + try ( + DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build(); + DaprPreviewClient previewClient = createDaprClientBuilder().withObjectSerializer(serializer) + .buildPreviewClient() + ) { + + publishBulkStringsAsserting(previewClient); + + publishMyObjectAsserting(previewClient); + + publishByteAsserting(previewClient); + + publishCloudEventAsserting(previewClient); + + Thread.sleep(10000); + + callWithRetry(() -> validatePublishedMessages(client), 2000); + } + } + + @Test + @DisplayName("Should publish various payload types to different topics") + public void testPubSub() throws Exception { + + DaprObjectSerializer serializer = createJacksonObjectSerializer(); + + // Send a batch of messages on one topic + try (DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build()) { + + sendBulkMessagesAsText(client, TOPIC_NAME); + + sendBulkMessagesAsText(client, ANOTHER_TOPIC_NAME); + + //Publishing an object. + PubSubIT.MyObject object = new PubSubIT.MyObject(); + object.setId("123"); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, object).block(); + System.out.println("Published one object."); + + client.publishEvent(PUBSUB_NAME, TYPED_TOPIC_NAME, object).block(); + System.out.println("Published another object."); + + //Publishing a single byte: Example of non-string based content published + publishOneByteSync(client, TOPIC_NAME); + + CloudEvent cloudEvent = new CloudEvent<>(); + cloudEvent.setId("1234"); + cloudEvent.setData("message from cloudevent"); + cloudEvent.setSource("test"); + cloudEvent.setSpecversion("1"); + cloudEvent.setType("myevent"); + cloudEvent.setDatacontenttype("text/plain"); + + //Publishing a cloud event. + client.publishEvent(new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEvent) + .setContentType("application/cloudevents+json")).block(); + System.out.println("Published one cloud event."); + + { + CloudEvent cloudEventV2 = new CloudEvent<>(); + cloudEventV2.setId("2222"); + cloudEventV2.setData("message from cloudevent v2"); + cloudEventV2.setSource("test"); + cloudEventV2.setSpecversion("1"); + cloudEventV2.setType("myevent.v2"); + cloudEventV2.setDatacontenttype("text/plain"); + client.publishEvent( + new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV2) + .setContentType("application/cloudevents+json")).block(); + System.out.println("Published one cloud event for v2."); + } + + { + CloudEvent cloudEventV3 = new CloudEvent<>(); + cloudEventV3.setId("3333"); + cloudEventV3.setData("message from cloudevent v3"); + cloudEventV3.setSource("test"); + cloudEventV3.setSpecversion("1"); + cloudEventV3.setType("myevent.v3"); + cloudEventV3.setDatacontenttype("text/plain"); + client.publishEvent( + new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV3) + .setContentType("application/cloudevents+json")).block(); + System.out.println("Published one cloud event for v3."); + } + + Thread.sleep(2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TOPIC_NAME); + + List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/testingtopic", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .hasSize(13) + .extracting(CloudEvent::getData) + .filteredOn(Objects::nonNull) + .contains( + "AQ==", + "message from cloudevent" + ); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String expectedMessage = String.format("This is message #%d on topic %s", i, TOPIC_NAME); + assertThat(messages) + .extracting(CloudEvent::getData) + .filteredOn(Objects::nonNull) + .anyMatch(expectedMessage::equals); + } + + assertThat(messages) + .extracting(CloudEvent::getData) + .filteredOn(LinkedHashMap.class::isInstance) + .map(data -> (String) ((LinkedHashMap) data).get("id")) + .contains("123"); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TOPIC_NAME + " V2"); + + List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/testingtopicV2", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .hasSize(1); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TOPIC_NAME + " V3"); + + List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/testingtopicV3", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .hasSize(1); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TYPED_TOPIC_NAME); + + List> messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/typedtestingtopic", + null, + HttpExtension.GET, + CLOUD_EVENT_MYOBJECT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .extracting(CloudEvent::getData) + .filteredOn(Objects::nonNull) + .filteredOn(PubSubIT.MyObject.class::isInstance) + .map(PubSubIT.MyObject::getId) + .contains("123"); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + ANOTHER_TOPIC_NAME); + + List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/anothertopic", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(messages) + .hasSize(10); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String expectedMessage = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME); + assertThat(messages) + .extracting(CloudEvent::getData) + .filteredOn(Objects::nonNull) + .anyMatch(expectedMessage::equals); + } + }, 2000); + + } + } + + @Test + @DisplayName("Should publish binary payload type successfully") + public void shouldPublishBinary() throws Exception { + + DaprObjectSerializer serializer = createBinaryObjectSerializer(); + + try (DaprClient client = createDaprClientBuilder().withObjectSerializer(serializer).build()) { + publishOneByteSync(client, BINARY_TOPIC_NAME); + } + + Thread.sleep(3000); + + try (DaprClient client = createDaprClientBuilder().build()) { + callWithRetry(() -> { + System.out.println("Checking results for topic " + BINARY_TOPIC_NAME); + final List messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/binarytopic", + null, + HttpExtension.GET, CLOUD_EVENT_LIST_TYPE_REF).block(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(messages.size()).isEqualTo(1); + softly.assertThat(messages.get(0).getData()).isNull(); + softly.assertThat(messages.get(0).getBinaryData()).isEqualTo(new byte[] {1}); + }); + }, 2000); + } + } + + private static void publishOneByteSync(DaprClient client, String topicName) { + client.publishEvent( + PUBSUB_NAME, + topicName, + new byte[] {1}).block(); + } + + private static void sendBulkMessagesAsText(DaprClient client, String topicName) { + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d on topic %s", i, topicName); + client.publishEvent(PUBSUB_NAME, topicName, message).block(); + } + } + + private void publishMyObjectAsserting(DaprPreviewClient previewClient) { + PubSubIT.MyObject object = new PubSubIT.MyObject(); + object.setId("123"); + BulkPublishResponse response = previewClient.publishEvents( + PUBSUB_NAME, + TOPIC_BULK, + "application/json", + Collections.singletonList(object) + ).block(); + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(response).isNotNull(); + softly.assertThat(response.getFailedEntries().size()).isZero(); + }); + } + + private void publishBulkStringsAsserting(DaprPreviewClient previewClient) { + List messages = new ArrayList<>(); + for (int i = 0; i < NUM_MESSAGES; i++) { + messages.add(String.format("This is message #%d on topic %s", i, TOPIC_BULK)); + } + BulkPublishResponse response = previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK, "", messages).block(); + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(response).isNotNull(); + softly.assertThat(response.getFailedEntries().size()).isZero(); + }); + } + + private void publishByteAsserting(DaprPreviewClient previewClient) { + BulkPublishResponse response = previewClient.publishEvents( + PUBSUB_NAME, + TOPIC_BULK, + "", + Collections.singletonList(new byte[] {1}) + ).block(); + SoftAssertions.assertSoftly(softly -> { + assertThat(response).isNotNull(); + softly.assertThat(response.getFailedEntries().size()).isZero(); + }); + } + + private void publishCloudEventAsserting(DaprPreviewClient previewClient) { + CloudEvent cloudEvent = new CloudEvent<>(); + cloudEvent.setId("1234"); + cloudEvent.setData("message from cloudevent"); + cloudEvent.setSource("test"); + cloudEvent.setSpecversion("1"); + cloudEvent.setType("myevent"); + cloudEvent.setDatacontenttype("text/plain"); + + BulkPublishRequest> req = new BulkPublishRequest<>( + PUBSUB_NAME, + TOPIC_BULK, + Collections.singletonList( + new BulkPublishEntry<>("1", cloudEvent, "application/cloudevents+json", null) + ) + ); + BulkPublishResponse> response = previewClient.publishEvents(req).block(); + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(response).isNotNull(); + softly.assertThat(response.getFailedEntries().size()).isZero(); + }); + } + + private void validatePublishedMessages(DaprClient client) { + List cloudEventMessages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/redis/testingbulktopic", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF + ).block(); + + assertThat(cloudEventMessages) + .as("expected non-null list of cloud events") + .isNotNull() + .hasSize(13); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String expectedMessage = String.format("This is message #%d on topic %s", i, TOPIC_BULK); + assertThat(cloudEventMessages) + .as("expected text payload to match for message %d", i) + .anySatisfy(event -> assertThat(event.getData()).isEqualTo(expectedMessage)); + } + + assertThat(cloudEventMessages) + .filteredOn(event -> event.getData() instanceof LinkedHashMap) + .map(event -> (LinkedHashMap) event.getData()) + .anySatisfy(map -> assertThat(map.get("id")).isEqualTo("123")); + + assertThat(cloudEventMessages) + .map(CloudEvent::getData) + .anySatisfy(data -> assertThat(data).isEqualTo("AQ==")); + + assertThat(cloudEventMessages) + .map(CloudEvent::getData) + .anySatisfy(data -> assertThat(data).isEqualTo("message from cloudevent")); + } + + @Test + @DisplayName("Should publish with TTL") + public void testPubSubTTLMetadata() throws Exception { + + // Send a batch of messages on one topic, all to be expired in 1 second. + try (DaprClient client = createDaprClientBuilder().build()) { + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d on topic %s", i, TTL_TOPIC_NAME); + //Publishing messages + client.publishEvent( + PUBSUB_NAME, + TTL_TOPIC_NAME, + message, + Map.of(Metadata.TTL_IN_SECONDS, "1")) + .block(); + System.out.printf("Published message: '%s' to topic '%s' pubsub_name '%s'%n", message, TOPIC_NAME, PUBSUB_NAME); + } + } + + // Sleeps for two seconds to let them expire. + Thread.sleep(2000); + + try (DaprClient client = createDaprClientBuilder().build()) { + callWithRetry(() -> { + System.out.println("Checking results for topic " + TTL_TOPIC_NAME); + final List + messages = client.invokeMethod(PUBSUB_APP_ID, "messages/" + TTL_TOPIC_NAME, null, HttpExtension.GET, List.class).block(); + assertThat(messages).hasSize(0); + }, 2000); + } + } + + @Test + @DisplayName("Should publish long values") + public void testLongValues() throws Exception { + + Random random = new Random(590518626939830271L); + Set values = new HashSet<>(); + values.add(new PubSubIT.ConvertToLong().setVal(590518626939830271L)); + PubSubIT.ConvertToLong val; + for (int i = 0; i < NUM_MESSAGES - 1; i++) { + do { + val = new PubSubIT.ConvertToLong().setVal(random.nextLong()); + } while (values.contains(val)); + values.add(val); + } + Iterator valuesIt = values.iterator(); + try (DaprClient client = createDaprClientBuilder().build()) { + for (int i = 0; i < NUM_MESSAGES; i++) { + PubSubIT.ConvertToLong value = valuesIt.next(); + System.out.println("The long value sent " + value.getValue()); + //Publishing messages + client.publishEvent( + PUBSUB_NAME, + LONG_TOPIC_NAME, + value, + Map.of(Metadata.TTL_IN_SECONDS, "30")).block(); + + try { + Thread.sleep((long) (1000 * Math.random())); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + return; + } + } + } + + Set actual = new HashSet<>(); + try (DaprClient client = createDaprClientBuilder().build()) { + callWithRetry(() -> { + System.out.println("Checking results for topic " + LONG_TOPIC_NAME); + final List> messages = client.invokeMethod( + PUBSUB_APP_ID, + "messages/testinglongvalues", + null, + HttpExtension.GET, CLOUD_EVENT_LONG_LIST_TYPE_REF).block(); + assertNotNull(messages); + for (CloudEvent message : messages) { + actual.add(message.getData()); + } + assertThat(values).isEqualTo(actual); + }, 2000); + } + } + + private static DaprClientBuilder createDaprClientBuilder() { + return new DaprClientBuilder() + .withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getHttpPort()) + .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getGrpcPort()); + } + + private DaprObjectSerializer createJacksonObjectSerializer() { + return new DaprObjectSerializer() { + @Override + public byte[] serialize(Object o) throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsBytes(o); + } + + @Override + public T deserialize(byte[] data, TypeRef type) throws IOException { + return OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType())); + } + + @Override + public String getContentType() { + return "application/json"; + } + }; + } + + private @NotNull DaprObjectSerializer createBinaryObjectSerializer() { + return new DaprObjectSerializer() { + @Override + public byte[] serialize(Object o) { + return (byte[]) o; + } + + @Override + public T deserialize(byte[] data, TypeRef type) { + return (T) data; + } + + @Override + public String getContentType() { + return "application/octet-stream"; + } + }; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java new file mode 100644 index 000000000..0fc85a2a8 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/SubscriberController.java @@ -0,0 +1,271 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers.pubsub.http; + +import io.dapr.Rule; +import io.dapr.Topic; +import io.dapr.client.domain.BulkSubscribeAppResponse; +import io.dapr.client.domain.BulkSubscribeAppResponseEntry; +import io.dapr.client.domain.BulkSubscribeAppResponseStatus; +import io.dapr.client.domain.BulkSubscribeMessage; +import io.dapr.client.domain.BulkSubscribeMessageEntry; +import io.dapr.client.domain.CloudEvent; +import io.dapr.it.pubsub.http.PubSubIT; +import io.dapr.springboot.annotations.BulkSubscribe; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * SpringBoot Controller to handle input binding. + */ +@RestController +public class SubscriberController { + + private final Map>> messagesByTopic = Collections.synchronizedMap(new HashMap<>()); + + @GetMapping(path = "/messages/{topic}") + public List> getMessagesByTopic(@PathVariable("topic") String topic) { + return messagesByTopic.getOrDefault(topic, Collections.emptyList()); + } + + private static final List messagesReceivedBulkPublishTopic = new ArrayList(); + private static final List messagesReceivedTestingTopic = new ArrayList(); + private static final List messagesReceivedTestingTopicV2 = new ArrayList(); + private static final List messagesReceivedTestingTopicV3 = new ArrayList(); + private static final List responsesReceivedTestingTopicBulkSub = new ArrayList<>(); + + @GetMapping(path = "/messages/redis/testingbulktopic") + public List getMessagesReceivedBulkTopic() { + return messagesReceivedBulkPublishTopic; + } + + + + @GetMapping(path = "/messages/testingtopic") + public List getMessagesReceivedTestingTopic() { + return messagesReceivedTestingTopic; + } + + @GetMapping(path = "/messages/testingtopicV2") + public List getMessagesReceivedTestingTopicV2() { + return messagesReceivedTestingTopicV2; + } + + @GetMapping(path = "/messages/testingtopicV3") + public List getMessagesReceivedTestingTopicV3() { + return messagesReceivedTestingTopicV3; + } + + @GetMapping(path = "/messages/topicBulkSub") + public List getMessagesReceivedTestingTopicBulkSub() { + System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size()); + return responsesReceivedTestingTopicBulkSub; + } + + @Topic(name = "testingtopic", pubsubName = "pubsub") + @PostMapping("/route1") + public Mono handleMessage(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedTestingTopic.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testingbulktopic", pubsubName = "pubsub") + @PostMapping("/route1_redis") + public Mono handleBulkTopicMessage(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing bulk publish topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedBulkPublishTopic.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testingtopic", pubsubName = "pubsub", + rule = @Rule(match = "event.type == 'myevent.v2'", priority = 2)) + @PostMapping(path = "/route1_v2") + public Mono handleMessageV2(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedTestingTopicV2.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testingtopic", pubsubName = "pubsub", + rule = @Rule(match = "event.type == 'myevent.v3'", priority = 1)) + @PostMapping(path = "/route1_v3") + public Mono handleMessageV3(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedTestingTopicV3.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "typedtestingtopic", pubsubName = "pubsub") + @PostMapping(path = "/route1b") + public Mono handleMessageTyped(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String id = envelope.getData() == null ? "" : envelope.getData().getId(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing typed topic Subscriber got message with ID: " + id + "; Content-type: " + contentType); + messagesByTopic.compute("typedtestingtopic", merge(envelope)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "binarytopic", pubsubName = "pubsub") + @PostMapping(path = "/route2") + public Mono handleBinaryMessage(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Binary topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesByTopic.compute("binarytopic", merge(envelope)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "#{'another'.concat('topic')}", pubsubName = "${pubsubName:pubsub}") + @PostMapping(path = "/route3") + public Mono handleMessageAnotherTopic(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + System.out.println("Another topic Subscriber got message: " + message); + messagesByTopic.compute("anothertopic", merge(envelope)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @PostMapping(path = "/route4") + public Mono handleMessageTTLTopic(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + System.out.println("TTL topic Subscriber got message: " + message); + messagesByTopic.compute("ttltopic", merge(envelope)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testinglongvalues", pubsubName = "pubsub") + @PostMapping(path = "/testinglongvalues") + public Mono handleMessageLongValues(@RequestBody(required = false) CloudEvent cloudEvent) { + return Mono.fromRunnable(() -> { + try { + Long message = cloudEvent.getData().getValue(); + System.out.println("Subscriber got: " + message); + messagesByTopic.compute("testinglongvalues", merge(cloudEvent)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + /** + * Receive messages using the bulk subscribe API. + * The maxBulkSubCount and maxBulkSubAwaitDurationMs are adjusted to ensure + * that all the test messages arrive in a single batch. + * + * @param bulkMessage incoming bulk of messages from the message bus. + * @return status for each message received. + */ + @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 100) + @Topic(name = "topicBulkSub", pubsubName = "pubsub") + @PostMapping(path = "/routeBulkSub") + public Mono handleMessageBulk( + @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { + return Mono.fromCallable(() -> { + System.out.println("bulkMessage: " + bulkMessage.getEntries().size()); + + if (bulkMessage.getEntries().size() == 0) { + BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(new ArrayList<>()); + responsesReceivedTestingTopicBulkSub.add(response); + System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size()); + return response; + } + + List entries = new ArrayList<>(); + for (BulkSubscribeMessageEntry entry: bulkMessage.getEntries()) { + try { + System.out.printf("Bulk Subscriber got entry ID: %s\n", entry.getEntryId()); + entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS)); + } catch (Exception e) { + entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY)); + } + } + BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(entries); + responsesReceivedTestingTopicBulkSub.add(response); + System.out.println("res size: " + responsesReceivedTestingTopicBulkSub.size()); + + return response; + }); + } + + private BiFunction>, List>> merge(final CloudEvent item) { + return (key, value) -> { + final List> list = value == null ? new ArrayList<>() : value; + list.add(item); + return list; + }; + } + + @GetMapping(path = "/health") + public void health() { + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/TestPubSubApplication.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/TestPubSubApplication.java new file mode 100644 index 000000000..2ed550cac --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/http/TestPubSubApplication.java @@ -0,0 +1,23 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.http; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class TestPubSubApplication { + public static void main(String[] args) { + SpringApplication.run(TestPubSubApplication.class, args); + } +}