Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to name timers / scheduled jobs #2911

Merged
merged 1 commit into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.time.ZonedDateTime;

import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.xtext.xbase.XExpression;
import org.eclipse.xtext.xbase.lib.Procedures.Procedure0;
import org.eclipse.xtext.xbase.lib.Procedures.Procedure1;
Expand Down Expand Up @@ -76,11 +77,25 @@ public static Object callScript(String scriptName) throws ScriptExecutionExcepti
* @throws ScriptExecutionException if an error occurs during the execution
*/
public static Timer createTimer(ZonedDateTime instant, Procedure0 closure) {
return createTimer(null, instant, closure);
}

/**
* Schedules a block of code for later execution.
*
* @param identifier an optional identifier
* @param instant the point in time when the code should be executed
* @param closure the code block to execute
*
* @return a handle to the created timer, so that it can be canceled or rescheduled
* @throws ScriptExecutionException if an error occurs during the execution
*/
public static Timer createTimer(@Nullable String identifier, ZonedDateTime instant, Procedure0 closure) {
Scheduler scheduler = ScriptServiceUtil.getScheduler();

return new TimerImpl(scheduler, instant, () -> {
closure.apply();
});
}, identifier);
}

/**
Expand All @@ -94,11 +109,25 @@ public static Timer createTimer(ZonedDateTime instant, Procedure0 closure) {
* @throws ScriptExecutionException if an error occurs during the execution
*/
public static Timer createTimerWithArgument(ZonedDateTime instant, Object arg1, Procedure1<Object> closure) {
return createTimerWithArgument(null, instant, arg1, closure);
}

/**
* Schedules a block of code (with argument) for later execution
*
* @param identifier an optional identifier
* @param instant the point in time when the code should be executed
* @param arg1 the argument to pass to the code block
* @param closure the code block to execute
*
* @return a handle to the created timer, so that it can be canceled or rescheduled
* @throws ScriptExecutionException if an error occurs during the execution
*/
public static Timer createTimerWithArgument(@Nullable String identifier, ZonedDateTime instant, Object arg1, Procedure1<Object> closure) {
Scheduler scheduler = ScriptServiceUtil.getScheduler();

return new TimerImpl(scheduler, instant, () -> {
closure.apply(arg1);
});
}, identifier);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@ public class TimerImpl implements Timer {
private final Scheduler scheduler;
private final ZonedDateTime startTime;
private final SchedulerRunnable runnable;
private final @Nullable String identifier;
private ScheduledCompletableFuture<?> future;

public TimerImpl(Scheduler scheduler, ZonedDateTime startTime, SchedulerRunnable runnable) {
this(scheduler, startTime, runnable, null);
}

public TimerImpl(Scheduler scheduler, ZonedDateTime startTime, SchedulerRunnable runnable, @Nullable String identifier) {
this.scheduler = scheduler;
this.startTime = startTime;
this.runnable = runnable;
this.identifier = identifier;

future = scheduler.schedule(runnable, startTime.toInstant());
future = scheduler.schedule(runnable, identifier, startTime.toInstant());
}

@Override
Expand Down
9 changes: 9 additions & 0 deletions bundles/org.openhab.core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@

<name>openHAB Core :: Bundles :: Core</name>

<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.scheduler.ScheduledCompletableFuture;
import org.openhab.core.scheduler.Scheduler;
import org.openhab.core.scheduler.SchedulerRunnable;
Expand Down Expand Up @@ -96,6 +97,12 @@ public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, Te
return add(delegate.schedule(runnable, temporalAdjuster));
}

@Override
public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, @Nullable String identifier,
TemporalAdjuster temporalAdjuster) {
return add(delegate.schedule(runnable, identifier, temporalAdjuster));
}

private <T> ScheduledCompletableFuture<T> add(ScheduledCompletableFuture<T> t) {
synchronized (scheduledJobs) {
scheduledJobs.add(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAdjuster;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -67,7 +68,7 @@ public ScheduledCompletableFuture<Instant> after(Duration duration) {

@Override
public <T> ScheduledCompletableFuture<T> after(Callable<T> callable, Duration duration) {
return afterInternal(new ScheduledCompletableFutureOnce<>(duration), callable);
return afterInternal(new ScheduledCompletableFutureOnce<>(null, duration), callable);
}

private <T> ScheduledCompletableFutureOnce<T> afterInternal(ScheduledCompletableFutureOnce<T> deferred,
Expand All @@ -89,7 +90,8 @@ private <T> ScheduledCompletableFutureOnce<T> afterInternal(ScheduledCompletable
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.warn("Scheduled job failed and stopped", e);
logger.warn("Scheduled job '{}' failed and stopped",
Objects.requireNonNullElse(deferred.identifier, "<unknown>"), e);
deferred.completeExceptionally(e);
}
}, duration, TimeUnit.MILLISECONDS);
Expand All @@ -114,7 +116,7 @@ public <T> ScheduledCompletableFuture<T> before(CompletableFuture<T> promise, Du
runnable.run();
}
};
final ScheduledCompletableFutureOnce<T> wrappedPromise = new ScheduledCompletableFutureOnce<>(timeout);
final ScheduledCompletableFutureOnce<T> wrappedPromise = new ScheduledCompletableFutureOnce<>(null, timeout);
Callable<T> callable = () -> {
wrappedPromise.completeExceptionally(new TimeoutException());
return null;
Expand Down Expand Up @@ -144,7 +146,7 @@ public ScheduledCompletableFuture<Instant> at(Instant instant) {
@Override
public <T> ScheduledCompletableFuture<T> at(Callable<T> callable, Instant instant) {
return atInternal(
new ScheduledCompletableFutureOnce<>(ZonedDateTime.ofInstant(instant, ZoneId.systemDefault())),
new ScheduledCompletableFutureOnce<>(null, ZonedDateTime.ofInstant(instant, ZoneId.systemDefault())),
callable);
}

Expand All @@ -155,25 +157,30 @@ private <T> ScheduledCompletableFuture<T> atInternal(ScheduledCompletableFutureO

@Override
public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, TemporalAdjuster temporalAdjuster) {
final ScheduledCompletableFutureRecurring<T> schedule = new ScheduledCompletableFutureRecurring<>(
ZonedDateTime.now());
return schedule(runnable, null, temporalAdjuster);
}

schedule(schedule, runnable, temporalAdjuster);
@Override
public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, @Nullable String identifier,
TemporalAdjuster temporalAdjuster) {
final ScheduledCompletableFutureRecurring<T> schedule = new ScheduledCompletableFutureRecurring<>(identifier,
ZonedDateTime.now());
schedule(schedule, runnable, identifier, temporalAdjuster);
return schedule;
}

private <T> void schedule(ScheduledCompletableFutureRecurring<T> recurringSchedule, SchedulerRunnable runnable,
TemporalAdjuster temporalAdjuster) {
@Nullable String identifier, TemporalAdjuster temporalAdjuster) {
final Temporal newTime = recurringSchedule.getScheduledTime().with(temporalAdjuster);
final ScheduledCompletableFutureOnce<T> deferred = new ScheduledCompletableFutureOnce<>(
final ScheduledCompletableFutureOnce<T> deferred = new ScheduledCompletableFutureOnce<>(identifier,
ZonedDateTime.from(newTime));

deferred.thenAccept(v -> {
if (temporalAdjuster instanceof SchedulerTemporalAdjuster) {
final SchedulerTemporalAdjuster schedulerTemporalAdjuster = (SchedulerTemporalAdjuster) temporalAdjuster;

if (!schedulerTemporalAdjuster.isDone(newTime)) {
schedule(recurringSchedule, runnable, temporalAdjuster);
schedule(recurringSchedule, runnable, identifier, temporalAdjuster);
return;
}
}
Expand All @@ -195,9 +202,10 @@ private <T> void schedule(ScheduledCompletableFutureRecurring<T> recurringSchedu
*/
private static class ScheduledCompletableFutureRecurring<T> extends ScheduledCompletableFutureOnce<T> {
private @Nullable volatile ScheduledCompletableFuture<T> scheduledPromise;
private @Nullable String identifier;

public ScheduledCompletableFutureRecurring(ZonedDateTime scheduledTime) {
super(scheduledTime);
public ScheduledCompletableFutureRecurring(@Nullable String identifier, ZonedDateTime scheduledTime) {
super(identifier, scheduledTime);
exceptionally(e -> {
synchronized (this) {
if (e instanceof CancellationException) {
Expand Down Expand Up @@ -245,12 +253,14 @@ public ZonedDateTime getScheduledTime() {
private static class ScheduledCompletableFutureOnce<T> extends CompletableFuture<T>
implements ScheduledCompletableFuture<T> {
private ZonedDateTime scheduledTime;
private @Nullable String identifier;

public ScheduledCompletableFutureOnce(Duration duration) {
this(ZonedDateTime.now().plusNanos(duration.toNanos()));
public ScheduledCompletableFutureOnce(@Nullable String identifier, Duration duration) {
this(identifier, ZonedDateTime.now().plusNanos(duration.toNanos()));
}

public ScheduledCompletableFutureOnce(ZonedDateTime scheduledTime) {
public ScheduledCompletableFutureOnce(@Nullable String identifier, ZonedDateTime scheduledTime) {
this.identifier = identifier;
this.scheduledTime = scheduledTime;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,17 @@ public interface Scheduler {
* @return A {@link ScheduledCompletableFuture}
*/
<T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable callable, TemporalAdjuster temporalAdjuster);

/**
* Schedules the callable once or repeating using the temporalAdjuster to determine the
* time the callable should run. Runs until the job is cancelled or if the temporalAdjuster
* method {@link SchedulerTemporalAdjuster#isDone()) returns true.
*
* @param callable Provides the result
* @param an optional identifier for this job
* @param temporalAdjuster the temperalAdjuster to return the time the callable should run
* @return A {@link ScheduledCompletableFuture}
*/
<T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable callable, @Nullable String identifier,
TemporalAdjuster temporalAdjuster);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
import org.openhab.core.scheduler.ScheduledCompletableFuture;
import org.openhab.core.scheduler.SchedulerRunnable;
import org.openhab.core.scheduler.SchedulerTemporalAdjuster;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;

/**
* Test class for {@link SchedulerImpl}.
Expand Down Expand Up @@ -222,24 +228,41 @@ public void testScheduleCancel() throws InterruptedException {
@Test
@Timeout(value = 15, unit = TimeUnit.SECONDS)
public void testScheduleException() throws InterruptedException {
// add logging interceptor
Logger logger = (Logger) LoggerFactory.getLogger(SchedulerImpl.class);
logger.setLevel(Level.DEBUG);
ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
listAppender.start();
logger.addAppender(listAppender);

Semaphore s = new Semaphore(0);
TestSchedulerWithCounter temporalAdjuster = new TestSchedulerWithCounter();
SchedulerRunnable runnable = () -> {
// Pass a exception not very likely thrown by the scheduler it self to avoid missing real exceptions.
// Pass an exception not very likely thrown by the scheduler itself to avoid missing real exceptions.
throw new FileNotFoundException("testBeforeTimeoutException");
};

ScheduledCompletableFuture<@Nullable Void> schedule = scheduler.schedule(runnable, temporalAdjuster);
ScheduledCompletableFuture<@Nullable Void> schedule = scheduler.schedule(runnable, "myScheduledJob",
temporalAdjuster);
schedule.getPromise().exceptionally(e -> {
if (e instanceof FileNotFoundException) {
s.release();
}
return null;
});

s.acquire(1);
Thread.sleep(1000); // wait a little longer to see if not more are scheduled.
Thread.sleep(300); // wait a little longer to see if not more are scheduled.

logger.detachAppender(listAppender);

assertEquals(0, s.availablePermits(), "Scheduler should not have released more after cancel");
assertEquals(0, temporalAdjuster.getCount(), "Scheduler should have run 0 time");

assertEquals(1, listAppender.list.size());
ILoggingEvent loggingEvent = listAppender.list.get(0);
assertEquals(Level.WARN, loggingEvent.getLevel());
assertEquals("Scheduled job 'myScheduledJob' failed and stopped", loggingEvent.getFormattedMessage());
}

@Test
Expand Down