diff --git a/examples/src/main/java/io/dapr/examples/pubsub/README.md b/examples/src/main/java/io/dapr/examples/pubsub/README.md index ce723510b8..11bb5b5597 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/README.md @@ -37,168 +37,10 @@ Then get into the examples directory: ```sh cd examples ``` - ### Initialize Dapr Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. -### Running the subscriber - -The subscriber will subscribe to the topic to be used by the publisher and read the messages published. The subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. There is a gRPC version and HTTP version of the subscriber in the grpc and http folders. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below: - -```java -public class Subscriber { - - public static void main(String[] args) throws Exception { - ///... - // Start Dapr's callback endpoint. - DaprApplication.start([PROTOCAL],port); - } -} -``` -`DaprApplication.start()` Method will run a Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request, or the `SubscriberGrpcService`, which implement the grpc methods that the sidecar will call. - -**HTTP Version** - -The Dapr sidecar is the one that performs the actual call to the controller, based on the pubsub features. This Spring Controller handles the message endpoint, printing the message which is received as the POST body. - -The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in -[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations). - -The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`). - -```java -@RestController -public class SubscriberController { - ///... - @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}") - @PostMapping(path = "/testingtopic") - public Mono handleMessage(@RequestBody(required = false) byte[] body, - @RequestHeader Map headers) { - return Mono.fromRunnable(() -> { - try { - // Dapr's event is compliant to CloudEvent. - CloudEventEnvelope envelope = SERIALIZER.deserialize(body, CloudEventEnvelope.class); - - String message = envelope.getData() == null ? "" : envelope.getData(); - System.out.println("Subscriber got message: " + message); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } -} -``` - -The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example below on how to handle the bulk messages and respond correctly. - -```java -@RestController -public class SubscriberController { - ///... - @BulkSubscribe() - @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}") - @PostMapping(path = "/testingtopicbulk") - public Mono handleBulkMessage( - @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { - return Mono.fromCallable(() -> { - System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages."); - - List entries = new ArrayList(); - for (BulkSubscribeMessageEntry entry : bulkMessage.getEntries()) { - try { - System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId()); - CloudEvent cloudEvent = (CloudEvent) entry.getEvent(); - System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData()); - entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS)); - } catch (Exception e) { - e.printStackTrace(); - entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY)); - } - } - return new BulkSubscribeAppResponse(entries); - }); - } -} -``` - - -Execute the following command to run the HTTP Subscriber example: - - - -```bash -dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol http -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000 -``` - - - -**gRPC Version** - -The Spring GrpcService implements the methods required for gRPC communication with Dapr\`s sidecar. - -The `SubscriberGrpcService.java` snippet below shows the details. Dapr\`s sidecar will call `listTopicSubscriptions` to get the topic and pubsub name that are contained in the response before the subscription starts. After the pubsub component in the sidecar subscribes successfully to the specified topic, a message will be sent to the method `onTopicEvent` in the request parameter. - -```java -@GrpcService -public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase { - private final List topicSubscriptionList = new ArrayList<>(); - private final DaprObjectSerializer objectSerializer = new DefaultObjectSerializer(); - - @Override - public void listTopicSubscriptions(Empty request, - StreamObserver responseObserver) { - registerConsumer("messagebus","testingtopic"); - try { - DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos.ListTopicSubscriptionsResponse - .newBuilder(); - topicSubscriptionList.forEach(builder::addSubscriptions); - DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build(); - responseObserver.onNext(response); - } catch (Throwable e) { - responseObserver.onError(e); - } finally { - responseObserver.onCompleted(); - } - } - - @Override - public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request, - StreamObserver responseObserver) { - try { - System.out.println("Subscriber got: " + request.getData()); - DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder() - .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS) - .build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } catch (Throwable e) { - responseObserver.onError(e); - } - } - ///... -} -``` - -Execute the following command to run the gRPC Subscriber example: - -```bash -dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000 -``` - - ### Running the publisher The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. @@ -477,7 +319,6 @@ match_order: sequential expected_stdout_lines: - '== APP == Published the set of messages in a single call to Dapr' background: true -sleep: 20 --> ```bash @@ -539,9 +380,7 @@ The Subscriber started previously [here](#running-the-subscriber) should print t Messages have been retrieved from the topic. -### Bulk Subscription - -You can also run the publisher to publish messages to `testingtopicbulk` topic, and receive messages using the bulk subscription. +You can also run the publisher to publish messages to `testingtopicbulk` topic, so that it can receive messages using the bulk subscription. ```bash @@ -585,6 +423,306 @@ Once running, the Publisher should print the same output as seen [above](#runnin ``` +### Running the subscriber + +The subscriber will subscribe to the topic to be used by the publisher and read the messages published. The subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. There is a gRPC version and HTTP version of the subscriber in the grpc and http folders. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below: + +```java +public class Subscriber { + + public static void main(String[] args) throws Exception { + ///... + // Start Dapr's callback endpoint. + DaprApplication.start([PROTOCAL],port); + } +} +``` +`DaprApplication.start()` Method will run a Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request, or the `SubscriberGrpcService`, which implement the grpc methods that the sidecar will call. + +**HTTP Version** + +The Dapr sidecar is the one that performs the actual call to the controller, based on the pubsub features. This Spring Controller handles the message endpoint, printing the message which is received as the POST body. + +The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in +[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations). + +The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`). + +```java +@RestController +public class SubscriberController { + ///... + @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}") + @PostMapping(path = "/testingtopic") + public Mono handleMessage(@RequestBody(required = false) byte[] body, + @RequestHeader Map headers) { + return Mono.fromRunnable(() -> { + try { + // Dapr's event is compliant to CloudEvent. + CloudEventEnvelope envelope = SERIALIZER.deserialize(body, CloudEventEnvelope.class); + + String message = envelope.getData() == null ? "" : envelope.getData(); + System.out.println("Subscriber got message: " + message); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +} +``` + +The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example below on how to handle the bulk messages and respond correctly. + +```java +@RestController +public class SubscriberController { + ///... + @BulkSubscribe() + @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}") + @PostMapping(path = "/testingtopicbulk") + public Mono handleBulkMessage( + @RequestBody(required = false) BulkSubscribeMessage> bulkMessage) { + return Mono.fromCallable(() -> { + System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages."); + + List entries = new ArrayList(); + for (BulkSubscribeMessageEntry entry : bulkMessage.getEntries()) { + try { + System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId()); + CloudEvent cloudEvent = (CloudEvent) entry.getEvent(); + System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData()); + entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS)); + } catch (Exception e) { + e.printStackTrace(); + entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY)); + } + } + return new BulkSubscribeAppResponse(entries); + }); + } +} +``` + +Execute the following command to run the HTTP Subscriber example: + + + +```bash +dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol http -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000 +``` + + + +**gRPC Version** + +The Spring GrpcService implements the methods required for gRPC communication with Dapr\`s sidecar. + +The `SubscriberGrpcService.java` snippet below shows the details. Dapr\`s sidecar will call `listTopicSubscriptions` to get the topic and pubsub name that are contained in the response before the subscription starts. After the pubsub component in the sidecar subscribes successfully to the specified topic, a message will be sent to the method `onTopicEvent` in the request parameter. + +```java +@GrpcService +public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase { + private final List topicSubscriptionList = new ArrayList<>(); + private final DaprObjectSerializer objectSerializer = new DefaultObjectSerializer(); + + @Override + public void listTopicSubscriptions(Empty request, + StreamObserver responseObserver) { + registerConsumer("messagebus", "testingtopic", false); + registerConsumer("messagebus", "testingtopicbulk", true); + try { + DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos + .ListTopicSubscriptionsResponse.newBuilder(); + topicSubscriptionList.forEach(builder::addSubscriptions); + DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build(); + responseObserver.onNext(response); + } catch (Throwable e) { + responseObserver.onError(e); + } finally { + responseObserver.onCompleted(); + } + } + + @Override + public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request, + StreamObserver responseObserver) { + try { + String data = request.getData().toStringUtf8().replace("\"", ""); + System.out.println("Subscriber got: " + data); + DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder() + .setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS) + .build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable e) { + responseObserver.onError(e); + } + } + ///... +} +``` +The `BulkSubscriberGrpcService.java` file is responsible for implementing the processing of bulk message subscriptions. When Dapr's sidecar successfully subscribes to bulk messages, it will call `onBulkTopicEventAlpha1` and pass them as a request parameter. You can refer to the example on how to handle bulk messages and respond correctly over gPRC. + +```java +public class BulkSubscriberGrpcService extends AppCallbackAlphaGrpc.AppCallbackAlphaImplBase { + + @Override + public void onBulkTopicEventAlpha1(io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequest request, + io.grpc.stub.StreamObserver responseObserver) { + try { + TopicEventBulkResponse.Builder responseBuilder = TopicEventBulkResponse.newBuilder(); + + if (request.getEntriesCount() == 0) { + responseObserver.onNext(responseBuilder.build()); + responseObserver.onCompleted(); + } + + System.out.println("Bulk Subscriber received " + request.getEntriesCount() + " messages."); + + for (TopicEventBulkRequestEntry entry : request.getEntriesList()) { + try { + System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId()); + System.out.printf("Bulk Subscriber got: %s\n", entry.getCloudEvent().getData().toStringUtf8()); + TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry + .newBuilder() + .setEntryId(entry.getEntryId()) + .setStatusValue(TopicEventResponseStatus.SUCCESS_VALUE); + responseBuilder.addStatuses(responseEntryBuilder); + } catch (Throwable e) { + TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry + .newBuilder() + .setEntryId(entry.getEntryId()) + .setStatusValue(TopicEventResponseStatus.RETRY_VALUE); + responseBuilder.addStatuses(responseEntryBuilder); + } + } + TopicEventBulkResponse response = responseBuilder.build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable e) { + responseObserver.onError(e); + } + } + +} +``` + +Execute the following command to run the gRPC Subscriber example: + + +```bash +// stop http subscriber if you have started one. +dapr stop --app-id subscriber + + +// start a grpc subscriber +dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000 +``` + + + +Use the follow command to start a publisher + + +```bash +dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher +``` + + + +Messages will be subscribed by the gRPC Subscriber and will print the output as follows: + +``` +== APP == Subscriber got: This is message #0 +== APP == Subscriber got: This is message #1 +== APP == Subscriber got: This is message #2 +== APP == Subscriber got: This is message #3 +== APP == Subscriber got: This is message #4 +== APP == Subscriber got: This is message #5 +== APP == Subscriber got: This is message #6 +== APP == Subscriber got: This is message #7 +== APP == Subscriber got: This is message #8 +== APP == Subscriber got: This is message #9 +``` + +If you run a bulk publisher using following command + + + +```bash +dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher testingtopicbulk +``` + + + + +The console will print the output: + +``` +== APP == Bulk Subscriber received 2 messages. +== APP == Bulk Subscriber message has entry ID: 0c917871-557c-41db-b038-a250d23ee07c +== APP == Bulk Subscriber got: "This is message #0" +== APP == Bulk Subscriber message has entry ID: df114819-0db9-4a04-b06b-d5844d3ff731 +== APP == Bulk Subscriber got: "This is message #1" +== APP == Bulk Subscriber received 2 messages. +== APP == Bulk Subscriber message has entry ID: 8161f1e2-caf0-446a-81f7-0b40e7350e19 +== APP == Bulk Subscriber got: "This is message #2" +== APP == Bulk Subscriber message has entry ID: 173fafa1-d187-4b2d-83bf-b4da00616a3a +== APP == Bulk Subscriber got: "This is message #3" +== APP == Bulk Subscriber received 2 messages. +== APP == Bulk Subscriber message has entry ID: 94f89996-155d-4b52-8a8c-c268662ff3a2 +== APP == Bulk Subscriber got: "This is message #4" +== APP == Bulk Subscriber message has entry ID: 497c47a4-e4f7-4aeb-abde-099068dea30c +== APP == Bulk Subscriber got: "This is message #5" +== APP == Bulk Subscriber received 1 messages. +== APP == Bulk Subscriber message has entry ID: 892fb1ae-b027-457a-860e-52b9c3219270 +== APP == Bulk Subscriber got: "This is message #6" +== APP == Bulk Subscriber received 2 messages. +== APP == Bulk Subscriber message has entry ID: 95abf8cc-e033-483d-9a55-e0491bd97930 +== APP == Bulk Subscriber got: "This is message #7" +== APP == Bulk Subscriber message has entry ID: eaf75a31-335e-4c1a-b19a-6aa1710f625a +== APP == Bulk Subscriber got: "This is message #8" +== APP == Bulk Subscriber received 1 messages. +== APP == Bulk Subscriber message has entry ID: 9836ef69-6d3c-4738-ba99-1d0ce68ec06b +== APP == Bulk Subscriber got: "This is message #9" +``` +>The order of the events that are published are not guaranteed if use redis. + ### Tracing Dapr handles tracing in PubSub automatically. Open Zipkin on [http://localhost:9411/zipkin](http://localhost:9411/zipkin). You should see a screen like the one below: @@ -684,11 +822,16 @@ For more details on Dapr Spring Boot integration, please refer to [Dapr Spring B +Use the following command to stop your running http subscriber or gRPC subscriber. +```bash +dapr stop --app-id subscriber +``` + +After completing publish , the application will automatically exit. However, you can still use the following command to stop your running publisher. ```bash dapr stop --app-id publisher dapr stop --app-id bulk-publisher -dapr stop --app-id subscriber ``` - + diff --git a/examples/src/main/java/io/dapr/examples/pubsub/grpc/BulkSubscriberGrpcService.java b/examples/src/main/java/io/dapr/examples/pubsub/grpc/BulkSubscriberGrpcService.java new file mode 100644 index 0000000000..5422f5be75 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/pubsub/grpc/BulkSubscriberGrpcService.java @@ -0,0 +1,65 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.pubsub.grpc; + +import io.dapr.v1.AppCallbackAlphaGrpc; +import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequestEntry; +import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponse; +import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponseEntry; +import io.dapr.v1.DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus; + +/** + * Class that encapsulates all client-side logic for Grpc. + */ +public class BulkSubscriberGrpcService extends AppCallbackAlphaGrpc.AppCallbackAlphaImplBase { + + @Override + public void onBulkTopicEventAlpha1(io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequest request, + io.grpc.stub.StreamObserver responseObserver) { + try { + TopicEventBulkResponse.Builder responseBuilder = TopicEventBulkResponse.newBuilder(); + + if (request.getEntriesCount() == 0) { + responseObserver.onNext(responseBuilder.build()); + responseObserver.onCompleted(); + } + + System.out.println("Bulk Subscriber received " + request.getEntriesCount() + " messages."); + + for (TopicEventBulkRequestEntry entry : request.getEntriesList()) { + try { + System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId()); + System.out.printf("Bulk Subscriber got: %s\n", entry.getCloudEvent().getData().toStringUtf8()); + TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry + .newBuilder() + .setEntryId(entry.getEntryId()) + .setStatusValue(TopicEventResponseStatus.SUCCESS_VALUE); + responseBuilder.addStatuses(responseEntryBuilder); + } catch (Throwable e) { + TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry + .newBuilder() + .setEntryId(entry.getEntryId()) + .setStatusValue(TopicEventResponseStatus.RETRY_VALUE); + responseBuilder.addStatuses(responseEntryBuilder); + } + } + TopicEventBulkResponse response = responseBuilder.build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } catch (Throwable e) { + responseObserver.onError(e); + } + } + +} diff --git a/examples/src/main/java/io/dapr/examples/pubsub/grpc/Subscriber.java b/examples/src/main/java/io/dapr/examples/pubsub/grpc/Subscriber.java index e2df82b00f..7893416b70 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/grpc/Subscriber.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/grpc/Subscriber.java @@ -50,6 +50,7 @@ public static void main(String[] args) throws Exception { //start a grpc server Server server = ServerBuilder.forPort(port) .addService(new SubscriberGrpcService()) + .addService(new BulkSubscriberGrpcService()) .build(); server.start(); server.awaitTermination(); diff --git a/examples/src/main/java/io/dapr/examples/pubsub/grpc/SubscriberGrpcService.java b/examples/src/main/java/io/dapr/examples/pubsub/grpc/SubscriberGrpcService.java index dc8f86924d..d454280530 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/grpc/SubscriberGrpcService.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/grpc/SubscriberGrpcService.java @@ -30,7 +30,9 @@ public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase { @Override public void listTopicSubscriptions(Empty request, StreamObserver responseObserver) { - registerConsumer("messagebus","testingtopic"); + registerConsumer("messagebus", "testingtopic", false); + registerConsumer("messagebus", "bulkpublishtesting", false); + registerConsumer("messagebus", "testingtopicbulk", true); try { DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos .ListTopicSubscriptionsResponse.newBuilder(); @@ -65,12 +67,14 @@ public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request, * * @param topic the topic * @param pubsubName the pubsub name + * @param isBulkMessage flag to enable/disable bulk subscribe */ - public void registerConsumer(String pubsubName, String topic) { + public void registerConsumer(String pubsubName, String topic, boolean isBulkMessage) { topicSubscriptionList.add(DaprAppCallbackProtos.TopicSubscription .newBuilder() .setPubsubName(pubsubName) .setTopic(topic) + .setBulkSubscribe(DaprAppCallbackProtos.BulkSubscribeConfig.newBuilder().setEnabled(isBulkMessage)) .build()); } }