Skip to content

Commit

Permalink
Merge pull request #11 from Azquelt/microprofile-changes-integration
Browse files Browse the repository at this point in the history
Microprofile Fault Tolerance changes (integration branch)
  • Loading branch information
Azquelt authored Sep 19, 2017
2 parents 4ebf0ef + 7a36ade commit 06f1160
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public void beforeBeanDiscovery(@Observes BeforeBeanDiscovery beforeBeanDiscover
beforeBeanDiscovery.addInterceptorBinding(bindingType);
AnnotatedType<FaultToleranceInterceptor> interceptorType = beanManager.createAnnotatedType(FaultToleranceInterceptor.class);
beforeBeanDiscovery.addAnnotatedType(interceptorType);
AnnotatedType<FaultToleranceInterceptor.ExecutorCleanup> executorCleanup = beanManager.createAnnotatedType(FaultToleranceInterceptor.ExecutorCleanup.class);
beforeBeanDiscovery.addAnnotatedType(executorCleanup);
}

public <T> void processAnnotatedType(@Observes @WithAnnotations({ Asynchronous.class, Fallback.class, Timeout.class, CircuitBreaker.class, Retry.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

import javax.annotation.PreDestroy;
import javax.annotation.Priority;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Inject;
import javax.interceptor.AroundInvoke;
Expand All @@ -32,6 +34,8 @@
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.ws.ffdc.annotation.FFDCIgnore;
import com.ibm.ws.microprofile.faulttolerance.cdi.config.AsynchronousConfig;
import com.ibm.ws.microprofile.faulttolerance.cdi.config.BulkheadConfig;
Expand All @@ -58,6 +62,11 @@ public class FaultToleranceInterceptor {

private final ConcurrentHashMap<Method, AggregatedFTPolicy> policyCache = new ConcurrentHashMap<>();

@Inject
public FaultToleranceInterceptor(ExecutorCleanup executorCleanup) {
executorCleanup.setPolicies(policyCache.values());
}

@AroundInvoke
public Object executeFT(InvocationContext context) throws Throwable {

Expand Down Expand Up @@ -214,7 +223,12 @@ private Object execute(InvocationContext invocationContext, AggregatedFTPolicy a
};

Executor<Future<Object>> async = (Executor<Future<Object>>) executor;
result = async.execute(callable, executionContext);
try {
result = async.execute(callable, executionContext);
} catch (ExecutionException e) {
throw e.getCause();
}

} else {

Callable<Object> callable = () -> {
Expand All @@ -235,11 +249,25 @@ private Object execute(InvocationContext invocationContext, AggregatedFTPolicy a
return result;
}

@PreDestroy
public void cleanUpExecutors(InvocationContext ctx) throws Exception {
ctx.proceed();
policyCache.forEach((k, v) -> {
v.close();
});
@Dependent
public static class ExecutorCleanup {
private static final TraceComponent tc = Tr.register(ExecutorCleanup.class);

private Collection<AggregatedFTPolicy> policies;

public void setPolicies(Collection<AggregatedFTPolicy> policies) {
this.policies = policies;
}

@PreDestroy
public void cleanUpExecutors() {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Cleaning up executors");
}

policies.forEach((e) -> {
e.close();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,33 @@ public void testCBFailureThresholdWithException() throws Exception {
"SUCCESS");
}

@Test
public void testCBAsync() throws Exception {
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/circuitbreaker?testMethod=testCBAsync",
"SUCCESS");
}

@Test
public void testCBAsyncFallback() throws Exception {
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/circuitbreaker?testMethod=testCBAsyncFallback",
"SUCCESS");
}

/** {@inheritDoc} */
@Override
protected SharedServer getSharedServer() {
return SHARED_SERVER;
}
@BeforeClass
public static void setUp() throws Exception {
if (!SHARED_SERVER.getLibertyServer().isStarted()) {
SHARED_SERVER.getLibertyServer().startServer();
}
}

@BeforeClass
public static void setUp() throws Exception {
if (!SHARED_SERVER.getLibertyServer().isStarted()) {
SHARED_SERVER.getLibertyServer().startServer();
}

}

@AfterClass
public static void tearDown() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.ClassRule;
import org.junit.Test;

import com.ibm.websphere.simplicity.RemoteFile;
import com.ibm.ws.fat.util.LoggingTest;
import com.ibm.ws.fat.util.SharedServer;
import com.ibm.ws.fat.util.browser.WebBrowser;
Expand Down Expand Up @@ -72,6 +73,23 @@ public void testRetryDurationZero() throws Exception {
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/retry?testMethod=testRetryDurationZero", "SUCCESS");
}

/**
* Not really related to retry but it's easiest to test it here
*/
@Test
public void testExecutorsClose() throws Exception {

RemoteFile traceLog = SHARED_SERVER.getLibertyServer().getMostRecentTraceFile();
SHARED_SERVER.getLibertyServer().setMarkToEndOfLog(traceLog);

// This calls a RequestScoped bean which only has fault tolerance annotations on the method
// This should cause executors to get cleaned up
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/retry?testMethod=testRetryAbortOn", "SUCCESS");

SHARED_SERVER.getLibertyServer().waitForStringInLog("Cleaning up executors", traceLog);
}

/** {@inheritDoc} */
@Override
protected SharedServer getSharedServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ public void testTimeoutZero() throws Exception {
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/timeout?testMethod=testTimeoutZero", "SUCCESS");
}

@Test
public void testNonInterruptableTimeout() throws Exception {
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/timeout?testMethod=testNonInterruptableTimeout", "SUCCESS");
}

@Test
public void testNonInterruptableDoesntTimeout() throws Exception {
WebBrowser browser = createWebBrowserForTestCase();
getSharedServer().verifyResponse(browser, "/CDIFaultTolerance/timeout?testMethod=testNonInterruptableDoesntTimeout", "SUCCESS");
}

/** {@inheritDoc} */
@Override
protected SharedServer getSharedServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void testAsyncDisabled(HttpServletRequest request, HttpServletResponse re
long duration = end - start;

// Ensure that this method was executed synchronously
assertThat("Call duration", duration, greaterThan(TestConstants.WORK_TIME));
assertThat("Call duration", duration, greaterThan(TestConstants.WORK_TIME - TestConstants.TEST_TWEAK_TIME_UNIT));
assertThat("Call result", future.get(), is(notNullValue()));
assertThat("Call result", future.get().getData(), equalTo(AsyncBean.CONNECT_A_DATA));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.ibm.ws.microprofile.faulttolerance_fat.cdi;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/*******************************************************************************
* Copyright (c) 2017 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
Expand All @@ -12,6 +16,8 @@
*******************************************************************************/

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import javax.inject.Inject;
import javax.servlet.ServletException;
Expand Down Expand Up @@ -106,6 +112,44 @@ public void testCBFailureThresholdWithException(HttpServletRequest request,
}
}

public void testCBAsync() throws Exception {
for (int i = 0; i < 3; i++) {
try {
bean.serviceC().get();
fail("Exception not thrown");
} catch (ExecutionException e) {
//assertThat("Execution exception cause", e.getCause(), instanceOf(ConnectException.class));
}
}

// Circuit should now be open

try {
bean.serviceC();
fail("Exception not thrown");
} catch (CircuitBreakerOpenException e) {
// Expected
}
}

public void testCBAsyncFallback() throws Exception {
for (int i = 0; i < 3; i++) {
assertThat(bean.serviceD().get(), is("serviceDFallback"));
}

assertThat(bean.getExecutionCounterD(), is(3));

// Circuit should now be open
Future<String> future = bean.serviceD();
String result = future.get();

// CB is open, expect to fall back
assertThat(result, is("serviceDFallback"));

// However, we don't expect the call to have reached the serviceD method
assertThat(bean.getExecutionCounterD(), is(3));
}

/**
* This test should only pass if MP_Fault_Tolerance_NonFallback_Enabled is set to false
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,27 @@ public void testTimeoutZero(HttpServletRequest request, HttpServletResponse resp
// No TimeoutException expected
}

public void testNonInterruptableTimeout() throws InterruptedException {
try {
bean.busyWait(1000); // Busy wait time is greater than timeout (=500)
fail("No exception thrown");
} catch (TimeoutException e) {
if (Thread.interrupted()) {
fail("Thread was in interrupted state upon return");
}
// This wait is to ensure our thread doesn't get interrupted later, after the method has finished
Thread.sleep(1000);
}
}

public void testNonInterruptableDoesntTimeout() throws Exception {
bean.busyWait(10); // Busy wait time is less than timeout (=500)

if (Thread.interrupted()) {
fail("Thread was in interrupted state upon return");
}

Thread.sleep(2000); // Wait to ensure that our thread isn't interrupted after the method has finished
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
package com.ibm.ws.microprofile.faulttolerance_fat.cdi.beans;

import java.time.temporal.ChronoUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import javax.enterprise.context.RequestScoped;

import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.CircuitBreaker;
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.faulttolerance.Timeout;

import com.ibm.ws.microprofile.faulttolerance_fat.util.ConnectException;
Expand All @@ -24,6 +28,7 @@ public class CircuitBreakerBean {

private int executionCounterA = 0;
private int executionCounterB = 0;
private int executionCounterD = 0;

@CircuitBreaker(delay = 1, delayUnit = ChronoUnit.SECONDS, requestVolumeThreshold = 3, failureRatio = 1.0)
@Timeout(value = 3, unit = ChronoUnit.SECONDS)
Expand Down Expand Up @@ -52,4 +57,26 @@ public String serviceB() throws ConnectException {
}
return "serviceB: " + executionCounterB;
}

@Asynchronous
@CircuitBreaker(requestVolumeThreshold = 3, failureRatio = 1.0)
public Future<String> serviceC() throws ConnectException {
throw new ConnectException("serviceC");
}

@Asynchronous
@CircuitBreaker(requestVolumeThreshold = 3, failureRatio = 1.0)
@Fallback(fallbackMethod = "serviceDFallback")
public Future<String> serviceD() throws ConnectException {
executionCounterD++;
throw new ConnectException("serviceD: " + executionCounterD);
}

public Future<String> serviceDFallback() {
return CompletableFuture.completedFuture("serviceDFallback");
}

public int getExecutionCounterD() {
return executionCounterD;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*******************************************************************************/
package com.ibm.ws.microprofile.faulttolerance_fat.cdi.beans;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -91,4 +92,18 @@ public int getConnectDCalls() {
public void connectE() throws InterruptedException {
Thread.sleep(2000);
}

/**
* Used for testing timeout with workloads which are not interruptable
*
* @param milliseconds number of milliseconds to busy wait for
*/
@Timeout(500)
public void busyWait(int milliseconds) {
long duration = Duration.ofMillis(milliseconds).toNanos();
long start = System.nanoTime();
while (System.nanoTime() - start < duration) {
// Do nothing
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.ws.microprofile.faulttolerance.spi.CircuitBreakerPolicy;

import net.jodah.failsafe.CircuitBreaker;
Expand All @@ -22,11 +24,9 @@
*/
public class CircuitBreakerImpl extends CircuitBreaker {

private final boolean async;
private volatile boolean nested = false;
private static final TraceComponent tc = Tr.register(CircuitBreakerImpl.class);

public CircuitBreakerImpl(CircuitBreakerPolicy policy, boolean async) {
this.async = async;
public CircuitBreakerImpl(CircuitBreakerPolicy policy) {

Duration delay = policy.getDelay();
Class<? extends Throwable>[] failOn = policy.getFailOn();
Expand All @@ -48,14 +48,4 @@ public CircuitBreakerImpl(CircuitBreakerPolicy policy, boolean async) {
withSuccessThreshold(successThreshold);
}

@Override
public void recordSuccess() {
if (!async || nested) {
super.recordSuccess();
}
}

public void setNested() {
this.nested = true;
}
}
Loading

0 comments on commit 06f1160

Please sign in to comment.