Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hystrix 1.4 - Async/Non-Blocking #218

Merged
merged 39 commits into from
Mar 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1eb528e
Hystrix non blocking command, a copy paste of hystrix command with
neerajrj Nov 13, 2013
d31dc2d
fixed test cases
neerajrj Nov 15, 2013
a0fc403
add comments and copyrights
neerajrj Nov 15, 2013
a3c6d8a
minor formatting
neerajrj Jan 14, 2014
f3c5bf0
move setter down to subclasses for backwards compatibility
neerajrj Jan 17, 2014
146898b
Organize Imports
benjchristensen Feb 5, 2014
9f8c742
Upgrade to RxJava 0.17 - work in progress, tests failing
benjchristensen Feb 7, 2014
545ee76
Upgrade HystrixCommand to RxJava 0.17
benjchristensen Feb 12, 2014
56527c4
Upgrade HystrixNonBlockingCommand to RxJava 0.17
benjchristensen Feb 12, 2014
7f031e5
Moved Unit Tests
benjchristensen Feb 12, 2014
b2fdc25
Opened Access for Unit Tests
benjchristensen Feb 12, 2014
09f2c23
Upgrade Deprecated RxJava Usage
benjchristensen Feb 15, 2014
98ee491
Fixed Unit Test: Subscription Behavior Changed
benjchristensen Feb 17, 2014
e4759d4
Fixed Unit Test: Class Names
benjchristensen Feb 17, 2014
1e536ef
Make UnitTests use NonBlocking Observables
benjchristensen Feb 17, 2014
a296d80
rx.util.functions -> rx.functions
benjchristensen Feb 19, 2014
3ec6e4b
Format Code
benjchristensen Feb 21, 2014
b4c3363
RxJava 0.17.0-RC3
benjchristensen Feb 21, 2014
706a0f2
HystrixObservableCommand
benjchristensen Feb 21, 2014
5aeab65
RxJava 0.17.0-RC4
benjchristensen Feb 21, 2014
a8ea50f
Remove debug println
benjchristensen Feb 21, 2014
bfaba6f
And another debug println ...
benjchristensen Feb 21, 2014
9a5adde
TimeoutObservable -> HystrixObservableTimeoutOperator
benjchristensen Feb 23, 2014
15898cf
HystrixObservableCommand Fixes
benjchristensen Feb 25, 2014
8bca0cb
Fix Thread and Semaphore Start/Stop Logic
benjchristensen Feb 25, 2014
1ada23e
RxJava 0.17.0-RC5 -> error handling fixes
benjchristensen Feb 25, 2014
42eb8bd
Timeout Thread and RequestContext
benjchristensen Feb 25, 2014
ad06ecd
Upgrade core_test.clj to RxJava 0.17
benjchristensen Feb 25, 2014
c59ed29
RxJava 0.17.0-RC6
benjchristensen Feb 25, 2014
ac9768d
Fix isExecutedInThread
benjchristensen Feb 25, 2014
94c708f
RequestContext and Thread Life-cycle Fixes and Tests
benjchristensen Feb 26, 2014
e0826d3
HystrixCommand now uses HystrixObservableCommand
benjchristensen Feb 27, 2014
d2bf17f
HystrixCommand composes HystrixObservableCommand
benjchristensen Feb 27, 2014
460aa8e
Organize Imports
benjchristensen Feb 27, 2014
beb1fd6
Remove stray debug println
benjchristensen Feb 28, 2014
999e7d3
Revert HystrixExecutable change
benjchristensen Feb 28, 2014
fc99fce
BugFix: Plugin WrapCallable of RequestContext in Timeout
benjchristensen Mar 5, 2014
b97165e
Merge branch 'hystrix-1.4-non-blocking' into hystrix-1-4-non-blocking…
benjchristensen Mar 10, 2014
005a344
HystrixCollapser onComplete fix
benjchristensen Mar 11, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@
(wait-for-observable o)))))
(testing "observes command with a Scheduler"
(let [o (observe-later-on (normalize base-def)
(rx.concurrency.Schedulers/newThread)
(rx.schedulers.Schedulers/newThread)
75 19 23)]
(is (instance? rx.Observable o))
(is (= (+ 75 19 23)
Expand Down Expand Up @@ -317,7 +317,7 @@
(is (= 103 (wait-for-observable (observe #'my-fn-command 90 13))))
(is (= 105 (wait-for-observable (observe-later #'my-fn-command 91 14))))
(is (= 107 (wait-for-observable (observe-later-on #'my-fn-command
(rx.concurrency.Schedulers/newThread)
(rx.schedulers.Schedulers/newThread)
92 15)))))))

(defcollapser my-collapser
Expand Down
4 changes: 2 additions & 2 deletions hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ apply plugin: 'idea'

dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
compile 'com.netflix.rxjava:rxjava-core:0.16.1'
compile 'com.netflix.rxjava:rxjava-core:0.17.0'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
provided 'junit:junit-dep:4.10'
testCompile 'junit:junit-dep:4.10'
}

javadoc {
Expand Down
190 changes: 22 additions & 168 deletions hystrix-core/src/main/java/com/netflix/hystrix/Hystrix.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package com.netflix.hystrix;

import static org.junit.Assert.*;

import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.hystrix.HystrixCommand.Setter;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import rx.functions.Action0;

/**
* Lifecycle management of Hystrix.
Expand Down Expand Up @@ -85,18 +81,35 @@ public static HystrixCommandKey getCurrentThreadExecutingCommand() {
return currentCommand.get().peek();
}

/* package */static void startCurrentThreadExecutingCommand(HystrixCommandKey key) {
/**
*
* @return Action0 to perform the same work as `endCurrentThreadExecutingCommand()` but can be done from any thread
*/
/* package */static Action0 startCurrentThreadExecutingCommand(HystrixCommandKey key) {
final LinkedList<HystrixCommandKey> list = currentCommand.get();
try {
currentCommand.get().push(key);
list.push(key);
} catch (Exception e) {
logger.warn("Unable to record command starting", e);
}
return new Action0() {

@Override
public void call() {
endCurrentThreadExecutingCommand(list);
}

};
}

/* package */static void endCurrentThreadExecutingCommand() {
endCurrentThreadExecutingCommand(currentCommand.get());
}

private static void endCurrentThreadExecutingCommand(LinkedList<HystrixCommandKey> list) {
try {
if (!currentCommand.get().isEmpty()) {
currentCommand.get().pop();
if (!list.isEmpty()) {
list.pop();
}
} catch (NoSuchElementException e) {
// this shouldn't be possible since we check for empty above and this is thread-isolated
Expand All @@ -106,163 +119,4 @@ public static HystrixCommandKey getCurrentThreadExecutingCommand() {
}
}

public static class UnitTest {
@Test
public void testNotInThread() {
assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testInsideHystrixThread() {

assertNull(getCurrentThreadExecutingCommand());

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CommandName"))) {

@Override
protected Boolean run() {
assertEquals("CommandName", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

assertTrue(command.execute());
assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testInsideNestedHystrixThread() {

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("OuterCommand"))) {

@Override
protected Boolean run() {

assertEquals("OuterCommand", getCurrentThreadExecutingCommand().name());

if (getCurrentThreadExecutingCommand() == null) {
throw new RuntimeException("BEFORE expected it to run inside a thread");
}

HystrixCommand<Boolean> command2 = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("InnerCommand"))) {

@Override
protected Boolean run() {
assertEquals("InnerCommand", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

if (getCurrentThreadExecutingCommand() == null) {
throw new RuntimeException("AFTER expected it to run inside a thread");
}

return command2.execute();
}

};

assertTrue(command.execute());

assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testInsideHystrixSemaphoreExecute() {

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreIsolatedCommandName"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))) {

@Override
protected Boolean run() {
assertEquals("SemaphoreIsolatedCommandName", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

// it should be true for semaphore isolation as well
assertTrue(command.execute());
// and then be null again once done
assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testInsideHystrixSemaphoreQueue() throws Exception {

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreIsolatedCommandName"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))) {

@Override
protected Boolean run() {
assertEquals("SemaphoreIsolatedCommandName", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

// it should be true for semaphore isolation as well
assertTrue(command.queue().get());
// and then be null again once done
assertNull(getCurrentThreadExecutingCommand());
}

@Test
public void testThreadNestedInsideHystrixSemaphore() {

HystrixCommand<Boolean> command = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("OuterSemaphoreCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))) {

@Override
protected Boolean run() {

assertEquals("OuterSemaphoreCommand", getCurrentThreadExecutingCommand().name());

if (getCurrentThreadExecutingCommand() == null) {
throw new RuntimeException("BEFORE expected it to run inside a semaphore");
}

HystrixCommand<Boolean> command2 = new HystrixCommand<Boolean>(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestUtil"))
.andCommandKey(HystrixCommandKey.Factory.asKey("InnerCommand"))) {

@Override
protected Boolean run() {
assertEquals("InnerCommand", getCurrentThreadExecutingCommand().name());

return getCurrentThreadExecutingCommand() != null;
}

};

if (getCurrentThreadExecutingCommand() == null) {
throw new RuntimeException("AFTER expected it to run inside a semaphore");
}

return command2.execute();
}

};

assertTrue(command.execute());

assertNull(getCurrentThreadExecutingCommand());
}
}
}
Loading