diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/DisabledSchedulerTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/DisabledSchedulerTest.java index 725479588688a..701f183f0845b 100644 --- a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/DisabledSchedulerTest.java +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/DisabledSchedulerTest.java @@ -1,8 +1,8 @@ package io.quarkus.quartz.test; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; @@ -31,14 +31,19 @@ public class DisabledSchedulerTest { "application.properties")); @Test - public void testNoSchedulerInvocations() throws InterruptedException { + public void testSchedulerInvocations() throws InterruptedException { + assertNotNull(scheduler); + assertFalse(scheduler.isStarted()); assertFalse(scheduler.isRunning()); - assertTrue(quartzScheduler.isResolvable()); - try { - quartzScheduler.get(); - fail(); - } catch (IllegalStateException expected) { - } + assertNotNull(scheduler.implementation()); + assertThrows(UnsupportedOperationException.class, () -> scheduler.newJob("foo")); + assertThrows(UnsupportedOperationException.class, () -> scheduler.unscheduleJob("foo")); + assertThrows(UnsupportedOperationException.class, () -> scheduler.pause()); + assertThrows(UnsupportedOperationException.class, () -> scheduler.pause("foo")); + assertThrows(UnsupportedOperationException.class, () -> scheduler.resume()); + assertThrows(UnsupportedOperationException.class, () -> scheduler.resume("foo")); + assertThrows(UnsupportedOperationException.class, () -> scheduler.getScheduledJobs()); + assertThrows(UnsupportedOperationException.class, () -> scheduler.getScheduledJob("bar")); } static class Jobs { diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index 8e6d0c620ace5..e67a8c9f41ac9 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -76,6 +76,7 @@ import io.quarkus.scheduler.SuccessfulExecution; import io.quarkus.scheduler.Trigger; import io.quarkus.scheduler.common.runtime.AbstractJobDefinition; +import io.quarkus.scheduler.common.runtime.BaseScheduler; import io.quarkus.scheduler.common.runtime.CronParser; import io.quarkus.scheduler.common.runtime.DefaultInvoker; import io.quarkus.scheduler.common.runtime.Events; @@ -87,7 +88,6 @@ import io.quarkus.scheduler.runtime.SchedulerConfig; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; -import io.quarkus.scheduler.runtime.SimpleScheduler; import io.quarkus.scheduler.spi.JobInstrumenter; import io.smallrye.mutiny.Uni; import io.vertx.core.Vertx; @@ -99,32 +99,18 @@ */ @Typed({ QuartzScheduler.class, Scheduler.class }) @Singleton -public class QuartzSchedulerImpl implements QuartzScheduler { +public class QuartzSchedulerImpl extends BaseScheduler implements QuartzScheduler { private static final Logger LOGGER = Logger.getLogger(QuartzSchedulerImpl.class.getName()); private static final String INVOKER_KEY = "invoker"; private final org.quartz.Scheduler scheduler; - private final Vertx vertx; private final boolean startHalted; private final Duration shutdownWaitTime; - private final boolean enabled; - private final CronParser cronParser; - private final Duration defaultOverdueGracePeriod; private final Map scheduledTasks = new ConcurrentHashMap<>(); - private final Event skippedExecutionEvent; - private final Event successExecutionEvent; - private final Event failedExecutionEvent; - private final Event delayedExecutionEvent; - private final Event schedulerPausedEvent; - private final Event schedulerResumedEvent; - private final Event scheduledJobPausedEvent; - private final Event scheduledJobResumedEvent; private final QuartzRuntimeConfig runtimeConfig; private final SchedulerConfig schedulerConfig; - private final Instance jobInstrumenter; private final StoreType storeType; - private final ScheduledExecutorService blockingExecutor; public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport, SchedulerRuntimeConfig schedulerRuntimeConfig, @@ -136,23 +122,14 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport Instance jobs, Instance userTransaction, Vertx vertx, SchedulerConfig schedulerConfig, Instance jobInstrumenter, ScheduledExecutorService blockingExecutor) { + super(vertx, new CronParser(context.getCronType()), schedulerRuntimeConfig.overdueGracePeriod, + new Events(skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, delayedExecutionEvent, + schedulerPausedEvent, schedulerResumedEvent, scheduledJobPausedEvent, scheduledJobResumedEvent), + jobInstrumenter, blockingExecutor); this.shutdownWaitTime = quartzSupport.getRuntimeConfig().shutdownWaitTime; - this.skippedExecutionEvent = skippedExecutionEvent; - this.successExecutionEvent = successExecutionEvent; - this.failedExecutionEvent = failedExecutionEvent; - this.delayedExecutionEvent = delayedExecutionEvent; - this.schedulerPausedEvent = schedulerPausedEvent; - this.schedulerResumedEvent = schedulerResumedEvent; - this.scheduledJobPausedEvent = scheduledJobPausedEvent; - this.scheduledJobResumedEvent = scheduledJobResumedEvent; this.runtimeConfig = quartzSupport.getRuntimeConfig(); - this.enabled = schedulerRuntimeConfig.enabled; - this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod; this.schedulerConfig = schedulerConfig; - this.jobInstrumenter = jobInstrumenter; this.storeType = quartzSupport.getBuildTimeConfig().storeType; - this.vertx = vertx; - this.blockingExecutor = blockingExecutor; StartMode startMode = initStartMode(schedulerRuntimeConfig, runtimeConfig); @@ -182,14 +159,12 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport .collect(Collectors.joining(", "))); } - cronParser = new CronParser(context.getCronType()); - JobInstrumenter instrumenter = null; if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { instrumenter = jobInstrumenter.get(); } - if (!enabled) { + if (!schedulerRuntimeConfig.enabled) { LOGGER.info("Quartz scheduler is disabled by config property and will not be started"); this.scheduler = null; } else if (!forceStart && context.getScheduledMethods(Scheduled.QUARTZ).isEmpty() @@ -240,11 +215,11 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) { } ScheduledInvoker invoker = context.createInvoker(method.getInvokerClassName()); - invoker = SimpleScheduler.initInvoker( + invoker = initInvoker( invoker, - skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, delayedExecutionEvent, + events, scheduled.concurrentExecution(), - SimpleScheduler.initSkipPredicate(scheduled.skipExecutionIf()), instrumenter, vertx, + initSkipPredicate(scheduled.skipExecutionIf()), instrumenter, vertx, invoker.isBlocking() && runtimeConfig.runBlockingScheduledMethodOnQuartzThread, SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor); @@ -346,6 +321,11 @@ public org.quartz.Scheduler getScheduler() { return scheduler; } + @Override + public boolean isStarted() { + return scheduler != null; + } + @Override public String implementation() { return Scheduled.QUARTZ; @@ -353,22 +333,24 @@ public String implementation() { @Override public void pause() { - if (!enabled) { - LOGGER.warn("Quartz Scheduler is disabled and cannot be paused"); - } else { - try { - if (scheduler != null) { - scheduler.standby(); - Events.fire(schedulerPausedEvent, SchedulerPaused.INSTANCE); - } - } catch (SchedulerException e) { - throw new RuntimeException("Unable to pause scheduler", e); + if (!isStarted()) { + throw notStarted(); + } + try { + if (scheduler != null) { + scheduler.standby(); + events.fireSchedulerPaused(); } + } catch (SchedulerException e) { + throw new RuntimeException("Unable to pause scheduler", e); } } @Override public void pause(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity, "Cannot pause - identity is null"); if (identity.isEmpty()) { LOGGER.warn("Cannot pause - identity is empty"); @@ -379,7 +361,7 @@ public void pause(String identity) { QuartzTrigger trigger = scheduledTasks.get(parsedIdentity); if (trigger != null) { scheduler.pauseJob(new JobKey(parsedIdentity, Scheduler.class.getName())); - Events.fire(scheduledJobPausedEvent, new ScheduledJobPaused(trigger)); + events.fireScheduledJobPaused(new ScheduledJobPaused(trigger)); } } catch (SchedulerException e) { throw new RuntimeException("Unable to pause job", e); @@ -388,6 +370,9 @@ public void pause(String identity) { @Override public boolean isPaused(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity); if (identity.isEmpty()) { return false; @@ -417,22 +402,24 @@ public boolean isPaused(String identity) { @Override public void resume() { - if (!enabled) { - LOGGER.warn("Quartz Scheduler is disabled and cannot be resumed"); - } else { - try { - if (scheduler != null) { - scheduler.start(); - Events.fire(schedulerResumedEvent, SchedulerResumed.INSTANCE); - } - } catch (SchedulerException e) { - throw new RuntimeException("Unable to resume scheduler", e); + if (!isStarted()) { + throw notStarted(); + } + try { + if (scheduler != null) { + scheduler.start(); + events.fireSchedulerResumed(); } + } catch (SchedulerException e) { + throw new RuntimeException("Unable to resume scheduler", e); } } @Override public void resume(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity, "Cannot resume - identity is null"); if (identity.isEmpty()) { LOGGER.warn("Cannot resume - identity is empty"); @@ -443,7 +430,7 @@ public void resume(String identity) { QuartzTrigger trigger = scheduledTasks.get(parsedIdentity); if (trigger != null) { scheduler.resumeJob(new JobKey(SchedulerUtils.lookUpPropertyValue(parsedIdentity), Scheduler.class.getName())); - Events.fire(scheduledJobResumedEvent, new ScheduledJobResumed(trigger)); + events.fireScheduledJobResumed(new ScheduledJobResumed(trigger)); } } catch (SchedulerException e) { throw new RuntimeException("Unable to resume job", e); @@ -452,7 +439,7 @@ public void resume(String identity) { @Override public boolean isRunning() { - if (!enabled || scheduler == null) { + if (!isStarted()) { return false; } else { try { @@ -465,11 +452,17 @@ public boolean isRunning() { @Override public List getScheduledJobs() { + if (!isStarted()) { + throw notStarted(); + } return List.copyOf(scheduledTasks.values()); } @Override public Trigger getScheduledJob(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity); if (identity.isEmpty()) { return null; @@ -479,6 +472,9 @@ public Trigger getScheduledJob(String identity) { @Override public JobDefinition newJob(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity); if (scheduledTasks.containsKey(identity)) { throw new IllegalStateException("A job with this identity is already scheduled: " + identity); @@ -488,6 +484,9 @@ public JobDefinition newJob(String identity) { @Override public Trigger unscheduleJob(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity); if (!identity.isEmpty()) { String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity); @@ -1010,10 +1009,8 @@ public boolean isBlocking() { if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { instrumenter = jobInstrumenter.get(); } - invoker = SimpleScheduler.initInvoker(invoker, skippedExecutionEvent, successExecutionEvent, - failedExecutionEvent, delayedExecutionEvent, scheduled.concurrentExecution(), skipPredicate, instrumenter, - vertx, - task != null && runtimeConfig.runBlockingScheduledMethodOnQuartzThread, + invoker = initInvoker(invoker, events, scheduled.concurrentExecution(), skipPredicate, instrumenter, + vertx, task != null && runtimeConfig.runBlockingScheduledMethodOnQuartzThread, SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor); QuartzTrigger quartzTrigger = new QuartzTrigger(trigger.getKey(), new Function<>() { diff --git a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java index b66d871fa61b9..d0d2467c160cc 100644 --- a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java +++ b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java @@ -17,7 +17,18 @@ public interface Scheduler { /** - * Pause the scheduler. No triggers are fired. + * By default, the scheduler is not started unless a {@link Scheduled} business method is discovered. However, it is + * possible to set the {@code forced} start mode with the {@code quarkus.scheduler.start-mode} configuration property. In + * this case, the scheduler will be started even if no scheduled business methods are found. + * + * @return {@code true} if the scheduler was started, {@code false} otherwise + */ + boolean isStarted(); + + /** + * Pause the scheduler. No triggers are fired when a scheduler is paused. + * + * @throws UnsupportedOperationException If the scheduler was not started */ void pause(); @@ -26,11 +37,14 @@ public interface Scheduler { * * @param identity * @see Scheduled#identity() + * @throws UnsupportedOperationException If the scheduler was not started */ void pause(String identity); /** * Resume the scheduler. Triggers can be fired again. + * + * @throws UnsupportedOperationException If the scheduler was not started */ void resume(); @@ -39,6 +53,7 @@ public interface Scheduler { * * @param identity * @see Scheduled#identity() + * @throws UnsupportedOperationException If the scheduler was not started */ void resume(String identity); @@ -52,21 +67,28 @@ public interface Scheduler { * @return {@code true} if the job with the given identity is paused, {@code false} otherwise * @see Scheduled#identity() * @see #pause(String) + * @throws UnsupportedOperationException If the scheduler was not started */ boolean isPaused(String identity); /** - * @return {@code true} if a scheduler is running the triggers are fired and jobs are executed, {@code false} otherwise + * @return {@code true} if a scheduler is started the triggers are fired and jobs are executed, {@code false} otherwise + * @see #pause() + * @see #resume() */ boolean isRunning(); /** - * @return an immutable list of scheduled jobs represented by their trigger. + * + * @return an immutable list of scheduled jobs represented by their trigger + * @throws UnsupportedOperationException If the scheduler was not started */ List getScheduledJobs(); /** - * @return the trigger of a specific job or null for non-existent identity. + * + * @return the trigger of a specific job or null for non-existent identity + * @throws UnsupportedOperationException If the scheduler was not started */ Trigger getScheduledJob(String identity); @@ -79,6 +101,7 @@ public interface Scheduler { * @param identity The identity must be unique for the scheduler * @return a new job definition * @see Scheduled#identity() + * @throws UnsupportedOperationException If the scheduler was not started */ JobDefinition newJob(String identity); @@ -89,6 +112,7 @@ public interface Scheduler { * * @param identity * @return the trigger or {@code null} if no such job exists + * @throws UnsupportedOperationException If the scheduler was not started */ Trigger unscheduleJob(String identity); diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/BaseScheduler.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/BaseScheduler.java new file mode 100644 index 0000000000000..fc262496c163f --- /dev/null +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/BaseScheduler.java @@ -0,0 +1,70 @@ +package io.quarkus.scheduler.common.runtime; + +import java.time.Duration; +import java.util.OptionalLong; +import java.util.concurrent.ScheduledExecutorService; + +import jakarta.enterprise.inject.Instance; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduled.ConcurrentExecution; +import io.quarkus.scheduler.Scheduled.SkipPredicate; +import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; +import io.quarkus.scheduler.spi.JobInstrumenter; +import io.vertx.core.Vertx; + +public class BaseScheduler { + + protected final Vertx vertx; + protected final CronParser cronParser; + protected final Duration defaultOverdueGracePeriod; + protected final Events events; + protected final Instance jobInstrumenter; + protected final ScheduledExecutorService blockingExecutor; + + public BaseScheduler(Vertx vertx, CronParser cronParser, + Duration defaultOverdueGracePeriod, Events events, Instance jobInstrumenter, + ScheduledExecutorService blockingExecutor) { + this.vertx = vertx; + this.cronParser = cronParser; + this.defaultOverdueGracePeriod = defaultOverdueGracePeriod; + this.events = events; + this.jobInstrumenter = jobInstrumenter; + this.blockingExecutor = blockingExecutor; + } + + protected UnsupportedOperationException notStarted() { + return new UnsupportedOperationException("Scheduler was not started"); + } + + protected ScheduledInvoker initInvoker(ScheduledInvoker invoker, Events events, + ConcurrentExecution concurrentExecution, Scheduled.SkipPredicate skipPredicate, JobInstrumenter instrumenter, + Vertx vertx, boolean skipOffloadingInvoker, + OptionalLong delay, ScheduledExecutorService blockingExecutor) { + invoker = new StatusEmitterInvoker(invoker, events.successExecution, events.failedExecution); + if (concurrentExecution == ConcurrentExecution.SKIP) { + invoker = new SkipConcurrentExecutionInvoker(invoker, events.skippedExecution); + } + if (skipPredicate != null) { + invoker = new SkipPredicateInvoker(invoker, skipPredicate, events.skippedExecution); + } + if (instrumenter != null) { + invoker = new InstrumentedInvoker(invoker, instrumenter); + } + if (!skipOffloadingInvoker) { + invoker = new OffloadingInvoker(invoker, vertx); + } + if (delay.isPresent()) { + invoker = new DelayedExecutionInvoker(invoker, delay.getAsLong(), blockingExecutor, events.delayedExecution); + } + return invoker; + } + + protected Scheduled.SkipPredicate initSkipPredicate(Class predicateClass) { + if (predicateClass.equals(Scheduled.Never.class)) { + return null; + } + return SchedulerUtils.instantiateBeanOrClass(predicateClass); + } + +} diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/Events.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/Events.java index 511b07ecc5ec8..60f30504883d2 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/Events.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/Events.java @@ -7,11 +7,57 @@ import org.jboss.logging.Logger; +import io.quarkus.scheduler.DelayedExecution; +import io.quarkus.scheduler.FailedExecution; +import io.quarkus.scheduler.ScheduledJobPaused; +import io.quarkus.scheduler.ScheduledJobResumed; +import io.quarkus.scheduler.SchedulerPaused; +import io.quarkus.scheduler.SchedulerResumed; +import io.quarkus.scheduler.SkippedExecution; +import io.quarkus.scheduler.SuccessfulExecution; + public final class Events { private static final Logger LOG = Logger.getLogger(Events.class); - private Events() { + public final Event skippedExecution; + public final Event successExecution; + public final Event failedExecution; + public final Event delayedExecution; + public final Event schedulerPaused; + public final Event schedulerResumed; + public final Event scheduledJobPaused; + public final Event scheduledJobResumed; + + public Events(Event skippedExecution, Event successExecution, + Event failedExecution, Event delayedExecution, + Event schedulerPaused, Event schedulerResumed, + Event scheduledJobPaused, Event scheduledJobResumed) { + super(); + this.skippedExecution = skippedExecution; + this.successExecution = successExecution; + this.failedExecution = failedExecution; + this.delayedExecution = delayedExecution; + this.schedulerPaused = schedulerPaused; + this.schedulerResumed = schedulerResumed; + this.scheduledJobPaused = scheduledJobPaused; + this.scheduledJobResumed = scheduledJobResumed; + } + + public void fireSchedulerPaused() { + fire(schedulerPaused, SchedulerPaused.INSTANCE); + } + + public void fireSchedulerResumed() { + fire(schedulerResumed, SchedulerResumed.INSTANCE); + } + + public void fireScheduledJobPaused(ScheduledJobPaused payload) { + fire(scheduledJobPaused, payload); + } + + public void fireScheduledJobResumed(ScheduledJobResumed payload) { + fire(scheduledJobResumed, payload); } /** diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/DisabledSchedulerTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/DisabledSchedulerTest.java index 7dd3aebdcc7a8..39156c53eaad4 100644 --- a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/DisabledSchedulerTest.java +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/DisabledSchedulerTest.java @@ -1,6 +1,8 @@ package io.quarkus.scheduler.test; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import jakarta.inject.Inject; @@ -16,7 +18,7 @@ public class DisabledSchedulerTest { @RegisterExtension static final QuarkusUnitTest test = new QuarkusUnitTest() - .withApplicationRoot((jar) -> jar + .withApplicationRoot(root -> root .addClasses(Jobs.class) .addAsResource(new StringAsset("quarkus.scheduler.enabled=false"), "application.properties")); @@ -25,8 +27,19 @@ public class DisabledSchedulerTest { Scheduler scheduler; @Test - public void testNoSchedulerInvocations() throws InterruptedException { + public void testSchedulerInvocations() throws InterruptedException { + assertNotNull(scheduler); + assertFalse(scheduler.isStarted()); assertFalse(scheduler.isRunning()); + assertNotNull(scheduler.implementation()); + assertThrows(UnsupportedOperationException.class, () -> scheduler.newJob("foo")); + assertThrows(UnsupportedOperationException.class, () -> scheduler.unscheduleJob("foo")); + assertThrows(UnsupportedOperationException.class, () -> scheduler.pause()); + assertThrows(UnsupportedOperationException.class, () -> scheduler.pause("foo")); + assertThrows(UnsupportedOperationException.class, () -> scheduler.resume()); + assertThrows(UnsupportedOperationException.class, () -> scheduler.resume("foo")); + assertThrows(UnsupportedOperationException.class, () -> scheduler.getScheduledJobs()); + assertThrows(UnsupportedOperationException.class, () -> scheduler.getScheduledJob("bar")); } static class Jobs { diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java index 55d90854249bd..3832f3a1fb436 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java @@ -32,6 +32,17 @@ public class CompositeScheduler implements Scheduler { this.schedulerContext = schedulerContext; } + @Override + public boolean isStarted() { + // IMPL NOTE: we return true if at least one of the schedulers is started + for (Scheduler scheduler : schedulers) { + if (scheduler.isStarted()) { + return true; + } + } + return false; + } + @Override public void pause() { for (Scheduler scheduler : schedulers) { diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index a9a4f9558fa05..595a23e9404aa 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -39,8 +39,6 @@ import io.quarkus.scheduler.DelayedExecution; import io.quarkus.scheduler.FailedExecution; import io.quarkus.scheduler.Scheduled; -import io.quarkus.scheduler.Scheduled.ConcurrentExecution; -import io.quarkus.scheduler.Scheduled.SkipPredicate; import io.quarkus.scheduler.ScheduledExecution; import io.quarkus.scheduler.ScheduledJobPaused; import io.quarkus.scheduler.ScheduledJobResumed; @@ -51,18 +49,13 @@ import io.quarkus.scheduler.SuccessfulExecution; import io.quarkus.scheduler.Trigger; import io.quarkus.scheduler.common.runtime.AbstractJobDefinition; +import io.quarkus.scheduler.common.runtime.BaseScheduler; import io.quarkus.scheduler.common.runtime.CronParser; import io.quarkus.scheduler.common.runtime.DefaultInvoker; -import io.quarkus.scheduler.common.runtime.DelayedExecutionInvoker; import io.quarkus.scheduler.common.runtime.Events; -import io.quarkus.scheduler.common.runtime.InstrumentedInvoker; -import io.quarkus.scheduler.common.runtime.OffloadingInvoker; import io.quarkus.scheduler.common.runtime.ScheduledInvoker; import io.quarkus.scheduler.common.runtime.ScheduledMethod; import io.quarkus.scheduler.common.runtime.SchedulerContext; -import io.quarkus.scheduler.common.runtime.SkipConcurrentExecutionInvoker; -import io.quarkus.scheduler.common.runtime.SkipPredicateInvoker; -import io.quarkus.scheduler.common.runtime.StatusEmitterInvoker; import io.quarkus.scheduler.common.runtime.SyntheticScheduled; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; import io.quarkus.scheduler.runtime.SchedulerRuntimeConfig.StartMode; @@ -71,7 +64,7 @@ @Typed(Scheduler.class) @Singleton -public class SimpleScheduler implements Scheduler { +public class SimpleScheduler extends BaseScheduler implements Scheduler { private static final Logger LOG = Logger.getLogger(SimpleScheduler.class); @@ -79,49 +72,24 @@ public class SimpleScheduler implements Scheduler { public static final long CHECK_PERIOD = 1000L; private final ScheduledExecutorService scheduledExecutor; - private final Vertx vertx; private volatile boolean running; private final ConcurrentMap scheduledTasks; - private final boolean enabled; - private final CronParser cronParser; - private final Duration defaultOverdueGracePeriod; - private final Event skippedExecutionEvent; - private final Event successExecutionEvent; - private final Event failedExecutionEvent; - private final Event delayedExecutionEvent; - private final Event schedulerPausedEvent; - private final Event schedulerResumedEvent; - private final Event scheduledJobPausedEvent; - private final Event scheduledJobResumedEvent; private final SchedulerConfig schedulerConfig; - private final Instance jobInstrumenter; - private final ScheduledExecutorService blockingExecutor; public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedulerRuntimeConfig, Event skippedExecutionEvent, Event successExecutionEvent, Event failedExecutionEvent, Event delayedExecutionEvent, - Event schedulerPausedEvent, - Event schedulerResumedEvent, Event scheduledJobPausedEvent, + Event schedulerPausedEvent, Event schedulerResumedEvent, + Event scheduledJobPausedEvent, Event scheduledJobResumedEvent, Vertx vertx, SchedulerConfig schedulerConfig, Instance jobInstrumenter, ScheduledExecutorService blockingExecutor) { + super(vertx, new CronParser(context.getCronType()), schedulerRuntimeConfig.overdueGracePeriod, + new Events(skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, delayedExecutionEvent, + schedulerPausedEvent, schedulerResumedEvent, scheduledJobPausedEvent, scheduledJobResumedEvent), + jobInstrumenter, blockingExecutor); this.running = true; - this.enabled = schedulerRuntimeConfig.enabled; this.scheduledTasks = new ConcurrentHashMap<>(); - this.vertx = vertx; - this.skippedExecutionEvent = skippedExecutionEvent; - this.successExecutionEvent = successExecutionEvent; - this.failedExecutionEvent = failedExecutionEvent; - this.delayedExecutionEvent = delayedExecutionEvent; - this.schedulerPausedEvent = schedulerPausedEvent; - this.schedulerResumedEvent = schedulerResumedEvent; - this.scheduledJobPausedEvent = scheduledJobPausedEvent; - this.scheduledJobResumedEvent = scheduledJobResumedEvent; this.schedulerConfig = schedulerConfig; - this.jobInstrumenter = jobInstrumenter; - this.blockingExecutor = blockingExecutor; - - this.cronParser = new CronParser(context.getCronType()); - this.defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod; if (!schedulerRuntimeConfig.enabled) { this.scheduledExecutor = null; @@ -186,8 +154,7 @@ public void run() { if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { instrumenter = jobInstrumenter.get(); } - ScheduledInvoker invoker = initInvoker(context.createInvoker(method.getInvokerClassName()), - skippedExecutionEvent, successExecutionEvent, failedExecutionEvent, delayedExecutionEvent, + ScheduledInvoker invoker = initInvoker(context.createInvoker(method.getInvokerClassName()), events, scheduled.concurrentExecution(), initSkipPredicate(scheduled.skipExecutionIf()), instrumenter, vertx, false, SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor); scheduledTasks.put(trigger.get().id, new ScheduledTask(trigger.get(), invoker, false)); @@ -196,6 +163,11 @@ public void run() { } } + @Override + public boolean isStarted() { + return scheduledExecutor != null; + } + @Override public String implementation() { return Scheduled.SIMPLE; @@ -203,6 +175,9 @@ public String implementation() { @Override public JobDefinition newJob(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity); if (scheduledTasks.containsKey(identity)) { throw new IllegalStateException("A job with this identity is already scheduled: " + identity); @@ -212,6 +187,9 @@ public JobDefinition newJob(String identity) { @Override public Trigger unscheduleJob(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity); if (!identity.isEmpty()) { String parsedIdentity = SchedulerUtils.lookUpPropertyValue(identity); @@ -263,16 +241,18 @@ void checkTriggers() { @Override public void pause() { - if (!enabled) { - LOG.warn("Scheduler is disabled and cannot be paused"); - } else { - running = false; - Events.fire(schedulerPausedEvent, SchedulerPaused.INSTANCE); + if (!isStarted()) { + throw notStarted(); } + running = false; + events.fireSchedulerPaused(); } @Override public void pause(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity, "Cannot pause - identity is null"); if (identity.isEmpty()) { LOG.warn("Cannot pause - identity is empty"); @@ -282,12 +262,15 @@ public void pause(String identity) { ScheduledTask task = scheduledTasks.get(parsedIdentity); if (task != null) { task.trigger.setRunning(false); - Events.fire(scheduledJobPausedEvent, new ScheduledJobPaused(task.trigger)); + events.fireScheduledJobPaused(new ScheduledJobPaused(task.trigger)); } } @Override public boolean isPaused(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity); if (identity.isEmpty()) { return false; @@ -302,16 +285,18 @@ public boolean isPaused(String identity) { @Override public void resume() { - if (!enabled) { - LOG.warn("Scheduler is disabled and cannot be resumed"); - } else { - running = true; - Events.fire(schedulerResumedEvent, SchedulerResumed.INSTANCE); + if (!isStarted()) { + throw notStarted(); } + running = true; + events.fireSchedulerResumed(); } @Override public void resume(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity, "Cannot resume - identity is null"); if (identity.isEmpty()) { LOG.warn("Cannot resume - identity is empty"); @@ -321,22 +306,28 @@ public void resume(String identity) { ScheduledTask task = scheduledTasks.get(parsedIdentity); if (task != null) { task.trigger.setRunning(true); - Events.fire(scheduledJobResumedEvent, new ScheduledJobResumed(task.trigger)); + events.fireScheduledJobResumed(new ScheduledJobResumed(task.trigger)); } } @Override public boolean isRunning() { - return enabled && running; + return isStarted() && running; } @Override public List getScheduledJobs() { + if (!isStarted()) { + throw notStarted(); + } return scheduledTasks.values().stream().map(task -> task.trigger).collect(Collectors.toUnmodifiableList()); } @Override public Trigger getScheduledJob(String identity) { + if (!isStarted()) { + throw notStarted(); + } Objects.requireNonNull(identity); if (identity.isEmpty()) { return null; @@ -382,38 +373,6 @@ Optional createTrigger(String id, String methodDescription, Sched } } - public static ScheduledInvoker initInvoker(ScheduledInvoker invoker, Event skippedExecutionEvent, - Event successExecutionEvent, Event failedExecutionEvent, - Event delayedExecutionEvent, - ConcurrentExecution concurrentExecution, Scheduled.SkipPredicate skipPredicate, JobInstrumenter instrumenter, - Vertx vertx, boolean skipOffloadingInvoker, - OptionalLong delay, ScheduledExecutorService blockingExecutor) { - invoker = new StatusEmitterInvoker(invoker, successExecutionEvent, failedExecutionEvent); - if (concurrentExecution == ConcurrentExecution.SKIP) { - invoker = new SkipConcurrentExecutionInvoker(invoker, skippedExecutionEvent); - } - if (skipPredicate != null) { - invoker = new SkipPredicateInvoker(invoker, skipPredicate, skippedExecutionEvent); - } - if (instrumenter != null) { - invoker = new InstrumentedInvoker(invoker, instrumenter); - } - if (!skipOffloadingInvoker) { - invoker = new OffloadingInvoker(invoker, vertx); - } - if (delay.isPresent()) { - invoker = new DelayedExecutionInvoker(invoker, delay.getAsLong(), blockingExecutor, delayedExecutionEvent); - } - return invoker; - } - - public static Scheduled.SkipPredicate initSkipPredicate(Class predicateClass) { - if (predicateClass.equals(Scheduled.Never.class)) { - return null; - } - return SchedulerUtils.instantiateBeanOrClass(predicateClass); - } - static class ScheduledTask { final boolean isProgrammatic; @@ -706,10 +665,8 @@ public boolean isBlocking() { if (schedulerConfig.tracingEnabled && jobInstrumenter.isResolvable()) { instrumenter = jobInstrumenter.get(); } - invoker = initInvoker(invoker, skippedExecutionEvent, successExecutionEvent, - failedExecutionEvent, delayedExecutionEvent, concurrentExecution, skipPredicate, instrumenter, vertx, - false, - SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor); + invoker = initInvoker(invoker, events, concurrentExecution, skipPredicate, instrumenter, vertx, + false, SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor); ScheduledTask scheduledTask = new ScheduledTask(trigger.get(), invoker, true); ScheduledTask existing = scheduledTasks.putIfAbsent(simpleTrigger.id, scheduledTask); if (existing != null) {