Skip to content

Commit

Permalink
initial bulk publish impl for java (#789)
Browse files Browse the repository at this point in the history
* initial bulk publish impl for java

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add UTs and clean up java doc for new interface methods.

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add more interface methods for bulk publish

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* adding examples and ITs for bulk publish

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* addressing review comments

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* use latest ref from dapr branch

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add example validation

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix bindings example validation

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* make changes for latest bulk publish dapr changes

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix examples

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix examples

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix typo

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* test against java 11 only

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change to latest dapr commit

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* run only pubsub IT, upload failsafe reports as run artifact

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix checkstyle

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix IT report upload condition

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix compile issues

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix spotbugs issue

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* run pubsubIT only

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change upload artifact name for IT

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix tests

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* introduce sleep

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* test bulk publish with redis

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change longvalues test to kafka

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change bulk pub to kafka and revert long values changes

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* remove kafka pubsub from pubsub IT

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change match order in examples

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* set fail fast as false

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix Internal Invoke exception in ITs

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* address review comments

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix IT

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix app scopes in examples

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add content to daprdocs

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* address review comments

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix mm.py step comment

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* reset bindings examples readme

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix example, IT and make classes immutable

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
mukundansundar and artursouza authored Jan 19, 2023
1 parent eb8565c commit 81591b9
Show file tree
Hide file tree
Showing 32 changed files with 1,942 additions and 50 deletions.
18 changes: 16 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ jobs:
- name: Codecov
uses: codecov/codecov-action@v3.1.0
- name: Install jars
run: mvn install -q
run: mvn install -q
- name: Integration tests
run: mvn -f sdk-tests/pom.xml verify -q
id: integration_tests
run: mvn -f sdk-tests/pom.xml verify
- name: Upload test report for sdk
uses: actions/upload-artifact@master
with:
Expand All @@ -113,6 +114,19 @@ jobs:
with:
name: report-dapr-java-sdk-actors
path: sdk-actors/target/jacoco-report/
- name: Upload failsafe test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@master
with:
name: failsafe-report-sdk-tests
path: sdk-tests/target/failsafe-reports
- name: Upload surefire test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@master
with:
name: surefire-report-sdk-tests
path: sdk-tests/target/surefire-reports

publish:
runs-on: ubuntu-latest
needs: build
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
validate:
runs-on: ubuntu-latest
strategy:
fail-fast: false # Keep running if one leg fails.
matrix:
java: [ 11, 13, 15, 16 ]
env:
Expand Down Expand Up @@ -129,10 +130,10 @@ jobs:
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/state/README.md
- name: Validate pubsub HTTP example
- name: Validate pubsub example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/http/README.md
mm.py ./src/main/java/io/dapr/examples/pubsub/README.md
- name: Validate bindings HTTP example
working-directory: ./examples
run: |
Expand Down
29 changes: 29 additions & 0 deletions daprdocs/content/en/java-sdk-docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,35 @@ public class SubscriberController {
}
```

##### Bulk Publish Messages
> Note: API is in Alpha stage

```java
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class Solution {
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// Create a list of messages to publish
List<String> messages = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}

// Publish list of messages using the bulk publish API
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block()
}
}
}
```

- For a full guide on publishing messages and subscribing to a topic [How-To: Publish & subscribe]({{< ref howto-publish-subscribe.md >}}).
- Visit [Java SDK examples](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/pubsub/http) for code samples and instructions to try out pub/sub

Expand Down
5 changes: 5 additions & 0 deletions examples/components/pubsub/redis_messagebus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ spec:
value: localhost:6379
- name: redisPassword
value: ""
scopes:
- publisher
- bulk-publisher
- subscriber
- publisher-tracing
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ cd examples

Before getting into the application code, follow these steps in order to set up a local instance of Kafka. This is needed for the local instances. Steps are:

1. To run container locally run:
1. To run container locally run:

<!-- STEP
name: Setup kafka container
Expand Down
101 changes: 101 additions & 0 deletions examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.examples.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;

import java.util.ArrayList;
import java.util.List;

import static io.dapr.examples.OpenTelemetryConfig.getReactorContext;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id bulk-publisher -- \
* java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.BulkPublisher
*/
public class BulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "bulkpublishtesting";

//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";

/**
* main method.
*
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient) client;
c.waitForSidecar(10000);
try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client...");
List<String> messages = new ArrayList<>();
System.out.println("Constructing the list of messages to publish");
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.subscriberContext(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response from dapr");
}
}
// Close the span.

span.end();
// Allow plenty of time for Dapr to export all relevant spans to the tracing infra.
Thread.sleep(10000);
// Shutdown the OpenTelemetry tracer.
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();

System.out.println("Done");
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.CloudEvent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -Ddapr.grpc.port="50010" \
* -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.CloudEventBulkPublisher
*/
public class CloudEventBulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "bulkpublishtesting";

//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";

/**
* main method.
*
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
List<BulkPublishEntry<CloudEvent<Map<String, String>>>> entries = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
CloudEvent<Map<String, String>> cloudEvent = new CloudEvent<>();
cloudEvent.setId(UUID.randomUUID().toString());
cloudEvent.setType("example");
cloudEvent.setSpecversion("1");
cloudEvent.setDatacontenttype("application/json");
String val = String.format("This is message #%d", i);
cloudEvent.setData(new HashMap<>() {
{
put("dataKey", val);
}
});
BulkPublishEntry<CloudEvent<Map<String, String>>> entry = new BulkPublishEntry<>(
"" + (i + 1), cloudEvent, CloudEvent.CONTENT_TYPE, null);
entries.add(entry);
}
BulkPublishRequest<CloudEvent<Map<String, String>>> request = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
entries);
BulkPublishResponse<?> res = client.publishEvents(request).block();
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response");
}
System.out.println("Done");
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
limitations under the License.
*/

package io.dapr.examples.pubsub.http;
package io.dapr.examples.pubsub;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
Expand All @@ -30,7 +30,7 @@
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.CloudEventPublisher
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.CloudEventPublisher
*/
public class CloudEventPublisher {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
limitations under the License.
*/

package io.dapr.examples.pubsub.http;
package io.dapr.examples.pubsub;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
Expand All @@ -26,7 +26,7 @@
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher
*/
public class Publisher {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
limitations under the License.
*/

package io.dapr.examples.pubsub.http;
package io.dapr.examples.pubsub;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
Expand All @@ -31,7 +31,7 @@
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher-tracing -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.PublisherWithTracing
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.PublisherWithTracing
*/
public class PublisherWithTracing {

Expand Down
Loading

0 comments on commit 81591b9

Please sign in to comment.