Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/maven/sdk-tests/com.jayway.json…
Browse files Browse the repository at this point in the history
…path-json-path-2.9.0
  • Loading branch information
artursouza authored Feb 16, 2024
2 parents 8eb9a1c + 14cc3f8 commit a993067
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 220 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
experimental: [ false ]
include:
- java: 17
spring-boot-version: 2.7.8
spring-boot-version: 2.7.18
experimental: false
- java: 17
spring-boot-version: 2.6.14
Expand Down
473 changes: 308 additions & 165 deletions examples/src/main/java/io/dapr/examples/pubsub/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.examples.pubsub.grpc;

import io.dapr.v1.AppCallbackAlphaGrpc;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequestEntry;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponse;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponseEntry;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus;

/**
* Class that encapsulates all client-side logic for Grpc.
*/
public class BulkSubscriberGrpcService extends AppCallbackAlphaGrpc.AppCallbackAlphaImplBase {

@Override
public void onBulkTopicEventAlpha1(io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequest request,
io.grpc.stub.StreamObserver<io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponse> responseObserver) {
try {
TopicEventBulkResponse.Builder responseBuilder = TopicEventBulkResponse.newBuilder();

if (request.getEntriesCount() == 0) {
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}

System.out.println("Bulk Subscriber received " + request.getEntriesCount() + " messages.");

for (TopicEventBulkRequestEntry entry : request.getEntriesList()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
System.out.printf("Bulk Subscriber got: %s\n", entry.getCloudEvent().getData().toStringUtf8());
TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry
.newBuilder()
.setEntryId(entry.getEntryId())
.setStatusValue(TopicEventResponseStatus.SUCCESS_VALUE);
responseBuilder.addStatuses(responseEntryBuilder);
} catch (Throwable e) {
TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry
.newBuilder()
.setEntryId(entry.getEntryId())
.setStatusValue(TopicEventResponseStatus.RETRY_VALUE);
responseBuilder.addStatuses(responseEntryBuilder);
}
}
TopicEventBulkResponse response = responseBuilder.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable e) {
responseObserver.onError(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static void main(String[] args) throws Exception {
//start a grpc server
Server server = ServerBuilder.forPort(port)
.addService(new SubscriberGrpcService())
.addService(new BulkSubscriberGrpcService())
.build();
server.start();
server.awaitTermination();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
@Override
public void listTopicSubscriptions(Empty request,
StreamObserver<DaprAppCallbackProtos.ListTopicSubscriptionsResponse> responseObserver) {
registerConsumer("messagebus","testingtopic");
registerConsumer("messagebus", "testingtopic", false);
registerConsumer("messagebus", "bulkpublishtesting", false);
registerConsumer("messagebus", "testingtopicbulk", true);
try {
DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos
.ListTopicSubscriptionsResponse.newBuilder();
Expand Down Expand Up @@ -65,12 +67,14 @@ public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request,
*
* @param topic the topic
* @param pubsubName the pubsub name
* @param isBulkMessage flag to enable/disable bulk subscribe
*/
public void registerConsumer(String pubsubName, String topic) {
public void registerConsumer(String pubsubName, String topic, boolean isBulkMessage) {
topicSubscriptionList.add(DaprAppCallbackProtos.TopicSubscription
.newBuilder()
.setPubsubName(pubsubName)
.setTopic(topic)
.setBulkSubscribe(DaprAppCallbackProtos.BulkSubscribeConfig.newBuilder().setEnabled(isBulkMessage))
.build());
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk-actors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.8</version>
<version>2.7.18</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion sdk-springboot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<properties>
<maven.deploy.skip>false</maven.deploy.skip>
<springboot.version>2.7.8</springboot.version>
<springboot.version>2.7.18</springboot.version>
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 The Dapr Authors
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -18,11 +18,11 @@
import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;
import io.grpc.ClientInterceptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

public class WorkflowRuntimeBuilder {
private static volatile WorkflowRuntime instance;
Expand All @@ -31,14 +31,20 @@ public class WorkflowRuntimeBuilder {
private Set<String> workflows = new HashSet<String>();
private Set<String> activities = new HashSet<String>();
private static ClientInterceptor WORKFLOW_INTERCEPTOR = new ApiTokenClientInterceptor();
private final Set<String> activitySet = Collections.synchronizedSet(new HashSet<>());
private final Set<String> workflowSet = Collections.synchronizedSet(new HashSet<>());

/**
* Constructs the WorkflowRuntimeBuilder.
*/
public WorkflowRuntimeBuilder() {
this(LoggerFactory.getLogger(WorkflowRuntimeBuilder.class));
}

WorkflowRuntimeBuilder(Logger logger) {
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(
NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR));
this.logger = Logger.getLogger(WorkflowRuntimeBuilder.class.getName());
NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR));
this.logger = logger;
}

/**
Expand All @@ -54,7 +60,9 @@ public WorkflowRuntime build() {
}
}
}
this.logger.log(Level.INFO, "Successfully built dapr workflow runtime");
this.logger.info("List of registered workflows: " + this.workflowSet);
this.logger.info("List of registered activites: " + this.activitySet);
this.logger.info("Successfully built dapr workflow runtime");
return instance;
}

Expand All @@ -69,7 +77,8 @@ public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> cla
this.builder = this.builder.addOrchestration(
new OrchestratorWrapper<>(clazz)
);
this.logger.log(Level.INFO, "Registered Workflow: " + clazz.getSimpleName());
this.workflowSet.add(clazz.getCanonicalName());
this.logger.info("Registered Workflow: " + clazz.getSimpleName());
this.workflows.add(clazz.getSimpleName());
return this;
}
Expand All @@ -84,7 +93,8 @@ public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
this.logger.log(Level.INFO, "Registered Activity: " + clazz.getSimpleName());
this.activitySet.add(clazz.getCanonicalName());
this.logger.info("Registered Activity: " + clazz.getSimpleName());
this.activities.add(clazz.getSimpleName());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows.runtime;


import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;

import static org.junit.Assert.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.logging.Logger;
import java.util.logging.Handler;
import java.util.logging.LogRecord;

public class WorkflowRuntimeBuilderTest {
public static class TestWorkflow extends Workflow {
Expand Down Expand Up @@ -51,36 +61,19 @@ public void loggingOutputTest() {
ByteArrayOutputStream outStreamCapture = new ByteArrayOutputStream();
System.setOut(new PrintStream(outStreamCapture));

LogCaptureHandler testLoggerHandler = new LogCaptureHandler();
Logger testLogger = Logger.getLogger(WorkflowRuntimeBuilder.class.getName());
Logger testLogger = Mockito.mock(Logger.class);

testLogger.addHandler(testLoggerHandler);

// indexOf will return -1 if the string is not found.
assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class));
assertNotEquals(-1, testLoggerHandler.capturedLog.indexOf("Registered Workflow: TestWorkflow"));
assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(TestActivity.class));
assertNotEquals(-1, testLoggerHandler.capturedLog.indexOf("Registered Activity: TestActivity"));
assertDoesNotThrow(() -> new WorkflowRuntimeBuilder(testLogger).registerWorkflow(TestWorkflow.class));
assertDoesNotThrow(() -> new WorkflowRuntimeBuilder(testLogger).registerActivity(TestActivity.class));

WorkflowRuntimeBuilder wfRuntime = new WorkflowRuntimeBuilder();

wfRuntime.build();
}

private static class LogCaptureHandler extends Handler {
private StringBuilder capturedLog = new StringBuilder();

@Override
public void publish(LogRecord record) {
capturedLog.append(record.getMessage()).append(System.lineSeparator());
}

@Override
public void flush(){
}

@Override
public void close() throws SecurityException {
}
Mockito.verify(testLogger, Mockito.times(1))
.info(Mockito.eq("Registered Workflow: TestWorkflow"));
Mockito.verify(testLogger, Mockito.times(1))
.info(Mockito.eq("Registered Activity: TestActivity"));
}

}
4 changes: 2 additions & 2 deletions sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.8</version>
<version>2.7.18</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>2.7.8</version>
<version>2.7.18</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
29 changes: 19 additions & 10 deletions sdk/src/test/java/io/dapr/client/GrpcChannelFacadeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import io.grpc.ServerBuilder;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.testing.GrpcCleanupRule;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.ResponseBody;
import okhttp3.mock.Behavior;
import okhttp3.mock.MockInterceptor;
import org.junit.Before;
import org.junit.Rule;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -37,6 +39,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static io.dapr.utils.TestUtils.findFreePort;
Expand All @@ -54,6 +57,9 @@ public class GrpcChannelFacadeTest {

private static DaprHttp daprHttp;

@Rule
public static final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();

/**
* Enable the waitForSidecar to allow the gRPC to check the http endpoint for the health check
*/
Expand All @@ -66,20 +72,19 @@ public void setUp() {
@BeforeAll
public static void setup() throws IOException {
port = findFreePort();
server = ServerBuilder.forPort(port)

// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(ServerBuilder.forPort(port)
.addService(new DaprGrpc.DaprImplBase() {
})
.build();
server.start();
.build().start());
}

@AfterAll
public static void teardown() throws InterruptedException {
if (daprHttp != null) {
daprHttp.close();
}
server.shutdown();
server.awaitTermination();
}

@Test
Expand All @@ -88,13 +93,14 @@ public void waitForSidecarTimeoutHealthCheck() throws Exception {
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient);

ManagedChannel channel = InProcessChannelBuilder.forName("waitForSidecarTimeoutHealthCheck").build();
grpcCleanup.register(channel);
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, daprHttp);

mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.times(6)
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));
mockInterceptor.addRule()
.get()
.path("/v1.0/healthz/outbound")
.times(6)
.respond(404, ResponseBody.create("Not Found", MediaType.get("application/json")));

StepVerifier.create(channelFacade.waitForChannelReady(1000))
.expectSubscription()
Expand All @@ -110,6 +116,9 @@ public void waitForSidecarOK() {

ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port)
.usePlaintext().build();

grpcCleanup.register(channel);

final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel, daprHttp);

// added since this is doing a check against the http health check endpoint
Expand Down

0 comments on commit a993067

Please sign in to comment.