Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
artursouza authored Jan 25, 2023
2 parents 22a7d78 + beafb5a commit af4bcff
Show file tree
Hide file tree
Showing 84 changed files with 3,576 additions and 591 deletions.
20 changes: 17 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
DAPR_RUNTIME_VER: 1.9.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.9.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
DAPR_REF: a8c698ad897e42d6624f5fc6ccfd0630e2a8fd00
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down Expand Up @@ -100,9 +100,10 @@ jobs:
- name: Codecov
uses: codecov/codecov-action@v3.1.0
- name: Install jars
run: mvn install -q
run: mvn install -q
- name: Integration tests
run: mvn -f sdk-tests/pom.xml verify -q
id: integration_tests
run: mvn -f sdk-tests/pom.xml verify
- name: Upload test report for sdk
uses: actions/upload-artifact@master
with:
Expand All @@ -113,6 +114,19 @@ jobs:
with:
name: report-dapr-java-sdk-actors
path: sdk-actors/target/jacoco-report/
- name: Upload failsafe test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@master
with:
name: failsafe-report-sdk-tests
path: sdk-tests/target/failsafe-reports
- name: Upload surefire test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@master
with:
name: surefire-report-sdk-tests
path: sdk-tests/target/surefire-reports

publish:
runs-on: ubuntu-latest
needs: build
Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
validate:
runs-on: ubuntu-latest
strategy:
fail-fast: false # Keep running if one leg fails.
matrix:
java: [ 11, 13, 15, 16 ]
env:
Expand All @@ -40,7 +41,7 @@ jobs:
DAPR_RUNTIME_VER: 1.9.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.9.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
DAPR_REF: a8c698ad897e42d6624f5fc6ccfd0630e2a8fd00
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down Expand Up @@ -129,10 +130,10 @@ jobs:
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/state/README.md
- name: Validate pubsub HTTP example
- name: Validate pubsub example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/http/README.md
mm.py ./src/main/java/io/dapr/examples/pubsub/README.md
- name: Validate bindings HTTP example
working-directory: ./examples
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# IDE generated files and directories
*.iml
.idea/
.run/
.vs/
.vscode/

Expand Down
62 changes: 62 additions & 0 deletions daprdocs/content/en/java-sdk-docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ try (DaprClient client = (new DaprClientBuilder()).build()) {
```java
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
Expand Down Expand Up @@ -186,6 +192,62 @@ public class SubscriberController {
});
}

@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
if (bulkMessage.getEntries().size() == 0) {
return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
}

System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");

List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
```

##### Bulk Publish Messages
> Note: API is in Alpha stage

```java
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class Solution {
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// Create a list of messages to publish
List<String> messages = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}

// Publish list of messages using the bulk publish API
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block()
}
}
}
```

Expand Down
5 changes: 5 additions & 0 deletions examples/components/pubsub/redis_messagebus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ spec:
value: localhost:6379
- name: redisPassword
value: ""
scopes:
- publisher
- bulk-publisher
- subscriber
- publisher-tracing
5 changes: 5 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
<artifactId>dapr-sdk</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.evanlennick</groupId>
<artifactId>retry4j</artifactId>
<version>0.15.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ cd examples

Before getting into the application code, follow these steps in order to set up a local instance of Kafka. This is needed for the local instances. Steps are:

1. To run container locally run:
1. To run container locally run:

<!-- STEP
name: Setup kafka container
Expand Down
4 changes: 2 additions & 2 deletions examples/src/main/java/io/dapr/examples/exception/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ sleep: 5
-->

```bash
dapr run --app-id exception_example -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.exception.Client
dapr run --app-id exception-example -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.exception.Client
```

<!-- END_STEP -->
Expand Down Expand Up @@ -124,7 +124,7 @@ name: Cleanup
-->

```bash
dapr stop --app-id exception_example
dapr stop --app-id exception-example
```

<!-- END_STEP -->
101 changes: 101 additions & 0 deletions examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.examples.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;

import java.util.ArrayList;
import java.util.List;

import static io.dapr.examples.OpenTelemetryConfig.getReactorContext;

/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id bulk-publisher -- \
* java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.BulkPublisher
*/
public class BulkPublisher {

private static final int NUM_MESSAGES = 10;

private static final String TOPIC_NAME = "bulkpublishtesting";

//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";

/**
* main method.
*
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient) client;
c.waitForSidecar(10000);
try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client...");
List<String> messages = new ArrayList<>();
System.out.println("Constructing the list of messages to publish");
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.subscriberContext(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response from dapr");
}
}
// Close the span.

span.end();
// Allow plenty of time for Dapr to export all relevant spans to the tracing infra.
Thread.sleep(10000);
// Shutdown the OpenTelemetry tracer.
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();

System.out.println("Done");
}
}
}

Loading

0 comments on commit af4bcff

Please sign in to comment.