From 1d0b5cb47433ff9781e9388049d7827fd338187b Mon Sep 17 00:00:00 2001 From: core2048 <43812079+core2048@users.noreply.github.com> Date: Tue, 4 Dec 2018 13:20:32 +0200 Subject: [PATCH 1/7] Delete pom.xml --- pom.xml | 275 -------------------------------------------------------- 1 file changed, 275 deletions(-) delete mode 100644 pom.xml diff --git a/pom.xml b/pom.xml deleted file mode 100644 index fe57109..0000000 --- a/pom.xml +++ /dev/null @@ -1,275 +0,0 @@ - - - - - 4.0.0 - - - com.stratio - parent - 0.9.0 - - - com.stratio.receiver - spark-rabbitmq - 0.6.0-SNAPSHOT - pom - - - https://github.com/Stratio/RabbitMQ-Receiver - Spark Streaming RabbitMQ Receiver - - - RabbitMQ-Receiver is a library that allows the user to read data with Apache Spark from RabbitMQ. - - - - https://github.com/Stratio/RabbitMQ-Receiver.git - scm:git:git://github.com/Stratio/RabbitMQ-Receiver.git - scm:git:git@github.com:Stratio/RabbitMQ-Receiver.git - HEAD - - - - - jcgarcia - Jose Carlos Garcia - jcgarcia@stratio.com - - developer - - - - gschiavon - Germán Schiavón - gschiavon@stratio.com - - developer - - - - dcarroza - Daniel Carroza Santana - dcarroza@stratio.com - - developer - - - - - - UTF-8 - UTF-8 - 2.11.8 - 2.11 - 3.6.6 - 2.2.0 - 2.5.3 - 2.8.2 - 2.2.2 - 4.8.1 - 1.11.3 - - - - - org.apache.spark - spark-core_${scala.binary.version} - ${spark.version} - provided - - - org.apache.spark - spark-streaming_${scala.binary.version} - ${spark.version} - provided - - - org.apache.spark - spark-sql_${scala.binary.version} - ${spark.version} - provided - - - com.typesafe.akka - akka-actor_${scala.binary.version} - ${akka.version} - - - com.rabbitmq - amqp-client - ${rabbitmq.version} - - - joda-time - joda-time - ${joda.version} - - - com.github.sstone - amqp-client_${scala.binary.version} - 1.5 - test - - - org.scalatest - scalatest_${scala.binary.version} - ${scalatest.version} - test - - - org.scalacheck - scalacheck_${scala.binary.version} - ${scala.check.version} - test - - - junit - junit - 4.12 - test - - - com.typesafe.akka - akka-testkit_${scala.binary.version} - ${akka.version} - test - - - - - - src/test/resources - true - - - src/main/scala - src/test/scala - - - org.apache.maven.plugins - maven-dependency-plugin - 2.8 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/alternateLocation - false - false - true - ::* - - - - - - net.alchim31.maven - scala-maven-plugin - - - org.apache.maven.plugins - maven-surefire-plugin - - - org.apache.maven.plugins - maven-failsafe-plugin - - false - - - - - org.jacoco - jacoco-maven-plugin - - - org/apache/spark/streaming/rabbitmq/** - - - - - org.apache.maven.plugins - maven-source-plugin - 2.2.1 - - - attach-sources - - jar - - - - - - org.apache.maven.plugins - maven-jar-plugin - 2.2 - - - - test-jar - - - - - - com.mycila - license-maven-plugin - - - **/README - **/src/test/resources/** - **/src/main/resources/** - **/*.csv - **/*.json - **/*.conf - **/*.txt - **/*.properties - **/jetty* - **/node*/** - **/.tmp/** - **/*.scss - **/*.woff - **/*.woff2 - **/*.ttf - **/*.svg - **/*.eot - **/*.otf - **/*.htaccess - **/*.jshintrc - **/*.html - - - - - - check - - validate - - - - - - From 1b5f0100d43c59c28cd5a1679deded53ee1335f0 Mon Sep 17 00:00:00 2001 From: core2048 <43812079+core2048@users.noreply.github.com> Date: Tue, 4 Dec 2018 13:25:12 +0200 Subject: [PATCH 2/7] Update README.md --- README.md | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 7e500bf..b244215 100644 --- a/README.md +++ b/README.md @@ -8,28 +8,10 @@ from [RabbitMQ](https://www.rabbitmq.com/). ## Requirements -This library requires Spark 2.0+, Scala 2.11+, RabbitMQ 3.5+ + This library was updated to work with Spark 2.3+ and sbt instead of maven. ## Using the library - -There are two ways of using RabbitMQ-Receiver library: - -The first one is to add the next dependency in your pom.xml: - -``` - - com.stratio.receiver - spark-rabbitmq - LATEST - -``` - -The other one is to clone the full repository and build the project: - -``` -git clone https://github.com/Stratio/spark-rabbitmq.git -mvn clean install -``` +Create assembly with sbt and import into your project This library includes two implementations for consuming messages from RabbitMQ with Spark Streaming: @@ -42,8 +24,7 @@ This library includes two implementations for consuming messages from RabbitMQ w ### Build -`mvn clean package` - +sbt assembly ### Distributed Approach From fc339f583a4caeacd35dcdf97e6d0bf368cf5db3 Mon Sep 17 00:00:00 2001 From: Daniel Brener Date: Tue, 4 Dec 2018 13:36:59 +0200 Subject: [PATCH 3/7] Add sbt and build configuration --- build.sbt | 15 +++++++++++++++ project/assembly.sbt | 1 + project/build.properties | 1 + 3 files changed, 17 insertions(+) create mode 100644 build.sbt create mode 100644 project/assembly.sbt create mode 100644 project/build.properties diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..8432565 --- /dev/null +++ b/build.sbt @@ -0,0 +1,15 @@ +name := "qstream" + +version := "0.1" + +scalaVersion := "2.11.8" + +libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "2.7.4" % "provided", + "org.apache.spark" %% "spark-core" % "2.3.2" % "provided", + "org.apache.spark" %% "spark-sql" % "2.3.2" % "provided", + "org.apache.spark" %% "spark-streaming" % "2.3.2" % "provided", + "com.softwaremill.sttp" %% "core" % "1.3.8", + "com.softwaremill.sttp" %% "circe" % "1.3.8", + "com.typesafe.akka" %% "akka-actor" % "2.5.18", + "com.rabbitmq" % "amqp-client" % "3.6.6") \ No newline at end of file diff --git a/project/assembly.sbt b/project/assembly.sbt new file mode 100644 index 0000000..784f19d --- /dev/null +++ b/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.8") \ No newline at end of file diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..c03cdb8 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version = 1.2.7 \ No newline at end of file From 9a04003d0a633e611e90ed473c39fc148b05a68b Mon Sep 17 00:00:00 2001 From: core2048 <43812079+core2048@users.noreply.github.com> Date: Tue, 4 Dec 2018 13:38:10 +0200 Subject: [PATCH 4/7] Update build.sbt --- build.sbt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 8432565..4b5ef98 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -name := "qstream" +name := "spark-rabbitmq" version := "0.1" @@ -12,4 +12,4 @@ libraryDependencies ++= Seq( "com.softwaremill.sttp" %% "core" % "1.3.8", "com.softwaremill.sttp" %% "circe" % "1.3.8", "com.typesafe.akka" %% "akka-actor" % "2.5.18", - "com.rabbitmq" % "amqp-client" % "3.6.6") \ No newline at end of file + "com.rabbitmq" % "amqp-client" % "3.6.6") From 5fcf0ecb472483d4d7e1df9ce7a008f96b3a5d72 Mon Sep 17 00:00:00 2001 From: Core2048 Date: Thu, 19 Dec 2019 14:23:50 +0200 Subject: [PATCH 5/7] add auto recovery --- .../spark/streaming/rabbitmq/ConfigParameters.scala | 6 +++++- .../streaming/rabbitmq/consumer/Consumer.scala | 13 +++++++++++++ .../rabbitmq/distributed/RabbitMQRDD.scala | 2 +- .../rabbitmq/receiver/RabbitMQInputDStream.scala | 6 ------ 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main/scala/org/apache/spark/streaming/rabbitmq/ConfigParameters.scala b/src/main/scala/org/apache/spark/streaming/rabbitmq/ConfigParameters.scala index 645787b..72b9305 100644 --- a/src/main/scala/org/apache/spark/streaming/rabbitmq/ConfigParameters.scala +++ b/src/main/scala/org/apache/spark/streaming/rabbitmq/ConfigParameters.scala @@ -45,8 +45,10 @@ object ConfigParameters { val AckTypeKey = "ackType" val FairDispatchKey = "fairDispatch" val PrefetchCount = "prefetchCount" + val AutomaticRecovery = "automaticRecovery" + val NetworkRecoveryInterval = "networkRecoveryInterval" val QueueConnectionPropertiesKeys = - List(DurableKey, ExclusiveKey, AutoDeleteKey, AckTypeKey, FairDispatchKey, PrefetchCount) + List(DurableKey, ExclusiveKey, AutoDeleteKey, AckTypeKey, FairDispatchKey, PrefetchCount, AutomaticRecovery,NetworkRecoveryInterval) /** * Queue Connection Defaults @@ -61,6 +63,8 @@ object ConfigParameters { val DefaultHost = "localhost" val DefaultPrefetchCount = 1 val DefaultCodeClose = "320" + val DefaultAutomaticRecovery = true + val DefaultNetworkRecoveryInterval = 10000 /** * Message Consumed properties diff --git a/src/main/scala/org/apache/spark/streaming/rabbitmq/consumer/Consumer.scala b/src/main/scala/org/apache/spark/streaming/rabbitmq/consumer/Consumer.scala index 8d8f068..b8d9c8d 100644 --- a/src/main/scala/org/apache/spark/streaming/rabbitmq/consumer/Consumer.scala +++ b/src/main/scala/org/apache/spark/streaming/rabbitmq/consumer/Consumer.scala @@ -203,6 +203,7 @@ object Consumer extends Logging with ConsumerParamsUtils { setVirtualHost(params) setUserPassword(params) + setAutomaticRecoveryEnabled(params) getChannel(params) match { case Success(channel) => @@ -225,6 +226,18 @@ object Consumer extends Logging with ConsumerParamsUtils { } } + /** + * https://www.rabbitmq.com/api-guide.html#recovery + * @param params + */ + private def setAutomaticRecoveryEnabled(params: Map[String,String]): Unit = { + val recoveryState = params.getOrElse(AutomaticRecovery, DefaultAutomaticRecovery.toString).toBoolean + factory.setAutomaticRecoveryEnabled(recoveryState) + val recoveryInterval = params.getOrElse(NetworkRecoveryInterval, DefaultNetworkRecoveryInterval.toString).toInt + factory.setNetworkRecoveryInterval(recoveryInterval) + } + + private def setVirtualHost(params: Map[String, String]): Unit = { val vHost = params.get(VirtualHostKey) diff --git a/src/main/scala/org/apache/spark/streaming/rabbitmq/distributed/RabbitMQRDD.scala b/src/main/scala/org/apache/spark/streaming/rabbitmq/distributed/RabbitMQRDD.scala index aa546b5..eef7acd 100644 --- a/src/main/scala/org/apache/spark/streaming/rabbitmq/distributed/RabbitMQRDD.scala +++ b/src/main/scala/org/apache/spark/streaming/rabbitmq/distributed/RabbitMQRDD.scala @@ -231,7 +231,7 @@ class RabbitMQRDD[R: ClassTag]( log.info(s"******* Received $numMessages messages by Partition : ${part.index} before close Channel ******") //Close the scheduler and the channel in the consumer scheduleProcess.cancel() - consumer.close() + //consumer.close() } private def finishIterationAndReturn(): R = { diff --git a/src/main/scala/org/apache/spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala b/src/main/scala/org/apache/spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala index 3759290..1e720f5 100644 --- a/src/main/scala/org/apache/spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala +++ b/src/main/scala/org/apache/spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala @@ -104,12 +104,6 @@ class RabbitMQReceiver[R: ClassTag]( } finally { log.info("it has been stopped") - try { - consumer.close() - } catch { - case e: Throwable => - log.error(s"error on close consumer, ignoring it : ${e.getLocalizedMessage}", e) - } restart("Trying to connect again") } } From fc3a26380d295f4b1b55662bb3fb5c28316986d7 Mon Sep 17 00:00:00 2001 From: Core2048 Date: Thu, 19 Dec 2019 14:27:48 +0200 Subject: [PATCH 6/7] remove unused tests --- .../rabbitmq/JavaRabbitMQConsumer.java | 59 ------ .../JavaRabbitMQDistributedConsumer.java | 67 ------- .../rabbitmq/RabbitMQConsumerIT.scala | 63 ------- .../RabbitMQDistributedConsumerIT.scala | 108 ----------- .../streaming/rabbitmq/RabbitMQSuite.scala | 27 --- .../rabbitmq/TemporalDataSuite.scala | 173 ------------------ 6 files changed, 497 deletions(-) delete mode 100644 src/test/java/org/apache/spark/streaming/rabbitmq/JavaRabbitMQConsumer.java delete mode 100644 src/test/java/org/apache/spark/streaming/rabbitmq/JavaRabbitMQDistributedConsumer.java delete mode 100644 src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQConsumerIT.scala delete mode 100644 src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQDistributedConsumerIT.scala delete mode 100644 src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQSuite.scala delete mode 100644 src/test/scala/org/apache/spark/streaming/rabbitmq/TemporalDataSuite.scala diff --git a/src/test/java/org/apache/spark/streaming/rabbitmq/JavaRabbitMQConsumer.java b/src/test/java/org/apache/spark/streaming/rabbitmq/JavaRabbitMQConsumer.java deleted file mode 100644 index f422b07..0000000 --- a/src/test/java/org/apache/spark/streaming/rabbitmq/JavaRabbitMQConsumer.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (C) 2015 Stratio (http://stratio.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.rabbitmq; - -import com.rabbitmq.client.QueueingConsumer.Delivery; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - -import java.util.HashMap; -import java.util.Map; - - -public final class JavaRabbitMQConsumer { - - public static void main(String[] args) throws InterruptedException { - - SparkConf sparkConf = new SparkConf().setAppName("JavaRabbitMQConsumer").setMaster("local[2]"); - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); - Map params = new HashMap(); - - params.put("hosts", "localhost"); - params.put("queueName", "rabbitmq-queue"); - params.put("exchangeName", "rabbitmq-exchange"); - params.put("vHost", "/"); - params.put("userName", "guest"); - params.put("password", "guest"); - - Function messageHandler = new Function() { - - public String call(Delivery message) { - return new String(message.getBody()); - } - }; - - JavaReceiverInputDStream messages = - RabbitMQUtils.createJavaStream(jssc, String.class, params, messageHandler); - - messages.print(); - - jssc.start(); - jssc.awaitTermination(); - } -} \ No newline at end of file diff --git a/src/test/java/org/apache/spark/streaming/rabbitmq/JavaRabbitMQDistributedConsumer.java b/src/test/java/org/apache/spark/streaming/rabbitmq/JavaRabbitMQDistributedConsumer.java deleted file mode 100644 index 2de4581..0000000 --- a/src/test/java/org/apache/spark/streaming/rabbitmq/JavaRabbitMQDistributedConsumer.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (C) 2015 Stratio (http://stratio.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.rabbitmq; - -import com.rabbitmq.client.QueueingConsumer.Delivery; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.rabbitmq.distributed.JavaRabbitMQDistributedKey; -import org.apache.spark.streaming.rabbitmq.models.ExchangeAndRouting; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; - - -public final class JavaRabbitMQDistributedConsumer implements Serializable { - - public static void main(String[] args) throws InterruptedException { - - SparkConf sparkConf = new SparkConf().setAppName("JavaRabbitMQConsumer").setMaster("local[2]"); - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); - java.util.Map params = new HashMap(); - List distributedKeys = new LinkedList(); - - params.put("hosts", "localhost"); - params.put("vHost", "/"); - params.put("userName", "guest"); - params.put("password", "guest"); - - distributedKeys.add(new JavaRabbitMQDistributedKey("rabbitmq-queue", - new ExchangeAndRouting("rabbitmq-exchange"), - params - )); - - Function messageHandler = new Function() { - - public String call(Delivery message) { - return new String(message.getBody()); - } - }; - - JavaInputDStream messages = - RabbitMQUtils.createJavaDistributedStream(jssc, String.class, distributedKeys, params, messageHandler); - - messages.print(); - - jssc.start(); - jssc.awaitTermination(); - } -} \ No newline at end of file diff --git a/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQConsumerIT.scala b/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQConsumerIT.scala deleted file mode 100644 index 873bafc..0000000 --- a/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQConsumerIT.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (C) 2015 Stratio (http://stratio.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.rabbitmq - -import java.util.UUID - -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner - -@RunWith(classOf[JUnitRunner]) -class RabbitMQConsumerIT extends TemporalDataSuite { - - override val queueName = s"$configQueueName-${this.getClass().getName()}-${UUID.randomUUID().toString}" - - override val exchangeName = s"$configExchangeName-${this.getClass().getName()}-${UUID.randomUUID().toString}" - - test("RabbitMQ Receiver should read all the records") { - - val receiverStream = RabbitMQUtils.createStream(ssc, Map( - "hosts" -> hosts, - "queueName" -> queueName, - "exchangeName" -> exchangeName, - "exchangeType" -> exchangeType, - "vHost" -> vHost, - "userName" -> userName, - "password" -> password - )) - val totalEvents = ssc.sparkContext.longAccumulator("My Accumulator") - - // Start up the receiver. - receiverStream.start() - - // Fires each time the configured window has passed. - receiverStream.foreachRDD(rdd => { - if (!rdd.isEmpty()) { - val count = rdd.count() - // Do something with this message - println(s"EVENTS COUNT : \t $count") - totalEvents.add(count) - //rdd.collect().sortBy(event => event.toInt).foreach(event => print(s"$event, ")) - } else println("RDD is empty") - println(s"TOTAL EVENTS : \t $totalEvents") - }) - - ssc.start() // Start the computation - ssc.awaitTerminationOrTimeout(10000L) // Wait for the computation to terminate - - assert(totalEvents.value === totalRegisters.toLong) - } -} diff --git a/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQDistributedConsumerIT.scala b/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQDistributedConsumerIT.scala deleted file mode 100644 index 659c69d..0000000 --- a/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQDistributedConsumerIT.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (C) 2015 Stratio (http://stratio.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.rabbitmq - -import java.util.UUID - -import com.rabbitmq.client.QueueingConsumer.Delivery -import org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDistributedKey -import org.apache.spark.streaming.rabbitmq.models.ExchangeAndRouting -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner - -@RunWith(classOf[JUnitRunner]) -class RabbitMQDistributedConsumerIT extends TemporalDataSuite { - - override val queueName = s"$configQueueName-${this.getClass().getName()}-${UUID.randomUUID().toString}" - - override val exchangeName = s"$configExchangeName-${this.getClass().getName()}-${UUID.randomUUID().toString}" - - test("RabbitMQ Receiver should read all the records") { - /** - * Is possible to use this params example: - * Map( - * "maxMessagesPerPartition" -> "1000", - * "storageLevel" -> "MEMORY_AND_DISK", - * "ackType" -> "auto", - * "maxReceiveTime" -> "9000", - * "rememberDuration" -> "20000", - * "levelParallelism" -> "1" - * ) - */ - val rabbitMQParams = Map.empty[String, String] - - val rabbitMQConnection = Map( - "hosts" -> hosts, - "queueName" -> queueName, - "exchangeName" -> exchangeName, - "vHost" -> vHost, - "userName" -> userName, - "password" -> password - ) - - val distributedKey = Seq( - RabbitMQDistributedKey( - queueName, - new ExchangeAndRouting(exchangeName, routingKey), - rabbitMQConnection - ) - ) - - //Delivery is not Serializable by Spark, is possible use Map, Seq or native Classes - import scala.collection.JavaConverters._ - val distributedStream = RabbitMQUtils.createDistributedStream[Map[String, Any]]( - ssc, - distributedKey, - rabbitMQParams, - (rawMessage: Delivery) => - Map( - "body" -> new Predef.String(rawMessage.getBody), - "exchange" -> rawMessage.getEnvelope.getExchange, - "routingKey" -> rawMessage.getEnvelope.getRoutingKey, - "deliveryTag" -> rawMessage.getEnvelope.getDeliveryTag - ) ++ { - //Avoid null pointer Exception - Option(rawMessage.getProperties.getHeaders) match { - case Some(headers) => Map("headers" -> headers.asScala) - case None => Map.empty[String, Any] - } - } - ) - - val totalEvents = ssc.sparkContext.longAccumulator("Number of events received") - - // Start up the receiver. - distributedStream.start() - - // Fires each time the configured window has passed. - distributedStream.foreachRDD(rdd => { - if (!rdd.isEmpty()) { - val count = rdd.count() - // Do something with this message - println(s"EVENTS COUNT : \t $count") - totalEvents.add(count) - //rdd.collect().foreach(event => print(s"${event.toString}, ")) - } else println("RDD is empty") - println(s"TOTAL EVENTS : \t $totalEvents") - }) - - ssc.start() // Start the computation - ssc.awaitTerminationOrTimeout(10000L) // Wait for the computation to terminate - - assert(totalEvents.value === totalRegisters.toLong) - } -} - diff --git a/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQSuite.scala b/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQSuite.scala deleted file mode 100644 index 07d0c15..0000000 --- a/src/test/scala/org/apache/spark/streaming/rabbitmq/RabbitMQSuite.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (C) 2015 Stratio (http://stratio.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.rabbitmq - -import org.apache.spark.internal.Logging -import org.scalatest.FunSuite -import org.scalatest.concurrent.{TimeLimitedTests, Timeouts} -import org.scalatest.time.SpanSugar._ - -private[rabbitmq] trait RabbitMQSuite extends FunSuite with Timeouts with Logging with TimeLimitedTests { - - val timeLimit = 1 minutes - -} \ No newline at end of file diff --git a/src/test/scala/org/apache/spark/streaming/rabbitmq/TemporalDataSuite.scala b/src/test/scala/org/apache/spark/streaming/rabbitmq/TemporalDataSuite.scala deleted file mode 100644 index e8d28ba..0000000 --- a/src/test/scala/org/apache/spark/streaming/rabbitmq/TemporalDataSuite.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright (C) 2015 Stratio (http://stratio.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.rabbitmq - -import akka.actor.ActorSystem -import com.github.sstone.amqp.Amqp._ -import com.github.sstone.amqp.{Amqp, ChannelOwner, ConnectionOwner, Consumer} -import com.rabbitmq.client.AMQP.BasicProperties -import com.rabbitmq.client.ConnectionFactory -import com.typesafe.config.ConfigFactory -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.{SparkConf, SparkContext} -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import akka.testkit.TestProbe -import akka.util.Timeout -import akka.pattern.{ask, gracefulStop} - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.util.Try - -private[rabbitmq] trait TemporalDataSuite extends RabbitMQSuite with BeforeAndAfter with BeforeAndAfterAll { - - implicit val system = ActorSystem("ActorRabbitMQSystem") - implicit val timeout = Timeout(10 seconds) - - private lazy val config = ConfigFactory.load() - - val queueName : String - - val exchangeName : String - - val totalRegisters = 10000 - - /** - * Spark Properties - */ - val conf = new SparkConf() - .setAppName("datasource-receiver-example") - .setIfMissing("spark.master", "local[*]") - var sc: SparkContext = null - var ssc: StreamingContext = null - - /** - * RabbitMQ Properties - */ - val configQueueName = Try(config.getString("rabbitmq.queueName")).getOrElse("rabbitmq-queue") - val configExchangeName = Try(config.getString("rabbitmq.exchangeName")).getOrElse("rabbitmq-exchange") - val exchangeType = Try(config.getString("rabbitmq.exchangeType")).getOrElse("topic") - val routingKey = Try(config.getString("rabbitmq.routingKey")).getOrElse("") - val vHost = Try(config.getString("rabbitmq.vHost")).getOrElse("/") - val hosts = Try(config.getString("rabbitmq.hosts")).getOrElse("127.0.0.1") - val userName = Try(config.getString("rabbitmq.userName")).getOrElse("guest") - val password = Try(config.getString("rabbitmq.password")).getOrElse("guest") - - before { - - val probe = TestProbe() - val queue = QueueParameters( - name = queueName, - passive = false, - exclusive = false, - durable = true, - autodelete = false - ) - val exchange = ExchangeParameters( - name = exchangeName, - passive = false, - exchangeType = exchangeType, - durable = true, - autodelete = false - ) - val connFactory = new ConnectionFactory() - connFactory.setUri(s"amqp://$userName:$password@$hosts/%2F") - val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second)) - Amqp.waitForConnection(system, conn).await() - val consumer = ConnectionOwner.createChildActor( - conn, - Consumer.props(listener = Some(probe.ref)), - timeout = 5000 millis, - name = Some("RabbitMQ.consumer") - ) - val producer = ConnectionOwner.createChildActor( - conn, - ChannelOwner.props(), - timeout = 5000 millis, - name = Some("RabbitMQ.producer") - ) - - Amqp.waitForConnection(system, conn, consumer, producer).await() - - val deleteQueueResult = consumer ? DeleteQueue(queueName) - Await.result(deleteQueueResult, 5 seconds) - val deleteExchangeResult = consumer ? DeleteExchange(exchangeName) - Await.result(deleteExchangeResult, 5 seconds) - val bindingResult = consumer ? AddBinding(Binding(exchange, queue, routingKey)) - Await.result(bindingResult, 5 seconds) - - sc = new SparkContext(conf) - ssc = new StreamingContext(sc, Seconds(1)) - - for (register <- 1 to totalRegisters) { - val publishResult = producer ? Publish( - exchange = exchange.name, - key = "", - body = register.toString.getBytes, - properties = Some(new BasicProperties.Builder().contentType("my " + "content").build()) - ) - Await.result(publishResult, 1 seconds) - } - - /** - * Close Producer actors and connections - */ - conn ! Close() - Await.result(gracefulStop(conn, 5 seconds), 10 seconds) - Await.result(gracefulStop(consumer, 5 seconds), 10 seconds) - Await.result(gracefulStop(producer, 5 seconds), 10 seconds) - } - - after { - if (ssc != null) { - ssc.stop() - ssc = null - } - if (sc != null) { - sc.stop() - sc = null - } - - System.gc() - - val probe = TestProbe() - val connFactory = new ConnectionFactory() - connFactory.setUri(s"amqp://$userName:$password@$hosts/%2F") - val conn = system.actorOf(ConnectionOwner.props(connFactory, 1 second)) - Amqp.waitForConnection(system, conn).await() - val consumer = ConnectionOwner.createChildActor( - conn, - Consumer.props(listener = Some(probe.ref)), - timeout = 5000 millis, - name = Some("RabbitMQ.consumer") - ) - val deleteQueueResult = consumer ? DeleteQueue(queueName) - Await.result(deleteQueueResult, 5 seconds) - val deleteExchangeResult = consumer ? DeleteExchange(exchangeName) - Await.result(deleteExchangeResult, 5 seconds) - - /** - * Close Producer actors and connections - */ - conn ! Close() - Await.result(gracefulStop(conn, 5 seconds), 10 seconds) - Await.result(gracefulStop(consumer, 5 seconds), 10 seconds) - } - - override def afterAll : Unit = { - system.terminate() - } -} From be401400050337f3fdc00b93ee45026b40cd814c Mon Sep 17 00:00:00 2001 From: Core2048 Date: Thu, 19 Dec 2019 14:51:21 +0200 Subject: [PATCH 7/7] remove unused log entry --- .../spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala b/src/main/scala/org/apache/spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala index 1e720f5..9998a0d 100644 --- a/src/main/scala/org/apache/spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala +++ b/src/main/scala/org/apache/spark/streaming/rabbitmq/receiver/RabbitMQInputDStream.scala @@ -103,7 +103,6 @@ class RabbitMQReceiver[R: ClassTag]( log.error("Got this Exception: " + exception, exception) } finally { - log.info("it has been stopped") restart("Trying to connect again") } }