Skip to content

Commit

Permalink
Merge pull request wildfly#17962 from pferraro/WFLY-19361
Browse files Browse the repository at this point in the history
WFLY-19419 Distributed timer service should consolidate timeouts that would execute in the past
  • Loading branch information
pferraro authored Jul 22, 2024
2 parents 110706b + 732a00e commit 84e42d7
Show file tree
Hide file tree
Showing 14 changed files with 600 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class IntervalTimerMetaDataEntry<C> extends AbstractTimerMetaDataEntry<C>
private final Duration interval;

public IntervalTimerMetaDataEntry(C context, IntervalTimerConfiguration config) {
this(context, config.getStart(), config.getInterval());
super(context, config);
this.interval = config.getInterval();
}

public IntervalTimerMetaDataEntry(C context, Instant start, Duration interval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.wildfly.clustering.ejb.cache.timer;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Instant;
import java.util.ServiceLoader;
import java.util.function.UnaryOperator;
Expand All @@ -25,7 +27,12 @@ public enum ScheduleTimerOperatorFactory implements ScheduleTimerOperationProvid
}

private static ScheduleTimerOperationProvider load() {
return ServiceLoader.load(ScheduleTimerOperationProvider.class, ScheduleTimerOperationProvider.class.getClassLoader()).findFirst().orElseThrow(IllegalStateException::new);
return AccessController.doPrivileged(new PrivilegedAction<>() {
@Override
public ScheduleTimerOperationProvider run() {
return ServiceLoader.load(ScheduleTimerOperationProvider.class, ScheduleTimerOperationProvider.class.getClassLoader()).findFirst().orElseThrow(IllegalStateException::new);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.UUID;

Expand All @@ -26,7 +27,7 @@ public abstract class AbstractIntervalTimerMetaDataEntryTestCase extends Abstrac

@Parameters
public static Iterable<IntervalTimerConfiguration> parameters() {
Instant start = Instant.now();
Instant start = Instant.now().truncatedTo(ChronoUnit.MILLIS);
return List.of(new IntervalTimerConfiguration() {
@Override
public Instant getStart() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,45 +138,53 @@ public Boolean call() throws Exception {

// Safeguard : ensure timeout was not already triggered elsewhere
if (currentTimeoutReference.isEmpty()) {
InfinispanEjbLogger.ROOT_LOGGER.debugf("Unexpected timeout event triggered.", id);
InfinispanEjbLogger.ROOT_LOGGER.debugf("Unexpected timeout event triggered for %s", id);
return false;
}

Instant now = Instant.now();
Instant currentTimeout = currentTimeoutReference.get();
if (currentTimeout.isAfter(Instant.now())) {
InfinispanEjbLogger.ROOT_LOGGER.debugf("Timeout for timer %s initiated prematurely.", id);
if (currentTimeout.isAfter(now)) {
InfinispanEjbLogger.ROOT_LOGGER.debugf("Timeout for timer %s initiated prematurely @ %s", id, currentTimeout);
return false;
}

Timer<I> timer = factory.createTimer(id, metaData, manager, scheduler);

InfinispanEjbLogger.ROOT_LOGGER.debugf("Triggering timeout for timer %s [%s]", id, timer.getMetaData().getContext());

// In case we need to reset the last timeout
Optional<Instant> lastTimeout = metaData.getLastTimeout();
// Record last timeout - expected to be set prior to triggering timeout
// Capture previous last timeout in case we need to reset it
Optional<Instant> originalLastTimeout = metaData.getLastTimeout();
// Record new last timeout - expected to be set prior to triggering timeout
metaData.setLastTimeout(currentTimeout);

try {
timer.invoke();
} catch (ExecutionException e) {
// Log error and proceed as if it was successful
InfinispanEjbLogger.ROOT_LOGGER.error(e.getLocalizedMessage(), e);
} catch (RejectedExecutionException e) {
// Component is not started or is suspended
InfinispanEjbLogger.ROOT_LOGGER.debugf("EJB component is suspended - could not invoke timeout for timer %s", id);
// Reset last timeout
metaData.setLastTimeout(lastTimeout.orElse(null));
return false;
}

// If timeout callback canceled this timer
if (timer.isCanceled()) {
InfinispanEjbLogger.ROOT_LOGGER.debugf("Timeout callback canceled timer %s", id);
return true;
}

// Determine next timeout
Optional<Instant> nextTimeout = metaData.getNextTimeout();
// WFLY-19361: If next timeout is in the past do not trigger event
if (nextTimeout.orElse(now).isBefore(now)) {
// This has the effect of consolidating missed timeouts into a single event
InfinispanEjbLogger.ROOT_LOGGER.debugf("Skipping notification of missed timeout for timer %s @ %s", id, currentTimeout);
} else {
InfinispanEjbLogger.ROOT_LOGGER.debugf("Triggering timeout for timer %s @ %s", id, currentTimeout);

Timer<I> timer = factory.createTimer(id, metaData, manager, scheduler);
try {
timer.invoke();
} catch (ExecutionException e) {
// Log error and proceed as if it was successful
InfinispanEjbLogger.ROOT_LOGGER.error(e.getLocalizedMessage(), e);
} catch (RejectedExecutionException e) {
// Component is not started or is suspended
InfinispanEjbLogger.ROOT_LOGGER.debugf("EJB component is suspended - could not invoke timeout for timer %s", id);
// Reset last timeout
metaData.setLastTimeout(originalLastTimeout.orElse(null));
batch.discard();
return false;
}

// If timeout callback canceled this timer
if (timer.isCanceled()) {
InfinispanEjbLogger.ROOT_LOGGER.debugf("Timeout callback canceled timer %s", id);
return true;
}
}

if (nextTimeout.isEmpty()) {
InfinispanEjbLogger.ROOT_LOGGER.debugf("Timer %s has expired", id);
registry.unregister(id);
Expand All @@ -191,7 +199,7 @@ public Boolean call() throws Exception {
}

// Reschedule using next timeout
InfinispanEjbLogger.ROOT_LOGGER.debugf("Rescheduling timer %s for next timeout %s", id, nextTimeout);
InfinispanEjbLogger.ROOT_LOGGER.debugf("Rescheduling timer %s for next timeout %s", id, nextTimeout.get());
entries.add(id, nextTimeout.get());
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Date;
import java.util.GregorianCalendar;

import org.jboss.as.ejb3.timerservice.schedule.CalendarBasedTimeout;
import org.jboss.as.ejb3.timerservice.spi.TimedObjectInvoker;

/**
Expand Down Expand Up @@ -52,14 +53,17 @@ protected Date calculateNextTimeout(TimerImpl timer) {
if (currentTimeout == null) {
return null;
}
Calendar cal = new GregorianCalendar();
cal.setTime(currentTimeout);
// now compute the next timeout date
Calendar nextTimeout = ((CalendarTimer) timer).getCalendarTimeout().getNextTimeout(cal);
if (nextTimeout != null) {
return nextTimeout.getTime();
}
return null;
Calendar nextTimeout = new GregorianCalendar();
nextTimeout.setTime(currentTimeout);

CalendarBasedTimeout timeout = ((CalendarTimer) timer).getCalendarTimeout();
Date now = new Date();
do {
nextTimeout = timeout.getNextTimeout(nextTimeout);
// Ensure next timeout is in the future
} while ((nextTimeout != null) && nextTimeout.getTime().before(now));

return (nextTimeout != null) ? nextTimeout.getTime() : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.IdentityHashMap;
Expand Down Expand Up @@ -39,6 +40,7 @@
import org.jboss.as.test.clustering.cluster.ejb.timer.servlet.TimerServlet;
import org.jboss.as.test.clustering.ejb.EJBDirectory;
import org.jboss.as.test.http.util.TestHttpClientUtils;
import org.jboss.as.test.shared.TimeoutUtil;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import org.junit.Assert;
Expand All @@ -60,6 +62,7 @@ protected static WebArchive createArchive(Class<? extends AbstractTimerServiceTe
;
}

private static final Duration GRACE_PERIOD = Duration.ofSeconds(TimeoutUtil.adjust(2));
private final String moduleName;

protected AbstractTimerServiceTestCase() {
Expand All @@ -77,7 +80,7 @@ public void test(@ArquillianResource(TimerServlet.class) @OperateOnDeployment(DE

try (CloseableHttpClient client = TestHttpClientUtils.promiscuousCookieHttpClient()) {

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

// Create manual timers on node 1 only
try (CloseableHttpResponse response = client.execute(new HttpPut(uris.get(NODE_1)))) {
Expand All @@ -98,7 +101,7 @@ public void test(@ArquillianResource(TimerServlet.class) @OperateOnDeployment(DE
}
}

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

Map<Class<? extends TimerBean>, Map<String, List<Instant>>> timeouts = new IdentityHashMap<>();
for (Class<? extends TimerBean> beanClass : TimerServlet.TIMER_CLASSES) {
Expand Down Expand Up @@ -150,7 +153,7 @@ public void test(@ArquillianResource(TimerServlet.class) @OperateOnDeployment(DE
}
}

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

for (Map<String, List<Instant>> beanTimeouts : timeouts.values()) {
beanTimeouts.clear();
Expand Down Expand Up @@ -188,7 +191,7 @@ public void test(@ArquillianResource(TimerServlet.class) @OperateOnDeployment(DE

this.stop(NODE_1);

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

try (CloseableHttpResponse response = client.execute(new HttpHead(uris.get(NODE_2)))) {
Assert.assertEquals(HttpServletResponse.SC_OK, response.getStatusLine().getStatusCode());
Expand Down Expand Up @@ -224,7 +227,7 @@ public void test(@ArquillianResource(TimerServlet.class) @OperateOnDeployment(DE

this.start(NODE_1);

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

for (Map<String, List<Instant>> beanTimeouts : timeouts.values()) {
beanTimeouts.clear();
Expand Down Expand Up @@ -253,7 +256,7 @@ public void test(@ArquillianResource(TimerServlet.class) @OperateOnDeployment(DE
}
}

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

for (Map<String, List<Instant>> beanTimeouts : timeouts.values()) {
beanTimeouts.clear();
Expand Down Expand Up @@ -284,7 +287,7 @@ public void test(@ArquillianResource(TimerServlet.class) @OperateOnDeployment(DE

this.stop(NODE_2);

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

for (Map<String, List<Instant>> beanTimeouts : timeouts.values()) {
beanTimeouts.clear();
Expand All @@ -308,15 +311,15 @@ public void test(@ArquillianResource(TimerServlet.class) @OperateOnDeployment(DE

this.start(NODE_2);

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

try (CloseableHttpResponse response = client.execute(new HttpDelete(uris.get(NODE_1)))) {
Assert.assertEquals(HttpServletResponse.SC_OK, response.getStatusLine().getStatusCode());
}

Instant cancellation = Instant.now();

TimeUnit.SECONDS.sleep(2);
TimeUnit.SECONDS.sleep(GRACE_PERIOD.getSeconds());

for (Map<String, List<Instant>> beanTimeouts : timeouts.values()) {
beanTimeouts.clear();
Expand Down
Loading

0 comments on commit 84e42d7

Please sign in to comment.