Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial bulk publish impl for java #789

Merged
merged 44 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
db5a915
initial bulk publish impl for java
mukundansundar Sep 26, 2022
2c08948
Merge branch 'master' into bulk-publish-sdk
mukundansundar Sep 26, 2022
5bbbaaa
add UTs and clean up java doc for new interface methods.
mukundansundar Sep 26, 2022
7879e31
add more interface methods for bulk publish
mukundansundar Sep 27, 2022
c03cd4c
adding examples and ITs for bulk publish
mukundansundar Sep 28, 2022
93d4b99
addressing review comments
mukundansundar Sep 28, 2022
8bd8b33
use latest ref from dapr branch
mukundansundar Sep 28, 2022
fcd33cc
add example validation
mukundansundar Sep 28, 2022
3e8ae6c
fix bindings example validation
mukundansundar Sep 28, 2022
ad66e82
Merge branch 'master' into bulk-publish-sdk
mukundansundar Dec 16, 2022
2a5c50d
make changes for latest bulk publish dapr changes
mukundansundar Dec 16, 2022
c37ebe0
fix examples
mukundansundar Dec 16, 2022
28218b3
fix examples
mukundansundar Dec 16, 2022
c423e40
fix typo
mukundansundar Dec 16, 2022
48b3a6c
test against java 11 only
mukundansundar Dec 20, 2022
f2a8b35
Merge branch 'master' into bulk-publish-sdk
mukundansundar Jan 6, 2023
f49386d
change to latest dapr commit
mukundansundar Jan 6, 2023
f457fdb
run only pubsub IT, upload failsafe reports as run artifact
mukundansundar Jan 6, 2023
f9da15a
fix checkstyle
mukundansundar Jan 6, 2023
8ff181d
fix IT report upload condition
mukundansundar Jan 6, 2023
21e7334
fix compile issues
mukundansundar Jan 6, 2023
4d4beda
fix spotbugs issue
mukundansundar Jan 6, 2023
5408ac3
run pubsubIT only
mukundansundar Jan 6, 2023
d618c85
change upload artifact name for IT
mukundansundar Jan 6, 2023
6372227
fix tests
mukundansundar Jan 6, 2023
75a1c0f
fix
mukundansundar Jan 6, 2023
0040ead
introduce sleep
mukundansundar Jan 6, 2023
c1d29eb
test bulk publish with redis
mukundansundar Jan 6, 2023
ec8622f
change longvalues test to kafka
mukundansundar Jan 6, 2023
88620e8
change bulk pub to kafka and revert long values changes
mukundansundar Jan 6, 2023
66bf0bb
remove kafka pubsub from pubsub IT
mukundansundar Jan 6, 2023
481ed34
change match order in examples
mukundansundar Jan 6, 2023
9bc9a5c
set fail fast as false
mukundansundar Jan 6, 2023
84f3273
fix Internal Invoke exception in ITs
mukundansundar Jan 6, 2023
59dd45d
address review comments
mukundansundar Jan 9, 2023
2241d7c
fix IT
mukundansundar Jan 9, 2023
a9807d9
fix app scopes in examples
mukundansundar Jan 9, 2023
546818b
add content to daprdocs
mukundansundar Jan 9, 2023
a822a7a
Merge remote-tracking branch 'upstream/master' into bulk-publish-sdk
artursouza Jan 18, 2023
6a5e962
address review comments
mukundansundar Jan 18, 2023
0a98ed1
fix mm.py step comment
mukundansundar Jan 18, 2023
8aaba36
reset bindings examples readme
mukundansundar Jan 18, 2023
89daf58
Merge branch 'master' into bulk-publish-sdk
mukundansundar Jan 19, 2023
99639df
fix example, IT and make classes immutable
mukundansundar Jan 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false # Keep running if one leg fails.
matrix:
java: [ 11, 13, 15, 16 ]
env:
Expand All @@ -29,7 +30,7 @@ jobs:
DAPR_RUNTIME_VER: 1.9.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.9.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
DAPR_REF: c73b9596979ecf15b8207cd9baec86ab158be91d
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down Expand Up @@ -100,9 +101,10 @@ jobs:
- name: Codecov
uses: codecov/codecov-action@v3.1.0
- name: Install jars
run: mvn install -q
run: mvn install -q
- name: Integration tests
run: mvn -f sdk-tests/pom.xml verify -q
id: integration_tests
run: mvn -f sdk-tests/pom.xml verify
- name: Upload test report for sdk
uses: actions/upload-artifact@master
with:
Expand All @@ -113,6 +115,19 @@ jobs:
with:
name: report-dapr-java-sdk-actors
path: sdk-actors/target/jacoco-report/
- name: Upload failsafe test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@master
with:
name: failsafe-report-sdk-tests
path: sdk-tests/target/failsafe-reports
- name: Upload surefire test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@master
with:
name: surefire-report-sdk-tests
path: sdk-tests/target/surefire-reports

publish:
runs-on: ubuntu-latest
needs: build
Expand Down
11 changes: 10 additions & 1 deletion .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
validate:
runs-on: ubuntu-latest
strategy:
fail-fast: false # Keep running if one leg fails.
matrix:
java: [ 11, 13, 15, 16 ]
env:
Expand All @@ -40,7 +41,7 @@ jobs:
DAPR_RUNTIME_VER: 1.9.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.9.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
DAPR_REF: c73b9596979ecf15b8207cd9baec86ab158be91d
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down Expand Up @@ -99,6 +100,10 @@ jobs:
echo "PATH=$PATH:$HOME/.local/bin" >> $GITHUB_ENV
pip3 install setuptools wheel
pip3 install mechanical-markdown
- name: Install Local kafka using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d
docker ps
- name: Install Local mongo database using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-mongo.yml up -d
Expand Down Expand Up @@ -133,6 +138,10 @@ jobs:
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/http/README.md
- name: Validate bulk pubsub gRPC example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/bulk/README.md
- name: Validate bindings HTTP example
working-directory: ./examples
run: |
Expand Down
29 changes: 29 additions & 0 deletions daprdocs/content/en/java-sdk-docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,35 @@ public class SubscriberController {
}
```

##### Bulk Publish Messages
> Note: API is in Alpha stage


```java
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class Solution {
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// Create a list of messages to publish
List<String> messages = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}

// Publish list of messages using the bulk publish API
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block()
}
}
}
```

- For a full guide on publishing messages and subscribing to a topic [How-To: Publish & subscribe]({{< ref howto-publish-subscribe.md >}}).
- Visit [Java SDK examples](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/pubsub/http) for code samples and instructions to try out pub/sub

Expand Down
15 changes: 15 additions & 0 deletions examples/components/pubsub/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "localhost:9092"
- name: "authType"
value: "none"
scopes:
- bulk-publisher
- kafka-subscriber
4 changes: 4 additions & 0 deletions examples/components/pubsub/redis_messagebus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ spec:
value: localhost:6379
- name: redisPassword
value: ""
scopes:
- publisher
- subscriber
- publisher-tracing
19 changes: 2 additions & 17 deletions examples/src/main/java/io/dapr/examples/bindings/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,11 @@ cd examples

Before getting into the application code, follow these steps in order to set up a local instance of Kafka. This is needed for the local instances. Steps are:

1. To run container locally run:

<!-- STEP
name: Setup kafka container
expected_stderr_lines:
- 'Creating network "http_default" with the default driver'
sleep: 5
-->
1. To run container locally run:

```bash
docker-compose -f ./src/main/java/io/dapr/examples/bindings/http/docker-compose-single-kafka.yml up -d
```

<!-- END_STEP -->
````

2. Run `docker ps` to see the container running locally:

Expand Down Expand Up @@ -241,14 +232,8 @@ dapr stop --app-id outputbinding

For bringing down the kafka cluster that was started in the beginning, run

<!-- STEP
name: Cleanup Kafka containers
-->

```bash
docker-compose -f ./src/main/java/io/dapr/examples/bindings/http/docker-compose-single-kafka.yml down
```

<!-- END_STEP -->

For more details on Dapr Spring Boot integration, please refer to [Dapr Spring Boot](../../DaprApplication.java) Application implementation.
101 changes: 101 additions & 0 deletions examples/src/main/java/io/dapr/examples/pubsub/bulk/BulkPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.bulk;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.examples.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;

import java.util.ArrayList;
import java.util.List;

import static io.dapr.examples.OpenTelemetryConfig.getReactorContext;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id bulk-publisher -- \
* java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.bulk.BulkPublisher
*/
public class BulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "kafkatestingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "kafka-pubsub";

/**
* main method.
*
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient) client;
c.waitForSidecar(10000);
try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client...");
List<String> messages = new ArrayList<>();
System.out.println("Constructing the list of messages to publish");
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.subscriberContext(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response from dapr");
}
}
// Close the span.

span.end();
// Allow plenty of time for Dapr to export all relevant spans to the tracing infra.
Thread.sleep(10000);
// Shutdown the OpenTelemetry tracer.
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();

System.out.println("Done");
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.bulk;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.CloudEvent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -Ddapr.grpc.port="50010" \
* -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.bulk.CloudEventBulkPublisher
*/
public class CloudEventBulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "kafkatestingtopic";

//The name of the pubsub
private static final String PUBSUB_NAME = "kafka-pubsub";

/**
* main method.
*
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
BulkPublishRequest<CloudEvent<Map<String, String>>> request = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME);
List<BulkPublishEntry<CloudEvent<Map<String, String>>>> entries = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
CloudEvent<Map<String, String>> cloudEvent = new CloudEvent<>();
cloudEvent.setId(UUID.randomUUID().toString());
cloudEvent.setType("example");
cloudEvent.setSpecversion("1");
cloudEvent.setDatacontenttype("application/json");
String val = String.format("This is message #%d", i);
cloudEvent.setData(new HashMap<>() {
{
put("dataKey", val);
}
});
BulkPublishEntry<CloudEvent<Map<String, String>>> entry = new BulkPublishEntry<>();
entry.setEntryId("" + (i + 1)).setEvent(cloudEvent).setContentType(CloudEvent.CONTENT_TYPE);
entries.add(entry);
}
request.setEntries(entries);
BulkPublishResponse<?> res = client.publishEvents(request).block();
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response");
}
System.out.println("Done");
}
}
}

Loading