diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java index aa01ba5a..fa7295b6 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/ExecutePicked.java @@ -29,6 +29,7 @@ import java.time.Instant; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; @SuppressWarnings("rawtypes") class ExecutePicked implements Runnable { @@ -61,39 +62,37 @@ public ExecutePicked(Executor executor, TaskRepository taskRepository, Scheduler public void run() { // FIXLATER: need to cleanup all the references back to scheduler fields final UUID executionId = executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock)); - try { - statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED); - executePickedExecution(pickedExecution); - } finally { - executor.removeCurrentlyProcessing(executionId); - } + statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED); + executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId)); } - private void executePickedExecution(Execution execution) { + private CompletableFuture executePickedExecution(Execution execution) { final Optional task = taskResolver.resolve(execution.taskInstance.getTaskName()); if (!task.isPresent()) { LOG.error("Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", execution.taskInstance.getTaskName()); statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR); - return; + return new CompletableFuture<>(); } Instant executionStarted = clock.now(); - try { - LOG.debug("Executing " + execution); - CompletionHandler completion = task.get().execute(execution.taskInstance, new ExecutionContext(schedulerState, execution, schedulerClient)); - LOG.debug("Execution done"); + LOG.debug("Executing " + execution); + CompletableFuture completableFuture = task.get().execute(execution.taskInstance, new ExecutionContext(schedulerState, execution, schedulerClient)); + return completableFuture.whenCompleteAsync((completion, ex) -> { + if (ex != null) { + if (ex instanceof RuntimeException) { + failure(task.get(), execution, ex, executionStarted, "Unhandled exception"); + statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); + } else { + failure(task.get(), execution, ex, executionStarted, "Error"); + statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); + } + return; + } + LOG.debug("Execution done"); complete(completion, execution, executionStarted); statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED); - - } catch (RuntimeException unhandledException) { - failure(task.get(), execution, unhandledException, executionStarted, "Unhandled exception"); - statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); - - } catch (Throwable unhandledError) { - failure(task.get(), execution, unhandledError, executionStarted, "Error"); - statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED); - } + }, executor.getExecutorService()); } private void complete(CompletionHandler completion, Execution execution, Instant executionStarted) { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java index 38283c87..281aa30a 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Executor.java @@ -99,4 +99,8 @@ public void removeCurrentlyProcessing(UUID executionId) { LOG.warn("Released execution was not found in collection of executions currently being processed. Should never happen. Execution-id: " + executionId); } } + + public ExecutorService getExecutorService() { + return executorService; + } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java index 9fa113f9..e56c9db7 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/ExecutionHandler.java @@ -15,6 +15,8 @@ */ package com.github.kagkarlsson.scheduler.task; +import java.util.concurrent.CompletableFuture; + public interface ExecutionHandler { - CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext); + CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/StateReturningExecutionHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/StateReturningExecutionHandler.java index fa2ae45a..dd50cc65 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/StateReturningExecutionHandler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/StateReturningExecutionHandler.java @@ -15,6 +15,8 @@ */ package com.github.kagkarlsson.scheduler.task; +import java.util.concurrent.CompletableFuture; + public interface StateReturningExecutionHandler { /** @@ -24,5 +26,5 @@ public interface StateReturningExecutionHandler { * @param executionContext * @return */ - T execute(TaskInstance taskInstance, ExecutionContext executionContext); + CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/VoidExecutionHandler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/VoidExecutionHandler.java index 9842b9e0..30ba3947 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/VoidExecutionHandler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/VoidExecutionHandler.java @@ -15,6 +15,8 @@ */ package com.github.kagkarlsson.scheduler.task; +import java.util.concurrent.CompletableFuture; + public interface VoidExecutionHandler { - void execute(TaskInstance taskInstance, ExecutionContext executionContext); + CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext); } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java index 4b58c040..633922f0 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/OneTimeTask.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.CompletableFuture; public abstract class OneTimeTask extends AbstractTask { @@ -44,12 +45,14 @@ public SchedulableInstance schedulableInstance(String id, T data) { } @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { - executeOnce(taskInstance, executionContext); - return new OnCompleteRemove<>(); + public CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext) { + CompletableFuture voidFuture = executeOnce(taskInstance, executionContext); + return voidFuture.thenApply((nextData) -> { + return new OnCompleteRemove<>(); + }); } - public abstract void executeOnce(TaskInstance taskInstance, ExecutionContext executionContext); + public abstract CompletableFuture executeOnce(TaskInstance taskInstance, ExecutionContext executionContext); @Override public String toString() { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java index 2570a165..1672473c 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/RecurringTask.java @@ -24,6 +24,7 @@ import java.time.Instant; import java.util.Optional; +import java.util.concurrent.CompletableFuture; public abstract class RecurringTask extends AbstractTask implements OnStartup { @@ -65,12 +66,15 @@ public void onStartup(SchedulerClient scheduler, Clock clock) { } @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { - executeRecurringly(taskInstance, executionContext); - return onComplete; + public CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext) { + CompletableFuture voidFuture = executeRecurringly(taskInstance, executionContext); + return voidFuture.thenApply((v) -> { + return onComplete; + }); + } - public abstract void executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext); + public abstract CompletableFuture executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext); public TaskInstanceId getDefaultTaskInstance() { return TaskInstanceId.of(name, INSTANCE); diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/Tasks.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/Tasks.java index e8a0ba4d..b544f5cd 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/Tasks.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/helper/Tasks.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; public class Tasks { @@ -96,8 +97,8 @@ public RecurringTask execute(VoidExecutionHandler executionHandler) { return new RecurringTask(name, schedule, dataClass, scheduleOnStartup, onFailure, onDeadExecution) { @Override - public void executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { - executionHandler.execute(taskInstance, executionContext); + public CompletableFuture executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { + return executionHandler.execute(taskInstance, executionContext); } }; } @@ -106,14 +107,18 @@ public RecurringTask executeStateful(StateReturningExecutionHandler execut return new RecurringTask(name, schedule, dataClass, scheduleOnStartup, onFailure, onDeadExecution) { @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { - final T nextData = executionHandler.execute(taskInstance, executionContext); - return new CompletionHandler.OnCompleteReschedule<>(schedule, nextData); + public CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext) { + final CompletableFuture nextDataFuture = executionHandler.execute(taskInstance, executionContext); + return nextDataFuture.thenApply((nextData) -> { + return new CompletionHandler.OnCompleteReschedule<>(schedule, nextData); + }); + } @Override - public void executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { + public CompletableFuture executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { // never called + return CompletableFuture.completedFuture(null); } }; } @@ -131,17 +136,16 @@ public RecurringTaskWithPersistentScheduleBuilder(String name, Class dataClas public RecurringTaskWithPersistentSchedule execute(VoidExecutionHandler executionHandler) { return new RecurringTaskWithPersistentSchedule(name, dataClass) { @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { - executionHandler.execute(taskInstance, executionContext); - - return (executionComplete, executionOperations) -> { - executionOperations.reschedule( + public CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext) { + CompletableFuture voidFuture = executionHandler.execute(taskInstance, executionContext); + return voidFuture.thenApply((v) -> { + return (CompletionHandler) (executionComplete, executionOperations) -> executionOperations.reschedule( executionComplete, taskInstance.getData().getSchedule().getNextExecutionTime(executionComplete) ); - }; + }); - } + }; }; } @@ -149,16 +153,15 @@ public RecurringTaskWithPersistentSchedule executeStateful(StateReturningExec return new RecurringTaskWithPersistentSchedule(name, dataClass) { @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { - final T nextData = executionHandler.execute(taskInstance, executionContext); - - return (executionComplete, executionOperations) -> { - executionOperations.reschedule( + public CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext) { + final CompletableFuture nextDataFuture = executionHandler.execute(taskInstance, executionContext); + return nextDataFuture.thenApply((nextData) -> { + return (CompletionHandler) (executionComplete, executionOperations) -> executionOperations.reschedule( executionComplete, nextData.getSchedule().getNextExecutionTime(executionComplete), nextData ); - }; + }); } }; } @@ -201,8 +204,8 @@ public OneTimeTaskBuilder onDeadExecution(DeadExecutionHandler deadExecuti public OneTimeTask execute(VoidExecutionHandler executionHandler) { return new OneTimeTask(name, dataClass, onFailure, onDeadExecution) { @Override - public void executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { - executionHandler.execute(taskInstance, executionContext); + public CompletableFuture executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { + return executionHandler.execute(taskInstance, executionContext); } }; } @@ -264,7 +267,7 @@ public TaskBuilder defaultExecutionTime(Function defaultExec public CustomTask execute(ExecutionHandler executionHandler) { return new CustomTask(name, dataClass, onStartup, defaultExecutionTime, onFailure, onDeadExecution) { @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { + public CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext) { return executionHandler.execute(taskInstance, executionContext); } }; diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java index ebb1e51a..4e8019d7 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ClusterTest.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -112,6 +113,7 @@ public void test_concurrency_recurring() throws InterruptedException { .execute((taskInstance, executionContext) -> { // do nothing // System.out.println(counter.incrementAndGet() + " " + Thread.currentThread().getName()); + return CompletableFuture.completedFuture(null); }); final TestTasks.SimpleStatsRegistry stats = new TestTasks.SimpleStatsRegistry(); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java index 4d43695f..fd91836b 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/DeadExecutionsTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME; import static org.hamcrest.MatcherAssert.assertThat; @@ -140,9 +141,11 @@ public NonCompletingTask(String name, Class dataClass, VoidExecutionHandler taskInstance, ExecutionContext executionContext) { - handler.execute(taskInstance, executionContext); - throw new RuntimeException("simulated unexpected exception"); + public CompletableFuture executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { + return handler.execute(taskInstance, executionContext) + .thenApply((v) -> { + throw new RuntimeException("simulated unexpected exception"); + }); } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ExecutionTest.java index 445e1d79..df745c7a 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ExecutionTest.java @@ -7,6 +7,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -23,7 +24,7 @@ public class ExecutionTest { @Test public void test_equals() { Instant now = Instant.now(); - OneTimeTask task = TestTasks.oneTime("OneTime", Void.class, (instance, executionContext) -> {}); + OneTimeTask task = TestTasks.oneTime("OneTime", Void.class, (instance, executionContext) -> CompletableFuture.completedFuture(null)); RecurringTask task2 = TestTasks.recurring("Recurring", FixedDelay.of(Duration.ofHours(1)), TestTasks.DO_NOTHING); assertEquals(new Execution(now, task.instance("id1")), new Execution(now, task.instance("id1"))); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ScheduledExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ScheduledExecutionTest.java index 125cbb81..95f967cc 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ScheduledExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/ScheduledExecutionTest.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -23,14 +24,14 @@ public void test_equals() { } private ScheduledExecution createExecution(String taskname, String id, Instant executionTime) { - OneTimeTask task = TestTasks.oneTime(taskname, Integer.class, (instance, executionContext) -> {}); + OneTimeTask task = TestTasks.oneTime(taskname, Integer.class, (instance, executionContext) -> CompletableFuture.completedFuture(null)); return new ScheduledExecution(Void.class, new Execution(executionTime, task.instance(id))); } @Test public void test_data_class_type_equals() { Instant now = Instant.now(); - OneTimeTask task = TestTasks.oneTime("OneTime", Integer.class, (instance, executionContext) -> {}); + OneTimeTask task = TestTasks.oneTime("OneTime", Integer.class, (instance, executionContext) -> CompletableFuture.completedFuture(null)); Execution execution = new Execution(now, task.instance("id1", new Integer(1))); ScheduledExecution scheduledExecution = new ScheduledExecution<>(Integer.class, execution); @@ -42,8 +43,7 @@ public void test_data_class_type_not_equals() { DataClassMismatchException dataClassMismatchException = assertThrows(DataClassMismatchException.class, () -> { Instant now = Instant.now(); - OneTimeTask task = TestTasks.oneTime("OneTime", Integer.class, (instance, executionContext) -> { - }); + OneTimeTask task = TestTasks.oneTime("OneTime", Integer.class, (instance, executionContext) -> CompletableFuture.completedFuture(null)); Execution execution = new Execution(now, task.instance("id1", new Integer(1))); // Data class is an integer new ScheduledExecution<>(String.class, execution).getData(); // Instantiate with incorrect type diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java index a79a2a4b..f77ed7c7 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerClientTest.java @@ -16,6 +16,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import static java.time.Duration.ofSeconds; @@ -65,16 +66,18 @@ public void setUp() { } @Test - public void client_should_be_able_to_schedule_executions() { + public void client_should_be_able_to_schedule_executions() throws InterruptedException { SchedulerClient client = SchedulerClient.Builder.create(DB.getDataSource()).build(); client.schedule(oneTimeTaskA.instance("1"), settableClock.now()); scheduler.runAnyDueExecutions(); + // Since execution is executed in an async way, we need to wait for a while to let the execution finish before asserting + Thread.sleep(1000); assertThat(onetimeTaskHandlerA.timesExecuted.get(), CoreMatchers.is(1)); } @Test - public void should_be_able_to_schedule_other_executions_from_an_executionhandler() { + public void should_be_able_to_schedule_other_executions_from_an_executionhandler() throws InterruptedException { scheduler.schedule(scheduleAnotherTask.instance("1"), settableClock.now()); scheduler.runAnyDueExecutions(); assertThat(scheduleAnother.timesExecuted, CoreMatchers.is(1)); @@ -82,6 +85,8 @@ public void should_be_able_to_schedule_other_executions_from_an_executionhandler scheduler.tick(ofSeconds(1)); scheduler.runAnyDueExecutions(); + // Since execution is executed in an async way, we need to wait for a while to let the execution finish before asserting + Thread.sleep(1000); assertThat(onetimeTaskHandlerA.timesExecuted.get(), CoreMatchers.is(1)); } @@ -155,9 +160,10 @@ public ScheduleAnotherTaskHandler(TaskInstance secondTask, Instant instant } @Override - public void execute(TaskInstance taskInstance, ExecutionContext executionContext) { + public CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext) { executionContext.getSchedulerClient().schedule(secondTask, instant); this.timesExecuted++; + return CompletableFuture.completedFuture(null); } } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java index c4d7ac27..e953cf3d 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/SchedulerTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -72,7 +73,7 @@ private Scheduler schedulerFor(ExecutorService executor, Task ... tasks) { } @Test - public void scheduler_should_execute_task_when_exactly_due() { + public void scheduler_should_execute_task_when_exactly_due() throws InterruptedException { OneTimeTask oneTimeTask = TestTasks.oneTime("OneTime", Void.class, handler); Scheduler scheduler = schedulerFor(oneTimeTask); @@ -84,11 +85,13 @@ public void scheduler_should_execute_task_when_exactly_due() { clock.set(executionTime); scheduler.executeDue(); + // Since execution is executed in an async way, we need to wait for a while to let the execution finish before asserting + Thread.sleep(1000); assertThat(handler.timesExecuted.get(), is(1)); } @Test - public void scheduler_should_execute_rescheduled_task_when_exactly_due() { + public void scheduler_should_execute_rescheduled_task_when_exactly_due() throws InterruptedException { OneTimeTask oneTimeTask = TestTasks.oneTime("OneTime", Void.class, handler); Scheduler scheduler = schedulerFor(oneTimeTask); @@ -107,6 +110,8 @@ public void scheduler_should_execute_rescheduled_task_when_exactly_due() { clock.set(reScheduledExecutionTime); scheduler.executeDue(); + // Since execution is executed in an async way, we need to wait for a while to let the execution finish before asserting + Thread.sleep(1000); assertThat(handler.timesExecuted.get(), is(1)); } @@ -129,18 +134,21 @@ public void scheduler_should_not_execute_canceled_tasks() { } @Test - public void scheduler_should_execute_recurring_task_and_reschedule() { + public void scheduler_should_execute_recurring_task_and_reschedule() throws InterruptedException { RecurringTask recurringTask = TestTasks.recurring("Recurring", FixedDelay.of(ofHours(1)), handler); Scheduler scheduler = schedulerFor(recurringTask); scheduler.schedule(recurringTask.instance("single"), clock.now()); scheduler.executeDue(); - + // Since execution is executed in an async way, we need to wait for a while to let the execution finish before asserting + Thread.sleep(1000); assertThat(handler.timesExecuted.get(), is(1)); Instant nextExecutionTime = clock.now().plus(ofHours(1)); clock.set(nextExecutionTime); scheduler.executeDue(); + // Since execution is executed in an async way, we need to wait for a while to let the execution finish before asserting + Thread.sleep(1000); assertThat(handler.timesExecuted.get(), is(2)); } @@ -166,16 +174,16 @@ public void scheduler_should_track_duration() throws InterruptedException { public void should_expose_cause_of_failure_to_completion_handler() throws InterruptedException { TestTasks.ResultRegisteringFailureHandler failureHandler = new TestTasks.ResultRegisteringFailureHandler<>(); Task oneTimeTask = ComposableTask.customTask("cause-testing-task", Void.class, TestTasks.REMOVE_ON_COMPLETE, failureHandler, - (inst, ctx) -> { throw new RuntimeException("Failed!");}); + (inst, ctx) -> { return CompletableFuture.runAsync(() -> { throw new RuntimeException("Failed!"); });}); Scheduler scheduler = schedulerFor(oneTimeTask); scheduler.schedule(oneTimeTask.instance("1"), clock.now()); scheduler.executeDue(); -// failureHandler.waitForNotify.await(); + failureHandler.waitForNotify.await(); assertThat(failureHandler.result, is(ExecutionComplete.Result.FAILED)); - assertThat(failureHandler.cause.get().getMessage(), is("Failed!")); + assertThat(failureHandler.cause.get().getMessage(), is("java.lang.RuntimeException: Failed!")); } @@ -187,8 +195,8 @@ public void should_only_attempt_task_when_max_retries_handler_used() throws Inte OneTimeTask oneTimeTask = Tasks.oneTime("max-retries-task") .onFailure(failureHandler) .execute((inst, ctx) -> { - handler.execute(inst, ctx); - throw new RuntimeException("Failed!"); + return handler.execute(inst, ctx) + .thenRun(() -> { throw new RuntimeException("Failed!"); }); }); Scheduler scheduler = schedulerFor(oneTimeTask); @@ -214,10 +222,12 @@ public void should_reschedule_failure_on_exponential_backoff_with_default_rate() OneTimeTask oneTimeTask = Tasks.oneTime("exponential-defaults-task") .onFailure(new FailureHandler.ExponentialBackoffFailureHandler<>(expectedSleepDuration)) .execute((inst, ctx) -> { - executionTimes.add(ctx.getExecution().executionTime); - if(executionTimes.size() < 10){ - throw new RuntimeException("Failed!"); - } + return CompletableFuture.runAsync(() -> { + executionTimes.add(ctx.getExecution().executionTime); + if(executionTimes.size() < 10){ + throw new RuntimeException("Failed!"); + } + }); }); Scheduler scheduler = schedulerFor(oneTimeTask); @@ -254,10 +264,12 @@ public void should_reschedule_failure_on_exponential_backoff_with_defined_rate() OneTimeTask oneTimeTask = Tasks.oneTime("exponential-custom-rate-task") .onFailure(new FailureHandler.ExponentialBackoffFailureHandler<>(expectedSleepDuration, customRate)) .execute((inst, ctx) -> { - executionTimes.add(ctx.getExecution().executionTime); - if(executionTimes.size() < 10){ - throw new RuntimeException("Failed!"); - } + return CompletableFuture.runAsync(() -> { + executionTimes.add(ctx.getExecution().executionTime); + if(executionTimes.size() < 10){ + throw new RuntimeException("Failed!"); + } + }); }); Scheduler scheduler = schedulerFor(oneTimeTask); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/TestTasks.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/TestTasks.java index e2278a8c..0516d061 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/TestTasks.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/TestTasks.java @@ -13,6 +13,7 @@ import com.github.kagkarlsson.scheduler.task.schedule.FixedDelay; import java.time.Duration; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.LoggerFactory; @@ -20,13 +21,15 @@ public class TestTasks { public static final CompletionHandler REMOVE_ON_COMPLETE = new CompletionHandler.OnCompleteRemove<>(); - public static final VoidExecutionHandler DO_NOTHING = (taskInstance, executionContext) -> {}; + public static final VoidExecutionHandler DO_NOTHING = (taskInstance, executionContext) -> { + return CompletableFuture.completedFuture(null); + }; public static OneTimeTask oneTime(String name, Class dataClass, VoidExecutionHandler handler) { return new OneTimeTask(name, dataClass) { @Override - public void executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { - handler.execute(taskInstance, executionContext); + public CompletableFuture executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { + return handler.execute(taskInstance, executionContext); } }; } @@ -34,8 +37,8 @@ public void executeOnce(TaskInstance taskInstance, ExecutionContext execution public static OneTimeTask oneTimeWithType(String name, Class dataClass, VoidExecutionHandler handler) { return new OneTimeTask(name, dataClass) { @Override - public void executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { - handler.execute(taskInstance, executionContext); + public CompletableFuture executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { + return handler.execute(taskInstance, executionContext); } }; } @@ -43,8 +46,8 @@ public void executeOnce(TaskInstance taskInstance, ExecutionContext execution public static RecurringTask recurring(String name, FixedDelay schedule, VoidExecutionHandler handler) { return new RecurringTask(name, schedule, Void.class) { @Override - public void executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { - handler.execute(taskInstance, executionContext); + public CompletableFuture executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { + return handler.execute(taskInstance, executionContext); } }; } @@ -52,8 +55,8 @@ public void executeRecurringly(TaskInstance taskInstance, ExecutionContext public static RecurringTask recurringWithData(String name, Class dataClass, T initialData, FixedDelay schedule, VoidExecutionHandler handler) { return new RecurringTask(name, schedule, dataClass, initialData) { @Override - public void executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { - handler.execute(taskInstance, executionContext); + public CompletableFuture executeRecurringly(TaskInstance taskInstance, ExecutionContext executionContext) { + return handler.execute(taskInstance, executionContext); } }; } @@ -98,13 +101,15 @@ public CountingHandler(Duration wait) { } @Override - public void execute(TaskInstance taskInstance, ExecutionContext executionContext) { - this.timesExecuted.incrementAndGet(); - try { - Thread.sleep(wait.toMillis()); - } catch (InterruptedException e) { - LoggerFactory.getLogger(CountingHandler.class).info("Interrupted."); - } + public CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext) { + return CompletableFuture.runAsync(() -> { + this.timesExecuted.incrementAndGet(); + try { + Thread.sleep(wait.toMillis()); + } catch (InterruptedException e) { + LoggerFactory.getLogger(CountingHandler.class).info("Interrupted."); + } + }); } } @@ -117,12 +122,14 @@ public WaitingHandler() { } @Override - public void execute(TaskInstance taskInstance, ExecutionContext executionContext) { - try { - waitForNotify.await(); - } catch (InterruptedException e) { - LoggerFactory.getLogger(WaitingHandler.class).info("Interrupted."); - } + public CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext) { + return CompletableFuture.runAsync(() -> { + try { + waitForNotify.await(); + } catch (InterruptedException e) { + LoggerFactory.getLogger(WaitingHandler.class).info("Interrupted."); + } + }); } } @@ -137,13 +144,15 @@ public PausingHandler() { } @Override - public void execute(TaskInstance taskInstance, ExecutionContext executionContext) { - try { - waitForExecute.countDown(); - waitInExecuteUntil.await(); - } catch (InterruptedException e) { - LoggerFactory.getLogger(WaitingHandler.class).info("Interrupted."); - } + public CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext) { + return CompletableFuture.runAsync(() -> { + try { + waitForExecute.countDown(); + waitInExecuteUntil.await(); + } catch (InterruptedException e) { + LoggerFactory.getLogger(WaitingHandler.class).info("Interrupted."); + } + }); } } @@ -156,19 +165,22 @@ public SleepingHandler(int millis) { } @Override - public void execute(TaskInstance taskInstance, ExecutionContext executionContext) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - LoggerFactory.getLogger(WaitingHandler.class).info("Interrupted."); - } + public CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext) { + return CompletableFuture.runAsync(() -> { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + LoggerFactory.getLogger(WaitingHandler.class).info("Interrupted."); + } + }); } } public static class DoNothingHandler implements VoidExecutionHandler { @Override - public void execute(TaskInstance taskInstance, ExecutionContext executionContext) { + public CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext) { + return CompletableFuture.completedFuture(null); } } @@ -176,8 +188,9 @@ public static class SavingHandler implements VoidExecutionHandler { public T savedData; @Override - public void execute(TaskInstance taskInstance, ExecutionContext executionContext) { + public CompletableFuture execute(TaskInstance taskInstance, ExecutionContext executionContext) { savedData = taskInstance.getData(); + return CompletableFuture.completedFuture(null); } } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeadExecutionTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeadExecutionTest.java index 608d8c0a..c7f20f4a 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeadExecutionTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DeadExecutionTest.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import static com.github.kagkarlsson.scheduler.stats.StatsRegistry.SchedulerStatsEvent; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -32,12 +33,12 @@ public class DeadExecutionTest { public void test_dead_execution() { Assertions.assertTimeoutPreemptively(Duration.ofSeconds(5), () -> { CustomTask customTask = Tasks.custom("custom-a", Void.class) - .execute((taskInstance, executionContext) -> new CompletionHandler() { + .execute((taskInstance, executionContext) -> CompletableFuture.supplyAsync(() -> new CompletionHandler() { @Override public void complete(ExecutionComplete executionComplete, ExecutionOperations executionOperations) { //do nothing on complete, row will be left as-is in database } - }); + })); TestableRegistry.Condition completedCondition = TestableRegistry.Conditions.completed(2); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DynamicRecurringTaskTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DynamicRecurringTaskTest.java index 89b7818a..f7b177cd 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DynamicRecurringTaskTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/DynamicRecurringTaskTest.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static co.unruly.matchers.OptionalMatchers.contains; import static java.util.Collections.singletonList; @@ -57,6 +58,7 @@ public void should_schedule_multiple_instances_with_different_schedules() { final RecurringTaskWithPersistentSchedule task = Tasks.recurringWithPersistentSchedule(taskName, PlainScheduleAndData.class) .execute((taskInstance, executionContext) -> { + return CompletableFuture.completedFuture(null); }); ManualScheduler scheduler = manualSchedulerFor(singletonList(task)); @@ -77,16 +79,18 @@ public void should_schedule_multiple_instances_with_different_schedules() { } @Test - public void should_support_statechanging_tasks() { + public void should_support_statechanging_tasks() throws InterruptedException { final PersistentFixedDelaySchedule scheduleAndData1 = new PersistentFixedDelaySchedule(Schedules.fixedDelay(Duration.ofSeconds(10)), 1); final String taskName = "dynamic-recurring"; final RecurringTaskWithPersistentSchedule task = Tasks.recurringWithPersistentSchedule(taskName, PersistentFixedDelaySchedule.class) .executeStateful((taskInstance, executionContext) -> { - final PersistentFixedDelaySchedule persistentFixedDelaySchedule = taskInstance.getData().returnIncremented(); - System.out.println(persistentFixedDelaySchedule); - return persistentFixedDelaySchedule; + return CompletableFuture.supplyAsync(() -> { + final PersistentFixedDelaySchedule persistentFixedDelaySchedule = taskInstance.getData().returnIncremented(); + System.out.println(persistentFixedDelaySchedule); + return persistentFixedDelaySchedule; + }); }); ManualScheduler scheduler = manualSchedulerFor(singletonList(task)); @@ -97,7 +101,8 @@ public void should_support_statechanging_tasks() { assertScheduled(scheduler, task.instanceId("id1"), clock.now(), scheduleAndData1); // FixedDelay has initial execution-time now() scheduler.runAnyDueExecutions(); - + // Since execution is executed in an async way, we need to wait for a while to let the execution finish before asserting + Thread.sleep(1000); assertScheduled(scheduler, task.instanceId("id1"), clock.now().plus(Duration.ofSeconds(10)), scheduleAndData1.returnIncremented()); } diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/RecurringTaskTest.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/RecurringTaskTest.java index 50eaec33..bb761000 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/RecurringTaskTest.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/functional/RecurringTaskTest.java @@ -21,6 +21,7 @@ import java.time.ZonedDateTime; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static co.unruly.matchers.OptionalMatchers.contains; import static java.util.Collections.singletonList; @@ -120,7 +121,7 @@ public void should_update_preexisting_exeutions_with_new_deterministic_schedule_ public void should_not_update_data_of_preexisting_exeutions_even_if_rescheduling_because_of_updated_schedule() { RecurringTask recurringTask = Tasks.recurring(RECURRING_A, Schedules.daily(LocalTime.of(23, 59)), Integer.class) .initialData(1) - .execute((taskInstance, executionContext) -> {}); + .execute((taskInstance, executionContext) -> CompletableFuture.completedFuture(null)); ManualScheduler scheduler = manualSchedulerFor(singletonList(recurringTask)); @@ -133,7 +134,7 @@ public void should_not_update_data_of_preexisting_exeutions_even_if_rescheduling LocalTime.of(12, 0), LocalTime.of(23, 59)), Integer.class) .initialData(2) - .execute((taskInstance, executionContext) -> {}); + .execute((taskInstance, executionContext) -> CompletableFuture.completedFuture(null)); ManualScheduler schedulerUpdatedTask = manualSchedulerFor(singletonList(recurringTaskNewSchedule)); diff --git a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java index bef9c146..baa50608 100644 --- a/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java +++ b/db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/task/helper/ComposableTask.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.CompletableFuture; @Deprecated public class ComposableTask { @@ -27,8 +28,8 @@ public class ComposableTask { public static OneTimeTask onetimeTask(String name, Class dataClass, VoidExecutionHandler executionHandler) { return new OneTimeTask(name, dataClass) { @Override - public void executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { - executionHandler.execute(taskInstance, executionContext); + public CompletableFuture executeOnce(TaskInstance taskInstance, ExecutionContext executionContext) { + return executionHandler.execute(taskInstance, executionContext); } }; } @@ -46,9 +47,9 @@ public SchedulableInstance schedulableInstance(String id, T data) { } @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { - executionHandler.execute(taskInstance, executionContext); - return completionHandler; + public CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext) { + return executionHandler.execute(taskInstance, executionContext) + .thenApply((v) -> completionHandler); } }; } @@ -66,9 +67,9 @@ public SchedulableInstance schedulableInstance(String id, T data) { } @Override - public CompletionHandler execute(TaskInstance taskInstance, ExecutionContext executionContext) { - executionHandler.execute(taskInstance, executionContext); - return completionHandler; + public CompletableFuture> execute(TaskInstance taskInstance, ExecutionContext executionContext) { + return executionHandler.execute(taskInstance, executionContext) + .thenApply((v) -> completionHandler); } }; }