Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.IllegalStateException: Observable.toFuture() only supports sequences with a single value. #188

Closed
benjchristensen opened this issue Oct 1, 2013 · 4 comments

Comments

@benjchristensen
Copy link
Contributor

This was originally posted at: https://groups.google.com/forum/#!topic/hystrixoss/NsLRLwLEcFw Moving here as it is a bug.


From chris.mathews:

I wrote a small test for some code that propagates ThreadLocal context information to the executing Hystrix threads for my project. After convincing myself that things were working as designed I committed these changes. Over the next couple days, I noticed that the CI builds would fail occasionally with a strange (to me at least) exception: [java.lang.IllegalStateException: Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected.]

I have been unable to reproduce this issue anyplace other than the CI server... and even there it is intermittent. To try to narrow down the scope I created a very basic test to execute the simplest of all HystrixCommands. No issues locally but still intermittent issues on CI.

Full Stacktrace:
Running xxx.SimpleHystrixCommandTest
2013-09-21 10:04:52,589 [pool-1-thread-1] [] [] [] [] WARN - No URLs will be polled as dynamic configuration sources.
2013-09-21 10:04:52,778 [pool-1-thread-1] [] [] [] [] INFO - To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
2013-09-21 10:04:52,781 [pool-1-thread-1] [] [] [] [] INFO - DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@dd41677
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.283 sec <<< FAILURE! - in xxx.SimpleHystrixCommandTest
testSimpleHystrixCommand(xxx.SimpleHystrixCommandTest) Time elapsed: 3.281 sec <<< ERROR!
java.util.concurrent.ExecutionException: com.netflix.hystrix.exception.HystrixRuntimeException: SimpleHystrixCommand failed while executing.
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at xxx.SimpleHystrixCommandTest.testSimpleHystrixCommand(SimpleHystrixCommandTest.java:55)
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: SimpleHystrixCommand failed while executing.
at com.netflix.hystrix.HystrixCommand.decomposeException(HystrixCommand.java:670)
at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:424)
at xxx.SimpleHystrixCommandTest$1.call(SimpleHystrixCommandTest.java:49)
at xxx.SimpleHystrixCommandTest$1.call(SimpleHystrixCommandTest.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.util.concurrent.ExecutionException: Observable onError
at rx.operators.OperationToFuture$2.getValue(OperationToFuture.java:129)
at rx.operators.OperationToFuture$2.get(OperationToFuture.java:115)
at com.netflix.hystrix.HystrixCommand$1.performBlockingGetWithTimeout(HystrixCommand.java:630)
at com.netflix.hystrix.HystrixCommand$1.get(HystrixCommand.java:511)
at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:422)
at xxx.SimpleHystrixCommandTest$1.call(SimpleHystrixCommandTest.java:49)
at xxx.SimpleHystrixCommandTest$1.call(SimpleHystrixCommandTest.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.IllegalStateException: Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected.
at rx.operators.OperationToFuture$1.onNext(OperationToFuture.java:78)
at rx.subjects.ReplaySubject.onNext(ReplaySubject.java:178)
at rx.operators.SafeObserver.onNext(SafeObserver.java:121)
at rx.operators.OperationFinally$Finally$FinallyObserver.onNext(OperationFinally.java:104)
at rx.operators.OperationOnErrorResumeNextViaFunction$OnErrorResumeNextViaFunction$1.onNext(OperationOnErrorResumeNextViaFunction.java:79)
at com.netflix.hystrix.HystrixCommand$TimeoutObservable$1$1.tick(HystrixCommand.java:1014)
at com.netflix.hystrix.HystrixCommand$1.performBlockingGetWithTimeout(HystrixCommand.java:616)
at com.netflix.hystrix.HystrixCommand$1.get(HystrixCommand.java:511)
at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:422)
at xxx.SimpleHystrixCommandTest$1.call(SimpleHystrixCommandTest.java:49)
at xxx.SimpleHystrixCommandTest$1.call(SimpleHystrixCommandTest.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)

Test Case:
public class SimpleHystrixCommandTest {
private static final Logger logger = LoggerFactory.getLogger(SimpleHystrixCommandTest.class);

@Test
public void testSimpleHystrixCommand() throws Exception {
    final AtomicInteger successCounter = new AtomicInteger(0);
    final AtomicInteger fallbackCounter = new AtomicInteger(0);
    final int executorThreadPoolSize = 10;
    final int executionCount = 100000;
    ExecutorService executor = null;
    try {
        executor = Executors.newFixedThreadPool(executorThreadPoolSize);

        final List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
        for (int i = 0; i < executionCount; i++) {
            futures.add(executor.submit(new Callable<Boolean>() {

                @Override
                public Boolean call() throws Exception {
                    return new SimpleHystrixCommand(Boolean.TRUE).execute();
                }

            }));
        }
        for (final Future<Boolean> future : futures) {
            final Boolean response = future.get();
            assertNotNull(response);
            if (response) {
                successCounter.incrementAndGet();
            } else {
                fallbackCounter.incrementAndGet();
            }
        }
    } finally {
        if (executor != null) {
            executor.shutdown();
        }
    }
    logger.info("Success Count [{}]", successCounter.get());
    logger.info("Fallback Count [{}]", fallbackCounter.get());
}

private static class SimpleHystrixCommand extends HystrixCommand<Boolean> {
    private final Boolean response;

    public SimpleHystrixCommand(final Boolean response) {
        super(HystrixCommandGroupKey.Factory.asKey("SimpleHystrixCommand"));
        this.response = response;
    }

    @Override
    protected Boolean run() throws Exception {
        return response;
    }

    @Override
    protected Boolean getFallback() {
        return Boolean.FALSE;
    }
}

}

Library versions:

  • hystrix 1.3.5
  • rxjava 0.13.4

Recap:

  • Intermittent problem
  • Not able to reproduce locally (Windows 7 / Oracle JDK 1.6.0_43)
  • Only occurs on CI server (RHEL 5.8 / Oracle JDK 1.6.0_43)

I am slowly beginning to digest the Hystrix codebase but I am far away from grokking it and haven't even opened the source for RxJava. Can anyone explain what might cause this exception, if I am doing something stupid and if it is expected behavior?

@benjchristensen
Copy link
Contributor Author

My original response:


This is an interesting one, apparently from a race condition that I'm not handling if I read the code and stack traces correctly.

This could happen if run() executes, then the timeout triggers and run() keeps executing (ignores or races the interrupt) and getFallback() also emits a response. In this case 2 responses would have been returned which would trigger this error.

Thus I need to dig in on where I have not correctly synchronized the response propagation when a timeout occurs. It should be a simple compareAndSet I am missing between run() and getFallback() responses.

My schedule over the next several days will not easily allow me to work on this, but I will as soon as I can as this is definitely a bug that can affect production.

Thank you Chris for the great report.

@benjchristensen
Copy link
Contributor Author

I have not been able to replicate this issue but see where the race condition could happen.

I'd appreciate another set of eyes reviewing my change (#189) if possible.

The fix uses compareAndSet to control code flows for the 3 states of NOT_EXECUTED (the run method has not started yet), TIMED_OUT (the timeout happened and won the race to do fallback logic) and COMPLETED (the run method returned and won the race to emit the response, success or failure).

The same logic is used to protect against a race-condition for onError if a timeout occurs.

@chrismathews
Copy link

I was able to recreate the issue on 1.3.6 (I originally hit it on 1.3.5). I pulled down the branch with your changes and was not able to recreate the issue after numerous attempts. So I would say this one is fixed. Thanks Ben!

@benjchristensen
Copy link
Contributor Author

Great, thank you for testing that. I am releasing a new version now.

neerajrj pushed a commit to neerajrj/Hystrix that referenced this issue Nov 2, 2013
This uses compareAndSet to synchronize timeout vs run() completion logic when there is a race.

I have not been able to to replicate the issue reported at Netflix#188 but I can see how the existing code is vulnerable to a race condition. This theoretically should fix it. All unit tests pass for existing functionality. All my attempts to create a unit test to reveal the issue failed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants