Skip to content

Commit

Permalink
Update daprd, fix invoke it and improve binding it (#823)
Browse files Browse the repository at this point in the history
* Use app health check in ITs to minimize flakiness.

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Fix DaprRun and update Dapr runtime ref.

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Add retry logic to tracing example's validation

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Printing validation steps.

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Change retry logic for tracing it

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Updare Dapr ref.

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Revert secret.json

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Add gRPC HealthCheck.

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Change expectations for invoke tests.

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Fix typo causing compilation error.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Update Dapr ref to latest master.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix binding and method invoke ITs.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Add wait time to reduce flakiness of ApiIT.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Re-enable health-check.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Remove AppHealth as it breaks actor ITs.

Signed-off-by: Artur Souza <artur.barbalho@outlook.com>

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Artur Souza <artur.barbalho@outlook.com>
  • Loading branch information
artursouza authored Jan 17, 2023
1 parent 96259e4 commit b83661d
Show file tree
Hide file tree
Showing 19 changed files with 150 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
DAPR_RUNTIME_VER: 1.9.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.9.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
DAPR_REF: a8c698ad897e42d6624f5fc6ccfd0630e2a8fd00
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
DAPR_RUNTIME_VER: 1.9.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.9.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
DAPR_REF: a8c698ad897e42d6624f5fc6ccfd0630e2a8fd00
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
Expand Down
5 changes: 5 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
<artifactId>dapr-sdk</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.evanlennick</groupId>
<artifactId>retry4j</artifactId>
<version>0.15.0</version>
</dependency>
</dependencies>

<build>
Expand Down
32 changes: 29 additions & 3 deletions examples/src/main/java/io/dapr/examples/tracing/Validation.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

package io.dapr.examples.tracing;

import com.evanlennick.retry4j.CallExecutorBuilder;
import com.evanlennick.retry4j.Status;
import com.evanlennick.retry4j.config.RetryConfig;
import com.evanlennick.retry4j.config.RetryConfigBuilder;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;
Expand All @@ -21,13 +25,26 @@
import okhttp3.Request;
import okhttp3.Response;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.TimeZone;

import static java.time.temporal.ChronoUnit.SECONDS;

/**
* Class used to verify that traces are present as expected.
*/
final class Validation {

private static final OkHttpClient HTTP_CLIENT = new OkHttpClient();

private static final RetryConfig RETRY_CONFIG = new RetryConfigBuilder()
.withMaxNumberOfTries(5)
.withFixedBackoff().withDelayBetweenTries(10, SECONDS)
.retryOnAnyException()
.build();

public static final String JSONPATH_PROXY_ECHO_SPAN_ID =
"$..[?(@.parentId=='%s' && @.name=='calllocal/tracingdemoproxy/proxy_echo')]['id']";

Expand All @@ -40,9 +57,16 @@ final class Validation {
public static final String JSONPATH_SLEEP_SPAN_ID =
"$..[?(@.parentId=='%s' && @.duration > 1000000 && @.name=='calllocal/tracingdemo/sleep')]['id']";

static void validate() throws Exception {
// Must wait for some time to make sure Zipkin receives all spans.
Thread.sleep(5000);
static void validate() {
Status<Void> result = new CallExecutorBuilder().config(RETRY_CONFIG).build().execute(() -> doValidate());
if (!result.wasSuccessful()) {
throw new RuntimeException(result.getLastExceptionThatCausedRetry());
}
}

private static Void doValidate() throws Exception {
System.out.println("Performing validation of tracing events ...");

HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme("http")
.host("localhost")
Expand Down Expand Up @@ -83,6 +107,8 @@ static void validate() throws Exception {
.toString();
readOne(documentContext,
String.format(JSONPATH_SLEEP_SPAN_ID, proxySleepSpanId2));
System.out.println("Validation of tracing events has succeeded.");
return null;
}

private static Object readOne(DocumentContext documentContext, String path) {
Expand Down
9 changes: 6 additions & 3 deletions sdk-tests/components/kafka_bindings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ spec:
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
value: "topic-{appID}"
- name: consumerGroup
value: group1
value: "{appID}"
# publisher configuration: topic
- name: publishTopic
value: sample
value: "topic-{appID}"
- name: authRequired
value: "false"
- name: initialOffset
value: oldest
scopes:
- bindingit-http-inputbindingservice
- bindingit-grpc-inputbindingservice
2 changes: 1 addition & 1 deletion sdk-tests/components/secret.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{}
{}
5 changes: 1 addition & 4 deletions sdk-tests/configurations/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,4 @@ spec:
tracing:
samplingRate: "1"
zipkin:
endpointAddress: http://localhost:9411/api/v2/spans
features:
- name: PubSub.Routing
enabled: true
endpointAddress: http://localhost:9411/api/v2/spans
35 changes: 35 additions & 0 deletions sdk-tests/src/test/java/io/dapr/grpc/GrpcHealthCheckService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.grpc;

import io.dapr.v1.AppCallbackHealthCheckGrpc;
import io.dapr.v1.DaprAppCallbackProtos;

/**
* Handles apps' health check callback.
*/
public class GrpcHealthCheckService extends AppCallbackHealthCheckGrpc.AppCallbackHealthCheckImplBase {

/**
* Handler for health check.
* @param request Empty request.
* @param responseObserver Response for gRPC response.
*/
public void healthCheck(
com.google.protobuf.Empty request,
io.grpc.stub.StreamObserver<io.dapr.v1.DaprAppCallbackProtos.HealthCheckResponse> responseObserver) {
responseObserver.onNext(DaprAppCallbackProtos.HealthCheckResponse.newBuilder().build());
responseObserver.onCompleted();
}
}
2 changes: 2 additions & 0 deletions sdk-tests/src/test/java/io/dapr/it/api/ApiIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public void testShutdownAPI() throws Exception {
run.switchToHTTP();
}

// TODO(artursouza): change this to wait for the sidecar to be healthy (new method needed in DaprClient).
Thread.sleep(3000);
try (DaprClient client = new DaprClientBuilder().build()) {
logger.info("Sending shutdown request.");
client.shutdown().block();
Expand Down
20 changes: 17 additions & 3 deletions sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static io.dapr.it.Retry.callWithRetry;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -65,23 +66,36 @@ public static Collection<Object[]> data() {
@Test
public void inputOutputBinding() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
String serviceNameVariant = this.useGrpc ? "-grpc" : "-http";

DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName(),
this.getClass().getSimpleName() + serviceNameVariant,
InputBindingService.SUCCESS_MESSAGE,
InputBindingService.class,
true,
60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to.
// TODO: figure out why this wait is needed for this scenario to work end-to-end. Kafka not up yet?
Thread.sleep(120000);
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}

try(DaprClient client = new DaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking if input binding is up before publishing events ...");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, "ping").block();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}

client.invokeMethod(daprRun.getAppName(), "initialized", "", HttpExtension.GET).block();
}, 120000);

// This is an example of sending data in a user-defined object. The input binding will receive:
// {"message":"hello"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,30 @@
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* SpringBoot Controller to handle input binding.
*/
@RestController
public class InputBindingController {

private static final List<String> messagesReceived = new ArrayList();
private static final List<String> messagesReceived = Collections.synchronizedList(new ArrayList());

private static final AtomicBoolean initialized = new AtomicBoolean(false);

@PostMapping(path = "/sample123")
@PutMapping(path = "/sample123")
public void handleInputBinding(@RequestBody(required = false) String body) {
if ("\"ping\"".equals(body)) {
// Initialization messages are useful to detect if input binding is up.
initialized.set(true);
System.out.println("Input binding is up: " + body);
return;
}

messagesReceived.add(body);
System.out.println("Received message through binding: " + (body == null ? "" : body));
}
Expand All @@ -47,4 +58,15 @@ public String hello() {
return "hello";
}

@GetMapping(path = "/health")
public void health() {
}

@GetMapping(path = "/initialized")
public void initialized() {
if (!initialized.get()) {
throw new RuntimeException("Input binding is not initialized yet.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand Down Expand Up @@ -57,4 +58,8 @@ public Mono<Void> handleMessage(
}
);
}

@GetMapping(path = "/health")
public void health() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@
import io.dapr.it.DaprRun;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageRequest;
Expand All @@ -25,33 +21,17 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class MethodInvokeIT extends BaseIT {

//Number of messages to be sent: 10
private static final int NUM_MESSAGES = 10;

/**
* Parameters for this test.
* Param #1: useGrpc.
* @return Collection of parameter tuples.
*/
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false }, { true } });
}

/**
* Run of a Dapr application.
*/
private DaprRun daprRun = null;

@Parameter
public boolean useGrpc;

@Before
public void init() throws Exception {
daprRun = startDaprApp(
Expand All @@ -60,13 +40,7 @@ public void init() throws Exception {
MethodInvokeService.class,
DaprApiProtocol.GRPC, // appProtocol
60000);

if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}

daprRun.switchToGRPC();
// Wait since service might be ready even after port is available.
Thread.sleep(2000);
}
Expand Down Expand Up @@ -138,12 +112,9 @@ public void testInvokeException() throws Exception {
DaprException exception = assertThrows(DaprException.class, () ->
client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST).block());

assertEquals("UNKNOWN", exception.getErrorCode());
if (this.useGrpc) {
assertEquals("UNKNOWN: ", exception.getMessage());
} else {
assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage());
}
assertEquals("INTERNAL", exception.getErrorCode());
assertEquals("INTERNAL: fail to invoke, id: methodinvokeit-methodinvokeservice, err: message is nil",
exception.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.it.methodinvoke.grpc;

import com.google.protobuf.Any;
import io.dapr.grpc.GrpcHealthCheckService;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.CommonProtos;
import io.grpc.Server;
Expand Down Expand Up @@ -60,6 +61,7 @@ private void start(int port) throws IOException {
this.server = ServerBuilder
.forPort(port)
.addService(this)
.addService(new GrpcHealthCheckService())
.build()
.start();
System.out.printf("Server: started listening on port %d\n", port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ public void sleep(@RequestBody int seconds) throws InterruptedException {
}
Thread.sleep(seconds * 1000);
}

@GetMapping(path = "/health")
public void health() {
}
}
Loading

0 comments on commit b83661d

Please sign in to comment.