Skip to content

Conversation

@srdo
Copy link
Contributor

@srdo srdo commented Dec 9, 2016

See https://issues.apache.org/jira/browse/STORM-2239.

This basically just ensures that if the Kafka consumer throws InterruptException, we catch it and interrupt the current thread before returning to Storm.

I realize that this won't be an issue until the next Kafka release, but it doesn't hurt to get it fixed now.

@hmcl
Copy link
Contributor

hmcl commented Dec 12, 2016

@srdo what is Thread.currentThread().interrupt(); trying to accomplish?

@srdo
Copy link
Contributor Author

srdo commented Dec 12, 2016

When Storm is trying to shut down, it interrupts the executor threads. This should cause the async loop (https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L2177) to stop running, since an InterruptedException will be thrown from either within the spout, or from the sleep statement in the loop. The loop catches and handles InterruptedException so we don't crash the worker while shutting down. The Kafka consumer doesn't throw InterruptedException when interrupted, but instead a Kafka specific unchecked InterruptException. If we don't catch and convert it to something Storm understands, the spout will cause the worker to crash when shutting down. This is very inconvenient for local mode clusters, since that means the VM running the cluster gets killed. The ways to convert the exception are either to throw a new (wrapped) InterruptedException out of the spout, or to reinterrupt the running thread.

I can replace the interrupt with throw new RuntimeException(new InterruptedException()) if you like?

@hmcl
Copy link
Contributor

hmcl commented Dec 12, 2016

@srdo Kafka's InterruptException is a RuntimeException, otherwise the code wouldn't even compile without the Exception either being caught or declared in the method signature. I think that the code as is will simply throw a RuntimeException and shutdown the JVM.

We don't really do this type of check elsewhere in the codebase, and in particular in the old spout. My opinion is that we don't need this check which will make the code a bit more complex to little gain.

@srdo
Copy link
Contributor Author

srdo commented Dec 12, 2016

@hmcl The code as is will work fine in distributed mode. The problem is when you're using the spout in an integration test, or are running the cluster in local mode for some other reason.

When you're running an integration test in local mode, exceptions thrown by the executors are propagated to the testing code. If we don't catch and handle InterruptException the testing code has to handle this exception itself during shutdown. As far as I can tell (not sure though), the local mode code also stops shutting down executor threads once one of them throws an exception, so throwing exceptions from the spout might prevent other executor threads from being shut down. This is less of an issue if it only happens when a real error occurs, but it's not great if it happens randomly when this spout is shut down. Either way, there's no reason we shouldn't handle InterruptException internally in the spout instead of showing it to users.

We didn't do it in the old spout because the old spout would be throwing InterruptedException (the built-in Java exception), and not InterruptException (the Kafka-specific exception that Storm doesn't know about). So when the old spout shut down, any InterruptedException would be caught by the check here https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L2181, which would let the cluster shut down cleanly.

About the last line in my previous comment: The exception being wrapped is a new Java InterruptedException, not a Kafka InterruptException. The point is to convert the InterruptException, which is semantically identical to Java's InterruptedException, into a form so Storm will treat it like a normal InterruptedException (which it essentially is).

@srdo
Copy link
Contributor Author

srdo commented Dec 12, 2016

I went ahead and replaced the interrupts with throw new RuntimeException(new InterruptedException());, that way the executor stops immediately instead of waiting for next check in the async loop.

@hmcl
Copy link
Contributor

hmcl commented Dec 12, 2016

@srdo I think I understand what you mean. However, I think that the cleanest way to do this is to catch kafka's InterruptException, and call the Spout's close() method. If it is an error from which it cannot recover, it should close itself. This exception propagation is very cryptic, not only to say the exception names that overlap in all but one 'e'.

Also please put a comment on the exception block explaining why you need to handle this exception like this.

@srdo
Copy link
Contributor Author

srdo commented Dec 12, 2016

@hmcl I'd be happy to add comments explaining the propagation. Calling close() isn't enough. When the Kafka exception is thrown, the thread interrupt state is cleared (same behavior as the Java exception). If we just call close(), the executor thread keeps running because it's no longer interrupted (though it would probably crash on the next call to nextTuple since the consumer was closed). We have to either throw a wrapped InterruptedException, or reinterrupt the thread.

@hmcl
Copy link
Contributor

hmcl commented Dec 13, 2016

+1

@ptgoetz
Copy link
Member

ptgoetz commented Dec 15, 2016

One minor nit: It would be helpful to construct the exceptions with a message explaining what happened.

Other than that I'm +1.

@srdo
Copy link
Contributor Author

srdo commented Dec 15, 2016

@ptgoetz Sure, I'll add that.

//Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt.
throw new RuntimeException(new InterruptedException());
throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
}
Copy link
Contributor

@hmcl hmcl Dec 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srdo Can you please create a constant for all of these fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hmcl Better?

@ptgoetz
Copy link
Member

ptgoetz commented Dec 15, 2016

+1

1 similar comment
@HeartSaVioR
Copy link
Contributor

+1

asfgit pushed a commit that referenced this pull request Dec 20, 2016
* Add message to InterruptedExceptions thrown when Kafka consumer is interrupted
* Closes #1821
@asfgit asfgit closed this in 258b0e1 Dec 20, 2016
ptgoetz pushed a commit to ptgoetz/storm that referenced this pull request Jul 11, 2017
* Add message to InterruptedExceptions thrown when Kafka consumer is interrupted
* Closes apache#1821
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants