Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request log with timeouts #61

Merged
merged 3 commits into from
Dec 21, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 80 additions & 64 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,27 +416,20 @@ public final R execute() {
return getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited");
}

try {

if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// we want to run in a separate thread with timeout protection
R r = queueInThread().get();
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// we want to run in a separate thread with timeout protection
return queueInThread().get();
} else {
try {
R r = executeWithSemaphore();
return r;
} else {
try {
R r = executeWithSemaphore();
return r;
} catch (RuntimeException e) {
// count that we're throwing an exception and rethrow
metrics.markExceptionThrown();
throw e;
}
} catch (RuntimeException e) {
// count that we're throwing an exception and rethrow
metrics.markExceptionThrown();
throw e;
}

} finally {
// the total execution time for the user thread including queuing, thread scheduling, run() execution
metrics.addUserThreadExecutionTime(System.currentTimeMillis() - invocationStartTime.get());
}

} catch (Throwable e) {
if (e instanceof HystrixBadRequestException) {
throw (HystrixBadRequestException) e;
Expand All @@ -457,20 +450,7 @@ public final R execute() {
throw new HystrixRuntimeException(FailureType.COMMAND_EXCEPTION, this.getClass(), message, e, null);
}
} finally {
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (concurrencyStrategy instanceof HystrixConcurrencyStrategyDefault) {
// if we're using the default we support only optionally using a request context
if (HystrixRequestContext.isCurrentThreadInitialized()) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
} else {
// if it's a custom strategy it must ensure the context is initialized
if (HystrixRequestLog.getCurrentRequest(concurrencyStrategy) != null) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
}
}
recordExecutedCommand();
}
}

Expand All @@ -495,6 +475,9 @@ private R executeWithSemaphore() {
return response;
} finally {
executionSemaphore.release();

/* execution time on execution via semaphore */
recordTotalExecutionTime(invocationStartTime.get());
}
} else {
// mark on counter
Expand Down Expand Up @@ -557,20 +540,7 @@ public final Future<R> queue() {
throw e;
}
} finally {
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (concurrencyStrategy instanceof HystrixConcurrencyStrategyDefault) {
// if we're using the default we support only optionally using a request context
if (HystrixRequestContext.isCurrentThreadInitialized()) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
} else {
// if it's a custom strategy it must ensure the context is initialized
if (HystrixRequestLog.getCurrentRequest(concurrencyStrategy) != null) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
}
}
recordExecutedCommand();
}
}

Expand Down Expand Up @@ -655,6 +625,9 @@ public ExecutionResult getExecutionResult() {
executionCompleted.countDown();
// release the semaphore
executionSemaphore.release();

/* execution time on queue via semaphore */
recordTotalExecutionTime(invocationStartTime.get());
}
} else {
metrics.markSemaphoreRejection();
Expand All @@ -665,9 +638,6 @@ public ExecutionResult getExecutionResult() {
}

private Future<R> queueInThread() {
// we want to run as a thread
long startTime = System.currentTimeMillis();

// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
isExecutedInThread.set(true);

Expand All @@ -677,7 +647,7 @@ private Future<R> queueInThread() {
// a snapshot of time so the thread can measure how long it waited before executing
final long timeBeforeExecution = System.currentTimeMillis();
// wrap the synchronous execute() method in a Callable and execute in the threadpool
QueuedExecutionFuture future = new QueuedExecutionFuture(this, startTime, threadPool.getExecutor(), new HystrixContextCallable<R>(new Callable<R>() {
QueuedExecutionFuture future = new QueuedExecutionFuture(this, threadPool.getExecutor(), new HystrixContextCallable<R>(new Callable<R>() {

@Override
public R call() throws Exception {
Expand Down Expand Up @@ -806,14 +776,6 @@ private R executeCommand() {
executionResult = executionResult.setException(e);
return getFallbackOrThrowException(HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", e);
} finally {
/*
* we record the executionTime for command execution
* if the command is never executed (rejected, short-circuited, etc) then it will be left unset
* for this metric we include failures and successes as we use it for per-request profiling and debugging
* whereas 'metrics.addCommandExecutionTime(duration)' is used by stats across many requests
*/
executionResult = executionResult.setExecutionTime((int) (System.currentTimeMillis() - startTime));

// record that we're completed
isExecutionComplete.set(true);
}
Expand Down Expand Up @@ -850,6 +812,47 @@ private R getFallbackWithProtection() {
}
}

/**
* Record the duration of execution as response or exception is being returned to the caller.
*/
private void recordTotalExecutionTime(long startTime) {
long duration = System.currentTimeMillis() - startTime;
// the total execution time for the user thread including queuing, thread scheduling, run() execution
metrics.addUserThreadExecutionTime(duration);

/*
* We record the executionTime for command execution.
*
* If the command is never executed (rejected, short-circuited, etc) then it will be left unset.
*
* For this metric we include failures and successes as we use it for per-request profiling and debugging
* whereas 'metrics.addCommandExecutionTime(duration)' is used by stats across many requests.
*/
executionResult = executionResult.setExecutionTime((int) duration);
}

/**
* Record that this command was executed in the HystrixRequestLog.
* <p>
* This can be treated as an async operation as it just adds a references to "this" in the log even if the current command is still executing.
*/
private void recordExecutedCommand() {
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (concurrencyStrategy instanceof HystrixConcurrencyStrategyDefault) {
// if we're using the default we support only optionally using a request context
if (HystrixRequestContext.isCurrentThreadInitialized()) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
} else {
// if it's a custom strategy it must ensure the context is initialized
if (HystrixRequestLog.getCurrentRequest(concurrencyStrategy) != null) {
HystrixRequestLog.getCurrentRequest(concurrencyStrategy).addExecutedCommand(this);
}
}
}
}

/**
* Whether the 'circuit-breaker' is open meaning that <code>execute()</code> will immediately return
* the <code>getFallback()</code> response and not attempt a HystrixCommand execution.
Expand Down Expand Up @@ -1240,7 +1243,6 @@ private class QueuedExecutionFuture implements CommandFuture<R> {
private final ThreadPoolExecutor executor;
private final Callable<R> callable;
private final HystrixCommand<R> command;
private final long startTime;
private final CountDownLatch actualResponseReceived = new CountDownLatch(1);
private final AtomicBoolean actualFutureExecuted = new AtomicBoolean(false);
private volatile R result; // the result of the get()
Expand All @@ -1249,9 +1251,8 @@ private class QueuedExecutionFuture implements CommandFuture<R> {
private final CountDownLatch futureStarted = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false);

public QueuedExecutionFuture(HystrixCommand<R> command, long startTime, ThreadPoolExecutor executor, Callable<R> callable) {
public QueuedExecutionFuture(HystrixCommand<R> command, ThreadPoolExecutor executor, Callable<R> callable) {
this.command = command;
this.startTime = startTime;
this.executor = executor;
this.callable = callable;
}
Expand Down Expand Up @@ -1374,7 +1375,7 @@ private void performActualGet() throws CancellationException, InterruptedExcepti
// mark this command as timed-out so the run() when it completes can ignore it
if (isCommandTimedOut.compareAndSet(false, true)) {
// report timeout failure (or skip this if the compareAndSet failed as that means a thread-race occurred with the execution as the object lived in the queue too long)
metrics.markTimeout(System.currentTimeMillis() - startTime);
metrics.markTimeout(System.currentTimeMillis() - invocationStartTime.get());
}

try {
Expand All @@ -1388,6 +1389,9 @@ private void performActualGet() throws CancellationException, InterruptedExcepti
} finally {
// mark that we are done and other threads can proceed
actualResponseReceived.countDown();

/* execution time on threaded execution */
recordTotalExecutionTime(invocationStartTime.get());
}
}

Expand Down Expand Up @@ -2356,7 +2360,7 @@ public void testExecutionTimeoutWithNoFallback() {
command.execute();
fail("we shouldn't get here");
} catch (Exception e) {
e.printStackTrace();
// e.printStackTrace();
if (e instanceof HystrixRuntimeException) {
HystrixRuntimeException de = (HystrixRuntimeException) e;
assertNotNull(de.getFallbackException());
Expand All @@ -2368,6 +2372,9 @@ public void testExecutionTimeoutWithNoFallback() {
fail("the exception should be HystrixRuntimeException");
}
}
// the time should be 50+ since we timeout at 50ms
assertTrue("Execution Time is: " + command.getExecutionTimeInMilliseconds(), command.getExecutionTimeInMilliseconds() >= 50);

assertTrue(command.isResponseTimedOut());
assertFalse(command.isResponseFromFallback());
assertFalse(command.isResponseRejected());
Expand Down Expand Up @@ -2397,6 +2404,8 @@ public void testExecutionTimeoutWithFallback() {
TestHystrixCommand<Boolean> command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_SUCCESS);
try {
assertEquals(false, command.execute());
// the time should be 50+ since we timeout at 50ms
assertTrue("Execution Time is: " + command.getExecutionTimeInMilliseconds(), command.getExecutionTimeInMilliseconds() >= 50);
assertTrue(command.isResponseTimedOut());
assertTrue(command.isResponseFromFallback());
} catch (Exception e) {
Expand Down Expand Up @@ -2442,7 +2451,8 @@ public void testExecutionTimeoutFallbackFailure() {
fail("the exception should be HystrixRuntimeException");
}
}

// the time should be 50+ since we timeout at 50ms
assertTrue("Execution Time is: " + command.getExecutionTimeInMilliseconds(), command.getExecutionTimeInMilliseconds() >= 50);
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS));
assertEquals(1, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE));
Expand Down Expand Up @@ -4404,6 +4414,12 @@ protected Boolean run() {
Thread.sleep(timeout * 10);
} catch (InterruptedException e) {
e.printStackTrace();
// ignore and sleep some more to simulate a dependency that doesn't obey interrupts
try {
Thread.sleep(timeout * 2);
} catch (Exception e2) {
// ignore
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public static Collection<HystrixCommandMetrics> getInstances() {
this.group = commandGroup;
this.properties = properties;
this.counter = new HystrixRollingNumber(properties.metricsRollingStatisticalWindowInMilliseconds(), properties.metricsRollingStatisticalWindowBuckets());
this.percentileExecution = new HystrixRollingPercentile(properties.metricsRollingPercentileWindow(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize());
this.percentileTotal = new HystrixRollingPercentile(properties.metricsRollingPercentileWindow(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize());
this.percentileExecution = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize());
this.percentileTotal = new HystrixRollingPercentile(properties.metricsRollingPercentileWindowInMilliseconds(), properties.metricsRollingPercentileWindowBuckets(), properties.metricsRollingPercentileBucketSize());
this.eventNotifier = eventNotifier;
}

Expand Down