From 0202ab7a66fc5a40a019dc553e7b8fd1bc94d5f0 Mon Sep 17 00:00:00 2001 From: MregXN Date: Mon, 5 Jun 2023 16:57:06 +0800 Subject: [PATCH] cover validation to grpc subscriber Signed-off-by: MregXN --- .../java/io/dapr/examples/pubsub/README.md | 470 ++++++++++++------ 1 file changed, 306 insertions(+), 164 deletions(-) diff --git a/examples/src/main/java/io/dapr/examples/pubsub/README.md b/examples/src/main/java/io/dapr/examples/pubsub/README.md index 4c713af45..6b1f94a26 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/README.md @@ -38,168 +38,10 @@ Then get into the examples directory: cd examples ``` -### Running the subscriber - -The first is the subscriber. It will subscribe to the topic to be used by the publisher and read the messages published. The Subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. There are gRPC version and HTTP version of subscriber in grpc and http folders. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below: - -```java -public class Subscriber { - - public static void main(String[] args) throws Exception { - ///... - // Start Dapr's callback endpoint. - DaprApplication.start([PROTOCAL],port); - } -} -``` -`DaprApplication.start()` Method will run an Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request, or the `SubscriberGrpcService`, which implemente the grpc methods that sidecar will call. - -**HTTP Version** - -The Dapr's sidecar is the one that performs the actual call to the controller, based on the pubsub features. This Spring Controller handles the message endpoint, printing the message which is received as the POST body. - -The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in -[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations). - -The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`). - -```java -@RestController -public class SubscriberController { - ///... - @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}") - @PostMapping(path = "/testingtopic") - public Mono handleMessage(@RequestBody(required = false) byte[] body, - @RequestHeader Map headers) { - return Mono.fromRunnable(() -> { - try { - // Dapr's event is compliant to CloudEvent. - CloudEventEnvelope envelope = SERIALIZER.deserialize(body, CloudEventEnvelope.class); - - String message = envelope.getData() == null ? "" : envelope.getData(); - System.out.println("Subscriber got message: " + message); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } -} -``` - -The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example on how to handle the bulk messages and respond correctly. - -```java -@RestController -public class SubscriberController { - ///... - @BulkSubscribe() - @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}") - @PostMapping(path = "/testingtopicbulk") - public Mono handleBulkMessage( - @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { - return Mono.fromCallable(() -> { - System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages."); - - List entries = new ArrayList(); - for (BulkSubscribeMessageEntry entry : bulkMessage.getEntries()) { - try { - System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId()); - CloudEvent cloudEvent = (CloudEvent) entry.getEvent(); - System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData()); - entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS)); - } catch (Exception e) { - e.printStackTrace(); - entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY)); - } - } - return new BulkSubscribeAppResponse(entries); - }); - } -} -``` - - - -Execute the follow script in order to run the HTTP Subscriber example: - - - -```bash -dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol http -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000 -``` - - - -**gRPC Version** - -The Spring GrpcService implements the methods required for gRPC communication with Dapr\`s sidecar. - -The `SubscriberGrpcService.java` snippet below shows the details. Dapr\`s sidecar will call `listTopicSubscriptions` to get topic and pubsubname that are contained in response before subscription starts. After the pubsub component in sidecar subscribes successfully from the specified topic, message will be sent to the method `onTopicEvent` in request parameter. - -```java -@GrpcService -public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase { - private final List topicSubscriptionList = new ArrayList<>(); - private final DaprObjectSerializer objectSerializer = new DefaultObjectSerializer(); - - @Override - public void listTopicSubscriptions(Empty request, - StreamObserver responseObserver) { - registerConsumer("messagebus","testingtopic"); - try { - DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos.ListTopicSubscriptionsResponse - .newBuilder(); - topicSubscriptionList.forEach(builder::addSubscriptions); - DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build(); - responseObserver.onNext(response); - } catch (Throwable e) { - responseObserver.onError(e); - } finally { - responseObserver.onCompleted(); - } - } - - @Override - public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request, - StreamObserver responseObserver) { - try { - System.out.println("Subscriber got: " + request.getData()); - DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder() - .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS) - .build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (Throwable e) { - responseObserver.onError(e); - } - } - ///... -} -``` - - -Execute the follow script in order to run the gRPC Subscriber example: - -```bash -dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000 -``` - ### Running the publisher -Another component is the publisher. It is a simple java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. +The first is the publisher. It is a simple java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and received objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below: Dapr sidecar will automatically wrap the payload received into a CloudEvent object, which will later on parsed by the subscriber. @@ -475,7 +317,6 @@ match_order: sequential expected_stdout_lines: - '== APP == Published the set of messages in a single call to Dapr' background: true -sleep: 20 --> ```bash @@ -537,9 +378,7 @@ The Subscriber started previously [here](#running-the-subscriber) should print t Messages have been retrieved from the topic. -### Bulk Subscription - -You can also run the publisher to publish messages to `testingtopicbulk` topic, and receive messages using the bulk subscription. +You can also run the publisher to publish messages to `testingtopicbulk` topic, so that it can receive messages using the bulk subscription. ```bash @@ -583,6 +421,310 @@ Once running, the Publisher should print the same output as seen [above](#runnin ``` +### Running the subscriber + +Another component is the subscriber. It will subscribe to the topic to be used by the publisher and read the messages published. The Subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. There are gRPC version and HTTP version of subscriber in grpc and http folders. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below: + +```java +public class Subscriber { + + public static void main(String[] args) throws Exception { + ///... + // Start Dapr's callback endpoint. + DaprApplication.start([PROTOCAL],port); + } +} +``` +`DaprApplication.start()` Method will run an Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request, or the `SubscriberGrpcService`, which implemente the grpc methods that sidecar will call. + +**HTTP Version** + +The Dapr's sidecar is the one that performs the actual call to the controller, based on the pubsub features. This Spring Controller handles the message endpoint, printing the message which is received as the POST body. + +The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in +[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations). + +The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`). + +```java +@RestController +public class SubscriberController { + ///... + @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}") + @PostMapping(path = "/testingtopic") + public Mono handleMessage(@RequestBody(required = false) byte[] body, + @RequestHeader Map headers) { + return Mono.fromRunnable(() -> { + try { + // Dapr's event is compliant to CloudEvent. + CloudEventEnvelope envelope = SERIALIZER.deserialize(body, CloudEventEnvelope.class); + + String message = envelope.getData() == null ? "" : envelope.getData(); + System.out.println("Subscriber got message: " + message); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +} +``` + +The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example on how to handle the bulk messages and respond correctly. + +```java +@RestController +public class SubscriberController { + ///... + @BulkSubscribe() + @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}") + @PostMapping(path = "/testingtopicbulk") + public Mono handleBulkMessage( + @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { + return Mono.fromCallable(() -> { + System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages."); + + List entries = new ArrayList(); + for (BulkSubscribeMessageEntry entry : bulkMessage.getEntries()) { + try { + System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId()); + CloudEvent cloudEvent = (CloudEvent) entry.getEvent(); + System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData()); + entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS)); + } catch (Exception e) { + e.printStackTrace(); + entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY)); + } + } + return new BulkSubscribeAppResponse(entries); + }); + } +} +``` + + + +Execute the follow script in order to run the HTTP Subscriber example: + + + +```bash +dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol http -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000 +``` + + + +**gRPC Version** + +The Spring GrpcService implements the methods required for gRPC communication with Dapr\`s sidecar. + +The `SubscriberGrpcService.java` snippet below shows the details. Dapr\`s sidecar will call `listTopicSubscriptions` to get topic and pubsubname that are contained in response before subscription starts. After the pubsub component in sidecar subscribes successfully from the specified topic, message will be sent to the method `onTopicEvent` in request parameter. + +```java +@GrpcService +public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase { + private final List topicSubscriptionList = new ArrayList<>(); + private final DaprObjectSerializer objectSerializer = new DefaultObjectSerializer(); + + @Override + public void listTopicSubscriptions(Empty request, + StreamObserver responseObserver) { + registerConsumer("messagebus", "testingtopic", false); + registerConsumer("messagebus", "testingtopicbulk", true); + try { + DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos + .ListTopicSubscriptionsResponse.newBuilder(); + topicSubscriptionList.forEach(builder::addSubscriptions); + DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build(); + responseObserver.onNext(response); + } catch (Throwable e) { + responseObserver.onError(e); + } finally { + responseObserver.onCompleted(); + } + } + + @Override + public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request, + StreamObserver responseObserver) { + try { + String data = request.getData().toStringUtf8().replace("\"", ""); + System.out.println("Subscriber got: " + data); + DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder() + .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable e) { + responseObserver.onError(e); + } + } + ///... +} +``` +The `BulkSubscriberGrpcService.java` file is responsible for implementing the processing of bulk message subscriptions. When Dapr's sidecar successfully subscribes to bulk messages, it will call `onBulkTopicEventAlpha1` and pass them as a request parameter. You can refer to the example on how to handle bulk messages and respond correctly over gPRC. + +```java +public class BulkSubscriberGrpcService extends AppCallbackAlphaGrpc.AppCallbackAlphaImplBase { + + @Override + public void onBulkTopicEventAlpha1(io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequest request, + io.grpc.stub.StreamObserver responseObserver) { + try { + TopicEventBulkResponse.Builder responseBuilder = TopicEventBulkResponse.newBuilder(); + + if (request.getEntriesCount() == 0) { + responseObserver.onNext(responseBuilder.build()); + responseObserver.onCompleted(); + } + + System.out.println("Bulk Subscriber received " + request.getEntriesCount() + " messages."); + + for (TopicEventBulkRequestEntry entry : request.getEntriesList()) { + try { + System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId()); + System.out.printf("Bulk Subscriber got: %s\n", entry.getCloudEvent().getData().toStringUtf8()); + TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry + .newBuilder() + .setEntryId(entry.getEntryId()) + .setStatusValue(TopicEventResponseStatus.SUCCESS_VALUE); + responseBuilder.addStatuses(responseEntryBuilder); + } catch (Throwable e) { + TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry + .newBuilder() + .setEntryId(entry.getEntryId()) + .setStatusValue(TopicEventResponseStatus.RETRY_VALUE); + responseBuilder.addStatuses(responseEntryBuilder); + } + } + TopicEventBulkResponse response = responseBuilder.build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable e) { + responseObserver.onError(e); + } + } + +} +``` + +Execute the follow script in order to run the gRPC Subscriber example: + + +```bash +// stop http subscriber if you have started one. +dapr stop --app-id subscriber + + +// start a grpc subscriber +dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000 +``` + + + +Use the follow command to start a publisher + + +```bash +dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher +``` + + + +Messages will be subscribed by the gRPC Subscriber and will print the output as follows: + +``` +== APP == Subscriber got: This is message #0 +== APP == Subscriber got: This is message #1 +== APP == Subscriber got: This is message #2 +== APP == Subscriber got: This is message #3 +== APP == Subscriber got: This is message #4 +== APP == Subscriber got: This is message #5 +== APP == Subscriber got: This is message #6 +== APP == Subscriber got: This is message #7 +== APP == Subscriber got: This is message #8 +== APP == Subscriber got: This is message #9 +``` + +If you run a bulk publisher using following command + + +```bash +dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher testingtopicbulk +``` + + + + +The console will print the output: + +``` +== APP == Bulk Subscriber received 2 messages. +== APP == Bulk Subscriber message has entry ID: 0c917871-557c-41db-b038-a250d23ee07c +== APP == Bulk Subscriber got: "This is message #0" +== APP == Bulk Subscriber message has entry ID: df114819-0db9-4a04-b06b-d5844d3ff731 +== APP == Bulk Subscriber got: "This is message #1" +== APP == Bulk Subscriber received 2 messages. +== APP == Bulk Subscriber message has entry ID: 8161f1e2-caf0-446a-81f7-0b40e7350e19 +== APP == Bulk Subscriber got: "This is message #2" +== APP == Bulk Subscriber message has entry ID: 173fafa1-d187-4b2d-83bf-b4da00616a3a +== APP == Bulk Subscriber got: "This is message #3" +== APP == Bulk Subscriber received 2 messages. +== APP == Bulk Subscriber message has entry ID: 94f89996-155d-4b52-8a8c-c268662ff3a2 +== APP == Bulk Subscriber got: "This is message #4" +== APP == Bulk Subscriber message has entry ID: 497c47a4-e4f7-4aeb-abde-099068dea30c +== APP == Bulk Subscriber got: "This is message #5" +== APP == Bulk Subscriber received 1 messages. +== APP == Bulk Subscriber message has entry ID: 892fb1ae-b027-457a-860e-52b9c3219270 +== APP == Bulk Subscriber got: "This is message #6" +== APP == Bulk Subscriber received 2 messages. +== APP == Bulk Subscriber message has entry ID: 95abf8cc-e033-483d-9a55-e0491bd97930 +== APP == Bulk Subscriber got: "This is message #7" +== APP == Bulk Subscriber message has entry ID: eaf75a31-335e-4c1a-b19a-6aa1710f625a +== APP == Bulk Subscriber got: "This is message #8" +== APP == Bulk Subscriber received 1 messages. +== APP == Bulk Subscriber message has entry ID: 9836ef69-6d3c-4738-ba99-1d0ce68ec06b +== APP == Bulk Subscriber got: "This is message #9" +``` +>The order of the events that are published are not guaranteed if use redis. + ### Tracing Dapr handles tracing in PubSub automatically. Open Zipkin on [http://localhost:9411/zipkin](http://localhost:9411/zipkin). You should see a screen like the one below: