Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.examples.streaming

import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
Expand All @@ -31,8 +31,6 @@ import org.apache.spark.SparkConf
*/
object MQTTPublisher {

var client: MqttClient = _

def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
Expand All @@ -42,25 +40,36 @@ object MQTTPublisher {
StreamingExamples.setStreamingLogLevels()

val Seq(brokerUrl, topic) = args.toSeq

var client: MqttClient = null

try {
var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
val persistence = new MemoryPersistence()
client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)

client.connect()

val msgtopic = client.getTopic(topic)
val msgContent = "hello mqtt demo for spark streaming"
val message = new MqttMessage(msgContent.getBytes("utf-8"))

while (true) {
try {
msgtopic.publish(message)
println(s"Published data. topic: {msgtopic.getName()}; Message: {message}")
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any risk here that it will just keep erroring forever? should there be a max number of failures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an example, and it's supposed to run forever, people have to kill the process anyway (with or without errors).
or
should edit this for checking max number of failures ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, at least log a warning. Or else this could silently spin forever as long as the queue is full. Yes it's an example but still seems funny.

Also, really minor but the org.apache.spark.streaming.StreamingContext import you moved in MQTTUtils is now out of order.

Thread.sleep(10)
println("Queue is full, wait for to consume data from the message queue")
}
}
} catch {
case e: MqttException => println("Exception Caught: " + e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the point in swallowing this exception. Why not let it percolate, it will anyway print the stacktrace and stop the thread?

} finally {
if (client != null) {
client.disconnect()
}
}

client.connect()

val msgtopic: MqttTopic = client.getTopic(topic)
val msg: String = "hello mqtt demo for spark streaming"

while (true) {
val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8"))
msgtopic.publish(message)
println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
}
client.disconnect()
}
}

Expand Down Expand Up @@ -96,9 +105,9 @@ object MQTTWordCount {
val sparkConf = new SparkConf().setAppName("MQTTWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)

val words = lines.flatMap(x => x.toString.split(" "))
val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()
ssc.start()
ssc.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@

package org.apache.spark.streaming.mqtt

import java.io.IOException
import java.util.concurrent.Executors
import java.util.Properties

import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import java.util.Properties
import java.util.concurrent.Executors
import java.io.IOException

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallback
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually which is the right import style. The below or existing ?

import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttCallback, MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the multiple import syntax is often used, especially where it's definitely shorter, but, I would not change it just to change it.


import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -88,18 +88,18 @@ class MQTTReceiver(
client.subscribe(topic)

// Callback automatically triggers as and when new message arrives on specified topic
val callback: MqttCallback = new MqttCallback() {
val callback = new MqttCallback() {

// Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
store(new String(arg1.getPayload(),"utf-8"))
override def messageArrived(topic: String, message: MqttMessage) {
store(new String(message.getPayload(),"utf-8"))
}

override def deliveryComplete(arg0: IMqttDeliveryToken) {
override def deliveryComplete(token: IMqttDeliveryToken) {
}

override def connectionLost(arg0: Throwable) {
restart("Connection lost ", arg0)
override def connectionLost(cause: Throwable) {
restart("Connection lost ", cause)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.streaming.mqtt

import scala.reflect.ClassTag

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}

object MQTTUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import org.apache.spark.util.Utils
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {

private val batchDuration = Milliseconds(500)
private val master: String = "local[2]"
private val framework: String = this.getClass.getSimpleName
private val master = "local[2]"
private val framework = this.getClass.getSimpleName
private val freePort = findFreePort()
private val brokerUri = "//localhost:" + freePort
private val topic = "def"
Expand All @@ -65,7 +65,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {

test("mqtt input stream") {
val sendMessage = "MQTT demo for spark streaming"
val receiveStream: ReceiverInputDStream[String] =
val receiveStream =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
Expand Down Expand Up @@ -113,12 +113,12 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
def publishData(data: String): Unit = {
var client: MqttClient = null
try {
val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
client.connect()
if (client.isConnected) {
val msgTopic: MqttTopic = client.getTopic(topic)
val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
val msgTopic = client.getTopic(topic)
val message = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)
for (i <- 0 to 100) {
Expand Down