Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ class MQTTInputDStream(
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) with Logging {
) extends ReceiverInputDStream[String](ssc_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change, but why this change for this fix? Was it necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it was not necessary, I just noticed that it wasn't needed, so I thought I should be a boy scout :)


def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}
}

private[streaming]
private[streaming]
class MQTTReceiver(
brokerUrl: String,
topic: String,
Expand All @@ -72,21 +72,15 @@ class MQTTReceiver(
def onStop() {

}

def onStart() {

// Set up persistence for messages
// Set up persistence for messages
val persistence = new MemoryPersistence()

// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)

// Connect to MqttBroker
client.connect()

// Subscribe to Mqtt topic
client.subscribe(topic)

// Callback automatically triggers as and when new message arrives on specified topic
val callback: MqttCallback = new MqttCallback() {

Expand All @@ -103,7 +97,15 @@ class MQTTReceiver(
}
}

// Set up callback for MqttClient
// Set up callback for MqttClient. This needs to happen before
// connecting or subscribing, otherwise messages may be lost
client.setCallback(callback)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this modification.


// Connect to MqttBroker
client.connect()

// Subscribe to Mqtt topic
client.subscribe(topic)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.streaming.mqtt

import java.net.{URI, ServerSocket}
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.duration._
import scala.language.postfixOps
Expand All @@ -32,6 +34,8 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -67,14 +71,19 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val sendMessage = "MQTT demo for spark streaming"
val receiveStream: ReceiverInputDStream[String] =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
var receiveMessage: List[String] = List()
@volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
if (rdd.collect.length > 0) {
receiveMessage = receiveMessage ::: List(rdd.first)
receiveMessage
}
}
ssc.start()

// wait for the receiver to start before publishing data, or we risk failing
// the test nondeterministically. See SPARK-4631
waitForReceiverToStart()

publishData(sendMessage)
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sendMessage.equals(receiveMessage(0)))
Expand Down Expand Up @@ -121,8 +130,14 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)
for (i <- 0 to 100) {
msgTopic.publish(message)

for (i <- 0 to 10) {
try {
msgTopic.publish(message)
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Thread.sleep(50) // wait for Spark streaming to consume something from the message queue
}
}
}
} finally {
Expand All @@ -131,4 +146,18 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
client = null
}
}

/**
* Block until at least one receiver has started or timeout occurs.
*/
private def waitForReceiverToStart() = {
val latch = new CountDownLatch(1)
ssc.addStreamingListener(new StreamingListener {
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
latch.countDown()
}
})

assert(latch.await(10, TimeUnit.SECONDS), "Timeout waiting for receiver to start.")
}
}