Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SimpleScheduler - fix CronTrigger#evaluate() #17763

Merged
merged 1 commit into from
Jun 8, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@Singleton
public class SimpleScheduler implements Scheduler {

private static final Logger LOGGER = Logger.getLogger(SimpleScheduler.class);
private static final Logger LOG = Logger.getLogger(SimpleScheduler.class);

// milliseconds
private static final long CHECK_PERIOD = 1000L;
Expand All @@ -65,10 +65,10 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule

if (!schedulerRuntimeConfig.enabled) {
this.scheduledExecutor = null;
LOGGER.info("Simple scheduler is disabled by config property and will not be started");
LOG.info("Simple scheduler is disabled by config property and will not be started");
} else if (context.getScheduledMethods().isEmpty()) {
this.scheduledExecutor = null;
LOGGER.info("No scheduled business methods found - Simple scheduler will not be started");
LOG.info("No scheduled business methods found - Simple scheduler will not be started");
} else {
this.scheduledExecutor = new JBossScheduledThreadPoolExecutor(1, new Runnable() {
@Override
Expand Down Expand Up @@ -118,16 +118,17 @@ void stop() {
scheduledExecutor.shutdownNow();
}
} catch (Exception e) {
LOGGER.warn("Unable to shutdown the scheduler executor", e);
LOG.warn("Unable to shutdown the scheduler executor", e);
}
}

void checkTriggers() {
if (!running) {
LOGGER.trace("Skip all triggers - scheduler paused");
LOG.trace("Skip all triggers - scheduler paused");
return;
}
ZonedDateTime now = ZonedDateTime.now();
LOG.tracef("Check triggers at %s", now);
for (ScheduledTask task : scheduledTasks) {
task.execute(now, executor);
}
Expand All @@ -136,7 +137,7 @@ void checkTriggers() {
@Override
public void pause() {
if (!enabled) {
LOGGER.warn("Scheduler is disabled and cannot be paused");
LOG.warn("Scheduler is disabled and cannot be paused");
} else {
running = false;
}
Expand All @@ -146,7 +147,7 @@ public void pause() {
public void pause(String identity) {
Objects.requireNonNull(identity, "Cannot pause - identity is null");
if (identity.isEmpty()) {
LOGGER.warn("Cannot pause - identity is empty");
LOG.warn("Cannot pause - identity is empty");
return;
}
String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity);
Expand All @@ -161,7 +162,7 @@ public void pause(String identity) {
@Override
public void resume() {
if (!enabled) {
LOGGER.warn("Scheduler is disabled and cannot be resumed");
LOG.warn("Scheduler is disabled and cannot be resumed");
} else {
running = true;
}
Expand All @@ -171,7 +172,7 @@ public void resume() {
public void resume(String identity) {
Objects.requireNonNull(identity, "Cannot resume - identity is null");
if (identity.isEmpty()) {
LOGGER.warn("Cannot resume - identity is empty");
LOG.warn("Cannot resume - identity is empty");
return;
}
String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity);
Expand Down Expand Up @@ -250,13 +251,12 @@ public void run() {
try {
invoker.invoke(new SimpleScheduledExecution(now, scheduledFireTime, trigger));
} catch (Throwable t) {
LOGGER.errorf(t, "Error occured while executing task for trigger %s", trigger);
LOG.errorf(t, "Error occured while executing task for trigger %s", trigger);
}
}
});
LOGGER.debugf("Executing scheduled task for trigger %s", trigger);
} catch (RejectedExecutionException e) {
LOGGER.warnf("Rejected execution of a scheduled task for trigger %s", trigger);
LOG.warnf("Rejected execution of a scheduled task for trigger %s", trigger);
}
}
}
Expand All @@ -268,6 +268,7 @@ static abstract class SimpleTrigger implements Trigger {
private final String id;
private volatile boolean running;
protected final ZonedDateTime start;
protected volatile ZonedDateTime lastFireTime;

public SimpleTrigger(String id, ZonedDateTime start) {
this.id = id;
Expand Down Expand Up @@ -298,8 +299,8 @@ public synchronized void setRunning(boolean running) {

static class IntervalTrigger extends SimpleTrigger {

// milliseconds
private final long interval;
private volatile ZonedDateTime lastFireTime;

public IntervalTrigger(String id, ZonedDateTime start, long interval) {
super(id, start);
Expand All @@ -316,9 +317,11 @@ ZonedDateTime evaluate(ZonedDateTime now) {
lastFireTime = now.truncatedTo(ChronoUnit.SECONDS);
return now;
}
if (ChronoUnit.MILLIS.between(lastFireTime, now) >= interval) {
long diff = ChronoUnit.MILLIS.between(lastFireTime, now);
if (diff >= interval) {
ZonedDateTime scheduledFireTime = lastFireTime.plus(Duration.ofMillis(interval));
lastFireTime = now.truncatedTo(ChronoUnit.SECONDS);
LOG.tracef("%s fired, diff=%s ms", this, diff);
return scheduledFireTime;
}
return null;
Expand All @@ -345,16 +348,14 @@ public String toString() {

static class CronTrigger extends SimpleTrigger {

// microseconds
private static final long DIFF_THRESHOLD = CHECK_PERIOD * 1000;

private final Cron cron;
private final ExecutionTime executionTime;

public CronTrigger(String id, ZonedDateTime start, Cron cron) {
super(id, start);
this.cron = cron;
this.executionTime = ExecutionTime.forCron(cron);
this.lastFireTime = ZonedDateTime.now();
}

@Override
Expand All @@ -373,17 +374,13 @@ ZonedDateTime evaluate(ZonedDateTime now) {
if (now.isBefore(start)) {
return null;
}
Optional<ZonedDateTime> lastFireTime = executionTime.lastExecution(now);
if (lastFireTime.isPresent()) {
ZonedDateTime trunc = lastFireTime.get().truncatedTo(ChronoUnit.SECONDS);
if (now.isBefore(trunc)) {
return null;
}
// Use microseconds precision to workaround incompatibility between jdk8 and jdk9+
long diff = ChronoUnit.MICROS.between(trunc, now);
if (diff <= DIFF_THRESHOLD) {
LOGGER.debugf("%s fired, diff=%s μs", this, diff);
return trunc;
Optional<ZonedDateTime> lastExecution = executionTime.lastExecution(now);
if (lastExecution.isPresent()) {
ZonedDateTime lastTruncated = lastExecution.get().truncatedTo(ChronoUnit.SECONDS);
if (now.isAfter(lastTruncated) && lastFireTime.isBefore(lastTruncated)) {
LOG.tracef("%s fired, last=", this, lastTruncated);
lastFireTime = now;
return lastTruncated;
}
}
return null;
Expand Down