Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial bulk publish impl for java #789

Merged
merged 44 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
db5a915
initial bulk publish impl for java
mukundansundar Sep 26, 2022
2c08948
Merge branch 'master' into bulk-publish-sdk
mukundansundar Sep 26, 2022
5bbbaaa
add UTs and clean up java doc for new interface methods.
mukundansundar Sep 26, 2022
7879e31
add more interface methods for bulk publish
mukundansundar Sep 27, 2022
c03cd4c
adding examples and ITs for bulk publish
mukundansundar Sep 28, 2022
93d4b99
addressing review comments
mukundansundar Sep 28, 2022
8bd8b33
use latest ref from dapr branch
mukundansundar Sep 28, 2022
fcd33cc
add example validation
mukundansundar Sep 28, 2022
3e8ae6c
fix bindings example validation
mukundansundar Sep 28, 2022
ad66e82
Merge branch 'master' into bulk-publish-sdk
mukundansundar Dec 16, 2022
2a5c50d
make changes for latest bulk publish dapr changes
mukundansundar Dec 16, 2022
c37ebe0
fix examples
mukundansundar Dec 16, 2022
28218b3
fix examples
mukundansundar Dec 16, 2022
c423e40
fix typo
mukundansundar Dec 16, 2022
48b3a6c
test against java 11 only
mukundansundar Dec 20, 2022
f2a8b35
Merge branch 'master' into bulk-publish-sdk
mukundansundar Jan 6, 2023
f49386d
change to latest dapr commit
mukundansundar Jan 6, 2023
f457fdb
run only pubsub IT, upload failsafe reports as run artifact
mukundansundar Jan 6, 2023
f9da15a
fix checkstyle
mukundansundar Jan 6, 2023
8ff181d
fix IT report upload condition
mukundansundar Jan 6, 2023
21e7334
fix compile issues
mukundansundar Jan 6, 2023
4d4beda
fix spotbugs issue
mukundansundar Jan 6, 2023
5408ac3
run pubsubIT only
mukundansundar Jan 6, 2023
d618c85
change upload artifact name for IT
mukundansundar Jan 6, 2023
6372227
fix tests
mukundansundar Jan 6, 2023
75a1c0f
fix
mukundansundar Jan 6, 2023
0040ead
introduce sleep
mukundansundar Jan 6, 2023
c1d29eb
test bulk publish with redis
mukundansundar Jan 6, 2023
ec8622f
change longvalues test to kafka
mukundansundar Jan 6, 2023
88620e8
change bulk pub to kafka and revert long values changes
mukundansundar Jan 6, 2023
66bf0bb
remove kafka pubsub from pubsub IT
mukundansundar Jan 6, 2023
481ed34
change match order in examples
mukundansundar Jan 6, 2023
9bc9a5c
set fail fast as false
mukundansundar Jan 6, 2023
84f3273
fix Internal Invoke exception in ITs
mukundansundar Jan 6, 2023
59dd45d
address review comments
mukundansundar Jan 9, 2023
2241d7c
fix IT
mukundansundar Jan 9, 2023
a9807d9
fix app scopes in examples
mukundansundar Jan 9, 2023
546818b
add content to daprdocs
mukundansundar Jan 9, 2023
a822a7a
Merge remote-tracking branch 'upstream/master' into bulk-publish-sdk
artursouza Jan 18, 2023
6a5e962
address review comments
mukundansundar Jan 18, 2023
0a98ed1
fix mm.py step comment
mukundansundar Jan 18, 2023
8aaba36
reset bindings examples readme
mukundansundar Jan 18, 2023
89daf58
Merge branch 'master' into bulk-publish-sdk
mukundansundar Jan 19, 2023
99639df
fix example, IT and make classes immutable
mukundansundar Jan 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/components/pubsub/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "localhost:9092"
- name: "authType"
value: "none"
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.grpc;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishRequestEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseEntry;
import io.dapr.client.domain.CloudEvent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BulkPublisher
*/
public class BulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "testingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "kafka-pubsub";

/**
* main method.
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).withObjectSerializer(new CustomSerializer()).buildPreviewClient()) {
System.out.println("Using preview client...");
BulkPublishRequest<Object> request = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME);
List<BulkPublishRequestEntry<Object>> entries = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
BulkPublishRequestEntry<Object> entry = new BulkPublishRequestEntry<>("" + (i + 1),
message, "text/plain", new HashMap<>());
entries.add(entry);
}
request.setEntries(entries);
BulkPublishResponse res = client.publishEvents(request).block();
if (res != null) {
for (BulkPublishResponseEntry entry : res.getStatuses()) {
System.out.println("EntryID : " + entry.getEntryID() + " Status : " + entry.getStatus());
}
} else {
throw new Exception("null response from dapr");
}
System.out.println("Done");
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.grpc;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishRequestEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;


import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -Ddapr.grpc.port="50010" \
* -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.CloudEventBulkPublisher
*/
public class CloudEventBulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "testingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "kafka-pubsub";

/**
* main method.
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).withObjectSerializer(new CustomSerializer()).buildPreviewClient()) {
System.out.println("Using preview client...");
BulkPublishRequest<CloudEvent<Map<String, String>>> request = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME);
List<BulkPublishRequestEntry<CloudEvent<Map<String, String>>>> entries = new ArrayList<>();
for (int i = 0; i < 1; i++) {
CloudEvent<Map<String, String>> cloudEvent = new CloudEvent<>();
cloudEvent.setId(UUID.randomUUID().toString());
cloudEvent.setType("example");
cloudEvent.setSpecversion("1");
cloudEvent.setDatacontenttype("application/json");
String val = String.format("This is message #%d", i);
cloudEvent.setData(new HashMap<>(){{
put("dataKey", val);
}});
BulkPublishRequestEntry<CloudEvent<Map<String, String>>> entry = new BulkPublishRequestEntry<>("" + (i + 1),

cloudEvent, CloudEvent.CONTENT_TYPE, new HashMap<>());
entries.add(entry);
}
request.setEntries(entries);
BulkPublishResponse res = client.publishEvents(request).block();
System.out.println(res);
if (res != null) {
for (BulkPublishResponseEntry entry : res.getStatuses()) {
System.out.println("EntryID : " + entry.getEntryID() + " Status : " + entry.getStatus());
}
} else {
throw new Exception("null response");
}
System.out.println("Done");
}
}
}

class CustomSerializer implements DaprObjectSerializer {

private final DefaultObjectSerializer serializer = new DefaultObjectSerializer();

@Override
public byte[] serialize(Object o) throws IOException {
return serializer.serialize(o);
}

@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return serializer.deserialize(data, type);
}

@Override
public String getContentType() {
return "application/json";
}
}
10 changes: 10 additions & 0 deletions examples/src/main/java/io/dapr/examples/pubsub/grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
```bash
dapr run --components-path ./components/pubsub --app-id pubsub-test --log-level debug -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.CloudEventBulkPublisher
```
java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.CloudEventBulkPublisher
java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BulkPublisher
java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BinaryBulkPublisher

java -Ddapr.api.protocol="http" -Ddapr.http.port="3500" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BulkPublisher
java -Ddapr.api.protocol="http" -Ddapr.http.port="3500" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.CloudEventBulkPublisher
java -Ddapr.api.protocol="http" -Ddapr.http.port="3500" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.BinaryBulkPublisher
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class Publisher {
private static final String TOPIC_NAME = "testingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";
private static final String PUBSUB_NAME = "kafka-pubsub";

/**
* This is the entry point of the publisher app example.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class SubscriberController {
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
@Topic(name = "testingtopic", pubsubName = "kafka-pubsub")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.42.1</grpc.version>
<protobuf.version>3.17.3</protobuf.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/683a436ff1367e4ff2d27da2d79069c04ec2c46d/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/83a6b114667808665cab282313ee1db12eb49105/dapr/proto</dapr.proto.baseurl>
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
Expand Down
32 changes: 32 additions & 0 deletions sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package io.dapr.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishRequestEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
Expand All @@ -39,6 +42,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -389,6 +393,34 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Cla
return this.queryState(request, TypeRef.get(clazz));
}

/**
* {@inheritDoc}
*/
@Override
public <T> Mono<BulkPublishResponse> publishEvents(String pubsubName, String topicName, List<T> events,
String contentType) {
return publishEvents(pubsubName, topicName, events, contentType, null);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Mono<BulkPublishResponse> publishEvents(String pubsubName, String topicName, List<T> events,
String contentType, Map<String, String> requestMetadata) {
BulkPublishRequest<T> request = new BulkPublishRequest<>(pubsubName, topicName);
if (events == null || events.size() == 0) {
throw new IllegalArgumentException("list of events cannot be null or empty");
}
List<BulkPublishRequestEntry<T>> entries = new ArrayList<>();
for (int i = 0; i < events.size(); i++) {
// entryID field is generated based on order of events in the request
entries.add(new BulkPublishRequestEntry<>("" + (i + 1), events.get(i), contentType, null));
}
request.setMetadata(requestMetadata);
return publishEvents(request.setEntries(entries));
}

/**
* {@inheritDoc}
*/
Expand Down
Loading