diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index b8b478268c..1559f5aef4 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -115,6 +115,12 @@ jobs: run: ./mvnw install -q env: DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} + - name: Validate Jobs example + working-directory: ./examples + run: | + mm.py ./src/main/java/io/dapr/examples/jobs/README.md + env: + DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} - name: Validate invoke http example working-directory: ./examples run: | diff --git a/examples/src/main/java/io/dapr/examples/jobs/DemoJobsClient.java b/examples/src/main/java/io/dapr/examples/jobs/DemoJobsClient.java new file mode 100644 index 0000000000..87ccf08016 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/jobs/DemoJobsClient.java @@ -0,0 +1,53 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.jobs; + +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.GetJobRequest; +import io.dapr.client.domain.GetJobResponse; +import io.dapr.client.domain.JobSchedule; +import io.dapr.client.domain.ScheduleJobRequest; +import io.dapr.config.Properties; +import io.dapr.config.Property; + +import java.util.Map; + +public class DemoJobsClient { + + /** + * The main method of this app to register and fetch jobs. + */ + public static void main(String[] args) throws Exception { + Map, String> overrides = Map.of( + Properties.HTTP_PORT, "3500", + Properties.GRPC_PORT, "51439" + ); + + try (DaprPreviewClient client = new DaprClientBuilder().withPropertyOverrides(overrides).buildPreviewClient()) { + + // Schedule a job. + System.out.println("**** Scheduling a Job with name dapr-jobs-1 *****"); + ScheduleJobRequest scheduleJobRequest = new ScheduleJobRequest("dapr-job-1", + JobSchedule.fromString("* * * * * *")).setData("Hello World!".getBytes()); + client.scheduleJob(scheduleJobRequest).block(); + + System.out.println("**** Scheduling job dapr-jobs-1 completed *****"); + + // Get a job. + System.out.println("**** Retrieving a Job with name dapr-jobs-1 *****"); + GetJobResponse getJobResponse = client.getJob(new GetJobRequest("dapr-job-1")).block(); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/jobs/DemoJobsSpringApplication.java b/examples/src/main/java/io/dapr/examples/jobs/DemoJobsSpringApplication.java new file mode 100644 index 0000000000..74993c62e0 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/jobs/DemoJobsSpringApplication.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.jobs; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Spring Boot application to demonstrate Dapr Jobs callback API. + *

+ * This application demonstrates how to use Dapr Jobs API with Spring Boot. + *

+ */ +@SpringBootApplication +public class DemoJobsSpringApplication { + + public static void main(String[] args) throws Exception { + SpringApplication.run(DemoJobsSpringApplication.class, args); + } +} diff --git a/examples/src/main/java/io/dapr/examples/jobs/JobsController.java b/examples/src/main/java/io/dapr/examples/jobs/JobsController.java new file mode 100644 index 0000000000..48548f9740 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/jobs/JobsController.java @@ -0,0 +1,43 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.jobs; + +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +/** + * SpringBoot Controller to handle jobs callback. + */ +@RestController +public class JobsController { + + /** + * Handles jobs callback from Dapr. + * + * @param jobName name of the job. + * @param payload data from the job if payload exists. + * @return Empty Mono. + */ + @PostMapping("/job/{jobName}") + public Mono handleJob(@PathVariable("jobName") String jobName, + @RequestBody(required = false) byte[] payload) { + System.out.println("Job Name: " + jobName); + System.out.println("Job Payload: " + new String(payload)); + + return Mono.empty(); + } +} diff --git a/examples/src/main/java/io/dapr/examples/jobs/README.md b/examples/src/main/java/io/dapr/examples/jobs/README.md new file mode 100644 index 0000000000..2877b31fba --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/jobs/README.md @@ -0,0 +1,118 @@ +## Manage Dapr Jobs via the Jobs API + +This example provides the different capabilities provided by Dapr Java SDK for Jobs. For further information about Job APIs please refer to [this link](https://docs.dapr.io/developing-applications/building-blocks/jobs/jobs-overview/) + +### Using the Jobs API + +The Java SDK exposes several methods for this - +* `client.scheduleJob(...)` for scheduling a job. +* `client.getJob(...)` for retrieving a scheduled job. +* `client.deleteJob(...)` for deleting a job. + +## Pre-requisites + +* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/). +* Java JDK 11 (or greater): + * [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) + * [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + * [OpenJDK 11](https://jdk.java.net/11/) +* [Apache Maven](https://maven.apache.org/install.html) version 3.x. + +### Checking out the code + +Clone this repository: + +```sh +git clone https://github.com/dapr/java-sdk.git +cd java-sdk +``` + +Then build the Maven project: + +```sh +# make sure you are in the `java-sdk` directory. +mvn install +``` + +Then get into the examples directory: + +```sh +cd examples +``` + +### Initialize Dapr + +Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. + +### Running the example + +This example uses the Java SDK Dapr client in order to **Schedule and Get** Jobs. +`DemoJobsClient.java` is the example class demonstrating these features. +Kindly check [DaprPreviewClient.java](https://github.com/dapr/java-sdk/blob/master/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java) for a detailed description of the supported APIs. + +```java +public class DemoJobsClient { + /** + * The main method of this app to register and fetch jobs. + */ + public static void main(String[] args) throws Exception { + Map, String> overrides = Map.of( + Properties.HTTP_PORT, "3500", + Properties.GRPC_PORT, "51439" + ); + + try (DaprPreviewClient client = new DaprClientBuilder().withPropertyOverrides(overrides).buildPreviewClient()) { + + // Schedule a job. + ScheduleJobRequest scheduleJobRequest = new ScheduleJobRequest("dapr-job-1", + JobSchedule.fromString("* * * * * *")).setData("Hello World!".getBytes()); + client.scheduleJob(scheduleJobRequest).block(); + + // Get a job. + GetJobResponse getJobResponse = client.getJob(new GetJobRequest("dapr-job-1")).block(); + } + } +} +``` + +Use the following command to run this example- + + + +```bash +dapr run --resources-path ./components/configuration --app-id myapp --app-port 8080 --dapr-http-port 3500 --dapr-grpc-port 51439 --log-level debug -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.jobs.DemoJobsSpringApplication +``` + +```bash +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.jobs.DemoJobsClient +``` + + + +### Sample output +``` +== APP == Job Name: dapr-job-1 +== APP == Job Payload: Hello World! +``` +### Cleanup + +To stop the app, run (or press CTRL+C): + + + +```bash +dapr stop --app-id myapp +``` + + + diff --git a/pom.xml b/pom.xml index 729eb0a71b..45344dac67 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 1.69.0 3.25.5 protoc - https://raw.githubusercontent.com/dapr/dapr/v1.14.4/dapr/proto + https://raw.githubusercontent.com/dapr/dapr/v1.15.3/dapr/proto 1.15.0-SNAPSHOT 0.15.0-SNAPSHOT 1.7.1 diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/ContainerConstants.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/ContainerConstants.java index 3c2c42e9a1..76cadb815d 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/ContainerConstants.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/ContainerConstants.java @@ -1,6 +1,9 @@ package io.dapr.it.testcontainers; public interface ContainerConstants { - String DAPR_IMAGE_TAG = "daprio/daprd:1.14.1"; String TOXIPROXY_IMAGE_TAG = "ghcr.io/shopify/toxiproxy:2.5.0"; + String DAPR_RUNTIME_VERSION = "1.15.3"; + String DAPR_IMAGE_TAG = "daprio/daprd:" + DAPR_RUNTIME_VERSION; + String DAPR_PLACEMENT_IMAGE_TAG = "daprio/placement:" + DAPR_RUNTIME_VERSION; + String DAPR_SCHEDULER_IMAGE_TAG = "daprio/scheduler:" + DAPR_RUNTIME_VERSION; } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprJobsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprJobsIT.java new file mode 100644 index 0000000000..78e2ec7ecf --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprJobsIT.java @@ -0,0 +1,158 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.DeleteJobRequest; +import io.dapr.client.domain.GetJobRequest; +import io.dapr.client.domain.GetJobResponse; +import io.dapr.client.domain.JobSchedule; +import io.dapr.client.domain.ScheduleJobRequest; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +@SpringBootTest( + webEnvironment = WebEnvironment.RANDOM_PORT, + classes = { + TestDaprJobsConfiguration.class, + TestJobsApplication.class + } +) +@Testcontainers +@Tag("testcontainers") +public class DaprJobsIT { + + private static final Network DAPR_NETWORK = Network.newNetwork(); + private static final Random RANDOM = new Random(); + private static final int PORT = RANDOM.nextInt(1000) + 8000; + + @Container + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(ContainerConstants.DAPR_IMAGE_TAG) + .withAppName("jobs-dapr-app") + .withNetwork(DAPR_NETWORK) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .withAppPort(PORT); + + /** + * Expose the Dapr ports to the host. + * + * @param registry the dynamic property registry + */ + @DynamicPropertySource + static void daprProperties(DynamicPropertyRegistry registry) { + registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint); + registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint); + registry.add("server.port", () -> PORT); + } + + @Autowired + private DaprPreviewClient daprPreviewClient; + + @BeforeEach + public void setUp(){ + org.testcontainers.Testcontainers.exposeHostPorts(PORT); + } + + @Test + public void testJobScheduleCreationWithDueTime() { + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + Instant currentTime = Instant.now(); + daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime)).block(); + + GetJobResponse getJobResponse = + daprPreviewClient.getJob(new GetJobRequest("Job")).block(); + assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); + assertEquals("Job", getJobResponse.getName()); + } + + @Test + public void testJobScheduleCreationWithSchedule() { + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + Instant currentTime = Instant.now(); + daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", JobSchedule.hourly()) + .setDueTime(currentTime)).block(); + + GetJobResponse getJobResponse = + daprPreviewClient.getJob(new GetJobRequest("Job")).block(); + assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); + assertEquals(JobSchedule.hourly().getExpression(), getJobResponse.getSchedule().getExpression()); + assertEquals("Job", getJobResponse.getName()); + } + + @Test + public void testJobScheduleCreationWithAllParameters() { + Instant currentTime = Instant.now(); + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + String cronExpression = "2 * 3 * * FRI"; + + daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) + .setData("Job data".getBytes()) + .setRepeat(3) + .setSchedule(JobSchedule.fromString(cronExpression))).block(); + + GetJobResponse getJobResponse = + daprPreviewClient.getJob(new GetJobRequest("Job")).block(); + assertEquals(iso8601Formatter.format(currentTime), getJobResponse.getDueTime().toString()); + assertEquals("2 * 3 * * FRI", getJobResponse.getSchedule().getExpression()); + assertEquals("Job", getJobResponse.getName()); + assertEquals(Integer.valueOf(3), getJobResponse.getRepeats()); + assertEquals("Job data", new String(getJobResponse.getData())); + assertEquals(iso8601Formatter.format(currentTime.plus(2, ChronoUnit.HOURS)), + getJobResponse.getTtl().toString()); + } + + @Test + public void testDeleteJobRequest() { + Instant currentTime = Instant.now(); + + String cronExpression = "2 * 3 * * FRI"; + + daprPreviewClient.scheduleJob(new ScheduleJobRequest("Job", currentTime) + .setTtl(currentTime.plus(2, ChronoUnit.HOURS)) + .setData("Job data".getBytes()) + .setRepeat(3) + .setSchedule(JobSchedule.fromString(cronExpression))).block(); + + daprPreviewClient.deleteJob(new DeleteJobRequest("Job")).block(); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprPlacementContainerIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprPlacementContainerIT.java index 41f537eb0b..d7069a3f14 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprPlacementContainerIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprPlacementContainerIT.java @@ -26,7 +26,8 @@ public class DaprPlacementContainerIT { @Container - private static final DaprPlacementContainer PLACEMENT_CONTAINER = new DaprPlacementContainer("daprio/placement"); + private static final DaprPlacementContainer PLACEMENT_CONTAINER = + new DaprPlacementContainer(ContainerConstants.DAPR_PLACEMENT_IMAGE_TAG); @Test public void testDaprPlacementContainerDefaults() { diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprJobsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprJobsConfiguration.java new file mode 100644 index 0000000000..5e0e2e8c89 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprJobsConfiguration.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprClientImpl; +import io.dapr.client.DaprPreviewClient; +import io.dapr.config.Properties; +import io.dapr.config.Property; +import io.dapr.serializer.DefaultObjectSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; + +@Configuration +public class TestDaprJobsConfiguration { + @Bean + public DaprPreviewClient daprPreviewClient( + @Value("${dapr.http.endpoint}") String daprHttpEndpoint, + @Value("${dapr.grpc.endpoint}") String daprGrpcEndpoint + ){ + Map, String> overrides = Map.of( + Properties.HTTP_ENDPOINT, daprHttpEndpoint, + Properties.GRPC_ENDPOINT, daprGrpcEndpoint + ); + + return new DaprClientBuilder().withPropertyOverrides(overrides).buildPreviewClient(); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestJobsApplication.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestJobsApplication.java new file mode 100644 index 0000000000..61ddaea792 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestJobsApplication.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class TestJobsApplication { + + public static void main(String[] args) { + SpringApplication.run(TestJobsApplication.class, args); + } + +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 0c1264eb19..aad67bc23d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -14,6 +14,7 @@ package io.dapr.client; import com.google.common.base.Strings; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import io.dapr.client.domain.ActorMetadata; @@ -27,17 +28,21 @@ import io.dapr.client.domain.ComponentMetadata; import io.dapr.client.domain.ConfigurationItem; import io.dapr.client.domain.DaprMetadata; +import io.dapr.client.domain.DeleteJobRequest; import io.dapr.client.domain.DeleteStateRequest; import io.dapr.client.domain.ExecuteStateTransactionRequest; import io.dapr.client.domain.GetBulkSecretRequest; import io.dapr.client.domain.GetBulkStateRequest; import io.dapr.client.domain.GetConfigurationRequest; +import io.dapr.client.domain.GetJobRequest; +import io.dapr.client.domain.GetJobResponse; import io.dapr.client.domain.GetSecretRequest; import io.dapr.client.domain.GetStateRequest; import io.dapr.client.domain.HttpEndpointMetadata; import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.InvokeBindingRequest; import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.JobSchedule; import io.dapr.client.domain.LockRequest; import io.dapr.client.domain.PublishEventRequest; import io.dapr.client.domain.QueryStateItem; @@ -45,6 +50,7 @@ import io.dapr.client.domain.QueryStateResponse; import io.dapr.client.domain.RuleMetadata; import io.dapr.client.domain.SaveStateRequest; +import io.dapr.client.domain.ScheduleJobRequest; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.SubscribeConfigurationRequest; @@ -90,9 +96,12 @@ import reactor.util.retry.Retry; import javax.annotation.Nonnull; - import java.io.IOException; import java.time.Duration; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1290,6 +1299,147 @@ public Mono unsubscribeConfiguration(Unsubscri } } + /** + * {@inheritDoc} + */ + public Mono scheduleJob(ScheduleJobRequest scheduleJobRequest) { + try { + validateScheduleJobRequest(scheduleJobRequest); + + DaprProtos.Job.Builder scheduleJobRequestBuilder = DaprProtos.Job.newBuilder(); + scheduleJobRequestBuilder.setName(scheduleJobRequest.getName()); + + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + if (scheduleJobRequest.getData() != null) { + scheduleJobRequestBuilder.setData(Any.newBuilder() + .setValue(ByteString.copyFrom(scheduleJobRequest.getData())).build()); + } + + if (scheduleJobRequest.getSchedule() != null) { + scheduleJobRequestBuilder.setSchedule(scheduleJobRequest.getSchedule().getExpression()); + } + + if (scheduleJobRequest.getTtl() != null) { + scheduleJobRequestBuilder.setTtl(iso8601Formatter.format(scheduleJobRequest.getTtl())); + } + + if (scheduleJobRequest.getRepeats() != null) { + scheduleJobRequestBuilder.setRepeats(scheduleJobRequest.getRepeats()); + } + + if (scheduleJobRequest.getDueTime() != null) { + scheduleJobRequestBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime())); + } + + Mono scheduleJobResponseMono = + Mono.deferContextual(context -> this.createMono( + it -> intercept(context, asyncStub) + .scheduleJobAlpha1(DaprProtos.ScheduleJobRequest.newBuilder() + .setJob(scheduleJobRequestBuilder.build()).build(), it) + ) + ); + + return scheduleJobResponseMono.then(); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + + /** + * {@inheritDoc} + */ + public Mono getJob(GetJobRequest getJobRequest) { + try { + validateGetJobRequest(getJobRequest); + + Mono getJobResponseMono = + Mono.deferContextual(context -> this.createMono( + it -> intercept(context, asyncStub) + .getJobAlpha1(DaprProtos.GetJobRequest.newBuilder() + .setName(getJobRequest.getName()).build(), it) + ) + ); + + return getJobResponseMono.map(response -> { + DaprProtos.Job job = response.getJob(); + GetJobResponse getJobResponse = null; + + if (job.hasSchedule() && job.hasDueTime()) { + getJobResponse = new GetJobResponse(job.getName(), JobSchedule.fromString(job.getSchedule())); + getJobResponse.setDueTime(Instant.parse(job.getDueTime())); + } else if (job.hasSchedule()) { + getJobResponse = new GetJobResponse(job.getName(), JobSchedule.fromString(job.getSchedule())); + } else { + getJobResponse = new GetJobResponse(job.getName(), Instant.parse(job.getDueTime())); + } + + return getJobResponse + .setTtl(job.hasTtl() ? Instant.parse(job.getTtl()) : null) + .setData(job.hasData() ? job.getData().getValue().toByteArray() : null) + .setRepeat(job.hasRepeats() ? job.getRepeats() : null); + }); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + + /** + * {@inheritDoc} + */ + public Mono deleteJob(DeleteJobRequest deleteJobRequest) { + try { + validateDeleteJobRequest(deleteJobRequest); + + Mono deleteJobResponseMono = + Mono.deferContextual(context -> this.createMono( + it -> intercept(context, asyncStub) + .deleteJobAlpha1(DaprProtos.DeleteJobRequest.newBuilder() + .setName(deleteJobRequest.getName()).build(), it) + ) + ); + + return deleteJobResponseMono.then(); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + + private void validateScheduleJobRequest(ScheduleJobRequest scheduleJobRequest) { + if (scheduleJobRequest == null) { + throw new IllegalArgumentException("scheduleJobRequest cannot be null"); + } + + if (scheduleJobRequest.getName() == null || scheduleJobRequest.getName().isEmpty()) { + throw new IllegalArgumentException("Name in the request cannot be null or empty"); + } + + if (scheduleJobRequest.getSchedule() == null && scheduleJobRequest.getDueTime() == null) { + throw new IllegalArgumentException("At least one of schedule or dueTime must be provided"); + } + } + + private void validateGetJobRequest(GetJobRequest getJobRequest) { + if (getJobRequest == null) { + throw new IllegalArgumentException("getJobRequest cannot be null"); + } + + if (getJobRequest.getName() == null || getJobRequest.getName().isEmpty()) { + throw new IllegalArgumentException("Name in the request cannot be null or empty"); + } + } + + private void validateDeleteJobRequest(DeleteJobRequest deleteJobRequest) { + if (deleteJobRequest == null) { + throw new IllegalArgumentException("deleteJobRequest cannot be null"); + } + + if (deleteJobRequest.getName() == null || deleteJobRequest.getName().isEmpty()) { + throw new IllegalArgumentException("Name in the request cannot be null or empty"); + } + } + /** * Build a new Configuration Item from provided parameter. * @@ -1494,5 +1644,4 @@ private AppConnectionPropertiesHealthMetadata getAppConnectionPropertiesHealth( return new AppConnectionPropertiesHealthMetadata(healthCheckPath, healthProbeInterval, healthProbeTimeout, healthThreshold); } - } diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 95911efc23..b4fba8ef38 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -17,9 +17,13 @@ import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponseFailedEntry; +import io.dapr.client.domain.DeleteJobRequest; +import io.dapr.client.domain.GetJobRequest; +import io.dapr.client.domain.GetJobResponse; import io.dapr.client.domain.LockRequest; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; +import io.dapr.client.domain.ScheduleJobRequest; import io.dapr.client.domain.UnlockRequest; import io.dapr.client.domain.UnlockResponseStatus; import io.dapr.client.domain.query.Query; @@ -268,4 +272,36 @@ Mono> publishEvents(String pubsubName, String topicNa */ Subscription subscribeToEvents( String pubsubName, String topic, SubscriptionListener listener, TypeRef type); + + /** + * Schedules a job using the provided job request details. + * + * @param scheduleJobRequest The request containing the details of the job to schedule. + * Must include a name and optional schedule, data, and other related properties. + * @return A {@link Mono} that completes when the job scheduling operation is successful or raises an error. + * @throws IllegalArgumentException If the request or its required fields like name are null or empty. + */ + public Mono scheduleJob(ScheduleJobRequest scheduleJobRequest); + + /** + * Retrieves details of a specific job. + * + * @param getJobRequest The request containing the job name for which the details are to be fetched. + * The name property is mandatory. + * @return A {@link Mono} that emits the {@link GetJobResponse} containing job details or raises an + * error if the job is not found. + * @throws IllegalArgumentException If the request or its required fields like name are null or empty. + */ + + public Mono getJob(GetJobRequest getJobRequest); + + /** + * Deletes a job based on the given request. + * + * @param deleteJobRequest The request containing the job name to be deleted. + * The name property is mandatory. + * @return A {@link Mono} that completes when the job is successfully deleted or raises an error. + * @throws IllegalArgumentException If the request or its required fields like name are null or empty. + */ + public Mono deleteJob(DeleteJobRequest deleteJobRequest); } diff --git a/sdk/src/main/java/io/dapr/client/domain/DeleteJobRequest.java b/sdk/src/main/java/io/dapr/client/domain/DeleteJobRequest.java new file mode 100644 index 0000000000..31f60a1197 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/DeleteJobRequest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +/** + * Represents a request to schedule a job in Dapr. + */ +public class DeleteJobRequest { + private final String name; + + /** + * Constructor to create Delete Job Request. + * + * @param name of the job to delete. + */ + public DeleteJobRequest(String name) { + this.name = name; + } + + /** + * Gets the name of the job. + * + * @return The job name. + */ + public String getName() { + return name; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/GetJobRequest.java b/sdk/src/main/java/io/dapr/client/domain/GetJobRequest.java new file mode 100644 index 0000000000..cbfe6eed6f --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/GetJobRequest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +/** + * Represents a request to schedule a job in Dapr. + */ +public class GetJobRequest { + private final String name; + + /** + * Constructor to create Get Job Request. + * + * @param name of the job to fetch.. + */ + public GetJobRequest(String name) { + this.name = name; + } + + /** + * Gets the name of the job. + * + * @return The job name. + */ + public String getName() { + return name; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/GetJobResponse.java b/sdk/src/main/java/io/dapr/client/domain/GetJobResponse.java new file mode 100644 index 0000000000..f79c0409ab --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/GetJobResponse.java @@ -0,0 +1,168 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +import java.time.Instant; + +/** + * Represents a request to schedule a job in Dapr. + */ +public class GetJobResponse { + private final String name; + private byte[] data; + private JobSchedule schedule; + private Instant dueTime; + private Integer repeats; + private Instant ttl; + + /** + * Constructor to create GetJobResponse. + * + * @param name of the job. + * @param schedule job has to run. + */ + public GetJobResponse(String name, JobSchedule schedule) { + this.name = name; + this.schedule = schedule; + } + + /** + * Constructor to create GetJobResponse. + * + * @param name of the job. + * @param dueTime An optional time at which the job should be active, or the “one shot” time, if other scheduling + * type fields are not provided. Accepts a “point in time” string in the format of RFC3339, + * Go duration string (calculated from creation time), or non-repeating ISO8601 + */ + public GetJobResponse(String name, Instant dueTime) { + this.name = name; + this.dueTime = dueTime; + } + + /** + * Sets the data payload for the job. + * This should be a JSON serialized value or object. + * + * @param data The job data in byte array format. + * @return This builder instance. + */ + public GetJobResponse setData(byte[] data) { + this.data = data; + return this; + } + + /** + * Sets the schedule for the job. + * The format should follow cron expressions or other supported scheduling formats. + * + * @param schedule The job schedule. + * @return This builder instance. + */ + public GetJobResponse setSchedule(JobSchedule schedule) { + this.schedule = schedule; + return this; + } + + /** + * Sets the due time when the job should become active or execute once. + * This can be in RFC3339, Go duration string, or non-repeating ISO8601 format. + * + * @param dueTime The due time of the job. + * @return This builder instance. + */ + public GetJobResponse setDueTime(Instant dueTime) { + this.dueTime = dueTime; + return this; + } + + /** + * Sets the number of times the job should be triggered. + * If not set, the job runs indefinitely or until expiration. + * + * @param repeats The number of times the job should repeat. + * @return This builder instance. + */ + public GetJobResponse setRepeat(Integer repeats) { + this.repeats = repeats; + return this; + } + + /** + * Sets the time-to-live (TTL) or expiration for the job. + * This can be in RFC3339, Go duration string, or non-repeating ISO8601 format. + * + * @param ttl The time-to-live for the job. + * @return This builder instance. + */ + public GetJobResponse setTtl(Instant ttl) { + this.ttl = ttl; + return this; + } + + // Getters + + /** + * Gets the name of the job. + * + * @return The job name. + */ + public String getName() { + return name; + } + + /** + * Gets the data payload of the job. + * + * @return The job data as a byte array. + */ + public byte[] getData() { + return data; + } + + /** + * Gets the schedule of the job. + * + * @return The job schedule. + */ + public JobSchedule getSchedule() { + return schedule; + } + + /** + * Gets the due time of the job. + * + * @return The due time. + */ + public Instant getDueTime() { + return dueTime; + } + + /** + * Gets the number of times the job should repeat. + * + * @return The repeat count, or null if not set. + */ + public Integer getRepeats() { + return repeats; + } + + /** + * Gets the time-to-live (TTL) or expiration of the job. + * + * @return The TTL value. + */ + public Instant getTtl() { + return ttl; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/JobSchedule.java b/sdk/src/main/java/io/dapr/client/domain/JobSchedule.java new file mode 100644 index 0000000000..5c79d241d8 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/JobSchedule.java @@ -0,0 +1,132 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +import java.time.Duration; + +/** + * Represents a job schedule using cron expressions or fixed intervals. + * This class provides various static methods to create schedules based on predefined periods + * (e.g., daily, weekly, monthly) or using custom cron expressions. + * Example usage: + *
+ * JobsSchedule schedule = JobsSchedule.daily();
+ * System.out.println(schedule.getExpression()); // Outputs: "0 0 0 * * *"
+ * 
+ */ +public class JobSchedule { + + private final String expression; + + /** + * Private constructor to create a job schedule from a cron expression. + * + * @param expression the cron expression defining the schedule. + */ + private JobSchedule(String expression) { + this.expression = expression; + } + + /** + * Creates a job schedule from a fixed period using a {@link Duration}. + * The resulting expression follows the format: "@every XhYmZsWms" + * where X, Y, Z, and W represent hours, minutes, seconds, and milliseconds respectively. + * Example: + *
+   * JobsSchedule schedule = JobsSchedule.fromPeriod(Duration.ofMinutes(30));
+   * System.out.println(schedule.getExpression()); // Outputs: "@every 0h30m0s0ms"
+   * 
+ * + * @param duration the duration of the period. + * @return a {@code JobsSchedule} with the corresponding interval. + * @throws IllegalArgumentException if the duration is null. + */ + public static JobSchedule fromPeriod(Duration duration) { + if (duration == null) { + throw new IllegalArgumentException("duration cannot be null"); + } + + String formattedDuration = String.format("%dh%dm%ds%dms", + duration.toHoursPart(), duration.toMinutesPart(), duration.toSecondsPart(), duration.toMillisPart()); + return new JobSchedule("@every " + formattedDuration); + } + + /** + * Creates a job schedule from a custom cron expression. + * + * @param cronExpression the cron expression. + * @return a {@code JobsSchedule} representing the given cron expression. + */ + public static JobSchedule fromString(String cronExpression) { + if (cronExpression == null) { + throw new IllegalArgumentException("cronExpression cannot be null"); + } + + return new JobSchedule(cronExpression); + } + + /** + * Creates a yearly job schedule, running at midnight on January 1st. + * + * @return a {@code JobsSchedule} for yearly execution. + */ + public static JobSchedule yearly() { + return new JobSchedule("0 0 0 1 1 *"); + } + + /** + * Creates a monthly job schedule, running at midnight on the first day of each month. + * + * @return a {@code JobsSchedule} for monthly execution. + */ + public static JobSchedule monthly() { + return new JobSchedule("0 0 0 1 * *"); + } + + /** + * Creates a weekly job schedule, running at midnight on Sunday. + * + * @return a {@code JobsSchedule} for weekly execution. + */ + public static JobSchedule weekly() { + return new JobSchedule("0 0 0 * * 0"); + } + + /** + * Creates a daily job schedule, running at midnight every day. + * + * @return a {@code JobsSchedule} for daily execution. + */ + public static JobSchedule daily() { + return new JobSchedule("0 0 0 * * *"); + } + + /** + * Creates an hourly job schedule, running at the start of every hour. + * + * @return a {@code JobsSchedule} for hourly execution. + */ + public static JobSchedule hourly() { + return new JobSchedule("0 0 * * * *"); + } + + /** + * Gets the cron expression representing this job schedule. + * + * @return the cron expression as a string. + */ + public String getExpression() { + return this.expression; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/ScheduleJobRequest.java b/sdk/src/main/java/io/dapr/client/domain/ScheduleJobRequest.java new file mode 100644 index 0000000000..9f91e6142f --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/ScheduleJobRequest.java @@ -0,0 +1,168 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +import java.time.Instant; + +/** + * Represents a request to schedule a job in Dapr. + */ +public class ScheduleJobRequest { + private final String name; + private byte[] data; + private JobSchedule schedule; + private Instant dueTime; + private Integer repeats; + private Instant ttl; + + /** + * Constructor to create ScheduleJobRequest. + * + * @param name of the job. + * @param schedule job has to run. + */ + public ScheduleJobRequest(String name, JobSchedule schedule) { + this.name = name; + this.schedule = schedule; + } + + /** + * Constructor to create ScheduleJobRequest. + * + * @param name of the job. + * @param dueTime An optional time at which the job should be active, or the “one shot” time, if other scheduling + * type fields are not provided. Accepts a “point in time” string in the format of RFC3339, + * Go duration string (calculated from creation time), or non-repeating ISO8601 + */ + public ScheduleJobRequest(String name, Instant dueTime) { + this.name = name; + this.dueTime = dueTime; + } + + /** + * Sets the data payload for the job. + * This should be a JSON serialized value or object. + * + * @param data The job data in byte array format. + * @return This builder instance. + */ + public ScheduleJobRequest setData(byte[] data) { + this.data = data; + return this; + } + + /** + * Sets the schedule for the job. + * The format should follow cron expressions or other supported scheduling formats. + * + * @param schedule The job schedule. + * @return This builder instance. + */ + public ScheduleJobRequest setSchedule(JobSchedule schedule) { + this.schedule = schedule; + return this; + } + + /** + * Sets the due time when the job should become active or execute once. + * This can be in RFC3339, Go duration string, or non-repeating ISO8601 format. + * + * @param dueTime The due time of the job. + * @return This builder instance. + */ + public ScheduleJobRequest setDueTime(Instant dueTime) { + this.dueTime = dueTime; + return this; + } + + /** + * Sets the number of times the job should be triggered. + * If not set, the job runs indefinitely or until expiration. + * + * @param repeats The number of times the job should repeat. + * @return This builder instance. + */ + public ScheduleJobRequest setRepeat(Integer repeats) { + this.repeats = repeats; + return this; + } + + /** + * Sets the time-to-live (TTL) or expiration for the job. + * This can be in RFC3339, Go duration string, or non-repeating ISO8601 format. + * + * @param ttl The time-to-live for the job. + * @return This builder instance. + */ + public ScheduleJobRequest setTtl(Instant ttl) { + this.ttl = ttl; + return this; + } + + // Getters + + /** + * Gets the name of the job. + * + * @return The job name. + */ + public String getName() { + return name; + } + + /** + * Gets the data payload of the job. + * + * @return The job data as a byte array. + */ + public byte[] getData() { + return data; + } + + /** + * Gets the schedule of the job. + * + * @return The job schedule. + */ + public JobSchedule getSchedule() { + return schedule; + } + + /** + * Gets the due time of the job. + * + * @return The due time. + */ + public Instant getDueTime() { + return dueTime; + } + + /** + * Gets the number of times the job should repeat. + * + * @return The repeat count, or null if not set. + */ + public Integer getRepeats() { + return repeats; + } + + /** + * Gets the time-to-live (TTL) or expiration of the job. + * + * @return The TTL value. + */ + public Instant getTtl() { + return ttl; + } +} diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 6864a1ee07..ca3c1c5341 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -632,5 +632,4 @@ public void close() throws Exception { daprClientHttp = buildDaprClient(daprHttp); daprClientHttp.close(); } - } diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index a28dad0f42..aec0f287ae 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -16,14 +16,20 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; import io.dapr.client.domain.BulkPublishEntry; import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.CloudEvent; +import io.dapr.client.domain.DeleteJobRequest; +import io.dapr.client.domain.GetJobRequest; +import io.dapr.client.domain.GetJobResponse; +import io.dapr.client.domain.JobSchedule; import io.dapr.client.domain.QueryStateItem; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; +import io.dapr.client.domain.ScheduleJobRequest; import io.dapr.client.domain.UnlockResponseStatus; import io.dapr.client.domain.query.Query; import io.dapr.serializer.DaprObjectSerializer; @@ -39,11 +45,19 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.mockito.stubbing.Answer; import reactor.core.publisher.Mono; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -57,515 +71,807 @@ import java.util.concurrent.atomic.AtomicInteger; import static io.dapr.utils.TestUtils.assertThrowsDaprException; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class DaprPreviewClientGrpcTest { - private static final ObjectMapper MAPPER = new ObjectMapper(); - - private static final String QUERY_STORE_NAME = "testQueryStore"; - - private static final String PUBSUB_NAME = "testPubsub"; - - private static final String TOPIC_NAME = "testTopic"; - - private static final String LOCK_STORE_NAME = "MyLockStore"; - - private GrpcChannelFacade channel; - private DaprGrpc.DaprStub daprStub; - private DaprHttp daprHttp; - private DaprPreviewClient previewClient; - - @BeforeEach - public void setup() throws IOException { - channel = mock(GrpcChannelFacade.class); - daprStub = mock(DaprGrpc.DaprStub.class); - daprHttp = mock(DaprHttp.class); - when(daprStub.withInterceptors(any())).thenReturn(daprStub); - previewClient = new DaprClientImpl( - channel, daprStub, daprHttp, new DefaultObjectSerializer(), new DefaultObjectSerializer()); - doNothing().when(channel).close(); - } - - @AfterEach - public void tearDown() throws Exception { - previewClient.close(); - verify(channel).close(); - verifyNoMoreInteractions(channel); - } - - @Test - public void publishEventsExceptionThrownTest() { - doAnswer((Answer) invocation -> { - throw newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument"); - }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); - - assertThrowsDaprException( - StatusRuntimeException.class, - "INVALID_ARGUMENT", - "INVALID_ARGUMENT: bad bad argument", - () -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, - Collections.EMPTY_LIST)).block()); - } - - @Test - public void publishEventsCallbackExceptionThrownTest() { - doAnswer((Answer) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - observer.onError(newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument")); - return null; - }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); - - assertThrowsDaprException( - ExecutionException.class, - "INVALID_ARGUMENT", - "INVALID_ARGUMENT: bad bad argument", - () -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, - Collections.EMPTY_LIST)).block()); - } - - @Test - public void publishEventsContentTypeMismatchException() throws IOException { - DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - doAnswer((Answer) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance()); - observer.onCompleted(); - return null; - }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); - - - BulkPublishEntry entry = new BulkPublishEntry<>("1", "testEntry" - , "application/octet-stream", null); - BulkPublishRequest wrongReq = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, - Collections.singletonList(entry)); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final String QUERY_STORE_NAME = "testQueryStore"; + + private static final String PUBSUB_NAME = "testPubsub"; + + private static final String TOPIC_NAME = "testTopic"; + + private static final String LOCK_STORE_NAME = "MyLockStore"; + + private GrpcChannelFacade channel; + private DaprGrpc.DaprStub daprStub; + private DaprHttp daprHttp; + private DaprPreviewClient previewClient; + + @BeforeEach + public void setup() throws IOException { + channel = mock(GrpcChannelFacade.class); + daprStub = mock(DaprGrpc.DaprStub.class); + daprHttp = mock(DaprHttp.class); + when(daprStub.withInterceptors(any())).thenReturn(daprStub); + previewClient = new DaprClientImpl( + channel, daprStub, daprHttp, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + doNothing().when(channel).close(); + } + + @AfterEach + public void tearDown() throws Exception { + previewClient.close(); + verify(channel).close(); + verifyNoMoreInteractions(channel); + } + + @Test + public void publishEventsExceptionThrownTest() { + doAnswer((Answer) invocation -> { + throw newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument"); + }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); + + assertThrowsDaprException( + StatusRuntimeException.class, + "INVALID_ARGUMENT", + "INVALID_ARGUMENT: bad bad argument", + () -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, + Collections.EMPTY_LIST)).block()); + } + + @Test + public void publishEventsCallbackExceptionThrownTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onError(newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument")); + return null; + }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); + + assertThrowsDaprException( + ExecutionException.class, + "INVALID_ARGUMENT", + "INVALID_ARGUMENT: bad bad argument", + () -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, + Collections.EMPTY_LIST)).block()); + } + + @Test + public void publishEventsContentTypeMismatchException() throws IOException { + DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); + + + BulkPublishEntry entry = new BulkPublishEntry<>("1", "testEntry" + , "application/octet-stream", null); + BulkPublishRequest wrongReq = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, + Collections.singletonList(entry)); assertThrows(IllegalArgumentException.class, () -> previewClient.publishEvents(wrongReq).block()); - } - - @Test - public void publishEventsSerializeException() throws IOException { - DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); - previewClient = new DaprClientImpl(channel, daprStub, daprHttp, mockSerializer, new DefaultObjectSerializer()); - doAnswer((Answer) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance()); - observer.onCompleted(); - return null; - }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); - BulkPublishEntry> entry = new BulkPublishEntry<>("1", new HashMap<>(), - "application/json", null); - BulkPublishRequest> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, - Collections.singletonList(entry)); - when(mockSerializer.serialize(any())).thenThrow(IOException.class); - Mono>> result = previewClient.publishEvents(req); - - assertThrowsDaprException( - IOException.class, - "UNKNOWN", - "UNKNOWN: ", - () -> result.block()); - } - - @Test - public void publishEventsTest() { - doAnswer((Answer) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder(); - observer.onNext(builder.build()); - observer.onCompleted(); - return null; - }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); - - BulkPublishEntry entry = new BulkPublishEntry<>("1", "test", - "text/plain", null); - BulkPublishRequest req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, - Collections.singletonList(entry)); - Mono> result = previewClient.publishEvents(req); - BulkPublishResponse res = result.block(); - Assertions.assertNotNull(res); - assertEquals( 0, res.getFailedEntries().size(), "expected no entry in failed entries list"); - } - - @Test - public void publishEventsWithoutMetaTest() { - doAnswer((Answer) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder(); - observer.onNext(builder.build()); - observer.onCompleted(); - return null; - }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); - - Mono> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME, - "text/plain", Collections.singletonList("test")); - BulkPublishResponse res = result.block(); - Assertions.assertNotNull(res); - assertEquals( 0, res.getFailedEntries().size(), "expected no entries in failed entries list"); - } - - @Test - public void publishEventsWithRequestMetaTest() { - doAnswer((Answer) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder(); - observer.onNext(builder.build()); - observer.onCompleted(); - return null; - }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); - - Mono> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME, - "text/plain", new HashMap(){{ - put("ttlInSeconds", "123"); - }}, Collections.singletonList("test")); - BulkPublishResponse res = result.block(); - Assertions.assertNotNull(res); - assertEquals( 0, res.getFailedEntries().size(), "expected no entry in failed entries list"); - } - - @Test - public void publishEventsObjectTest() { - doAnswer((Answer) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance()); - observer.onCompleted(); - return null; - }).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> { - DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0); - if (!"application/json".equals(bulkPublishRequest.getEntries(0).getContentType())) { - return false; - } - - if (!"{\"id\":1,\"value\":\"Event\"}".equals(new String(entry.getEvent().toByteArray())) && - !"{\"value\":\"Event\",\"id\":1}".equals(new String(entry.getEvent().toByteArray()))) { - return false; - } - return true; - }), any()); - - - DaprClientGrpcTest.MyObject event = new DaprClientGrpcTest.MyObject(1, "Event"); - BulkPublishEntry entry = new BulkPublishEntry<>("1", event, - "application/json", null); - BulkPublishRequest req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, - Collections.singletonList(entry)); - BulkPublishResponse result = previewClient.publishEvents(req).block(); - Assertions.assertNotNull(result); - Assertions.assertEquals(0, result.getFailedEntries().size(), "expected no entries to be failed"); - } - - @Test - public void publishEventsContentTypeOverrideTest() { - doAnswer((Answer) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance()); - observer.onCompleted(); - return null; - }).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> { - DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0); - if (!"application/json".equals(entry.getContentType())) { - return false; - } - - if (!"\"hello\"".equals(new String(entry.getEvent().toByteArray()))) { - return false; - } - return true; - }), any()); - - BulkPublishEntry entry = new BulkPublishEntry<>("1", "hello", - "", null); - BulkPublishRequest req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, - Collections.singletonList(entry)); - BulkPublishResponse result = previewClient.publishEvents(req).block(); - Assertions.assertNotNull(result); - Assertions.assertEquals( 0, result.getFailedEntries().size(), "expected no entries to be failed"); - } - - @Test - public void queryStateExceptionsTest() { - assertThrows(IllegalArgumentException.class, () -> { - previewClient.queryState("", "query", String.class).block(); - }); - assertThrows(IllegalArgumentException.class, () -> { - previewClient.queryState("storeName", "", String.class).block(); - }); - assertThrows(IllegalArgumentException.class, () -> { - previewClient.queryState("storeName", (Query) null, String.class).block(); - }); - assertThrows(IllegalArgumentException.class, () -> { - previewClient.queryState("storeName", (String) null, String.class).block(); - }); - assertThrows(IllegalArgumentException.class, () -> { - previewClient.queryState(new QueryStateRequest("storeName"), String.class).block(); - }); - assertThrows(IllegalArgumentException.class, () -> { - previewClient.queryState(null, String.class).block(); - }); - } - - @Test - public void queryState() throws JsonProcessingException { - List> resp = new ArrayList<>(); - resp.add(new QueryStateItem("1", (Object)"testData", "6f54ad94-dfb9-46f0-a371-e42d550adb7d")); - DaprProtos.QueryStateResponse responseEnvelope = buildQueryStateResponse(resp, ""); - doAnswer(invocation -> { - DaprProtos.QueryStateRequest req = (DaprProtos.QueryStateRequest) invocation.getArgument(0); - assertEquals(QUERY_STORE_NAME, req.getStoreName()); - assertEquals("query", req.getQuery()); - assertEquals(0, req.getMetadataCount()); - - StreamObserver observer = (StreamObserver) - invocation.getArguments()[1]; - observer.onNext(responseEnvelope); - observer.onCompleted(); - return null; - }).when(daprStub).queryStateAlpha1(any(DaprProtos.QueryStateRequest.class), any()); - - QueryStateResponse response = previewClient.queryState(QUERY_STORE_NAME, "query", String.class).block(); - assertNotNull(response); - assertEquals(1, response.getResults().size(), "result size must be 1"); - assertEquals("1", response.getResults().get(0).getKey(), "result must be same"); - assertEquals("testData", response.getResults().get(0).getValue(), "result must be same"); - assertEquals("6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag(), "result must be same"); - } - - @Test - public void queryStateMetadataError() throws JsonProcessingException { - List> resp = new ArrayList<>(); - resp.add(new QueryStateItem("1", null, "error data")); - DaprProtos.QueryStateResponse responseEnvelope = buildQueryStateResponse(resp, ""); - doAnswer(invocation -> { - DaprProtos.QueryStateRequest req = (DaprProtos.QueryStateRequest) invocation.getArgument(0); - assertEquals(QUERY_STORE_NAME, req.getStoreName()); - assertEquals("query", req.getQuery()); - assertEquals(1, req.getMetadataCount()); - assertEquals(1, req.getMetadataCount()); - - StreamObserver observer = (StreamObserver) - invocation.getArguments()[1]; - observer.onNext(responseEnvelope); - observer.onCompleted(); - return null; - }).when(daprStub).queryStateAlpha1(any(DaprProtos.QueryStateRequest.class), any()); - - QueryStateResponse response = previewClient.queryState(QUERY_STORE_NAME, "query", - new HashMap(){{ put("key", "error"); }}, String.class).block(); - assertNotNull(response); - assertEquals(1, response.getResults().size(), "result size must be 1"); - assertEquals( "1", response.getResults().get(0).getKey(), "result must be same"); - assertEquals( "error data", response.getResults().get(0).getError(), "result must be same"); - } - - @Test - public void tryLock() { - - DaprProtos.TryLockResponse.Builder builder = DaprProtos.TryLockResponse.newBuilder() - .setSuccess(true); - - DaprProtos.TryLockResponse response = builder.build(); - - doAnswer((Answer) invocation -> { - DaprProtos.TryLockRequest req = invocation.getArgument(0); - assertEquals(LOCK_STORE_NAME, req.getStoreName()); - assertEquals("1", req.getResourceId()); - assertEquals("owner", req.getLockOwner()); - assertEquals(10, req.getExpiryInSeconds()); - - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - observer.onNext(response); - observer.onCompleted(); - return null; - }).when(daprStub).tryLockAlpha1(any(DaprProtos.TryLockRequest.class), any()); - - Boolean result = previewClient.tryLock("MyLockStore", "1", "owner", 10).block(); - assertEquals(Boolean.TRUE, result); - } - - @Test - public void unLock() { - - DaprProtos.UnlockResponse.Builder builder = DaprProtos.UnlockResponse.newBuilder() - .setStatus(DaprProtos.UnlockResponse.Status.SUCCESS); - - DaprProtos.UnlockResponse response = builder.build(); - - doAnswer((Answer) invocation -> { - DaprProtos.UnlockRequest req = invocation.getArgument(0); - assertEquals(LOCK_STORE_NAME, req.getStoreName()); - assertEquals("1", req.getResourceId()); - assertEquals("owner", req.getLockOwner()); - - StreamObserver observer = - (StreamObserver) invocation.getArguments()[1]; - observer.onNext(response); - observer.onCompleted(); - return null; - }).when(daprStub).unlockAlpha1(any(DaprProtos.UnlockRequest.class), any()); - - UnlockResponseStatus result = previewClient.unlock("MyLockStore", "1", "owner").block(); - assertEquals(UnlockResponseStatus.SUCCESS, result); - } - - @Test - public void subscribeEventTest() throws Exception { - var numEvents = 100; - var numErrors = 3; - var numDrops = 2; - - var pubsubName = "pubsubName"; - var topicName = "topicName"; - var data = "my message"; - - var started = new Semaphore(0); - - doAnswer((Answer>) invocation -> { - StreamObserver observer = - (StreamObserver) invocation.getArguments()[0]; - var emitterThread = new Thread(() -> { + } + + @Test + public void publishEventsSerializeException() throws IOException { + DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class); + previewClient = new DaprClientImpl(channel, daprStub, daprHttp, mockSerializer, new DefaultObjectSerializer()); + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); + BulkPublishEntry> entry = new BulkPublishEntry<>("1", new HashMap<>(), + "application/json", null); + BulkPublishRequest> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, + Collections.singletonList(entry)); + when(mockSerializer.serialize(any())).thenThrow(IOException.class); + Mono>> result = previewClient.publishEvents(req); + + assertThrowsDaprException( + IOException.class, + "UNKNOWN", + "UNKNOWN: ", + () -> result.block()); + } + + @Test + public void publishEventsTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder(); + observer.onNext(builder.build()); + observer.onCompleted(); + return null; + }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); + + BulkPublishEntry entry = new BulkPublishEntry<>("1", "test", + "text/plain", null); + BulkPublishRequest req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, + Collections.singletonList(entry)); + Mono> result = previewClient.publishEvents(req); + BulkPublishResponse res = result.block(); + Assertions.assertNotNull(res); + assertEquals( 0, res.getFailedEntries().size(), "expected no entry in failed entries list"); + } + + @Test + public void publishEventsWithoutMetaTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder(); + observer.onNext(builder.build()); + observer.onCompleted(); + return null; + }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); + + Mono> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME, + "text/plain", Collections.singletonList("test")); + BulkPublishResponse res = result.block(); + Assertions.assertNotNull(res); + assertEquals( 0, res.getFailedEntries().size(), "expected no entries in failed entries list"); + } + + @Test + public void publishEventsWithRequestMetaTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder(); + observer.onNext(builder.build()); + observer.onCompleted(); + return null; + }).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any()); + + Mono> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME, + "text/plain", new HashMap(){{ + put("ttlInSeconds", "123"); + }}, Collections.singletonList("test")); + BulkPublishResponse res = result.block(); + Assertions.assertNotNull(res); + assertEquals( 0, res.getFailedEntries().size(), "expected no entry in failed entries list"); + } + + @Test + public void publishEventsObjectTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> { + DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0); + if (!"application/json".equals(bulkPublishRequest.getEntries(0).getContentType())) { + return false; + } + + if (!"{\"id\":1,\"value\":\"Event\"}".equals(new String(entry.getEvent().toByteArray())) && + !"{\"value\":\"Event\",\"id\":1}".equals(new String(entry.getEvent().toByteArray()))) { + return false; + } + return true; + }), any()); + + + DaprClientGrpcTest.MyObject event = new DaprClientGrpcTest.MyObject(1, "Event"); + BulkPublishEntry entry = new BulkPublishEntry<>("1", event, + "application/json", null); + BulkPublishRequest req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, + Collections.singletonList(entry)); + BulkPublishResponse result = previewClient.publishEvents(req).block(); + Assertions.assertNotNull(result); + Assertions.assertEquals(0, result.getFailedEntries().size(), "expected no entries to be failed"); + } + + @Test + public void publishEventsContentTypeOverrideTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> { + DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0); + if (!"application/json".equals(entry.getContentType())) { + return false; + } + + if (!"\"hello\"".equals(new String(entry.getEvent().toByteArray()))) { + return false; + } + return true; + }), any()); + + BulkPublishEntry entry = new BulkPublishEntry<>("1", "hello", + "", null); + BulkPublishRequest req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, + Collections.singletonList(entry)); + BulkPublishResponse result = previewClient.publishEvents(req).block(); + Assertions.assertNotNull(result); + Assertions.assertEquals( 0, result.getFailedEntries().size(), "expected no entries to be failed"); + } + + @Test + public void queryStateExceptionsTest() { + assertThrows(IllegalArgumentException.class, () -> { + previewClient.queryState("", "query", String.class).block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + previewClient.queryState("storeName", "", String.class).block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + previewClient.queryState("storeName", (Query) null, String.class).block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + previewClient.queryState("storeName", (String) null, String.class).block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + previewClient.queryState(new QueryStateRequest("storeName"), String.class).block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + previewClient.queryState(null, String.class).block(); + }); + } + + @Test + public void queryState() throws JsonProcessingException { + List> resp = new ArrayList<>(); + resp.add(new QueryStateItem("1", (Object)"testData", "6f54ad94-dfb9-46f0-a371-e42d550adb7d")); + DaprProtos.QueryStateResponse responseEnvelope = buildQueryStateResponse(resp, ""); + doAnswer(invocation -> { + DaprProtos.QueryStateRequest req = (DaprProtos.QueryStateRequest) invocation.getArgument(0); + assertEquals(QUERY_STORE_NAME, req.getStoreName()); + assertEquals("query", req.getQuery()); + assertEquals(0, req.getMetadataCount()); + + StreamObserver observer = (StreamObserver) + invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(daprStub).queryStateAlpha1(any(DaprProtos.QueryStateRequest.class), any()); + + QueryStateResponse response = previewClient.queryState(QUERY_STORE_NAME, "query", String.class).block(); + assertNotNull(response); + assertEquals(1, response.getResults().size(), "result size must be 1"); + assertEquals("1", response.getResults().get(0).getKey(), "result must be same"); + assertEquals("testData", response.getResults().get(0).getValue(), "result must be same"); + assertEquals("6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag(), "result must be same"); + } + + @Test + public void queryStateMetadataError() throws JsonProcessingException { + List> resp = new ArrayList<>(); + resp.add(new QueryStateItem("1", null, "error data")); + DaprProtos.QueryStateResponse responseEnvelope = buildQueryStateResponse(resp, ""); + doAnswer(invocation -> { + DaprProtos.QueryStateRequest req = (DaprProtos.QueryStateRequest) invocation.getArgument(0); + assertEquals(QUERY_STORE_NAME, req.getStoreName()); + assertEquals("query", req.getQuery()); + assertEquals(1, req.getMetadataCount()); + assertEquals(1, req.getMetadataCount()); + + StreamObserver observer = (StreamObserver) + invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(daprStub).queryStateAlpha1(any(DaprProtos.QueryStateRequest.class), any()); + + QueryStateResponse response = previewClient.queryState(QUERY_STORE_NAME, "query", + new HashMap(){{ put("key", "error"); }}, String.class).block(); + assertNotNull(response); + assertEquals(1, response.getResults().size(), "result size must be 1"); + assertEquals( "1", response.getResults().get(0).getKey(), "result must be same"); + assertEquals( "error data", response.getResults().get(0).getError(), "result must be same"); + } + + @Test + public void tryLock() { + + DaprProtos.TryLockResponse.Builder builder = DaprProtos.TryLockResponse.newBuilder() + .setSuccess(true); + + DaprProtos.TryLockResponse response = builder.build(); + + doAnswer((Answer) invocation -> { + DaprProtos.TryLockRequest req = invocation.getArgument(0); + assertEquals(LOCK_STORE_NAME, req.getStoreName()); + assertEquals("1", req.getResourceId()); + assertEquals("owner", req.getLockOwner()); + assertEquals(10, req.getExpiryInSeconds()); + + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(response); + observer.onCompleted(); + return null; + }).when(daprStub).tryLockAlpha1(any(DaprProtos.TryLockRequest.class), any()); + + Boolean result = previewClient.tryLock("MyLockStore", "1", "owner", 10).block(); + assertEquals(Boolean.TRUE, result); + } + + @Test + public void unLock() { + + DaprProtos.UnlockResponse.Builder builder = DaprProtos.UnlockResponse.newBuilder() + .setStatus(DaprProtos.UnlockResponse.Status.SUCCESS); + + DaprProtos.UnlockResponse response = builder.build(); + + doAnswer((Answer) invocation -> { + DaprProtos.UnlockRequest req = invocation.getArgument(0); + assertEquals(LOCK_STORE_NAME, req.getStoreName()); + assertEquals("1", req.getResourceId()); + assertEquals("owner", req.getLockOwner()); + + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(response); + observer.onCompleted(); + return null; + }).when(daprStub).unlockAlpha1(any(DaprProtos.UnlockRequest.class), any()); + + UnlockResponseStatus result = previewClient.unlock("MyLockStore", "1", "owner").block(); + assertEquals(UnlockResponseStatus.SUCCESS, result); + } + + @Test + public void subscribeEventTest() throws Exception { + var numEvents = 100; + var numErrors = 3; + var numDrops = 2; + + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var data = "my message"; + + var started = new Semaphore(0); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + var emitterThread = new Thread(() -> { try { started.acquire(); } catch (InterruptedException e) { throw new RuntimeException(e); } observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); - for (int i = 0; i < numEvents; i++) { - observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() - .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() - .setId(Integer.toString(i)) - .setPubsubName(pubsubName) - .setTopic(topicName) - .setData(ByteString.copyFromUtf8("\"" + data + "\"")) - .setDataContentType("application/json") - .build()) - .build()); - } - - for (int i = 0; i < numDrops; i++) { - // Bad messages - observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() - .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() - .setId(UUID.randomUUID().toString()) - .setPubsubName("bad pubsub") - .setTopic("bad topic") - .setData(ByteString.copyFromUtf8("\"\"")) - .setDataContentType("application/json") - .build()) - .build()); - } - observer.onCompleted(); - }); - emitterThread.start(); - return new StreamObserver<>() { - - @Override - public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { - started.release(); - } - - @Override - public void onError(Throwable throwable) { - } - - @Override - public void onCompleted() { - } - }; - }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); - - final Set success = Collections.synchronizedSet(new HashSet<>()); - final Set errors = Collections.synchronizedSet(new HashSet<>()); - final AtomicInteger dropCounter = new AtomicInteger(); - final Semaphore gotAll = new Semaphore(0); - - final AtomicInteger errorsToBeEmitted = new AtomicInteger(numErrors); - - var subscription = previewClient.subscribeToEvents( - "pubsubname", - "topic", - new SubscriptionListener<>() { - @Override - public Mono onEvent(CloudEvent event) { - if (event.getPubsubName().equals(pubsubName) && - event.getTopic().equals(topicName) && - event.getData().equals(data)) { - - // Simulate an error - if ((success.size() == 4 /* some random entry */) && errorsToBeEmitted.decrementAndGet() >= 0) { - throw new RuntimeException("simulated exception on event " + event.getId()); - } - - success.add(event.getId()); - if (success.size() >= numEvents) { - gotAll.release(); - } - return Mono.just(Status.SUCCESS); - } - - dropCounter.incrementAndGet(); - return Mono.just(Status.DROP); - } - - @Override - public void onError(RuntimeException exception) { - errors.add(exception.getMessage()); - } - - }, - TypeRef.STRING); - - gotAll.acquire(); - subscription.close(); - - assertEquals(numEvents, success.size()); - assertEquals(numDrops, dropCounter.get()); - assertEquals(numErrors, errors.size()); - } - private DaprProtos.QueryStateResponse buildQueryStateResponse(List> resp,String token) - throws JsonProcessingException { - List items = new ArrayList<>(); - for (QueryStateItem item: resp) { - items.add(buildQueryStateItem(item)); - } - return DaprProtos.QueryStateResponse.newBuilder() - .addAllResults(items) - .setToken(token) - .build(); - } - - private DaprProtos.QueryStateItem buildQueryStateItem(QueryStateItem item) throws JsonProcessingException { - DaprProtos.QueryStateItem.Builder it = DaprProtos.QueryStateItem.newBuilder().setKey(item.getKey()); - if (item.getValue() != null) { - it.setData(ByteString.copyFrom(MAPPER.writeValueAsBytes(item.getValue()))); - } - if (item.getEtag() != null) { - it.setEtag(item.getEtag()); - } - if (item.getError() != null) { - it.setError(item.getError()); - } - return it.build(); - } - - private static StatusRuntimeException newStatusRuntimeException(String status, String message) { - return new StatusRuntimeException(Status.fromCode(Status.Code.valueOf(status)).withDescription(message)); - } + for (int i = 0; i < numEvents; i++) { + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(Integer.toString(i)) + .setPubsubName(pubsubName) + .setTopic(topicName) + .setData(ByteString.copyFromUtf8("\"" + data + "\"")) + .setDataContentType("application/json") + .build()) + .build()); + } + + for (int i = 0; i < numDrops; i++) { + // Bad messages + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(UUID.randomUUID().toString()) + .setPubsubName("bad pubsub") + .setTopic("bad topic") + .setData(ByteString.copyFromUtf8("\"\"")) + .setDataContentType("application/json") + .build()) + .build()); + } + observer.onCompleted(); + }); + emitterThread.start(); + return new StreamObserver<>() { + + @Override + public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { + started.release(); + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onCompleted() { + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + final Set success = Collections.synchronizedSet(new HashSet<>()); + final Set errors = Collections.synchronizedSet(new HashSet<>()); + final AtomicInteger dropCounter = new AtomicInteger(); + final Semaphore gotAll = new Semaphore(0); + + final AtomicInteger errorsToBeEmitted = new AtomicInteger(numErrors); + + var subscription = previewClient.subscribeToEvents( + "pubsubname", + "topic", + new SubscriptionListener<>() { + @Override + public Mono onEvent(CloudEvent event) { + if (event.getPubsubName().equals(pubsubName) && + event.getTopic().equals(topicName) && + event.getData().equals(data)) { + + // Simulate an error + if ((success.size() == 4 /* some random entry */) && errorsToBeEmitted.decrementAndGet() >= 0) { + throw new RuntimeException("simulated exception on event " + event.getId()); + } + + success.add(event.getId()); + if (success.size() >= numEvents) { + gotAll.release(); + } + return Mono.just(Status.SUCCESS); + } + + dropCounter.incrementAndGet(); + return Mono.just(Status.DROP); + } + + @Override + public void onError(RuntimeException exception) { + errors.add(exception.getMessage()); + } + + }, + TypeRef.STRING); + + gotAll.acquire(); + subscription.close(); + + assertEquals(numEvents, success.size()); + assertEquals(numDrops, dropCounter.get()); + assertEquals(numErrors, errors.size()); + } + + @Test + public void scheduleJobShouldSucceedWhenAllFieldsArePresentInRequest() { + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob", + JobSchedule.fromString("*/5 * * * *")) + .setData("testData".getBytes()) + .setTtl(Instant.now().plus(1, ChronoUnit.DAYS)) + .setRepeat(5) + .setDueTime(Instant.now().plus(10, ChronoUnit.MINUTES)); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onCompleted(); // Simulate successful response + return null; + }).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any()); + + assertDoesNotThrow(() -> previewClient.scheduleJob(expectedScheduleJobRequest).block()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class); + + verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any()); + DaprProtos.ScheduleJobRequest actualScheduleJobReq = captor.getValue(); + + assertEquals("testJob", actualScheduleJobReq.getJob().getName()); + assertEquals("testData", + new String(actualScheduleJobReq.getJob().getData().getValue().toByteArray(), StandardCharsets.UTF_8)); + assertEquals("*/5 * * * *", actualScheduleJobReq.getJob().getSchedule()); + assertEquals(iso8601Formatter.format(expectedScheduleJobRequest.getTtl()), actualScheduleJobReq.getJob().getTtl()); + assertEquals(expectedScheduleJobRequest.getRepeats(), actualScheduleJobReq.getJob().getRepeats()); + assertEquals(iso8601Formatter.format(expectedScheduleJobRequest.getDueTime()), actualScheduleJobReq.getJob().getDueTime()); + } + + @Test + public void scheduleJobShouldSucceedWhenRequiredFieldsNameAndDueTimeArePresentInRequest() { + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onCompleted(); // Simulate successful response + return null; + }).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any()); + + ScheduleJobRequest expectedScheduleJobRequest = + new ScheduleJobRequest("testJob", Instant.now().plus(10, ChronoUnit.MINUTES)); + assertDoesNotThrow(() -> previewClient.scheduleJob(expectedScheduleJobRequest).block()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class); + + verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any()); + DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue(); + DaprProtos.Job job = actualScheduleJobRequest.getJob(); + assertEquals("testJob", job.getName()); + assertFalse(job.hasData()); + assertFalse(job.hasSchedule()); + assertEquals(0, job.getRepeats()); + assertFalse(job.hasTtl()); + assertEquals(iso8601Formatter.format(expectedScheduleJobRequest.getDueTime()), + actualScheduleJobRequest.getJob().getDueTime()); + } + + @Test + public void scheduleJobShouldSucceedWhenRequiredFieldsNameAndScheduleArePresentInRequest() { + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onCompleted(); // Simulate successful response + return null; + }).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any()); + + ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob", + JobSchedule.fromString("* * * * * *")); + assertDoesNotThrow(() -> previewClient.scheduleJob(expectedScheduleJobRequest).block()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class); + + verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any()); + DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue(); + DaprProtos.Job job = actualScheduleJobRequest.getJob(); + assertEquals("testJob", job.getName()); + assertFalse(job.hasData()); + assertEquals( "* * * * * *", job.getSchedule()); + assertEquals(0, job.getRepeats()); + assertFalse(job.hasTtl()); + } + + @Test + public void scheduleJobShouldThrowWhenRequestIsNull() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.scheduleJob(null).block(); + }); + assertEquals("scheduleJobRequest cannot be null", exception.getMessage()); + } + + @Test + public void scheduleJobShouldThrowWhenInvalidRequest() { + ScheduleJobRequest scheduleJobRequest = new ScheduleJobRequest(null, Instant.now()); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.scheduleJob(scheduleJobRequest).block(); + }); + assertEquals("Name in the request cannot be null or empty", exception.getMessage()); + } + + @Test + public void scheduleJobShouldThrowWhenNameInRequestIsEmpty() { + ScheduleJobRequest scheduleJobRequest = new ScheduleJobRequest("", Instant.now()); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.scheduleJob(scheduleJobRequest).block(); + }); + assertEquals("Name in the request cannot be null or empty", exception.getMessage()); + } + + @Test + public void getJobShouldReturnResponseWhenAllFieldsArePresentInRequest() { + DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + .withZone(ZoneOffset.UTC); + + GetJobRequest getJobRequest = new GetJobRequest("testJob"); + + DaprProtos.Job job = DaprProtos.Job.newBuilder() + .setName("testJob") + .setTtl(OffsetDateTime.now().format(iso8601Formatter)) + .setData(Any.newBuilder().setValue(ByteString.copyFrom("testData".getBytes())).build()) + .setSchedule("*/5 * * * *") + .setRepeats(5) + .setDueTime(iso8601Formatter.format(Instant.now().plus(10, ChronoUnit.MINUTES))) + .build(); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onNext(DaprProtos.GetJobResponse.newBuilder() + .setJob(job) + .build()); + observer.onCompleted(); + return null; + }).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any()); + + Mono resultMono = previewClient.getJob(getJobRequest); + + GetJobResponse response = resultMono.block(); + assertNotNull(response); + assertEquals("testJob", response.getName()); + assertEquals("testData", new String(response.getData(), StandardCharsets.UTF_8)); + assertEquals("*/5 * * * *", response.getSchedule().getExpression()); + assertEquals(5, response.getRepeats()); + assertEquals(job.getTtl(), iso8601Formatter.format(response.getTtl())); + assertEquals(job.getDueTime(), iso8601Formatter.format(response.getDueTime())); + } + + @Test + public void getJobShouldReturnResponseWithScheduleSetWhenResponseHasSchedule() { + GetJobRequest getJobRequest = new GetJobRequest("testJob"); + + DaprProtos.Job job = DaprProtos.Job.newBuilder() + .setName("testJob") + .setSchedule("0 0 0 1 1 *") + .build(); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onNext(DaprProtos.GetJobResponse.newBuilder() + .setJob(job) + .build()); + observer.onCompleted(); + return null; + }).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any()); + + Mono resultMono = previewClient.getJob(getJobRequest); + + GetJobResponse response = resultMono.block(); + assertNotNull(response); + assertEquals("testJob", response.getName()); + assertNull(response.getData()); + assertEquals("0 0 0 1 1 *", response.getSchedule().getExpression()); + assertNull(response.getRepeats()); + assertNull(response.getTtl()); + assertNull(response.getDueTime()); + } + + @Test + public void getJobShouldReturnResponseWithDueTimeSetWhenResponseHasDueTime() { + GetJobRequest getJobRequest = new GetJobRequest("testJob"); + + String datetime = OffsetDateTime.now().toString(); + DaprProtos.Job job = DaprProtos.Job.newBuilder() + .setName("testJob") + .setDueTime(datetime) + .build(); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onNext(DaprProtos.GetJobResponse.newBuilder() + .setJob(job) + .build()); + observer.onCompleted(); + return null; + }).when(daprStub).getJobAlpha1(any(DaprProtos.GetJobRequest.class), any()); + + Mono resultMono = previewClient.getJob(getJobRequest); + + GetJobResponse response = resultMono.block(); + assertNotNull(response); + assertEquals("testJob", response.getName()); + assertNull(response.getData()); + assertNull(response.getSchedule()); + assertNull(response.getRepeats()); + assertNull(response.getTtl()); + assertEquals(job.getDueTime(), datetime); + } + + @Test + public void getJobShouldThrowWhenRequestIsNull() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.getJob(null).block(); + }); + assertEquals("getJobRequest cannot be null", exception.getMessage()); + } + + @Test + public void getJobShouldThrowWhenNameIsNullRequest() { + GetJobRequest getJobRequest = new GetJobRequest(null); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.getJob(getJobRequest).block(); + }); + assertEquals("Name in the request cannot be null or empty", exception.getMessage()); + } + + @Test + public void getJobShouldThrowWhenNameIsEmptyRequest() { + GetJobRequest getJobRequest =new GetJobRequest("");; + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.getJob(getJobRequest).block(); + }); + assertEquals("Name in the request cannot be null or empty", exception.getMessage()); + } + + @Test + public void deleteJobShouldSucceedWhenValidRequest() { + DeleteJobRequest deleteJobRequest = new DeleteJobRequest("testJob"); + + doAnswer(invocation -> { + StreamObserver observer = invocation.getArgument(1); + observer.onCompleted(); // Simulate successful response + return null; + }).when(daprStub).deleteJobAlpha1(any(DaprProtos.DeleteJobRequest.class), any()); + + Mono resultMono = previewClient.deleteJob(deleteJobRequest); + + assertDoesNotThrow(() -> resultMono.block()); + } + + @Test + public void deleteJobShouldThrowRequestIsNull() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.deleteJob(null).block(); + }); + assertEquals("deleteJobRequest cannot be null", exception.getMessage()); + } + + @Test + public void deleteJobShouldThrowWhenNameIsNullRequest() { + DeleteJobRequest deleteJobRequest = new DeleteJobRequest(null); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.deleteJob(deleteJobRequest).block(); + }); + assertEquals("Name in the request cannot be null or empty", exception.getMessage()); + } + + @Test + public void deleteJobShouldThrowWhenNameIsEmptyRequest() { + DeleteJobRequest deleteJobRequest = new DeleteJobRequest(""); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + previewClient.deleteJob(deleteJobRequest).block(); + }); + assertEquals("Name in the request cannot be null or empty", exception.getMessage()); + } + + private DaprProtos.QueryStateResponse buildQueryStateResponse(List> resp,String token) + throws JsonProcessingException { + List items = new ArrayList<>(); + for (QueryStateItem item: resp) { + items.add(buildQueryStateItem(item)); + } + return DaprProtos.QueryStateResponse.newBuilder() + .addAllResults(items) + .setToken(token) + .build(); + } + + private DaprProtos.QueryStateItem buildQueryStateItem(QueryStateItem item) throws JsonProcessingException { + DaprProtos.QueryStateItem.Builder it = DaprProtos.QueryStateItem.newBuilder().setKey(item.getKey()); + if (item.getValue() != null) { + it.setData(ByteString.copyFrom(MAPPER.writeValueAsBytes(item.getValue()))); + } + if (item.getEtag() != null) { + it.setEtag(item.getEtag()); + } + if (item.getError() != null) { + it.setError(item.getError()); + } + return it.build(); + } + + private static StatusRuntimeException newStatusRuntimeException(String status, String message) { + return new StatusRuntimeException(Status.fromCode(Status.Code.valueOf(status)).withDescription(message)); + } } diff --git a/sdk/src/test/java/io/dapr/client/domain/JobsScheduleTest.java b/sdk/src/test/java/io/dapr/client/domain/JobsScheduleTest.java new file mode 100644 index 0000000000..8b71f28d51 --- /dev/null +++ b/sdk/src/test/java/io/dapr/client/domain/JobsScheduleTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class JobsScheduleTest { + + @Test + void testFromPeriodValidDuration() { + Duration duration = Duration.ofHours(1).plusMinutes(30) + .plusSeconds(15).plusMillis(500); + JobSchedule schedule = JobSchedule.fromPeriod(duration); + assertEquals("@every 1h30m15s500ms", schedule.getExpression()); + } + + @Test + void testFromPeriodValidDurationWithoutSecondsAndMillSeconds() { + Duration duration = Duration.ofHours(1).plusMinutes(30); + JobSchedule schedule = JobSchedule.fromPeriod(duration); + assertEquals("@every 1h30m0s0ms", schedule.getExpression()); + } + + @Test + void testFromPeriodNullDuration() { + Exception exception = assertThrows(IllegalArgumentException.class, () -> JobSchedule.fromPeriod(null)); + assertEquals("duration cannot be null", exception.getMessage()); + } + + @Test + void testFromStringThrowsIllegalArgumentWhenExpressionIsNull() { + Exception exception = assertThrows(IllegalArgumentException.class, () -> JobSchedule.fromString(null)); + assertEquals("cronExpression cannot be null", exception.getMessage()); + } + + @Test + void testFromString() { + String cronExpression = "0 0 * * *"; + JobSchedule schedule = JobSchedule.fromString(cronExpression); + assertEquals(cronExpression, schedule.getExpression()); + } + + @Test + void testYearly() { + JobSchedule schedule = JobSchedule.yearly(); + assertEquals("0 0 0 1 1 *", schedule.getExpression()); + } + + @Test + void testMonthly() { + JobSchedule schedule = JobSchedule.monthly(); + assertEquals("0 0 0 1 * *", schedule.getExpression()); + } + + @Test + void testWeekly() { + JobSchedule schedule = JobSchedule.weekly(); + assertEquals("0 0 0 * * 0", schedule.getExpression()); + } + + @Test + void testDaily() { + JobSchedule schedule = JobSchedule.daily(); + assertEquals("0 0 0 * * *", schedule.getExpression()); + } + + @Test + void testHourly() { + JobSchedule schedule = JobSchedule.hourly(); + assertEquals("0 0 * * * *", schedule.getExpression()); + } +} diff --git a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java index 145f61deaa..8959d91439 100644 --- a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java +++ b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java @@ -27,6 +27,7 @@ import org.testcontainers.containers.wait.strategy.WaitStrategy; import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; import org.yaml.snakeyaml.Yaml; import java.io.IOException; @@ -62,14 +63,18 @@ public class DaprContainer extends GenericContainer { private DaprLogLevel daprLogLevel = DaprLogLevel.INFO; private String appChannelAddress = "localhost"; private String placementService = "placement"; + private String schedulerService = "scheduler"; private String placementDockerImageName = "daprio/placement"; + private String schedulerDockerImageName = "daprio/scheduler"; private Configuration configuration; private DaprPlacementContainer placementContainer; + private DaprSchedulerContainer schedulerContainer; private String appName; private Integer appPort; private String appHealthCheckPath; private boolean shouldReusePlacement; + private boolean shouldReuseScheduler; /** * Creates a new Dapr container. @@ -157,8 +162,13 @@ public DaprContainer withPlacementImage(String placementDockerImageName) { return this; } - public DaprContainer withReusablePlacement(boolean reuse) { - this.shouldReusePlacement = reuse; + public DaprContainer withReusablePlacement(boolean shouldReusePlacement) { + this.shouldReusePlacement = shouldReusePlacement; + return this; + } + + public DaprContainer withReuseScheduler(boolean shouldReuseScheduler) { + this.shouldReuseScheduler = shouldReuseScheduler; return this; } @@ -167,6 +177,11 @@ public DaprContainer withPlacementContainer(DaprPlacementContainer placementCont return this; } + public DaprContainer withSchedulerContainer(DaprSchedulerContainer schedulerContainer) { + this.schedulerContainer = schedulerContainer; + return this; + } + public DaprContainer withComponent(Component component) { components.add(component); return this; @@ -237,6 +252,14 @@ protected void configure() { this.placementContainer.start(); } + if (this.schedulerContainer == null) { + this.schedulerContainer = new DaprSchedulerContainer(this.schedulerDockerImageName) + .withNetwork(getNetwork()) + .withNetworkAliases(schedulerService) + .withReuse(this.shouldReuseScheduler); + this.schedulerContainer.start(); + } + List cmds = new ArrayList<>(); cmds.add("./daprd"); cmds.add("--app-id"); @@ -246,6 +269,8 @@ protected void configure() { cmds.add(DAPR_PROTOCOL.getName()); cmds.add("--placement-host-address"); cmds.add(placementService + ":50005"); + cmds.add("--scheduler-host-address"); + cmds.add(schedulerService + ":51005"); if (appChannelAddress != null && !appChannelAddress.isEmpty()) { cmds.add("--app-channel-address"); @@ -324,7 +349,7 @@ protected void configure() { withCopyToContainer(Transferable.of(endpointYaml), "/dapr-resources/" + endpoint.getName() + ".yaml"); } - dependsOn(placementContainer); + dependsOn(placementContainer, schedulerContainer); } public String getAppName() { diff --git a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprSchedulerContainer.java b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprSchedulerContainer.java new file mode 100644 index 0000000000..bcac6a426e --- /dev/null +++ b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprSchedulerContainer.java @@ -0,0 +1,82 @@ +/* + * Copyright 2024 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.testcontainers; + +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.images.builder.Transferable; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; + +/** + * Test container for Dapr scheduler service. + */ +public class DaprSchedulerContainer extends GenericContainer { + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("daprio/scheduler"); + private int schedulerPort = 51005; + + /** + * Creates a new Dapr scheduler container. + * @param dockerImageName Docker image name. + */ + public DaprSchedulerContainer(DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + withExposedPorts(schedulerPort); + } + + /** + * Creates a new Dapr scheduler container. + * @param image Docker image name. + */ + public DaprSchedulerContainer(String image) { + this(DockerImageName.parse(image)); + } + + @Override + protected void configure() { + super.configure(); + + withCopyToContainer(Transferable.of("", 0777), "./default-dapr-scheduler-server-0/dapr-0.1/"); + withCopyToContainer(Transferable.of("", 0777), "./dapr-scheduler-existing-cluster/"); + withCommand("./scheduler", "--port", Integer.toString(schedulerPort), "--etcd-data-dir", "."); + } + + public static DockerImageName getDefaultImageName() { + return DEFAULT_IMAGE_NAME; + } + + public DaprSchedulerContainer withPort(Integer port) { + this.schedulerPort = port; + return this; + } + + public int getPort() { + return schedulerPort; + } + + // Required by spotbugs plugin + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } +}