-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4631][streaming][FIX] Wait for a receiver to start before publishing test data. #4270
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
According to the MQTT client docs the callback needs to be installed before the client connects or subscribes to a topic. Otherwise messages may be lost.
|
Can one of the admins verify this patch? |
|
Jenkins, this is okay to test. |
|
Can you add "[FIX]" to the title, so that it is clear that this fixes SPARK-631 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good change, but why this change for this fix? Was it necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it was not necessary, I just noticed that it wasn't needed, so I thought I should be a boy scout :)
|
Jenkins, this is ok to test. |
|
Test build #26366 has started for PR 4270 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defining this function inside the "if" looks a little weird, too nested. Might be better to just have it in for loop. Code would look less complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will do.
|
Test build #26366 has finished for PR 4270 at commit
|
|
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can modify the code like this.
for (i <- 0 to 100) {
try {
msgtopic.publish(message)
} catch {
case e: MqttException => Thread.sleep(1)
}
}I think tread sleep 1 second is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do that, but it won't retry the same message, instead if would just ignore a failure. In that case the total number of messages will be less than 101.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think waitForReceiverToStart() method is sufficient. Once the receiver is started, since the qos is set to 1 , as soon as the receiver receives the message, message will be flushed out from queue. If still problem exists the Thread.Sleep in the catch can solve the problem. There is no need for tryPublish (I guess)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll check that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please try this also.
Along with waitForReceiverToStart(), change the 'for loop' range from 0 to 9 . It is just for testing.
I am getting the error only if I change the MqttDefaultFilePersistence to memoryPersistence. I solved the problem by applying the solution provided above.
…g test data. This fixes two sources of non-deterministic failures in this test: - wait for a receiver to be up before pushing data through MQTT - gracefully handle the case where the MQTT client is overloaded. There’s a hard-coded limit of 10 in-flight messages, and this test may hit it. Instead of crashing, we retry sending the message. Both of these are needed to make the test pass reliably on my machine.
261653f to
f66c482
Compare
|
OK, I went for the amend (the way we do in our projects). This way the commit history is cleaner. |
|
Test build #26397 has started for PR 4270 at commit
|
|
Test build #26397 has finished for PR 4270 at commit
|
|
Test PASSed. |
|
@dragos you can just add a commit. It is easier to review and commits are squashed on merge anyway. |
|
@srowen if it's not too bad I'll do that the next time. I already amended the last commit, and the change is pretty small, it shouldn't be too hard to re-review. |
|
Merging this. Thanks for catching and fixing this. |
…ishing test data. This fixes two sources of non-deterministic failures in this test: - wait for a receiver to be up before pushing data through MQTT - gracefully handle the case where the MQTT client is overloaded. There’s a hard-coded limit of 10 in-flight messages, and this test may hit it. Instead of crashing, we retry sending the message. Both of these are needed to make the test pass reliably on my machine. Author: Iulian Dragos <jaguarul@gmail.com> Closes #4270 from dragos/issue/fix-flaky-test-SPARK-4631 and squashes the following commits: f66c482 [Iulian Dragos] [SPARK-4631][streaming] Wait for a receiver to start before publishing test data. d408a8e [Iulian Dragos] Install callback before connecting to MQTT broker. (cherry picked from commit e908322) Signed-off-by: Sean Owen <sowen@cloudera.com> # Conflicts: # external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
…ishing test data. This fixes two sources of non-deterministic failures in this test: - wait for a receiver to be up before pushing data through MQTT - gracefully handle the case where the MQTT client is overloaded. There’s a hard-coded limit of 10 in-flight messages, and this test may hit it. Instead of crashing, we retry sending the message. Both of these are needed to make the test pass reliably on my machine. Author: Iulian Dragos <jaguarul@gmail.com> Closes apache#4270 from dragos/issue/fix-flaky-test-SPARK-4631 and squashes the following commits: f66c482 [Iulian Dragos] [SPARK-4631][streaming] Wait for a receiver to start before publishing test data. d408a8e [Iulian Dragos] Install callback before connecting to MQTT broker. (cherry picked from commit e908322) Signed-off-by: Sean Owen <sowen@cloudera.com> # Conflicts: # external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
This fixes two sources of non-deterministic failures in this test:
a hard-coded limit of 10 in-flight messages, and this test may hit it.
Instead of crashing, we retry sending the message.
Both of these are needed to make the test pass reliably on my machine.