Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Fault Tolerance 1.1.3 #253

Merged
merged 16 commits into from
Dec 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ public class BulkheadAntn extends MethodAntn implements Bulkhead {
/**
* Constructor.
*
* @param beanClass Bean class.
* @param method The method.
*/
public BulkheadAntn(Method method) {
super(method);
public BulkheadAntn(Class<?> beanClass, Method method) {
super(beanClass, method);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ public class CircuitBreakerAntn extends MethodAntn implements CircuitBreaker {
/**
* Constructor.
*
* @param beanClass The bean class.
* @param method The method.
*/
public CircuitBreakerAntn(Method method) {
super(method);
public CircuitBreakerAntn(Class<?> beanClass, Method method) {
super(beanClass, method);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.helidon.microprofile.faulttolerance;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import javax.enterprise.inject.spi.CDI;
Expand All @@ -25,13 +26,17 @@
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.faulttolerance.FallbackHandler;

import static io.helidon.microprofile.faulttolerance.ExceptionUtil.toException;

/**
* Class CommandFallback.
*/
class CommandFallback {

private final InvocationContext context;

private final Throwable throwable;

private Class<? extends FallbackHandler<?>> handlerClass;

private Method fallbackMethod;
Expand All @@ -41,9 +46,11 @@ class CommandFallback {
*
* @param context Invocation context.
* @param introspector Method introspector.
* @param throwable Throwable that resulted in this fallback being called.
*/
CommandFallback(InvocationContext context, MethodIntrospector introspector) {
CommandFallback(InvocationContext context, MethodIntrospector introspector, Throwable throwable) {
this.context = context;
this.throwable = throwable;

// Establish fallback strategy
final Fallback fallback = introspector.getFallback();
Expand Down Expand Up @@ -71,33 +78,56 @@ class CommandFallback {
public Object execute() throws Exception {
assert handlerClass != null || fallbackMethod != null;

updateMetrics();

if (handlerClass != null) {
// Instantiate handler using CDI
FallbackHandler<?> handler = CDI.current().select(handlerClass).get();
return handler.handle(
new ExecutionContext() {
@Override
public Method getMethod() {
return context.getMethod();
}

@Override
public Object[] getParameters() {
return context.getParameters();
}
}
);
} else {
return fallbackMethod.invoke(context.getTarget(), context.getParameters());
Object result;
try {
if (handlerClass != null) {
// Instantiate handler using CDI
FallbackHandler<?> handler = CDI.current().select(handlerClass).get();
result = handler.handle(
new ExecutionContext() {
@Override
public Method getMethod() {
return context.getMethod();
}

@Override
public Object[] getParameters() {
return context.getParameters();
}

@Override
public Throwable getFailure() {
return throwable;
}
}
);
} else {
result = fallbackMethod.invoke(context.getTarget(), context.getParameters());
}
} catch (Throwable t) {
updateMetrics(t);

// If InvocationTargetException, then unwrap underlying cause
if (t instanceof InvocationTargetException) {
t = t.getCause();
}
throw toException(t);
}

updateMetrics(null);
tjquinno marked this conversation as resolved.
Show resolved Hide resolved
return result;
}

/**
* Updates fallback metrics.
* Updates fallback metrics and adjust failed invocations based on outcome of fallback.
*/
private void updateMetrics() {
FaultToleranceMetrics.getCounter(context.getMethod(), FaultToleranceMetrics.FALLBACK_CALLS_TOTAL).inc();
private void updateMetrics(Throwable throwable) {
final Method method = context.getMethod();
FaultToleranceMetrics.getCounter(method, FaultToleranceMetrics.FALLBACK_CALLS_TOTAL).inc();

// If fallback was successful, it is not a failed invocation
if (throwable == null) {
FaultToleranceMetrics.getCounter(method, FaultToleranceMetrics.INVOCATIONS_FAILED_TOTAL).dec();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public Object interceptCommand(InvocationContext context) throws Throwable {
+ "::" + context.getMethod().getName() + "'");

// Create method introspector and executer retrier
final MethodIntrospector introspector = new MethodIntrospector(context.getMethod());
final MethodIntrospector introspector = new MethodIntrospector(
context.getTarget().getClass(), context.getMethod());
final CommandRetrier retrier = new CommandRetrier(context, introspector);
return retrier.execute();
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netflix.hystrix.exception.HystrixRuntimeException;
import net.jodah.failsafe.AsyncFailsafe;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.FailsafeException;
import net.jodah.failsafe.FailsafeFuture;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.SyncFailsafe;
Expand All @@ -37,6 +38,23 @@
import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException;
import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;

import static io.helidon.microprofile.faulttolerance.ExceptionUtil.toException;
import static io.helidon.microprofile.faulttolerance.FaultToleranceExtension.isFaultToleranceMetricsEnabled;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.BULKHEAD_CALLS_ACCEPTED_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.BULKHEAD_CALLS_REJECTED_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.BULKHEAD_EXECUTION_DURATION;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.INVOCATIONS_FAILED_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.INVOCATIONS_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.RETRY_CALLS_FAILED_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.RETRY_CALLS_SUCCEEDED_NOT_RETRIED_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.RETRY_CALLS_SUCCEEDED_RETRIED_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.RETRY_RETRIES_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.TIMEOUT_CALLS_NOT_TIMED_OUT_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.TIMEOUT_CALLS_TIMED_OUT_TOTAL;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.TIMEOUT_EXECUTION_DURATION;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.getCounter;
import static io.helidon.microprofile.faulttolerance.FaultToleranceMetrics.getHistogram;

/**
* Class CommandRetrier.
*/
Expand All @@ -53,7 +71,7 @@ public class CommandRetrier {

private final Method method;

private boolean firstInvocation = true;
private int invocationCount = 0;

private FaultToleranceCommand command;

Expand Down Expand Up @@ -114,28 +132,33 @@ public CommandRetrier(InvocationContext context, MethodIntrospector introspector
* Retries running a command according to retry policy.
*
* @return Object returned by command.
* @throws Exception If something goes wrong.
*/
@SuppressWarnings("unchecked")
public Object execute() {
public Object execute() throws Exception {
LOGGER.fine("Executing command with isAsynchronous = " + isAsynchronous);

CheckedFunction fallbackFunction = t -> {
final CommandFallback fallback = new CommandFallback(context, introspector);
CheckedFunction<? extends Throwable, ?> fallbackFunction = t -> {
final CommandFallback fallback = new CommandFallback(context, introspector, t);
return fallback.execute();
};

if (isAsynchronous) {
Scheduler scheduler = CommandScheduler.create();
AsyncFailsafe<Object> failsafe = Failsafe.with(retryPolicy).with(scheduler);
FailsafeFuture<?> chainedFuture = (FailsafeFuture<?>) (introspector.hasFallback()
? failsafe.withFallback(fallbackFunction).get(this::retryExecute)
: failsafe.get(this::retryExecute));
return new FailsafeChainedFuture<>(chainedFuture);
} else {
SyncFailsafe<Object> failsafe = Failsafe.with(retryPolicy);
return introspector.hasFallback()
? failsafe.withFallback(fallbackFunction).get(this::retryExecute)
: failsafe.get(this::retryExecute);
try {
if (isAsynchronous) {
Scheduler scheduler = CommandScheduler.create();
AsyncFailsafe<Object> failsafe = Failsafe.with(retryPolicy).with(scheduler);
FailsafeFuture<?> chainedFuture = (introspector.hasFallback()
? failsafe.withFallback(fallbackFunction).get(this::retryExecute)
: failsafe.get(this::retryExecute));
return new FailsafeChainedFuture<>(chainedFuture);
} else {
SyncFailsafe<Object> failsafe = Failsafe.with(retryPolicy);
return introspector.hasFallback()
? failsafe.withFallback(fallbackFunction).get(this::retryExecute)
: failsafe.get(this::retryExecute);
}
} catch (FailsafeException e) {
throw toException(e.getCause());
}
}

Expand All @@ -146,32 +169,33 @@ public Object execute() {
*
* @return Object returned by command.
*/
private Object retryExecute() {
private Object retryExecute() throws Exception {
final String commandKey = createCommandKey();
command = new FaultToleranceCommand(commandKey, introspector, context);

Object result;
try {
LOGGER.info("About to execute command with key " + command.getCommandKey());
invocationCount++;
updateMetricsBefore();
result = command.execute();
updateMetricsAfter(null);
} catch (HystrixRuntimeException e) {
} catch (ExceptionUtil.WrappedException e) {
Throwable cause = e.getCause();
if (cause instanceof HystrixRuntimeException) {
cause = cause.getCause();
}
updateMetricsAfter(cause);
if (cause instanceof TimeoutException) {
throw new org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException(cause);
}
if (cause instanceof RejectedExecutionException) {
throw new BulkheadException(cause);
}
if (!(cause instanceof RuntimeException)) {
cause = new RuntimeException(cause);
}
if (command.isCircuitBreakerOpen()) {
throw new CircuitBreakerOpenException(cause);
}
throw (RuntimeException) cause;
throw toException(cause);
}
return result;
}
Expand All @@ -180,8 +204,10 @@ private Object retryExecute() {
* Update metrics before method is called.
*/
private void updateMetricsBefore() {
if (introspector.hasRetry() && !firstInvocation) {
FaultToleranceMetrics.getCounter(method, FaultToleranceMetrics.RETRY_RETRIES_TOTAL).inc();
if (isFaultToleranceMetricsEnabled()) {
if (introspector.hasRetry() && invocationCount > 1) {
getCounter(method, RETRY_RETRIES_TOTAL).inc();
}
}
}

Expand All @@ -191,41 +217,51 @@ private void updateMetricsBefore() {
* @param cause Exception cause or {@code null} if execution successful.
*/
private void updateMetricsAfter(Throwable cause) {
// Global method counters
FaultToleranceMetrics.getCounter(method, cause == null ? FaultToleranceMetrics.INVOCATIONS_TOTAL
: FaultToleranceMetrics.INVOCATIONS_FAILED_TOTAL).inc();
if (!isFaultToleranceMetricsEnabled()) {
return;
}

// Retry counters
// Special logic for methods with retries
if (introspector.hasRetry()) {
final Retry retry = introspector.getRetry();
boolean firstInvocation = (invocationCount == 1);

if (cause == null) {
FaultToleranceMetrics.getCounter(method, firstInvocation
? FaultToleranceMetrics.RETRY_CALLS_SUCCEEDED_NOT_RETRIED_TOTAL
: FaultToleranceMetrics.RETRY_CALLS_SUCCEEDED_RETRIED_TOTAL).inc();
} else if (!firstInvocation) {
FaultToleranceMetrics.getCounter(method, FaultToleranceMetrics.RETRY_CALLS_FAILED_TOTAL).inc();
getCounter(method, INVOCATIONS_TOTAL).inc();
getCounter(method, firstInvocation
? RETRY_CALLS_SUCCEEDED_NOT_RETRIED_TOTAL
: RETRY_CALLS_SUCCEEDED_RETRIED_TOTAL).inc();
} else if (retry.maxRetries() == invocationCount - 1) {
tjquinno marked this conversation as resolved.
Show resolved Hide resolved
getCounter(method, RETRY_CALLS_FAILED_TOTAL).inc();
getCounter(method, INVOCATIONS_FAILED_TOTAL).inc();
getCounter(method, INVOCATIONS_TOTAL).inc();
}
} else {
// Global method counters
getCounter(method, INVOCATIONS_TOTAL).inc();
if (cause != null) {
getCounter(method, INVOCATIONS_FAILED_TOTAL).inc();
}
}

// Timeout
if (introspector.hasTimeout()) {
FaultToleranceMetrics.getHistogram(method, FaultToleranceMetrics.TIMEOUT_EXECUTION_DURATION)
getHistogram(method, TIMEOUT_EXECUTION_DURATION)
.update(command.getExecutionTime());
FaultToleranceMetrics.getCounter(method, cause instanceof TimeoutException
? FaultToleranceMetrics.TIMEOUT_CALLS_TIMED_OUT_TOTAL
: FaultToleranceMetrics.TIMEOUT_CALLS_NOT_TIMED_OUT_TOTAL).inc();
getCounter(method, cause instanceof TimeoutException
? TIMEOUT_CALLS_TIMED_OUT_TOTAL
: TIMEOUT_CALLS_NOT_TIMED_OUT_TOTAL).inc();
}

// Bulkhead
if (introspector.hasBulkhead()) {
FaultToleranceMetrics.getHistogram(method, FaultToleranceMetrics.BULKHEAD_EXECUTION_DURATION)
.update(command.getExecutionTime());
FaultToleranceMetrics.getCounter(method, cause instanceof RejectedExecutionException
? FaultToleranceMetrics.BULKHEAD_CALLS_REJECTED_TOTAL
: FaultToleranceMetrics.BULKHEAD_CALLS_ACCEPTED_TOTAL).inc();
boolean bulkheadRejection = (cause instanceof RejectedExecutionException);
if (!bulkheadRejection) {
getHistogram(method, BULKHEAD_EXECUTION_DURATION).update(command.getExecutionTime());
}
getCounter(method, bulkheadRejection ? BULKHEAD_CALLS_REJECTED_TOTAL
: BULKHEAD_CALLS_ACCEPTED_TOTAL).inc();
}

// Update firstInvocation flag
firstInvocation = false;
}

/**
Expand Down
Loading