Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance db-scheduler to support asynchronous task execution #304

Open
wants to merge 4 commits into
base: async
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

@SuppressWarnings("rawtypes")
class ExecutePicked implements Runnable {
Expand Down Expand Up @@ -61,39 +62,37 @@ public ExecutePicked(Executor executor, TaskRepository taskRepository, Scheduler
public void run() {
// FIXLATER: need to cleanup all the references back to scheduler fields
final UUID executionId = executor.addCurrentlyProcessing(new CurrentlyExecuting(pickedExecution, clock));
try {
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
executePickedExecution(pickedExecution);
} finally {
executor.removeCurrentlyProcessing(executionId);
}
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId));
}

private void executePickedExecution(Execution execution) {
private CompletableFuture<CompletionHandler> executePickedExecution(Execution execution) {
final Optional<Task> task = taskResolver.resolve(execution.taskInstance.getTaskName());
if (!task.isPresent()) {
LOG.error("Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.", execution.taskInstance.getTaskName());
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
return;
return new CompletableFuture<>();
}

Instant executionStarted = clock.now();
try {
LOG.debug("Executing " + execution);
CompletionHandler completion = task.get().execute(execution.taskInstance, new ExecutionContext(schedulerState, execution, schedulerClient));
LOG.debug("Execution done");
LOG.debug("Executing " + execution);
CompletableFuture<CompletionHandler> completableFuture = task.get().execute(execution.taskInstance, new ExecutionContext(schedulerState, execution, schedulerClient));

return completableFuture.whenCompleteAsync((completion, ex) -> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure what happens here. What will the significance of whenCompleteAsync be here?

Will it happen in parallell with executor.removeCurrentlyProcessing here?

executePickedExecution(pickedExecution).whenComplete((c, ex) -> executor.removeCurrentlyProcessing(executionId));

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using whenCompleteAsync to associate the action to the db-scheduler executor explicitly (otherwise, it might use the common forkjoinpool).
I dont think it ll execute in parallel to removeCurrentlyProcessing since *Async API only enables to specify the executor service for the action.
ref link: https://www.linkedin.com/pulse/asynchronous-programming-java-completablefuture-aliaksandr-liakh/
please let me know in case I am off.

if (ex != null) {
if (ex instanceof RuntimeException) {
failure(task.get(), execution, ex, executionStarted, "Unhandled exception");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
} else {
failure(task.get(), execution, ex, executionStarted, "Error");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
}
return;
}
LOG.debug("Execution done");
complete(completion, execution, executionStarted);
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);

} catch (RuntimeException unhandledException) {
failure(task.get(), execution, unhandledException, executionStarted, "Unhandled exception");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);

} catch (Throwable unhandledError) {
failure(task.get(), execution, unhandledError, executionStarted, "Error");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
}
}, executor.getExecutorService());
}

private void complete(CompletionHandler completion, Execution execution, Instant executionStarted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,8 @@ public void removeCurrentlyProcessing(UUID executionId) {
LOG.warn("Released execution was not found in collection of executions currently being processed. Should never happen. Execution-id: " + executionId);
}
}

public ExecutorService getExecutorService() {
return executorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.github.kagkarlsson.scheduler.task;

import java.util.concurrent.CompletableFuture;

public interface ExecutionHandler<T> {
CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext);
CompletableFuture<CompletionHandler<T>> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.github.kagkarlsson.scheduler.task;

import java.util.concurrent.CompletableFuture;

public interface StateReturningExecutionHandler<T> {

/**
Expand All @@ -24,5 +26,5 @@ public interface StateReturningExecutionHandler<T> {
* @param executionContext
* @return
*/
T execute(TaskInstance<T> taskInstance, ExecutionContext executionContext);
CompletableFuture<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.github.kagkarlsson.scheduler.task;

import java.util.concurrent.CompletableFuture;

public interface VoidExecutionHandler<T> {
void execute(TaskInstance<T> taskInstance, ExecutionContext executionContext);
CompletableFuture<Void> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;

public abstract class OneTimeTask<T> extends AbstractTask<T> {

Expand All @@ -44,12 +45,14 @@ public SchedulableInstance<T> schedulableInstance(String id, T data) {
}

@Override
public CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
executeOnce(taskInstance, executionContext);
return new OnCompleteRemove<>();
public CompletableFuture<CompletionHandler<T>> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
CompletableFuture<Void> voidFuture = executeOnce(taskInstance, executionContext);
return voidFuture.thenApply((nextData) -> {
return new OnCompleteRemove<>();
});
}

public abstract void executeOnce(TaskInstance<T> taskInstance, ExecutionContext executionContext);
public abstract CompletableFuture<Void> executeOnce(TaskInstance<T> taskInstance, ExecutionContext executionContext);

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public abstract class RecurringTask<T> extends AbstractTask<T> implements OnStartup {

Expand Down Expand Up @@ -65,12 +66,15 @@ public void onStartup(SchedulerClient scheduler, Clock clock) {
}

@Override
public CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
executeRecurringly(taskInstance, executionContext);
return onComplete;
public CompletableFuture<CompletionHandler<T>> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
CompletableFuture<Void> voidFuture = executeRecurringly(taskInstance, executionContext);
return voidFuture.thenApply((v) -> {
return onComplete;
});

}

public abstract void executeRecurringly(TaskInstance<T> taskInstance, ExecutionContext executionContext);
public abstract CompletableFuture<Void> executeRecurringly(TaskInstance<T> taskInstance, ExecutionContext executionContext);

public TaskInstanceId getDefaultTaskInstance() {
return TaskInstanceId.of(name, INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class Tasks {
Expand Down Expand Up @@ -96,8 +97,8 @@ public RecurringTask<T> execute(VoidExecutionHandler<T> executionHandler) {
return new RecurringTask<T>(name, schedule, dataClass, scheduleOnStartup, onFailure, onDeadExecution) {

@Override
public void executeRecurringly(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
executionHandler.execute(taskInstance, executionContext);
public CompletableFuture<Void> executeRecurringly(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
return executionHandler.execute(taskInstance, executionContext);
}
};
}
Expand All @@ -106,14 +107,18 @@ public RecurringTask<T> executeStateful(StateReturningExecutionHandler<T> execut
return new RecurringTask<T>(name, schedule, dataClass, scheduleOnStartup, onFailure, onDeadExecution) {

@Override
public CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
final T nextData = executionHandler.execute(taskInstance, executionContext);
return new CompletionHandler.OnCompleteReschedule<>(schedule, nextData);
public CompletableFuture<CompletionHandler<T>> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
final CompletableFuture<T> nextDataFuture = executionHandler.execute(taskInstance, executionContext);
return nextDataFuture.thenApply((nextData) -> {
return new CompletionHandler.OnCompleteReschedule<>(schedule, nextData);
});

}

@Override
public void executeRecurringly(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
public CompletableFuture<Void> executeRecurringly(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
// never called
return CompletableFuture.completedFuture(null);
}
};
}
Expand All @@ -131,34 +136,32 @@ public RecurringTaskWithPersistentScheduleBuilder(String name, Class<T> dataClas
public RecurringTaskWithPersistentSchedule<T> execute(VoidExecutionHandler<T> executionHandler) {
return new RecurringTaskWithPersistentSchedule<T>(name, dataClass) {
@Override
public CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
executionHandler.execute(taskInstance, executionContext);

return (executionComplete, executionOperations) -> {
executionOperations.reschedule(
public CompletableFuture<CompletionHandler<T>> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
CompletableFuture<Void> voidFuture = executionHandler.execute(taskInstance, executionContext);
return voidFuture.thenApply((v) -> {
return (CompletionHandler<T>) (executionComplete, executionOperations) -> executionOperations.reschedule(
executionComplete,
taskInstance.getData().getSchedule().getNextExecutionTime(executionComplete)
);
};
});

}
};
};
}

public RecurringTaskWithPersistentSchedule<T> executeStateful(StateReturningExecutionHandler<T> executionHandler) {
return new RecurringTaskWithPersistentSchedule<T>(name, dataClass) {

@Override
public CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
final T nextData = executionHandler.execute(taskInstance, executionContext);

return (executionComplete, executionOperations) -> {
executionOperations.reschedule(
public CompletableFuture<CompletionHandler<T>> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
final CompletableFuture<T> nextDataFuture = executionHandler.execute(taskInstance, executionContext);
return nextDataFuture.thenApply((nextData) -> {
return (CompletionHandler<T>) (executionComplete, executionOperations) -> executionOperations.reschedule(
executionComplete,
nextData.getSchedule().getNextExecutionTime(executionComplete),
nextData
);
};
});
}
};
}
Expand Down Expand Up @@ -201,8 +204,8 @@ public OneTimeTaskBuilder<T> onDeadExecution(DeadExecutionHandler<T> deadExecuti
public OneTimeTask<T> execute(VoidExecutionHandler<T> executionHandler) {
return new OneTimeTask<T>(name, dataClass, onFailure, onDeadExecution) {
@Override
public void executeOnce(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
executionHandler.execute(taskInstance, executionContext);
public CompletableFuture<Void> executeOnce(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
return executionHandler.execute(taskInstance, executionContext);
}
};
}
Expand Down Expand Up @@ -264,7 +267,7 @@ public TaskBuilder<T> defaultExecutionTime(Function<Instant,Instant> defaultExec
public CustomTask<T> execute(ExecutionHandler<T> executionHandler) {
return new CustomTask<T>(name, dataClass, onStartup, defaultExecutionTime, onFailure, onDeadExecution) {
@Override
public CompletionHandler<T> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
public CompletableFuture<CompletionHandler<T>> execute(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
return executionHandler.execute(taskInstance, executionContext);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -112,6 +113,7 @@ public void test_concurrency_recurring() throws InterruptedException {
.execute((taskInstance, executionContext) -> {
// do nothing
// System.out.println(counter.incrementAndGet() + " " + Thread.currentThread().getName());
return CompletableFuture.completedFuture(null);
});

final TestTasks.SimpleStatsRegistry stats = new TestTasks.SimpleStatsRegistry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -140,9 +141,11 @@ public NonCompletingTask(String name, Class<T> dataClass, VoidExecutionHandler<T
}

@Override
public void executeOnce(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
handler.execute(taskInstance, executionContext);
throw new RuntimeException("simulated unexpected exception");
public CompletableFuture<Void> executeOnce(TaskInstance<T> taskInstance, ExecutionContext executionContext) {
return handler.execute(taskInstance, executionContext)
.thenApply((v) -> {
throw new RuntimeException("simulated unexpected exception");
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -23,7 +24,7 @@ public class ExecutionTest {
@Test
public void test_equals() {
Instant now = Instant.now();
OneTimeTask<Void> task = TestTasks.oneTime("OneTime", Void.class, (instance, executionContext) -> {});
OneTimeTask<Void> task = TestTasks.oneTime("OneTime", Void.class, (instance, executionContext) -> CompletableFuture.completedFuture(null));
RecurringTask<Void> task2 = TestTasks.recurring("Recurring", FixedDelay.of(Duration.ofHours(1)), TestTasks.DO_NOTHING);

assertEquals(new Execution(now, task.instance("id1")), new Execution(now, task.instance("id1")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand All @@ -23,14 +24,14 @@ public void test_equals() {
}

private ScheduledExecution<Void> createExecution(String taskname, String id, Instant executionTime) {
OneTimeTask<Integer> task = TestTasks.oneTime(taskname, Integer.class, (instance, executionContext) -> {});
OneTimeTask<Integer> task = TestTasks.oneTime(taskname, Integer.class, (instance, executionContext) -> CompletableFuture.completedFuture(null));
return new ScheduledExecution<Void>(Void.class, new Execution(executionTime, task.instance(id)));
}

@Test
public void test_data_class_type_equals() {
Instant now = Instant.now();
OneTimeTask<Integer> task = TestTasks.oneTime("OneTime", Integer.class, (instance, executionContext) -> {});
OneTimeTask<Integer> task = TestTasks.oneTime("OneTime", Integer.class, (instance, executionContext) -> CompletableFuture.completedFuture(null));
Execution execution = new Execution(now, task.instance("id1", new Integer(1)));

ScheduledExecution<Integer> scheduledExecution = new ScheduledExecution<>(Integer.class, execution);
Expand All @@ -42,8 +43,7 @@ public void test_data_class_type_not_equals() {
DataClassMismatchException dataClassMismatchException = assertThrows(DataClassMismatchException.class, () -> {

Instant now = Instant.now();
OneTimeTask<Integer> task = TestTasks.oneTime("OneTime", Integer.class, (instance, executionContext) -> {
});
OneTimeTask<Integer> task = TestTasks.oneTime("OneTime", Integer.class, (instance, executionContext) -> CompletableFuture.completedFuture(null));
Execution execution = new Execution(now, task.instance("id1", new Integer(1))); // Data class is an integer

new ScheduledExecution<>(String.class, execution).getData(); // Instantiate with incorrect type
Expand Down
Loading