Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial bulk publish impl for java #789

Merged
merged 44 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
db5a915
initial bulk publish impl for java
mukundansundar Sep 26, 2022
2c08948
Merge branch 'master' into bulk-publish-sdk
mukundansundar Sep 26, 2022
5bbbaaa
add UTs and clean up java doc for new interface methods.
mukundansundar Sep 26, 2022
7879e31
add more interface methods for bulk publish
mukundansundar Sep 27, 2022
c03cd4c
adding examples and ITs for bulk publish
mukundansundar Sep 28, 2022
93d4b99
addressing review comments
mukundansundar Sep 28, 2022
8bd8b33
use latest ref from dapr branch
mukundansundar Sep 28, 2022
fcd33cc
add example validation
mukundansundar Sep 28, 2022
3e8ae6c
fix bindings example validation
mukundansundar Sep 28, 2022
ad66e82
Merge branch 'master' into bulk-publish-sdk
mukundansundar Dec 16, 2022
2a5c50d
make changes for latest bulk publish dapr changes
mukundansundar Dec 16, 2022
c37ebe0
fix examples
mukundansundar Dec 16, 2022
28218b3
fix examples
mukundansundar Dec 16, 2022
c423e40
fix typo
mukundansundar Dec 16, 2022
48b3a6c
test against java 11 only
mukundansundar Dec 20, 2022
f2a8b35
Merge branch 'master' into bulk-publish-sdk
mukundansundar Jan 6, 2023
f49386d
change to latest dapr commit
mukundansundar Jan 6, 2023
f457fdb
run only pubsub IT, upload failsafe reports as run artifact
mukundansundar Jan 6, 2023
f9da15a
fix checkstyle
mukundansundar Jan 6, 2023
8ff181d
fix IT report upload condition
mukundansundar Jan 6, 2023
21e7334
fix compile issues
mukundansundar Jan 6, 2023
4d4beda
fix spotbugs issue
mukundansundar Jan 6, 2023
5408ac3
run pubsubIT only
mukundansundar Jan 6, 2023
d618c85
change upload artifact name for IT
mukundansundar Jan 6, 2023
6372227
fix tests
mukundansundar Jan 6, 2023
75a1c0f
fix
mukundansundar Jan 6, 2023
0040ead
introduce sleep
mukundansundar Jan 6, 2023
c1d29eb
test bulk publish with redis
mukundansundar Jan 6, 2023
ec8622f
change longvalues test to kafka
mukundansundar Jan 6, 2023
88620e8
change bulk pub to kafka and revert long values changes
mukundansundar Jan 6, 2023
66bf0bb
remove kafka pubsub from pubsub IT
mukundansundar Jan 6, 2023
481ed34
change match order in examples
mukundansundar Jan 6, 2023
9bc9a5c
set fail fast as false
mukundansundar Jan 6, 2023
84f3273
fix Internal Invoke exception in ITs
mukundansundar Jan 6, 2023
59dd45d
address review comments
mukundansundar Jan 9, 2023
2241d7c
fix IT
mukundansundar Jan 9, 2023
a9807d9
fix app scopes in examples
mukundansundar Jan 9, 2023
546818b
add content to daprdocs
mukundansundar Jan 9, 2023
a822a7a
Merge remote-tracking branch 'upstream/master' into bulk-publish-sdk
artursouza Jan 18, 2023
6a5e962
address review comments
mukundansundar Jan 18, 2023
0a98ed1
fix mm.py step comment
mukundansundar Jan 18, 2023
8aaba36
reset bindings examples readme
mukundansundar Jan 18, 2023
89daf58
Merge branch 'master' into bulk-publish-sdk
mukundansundar Jan 19, 2023
99639df
fix example, IT and make classes immutable
mukundansundar Jan 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/components/pubsub/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "localhost:9092"
- name: "authType"
value: "none"
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.grpc;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishRequestEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseEntry;
import io.dapr.client.domain.CloudEvent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BulkPublisher
*/
public class BulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "testingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "kafka-pubsub";

/**
* main method.
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
BulkPublishRequest<Object> request = new BulkPublishRequest<>();
request.setPubsubName(PUBSUB_NAME);
List<BulkPublishRequestEntry<Object>> entries = new ArrayList<>();
request.setTopic(TOPIC_NAME);
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
BulkPublishRequestEntry<Object> entry = new BulkPublishRequestEntry<>("" + (i + 1),
message, "text/plain", new HashMap<>());
entries.add(entry);
}
request.setEntries(entries);
BulkPublishResponse res = client.publishEvents(request).block();
if (res != null) {
for (BulkPublishResponseEntry entry : res.getStatuses()) {
System.out.println("EntryID : " + entry.getEntryID() + " Status : " + entry.getStatus());
}
} else {
throw new Exception("null response from dapr");
}
System.out.println("Done");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.grpc;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishRequestEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseEntry;
import io.dapr.client.domain.CloudEvent;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -Ddapr.grpc.port="50010" \
* -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.CloudEventBulkPublisher
*/
public class CloudEventBulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "testingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "kafka-pubsub";

/**
* main method.
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
BulkPublishRequest<CloudEvent<String>> request = new BulkPublishRequest<>();
request.setPubsubName(PUBSUB_NAME);
List<BulkPublishRequestEntry<CloudEvent<String>>> entries = new ArrayList<>();
request.setTopic(TOPIC_NAME);
for (int i = 0; i < NUM_MESSAGES; i++) {
CloudEvent<String> cloudEvent = new CloudEvent<>();
cloudEvent.setId(UUID.randomUUID().toString());
cloudEvent.setType("example");
cloudEvent.setSpecversion("1");
cloudEvent.setDatacontenttype("text/plain");
cloudEvent.setData(String.format("This is message #%d", i));
BulkPublishRequestEntry<CloudEvent<String>> entry = new BulkPublishRequestEntry<>("" + (i + 1),

cloudEvent, CloudEvent.CONTENT_TYPE, new HashMap<>());
entries.add(entry);
}
request.setEntries(entries);
BulkPublishResponse res = client.publishEvents(request).block();
System.out.println(res);
if (res != null) {
for (BulkPublishResponseEntry entry : res.getStatuses()) {
System.out.println("EntryID : " + entry.getEntryID() + " Status : " + entry.getStatus());
}
} else {
throw new Exception("null response");
}
System.out.println("Done");
}
}
}
10 changes: 10 additions & 0 deletions examples/src/main/java/io/dapr/examples/pubsub/grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
```bash
dapr run --components-path ./components/pubsub --app-id pubsub-test --log-level debug -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.CloudEventBulkPublisher
```
java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.CloudEventBulkPublisher
java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BulkPublisher
java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BinaryBulkPublisher

java -Ddapr.api.protocol="http" -Ddapr.http.port="3500" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BulkPublisher
java -Ddapr.api.protocol="http" -Ddapr.http.port="3500" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.CloudEventBulkPublisher
java -Ddapr.api.protocol="http" -Ddapr.http.port="3500" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BinaryBulkPublisher
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class SubscriberController {
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
@Topic(name = "testingtopic", pubsubName = "kafka-pubsub")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.42.1</grpc.version>
<protobuf.version>3.17.3</protobuf.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/683a436ff1367e4ff2d27da2d79069c04ec2c46d/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/83a6b114667808665cab282313ee1db12eb49105/dapr/proto</dapr.proto.baseurl>
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
Expand Down
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
<limit>
<counter>LINE</counter>
<value>COVEREDRATIO</value>
<minimum>80%</minimum>
<minimum>70%</minimum>
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
</limit>
</limits>
<excludes>
Expand Down
82 changes: 82 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishRequestEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseEntry;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
Expand All @@ -41,6 +45,8 @@
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
Expand All @@ -62,6 +68,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -182,6 +189,81 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
}
}

/**
*
* {@inheritDoc}
*/
public <T> Mono<BulkPublishResponse> publishEvents(BulkPublishRequest<T> request) {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
DaprProtos.BulkPublishRequest.Builder envelopeBuilder = DaprProtos.BulkPublishRequest.newBuilder().setTopic(topic)
.setPubsubName(pubsubName);

for (BulkPublishRequestEntry<?> entry: request.getEntries()) {
Object event = entry.getEvent();
byte[] data;
String contentType = entry.getContentType();

// Serialize event into bytes
if (!Strings.isNullOrEmpty(contentType) && objectSerializer instanceof DefaultObjectSerializer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a problem we need to fix in other places in the SDK. Using the default serializer should not trigger different behavior. It should be based on the content-type only.

Copy link
Contributor Author

@mukundansundar mukundansundar Sep 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a change that needs to be done SDK wide and not only for one api ... If it is is only changed here that might cause serialized events from this API to become incompatible with the other normal publish API ...

// If content type is given by user and default object serializer is used
data = DefaultContentTypeConverter.convertEventToBytesForGrpc(event, contentType);
} else {
// perform the serialization as per user given input of serializer
// this is also the case when content type is empty
data = objectSerializer.serialize(event);
if (Strings.isNullOrEmpty(contentType)) {
// Only override content type if not given in input by user
contentType = objectSerializer.getContentType();
}
}

DaprProtos.BulkPublishRequestEntry.Builder reqEntryBuilder = DaprProtos.BulkPublishRequestEntry.newBuilder()
.setEntryID(entry.getEntryID())
.setEvent(ByteString.copyFrom(data))
.setContentType(contentType);
Map<String, String> metadata = entry.getMetadata();
if (metadata != null) {
reqEntryBuilder.putAllMetadata(metadata);
}
envelopeBuilder.addEntries(reqEntryBuilder.build());
}

// Set metadata if available
Map<String, String> metadata = request.getMetadata();
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);
}

return Mono.subscriberContext().flatMap(
context ->
this.<DaprProtos.BulkPublishResponse>createMono(
it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it)
)
).map(
it -> {
BulkPublishResponse response = new BulkPublishResponse();
List<BulkPublishResponseEntry> statuses = new ArrayList<>();
for (DaprProtos.BulkPublishResponseEntry entry : it.getStatusesList()) {
BulkPublishResponseEntry domainEntry = new BulkPublishResponseEntry();
domainEntry.setEntryID(entry.getEntryID());
if (entry.getStatus() == DaprProtos.BulkPublishResponseEntry.Status.SUCCESS) {
domainEntry.setStatus(BulkPublishResponseEntry.PublishStatus.SUCCESS);
} else {
domainEntry.setStatus(BulkPublishResponseEntry.PublishStatus.FAILED);
}
statuses.add(domainEntry);
}
response.setStatuses(statuses);
return response;
}
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}

/**
* {@inheritDoc}
*/
Expand Down
Loading