diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 5390f5916fc0..7e8d5c707519 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -84,9 +84,9 @@ MVN="build/mvn --force" # Hive-specific profiles for some builds HIVE_PROFILES="-Phive -Phive-thriftserver" # Profiles for publishing snapshots and release to Maven Central -PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl" +PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl" # Profiles for building binary releases -BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr" +BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr" # Scala 2.11 only profiles for some builds SCALA_2_11_PROFILES="-Pkafka-0-8" # Scala 2.12 only profiles for some builds diff --git a/dev/mima b/dev/mima index fdb21f5007cf..1e3ca9700bc0 100755 --- a/dev/mima +++ b/dev/mima @@ -24,7 +24,7 @@ set -e FWDIR="$(cd "`dirname "$0"`"/..; pwd)" cd "$FWDIR" -SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" +SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)" OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)" diff --git a/dev/scalastyle b/dev/scalastyle index e5aa58986953..89ecc8abd6f8 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \ -Pmesos \ -Pkafka-0-8 \ -Pyarn \ + -Pflume \ -Phive \ -Phive-thriftserver \ scalastyle test:scalastyle \ diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 50e14b60545a..91d5667ed1f0 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -279,6 +279,12 @@ def __hash__(self): source_file_regexes=[ "external/flume-sink", ], + build_profile_flags=[ + "-Pflume", + ], + environ={ + "ENABLE_FLUME_TESTS": "1" + }, sbt_test_goals=[ "streaming-flume-sink/test", ] @@ -291,6 +297,12 @@ def __hash__(self): source_file_regexes=[ "external/flume", ], + build_profile_flags=[ + "-Pflume", + ], + environ={ + "ENABLE_FLUME_TESTS": "1" + }, sbt_test_goals=[ "streaming-flume/test", ] @@ -302,7 +314,13 @@ def __hash__(self): dependencies=[streaming_flume, streaming_flume_sink], source_file_regexes=[ "external/flume-assembly", - ] + ], + build_profile_flags=[ + "-Pflume", + ], + environ={ + "ENABLE_FLUME_TESTS": "1" + } ) diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index c7714578bd00..58b295d4f6e0 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -29,7 +29,7 @@ export LC_ALL=C # TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution. # NOTE: These should match those in the release publishing script -HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive" +HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Pflume -Phive" MVN="build/mvn" HADOOP_PROFILES=( hadoop-2.6 diff --git a/docs/building-spark.md b/docs/building-spark.md index 57baa503259c..98f7df155456 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -100,6 +100,13 @@ Note: Kafka 0.8 support is deprecated as of Spark 2.3.0. Kafka 0.10 support is still automatically built. +## Building with Flume support + +Apache Flume support must be explicitly enabled with the `flume` profile. +Note: Flume support is deprecated as of Spark 2.3.0. + + ./build/mvn -Pflume -DskipTests clean package + ## Building submodules individually It's possible to build Spark sub-modules using the `mvn -pl` option. diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md index a5d36da5b6de..257a4f7d4f3c 100644 --- a/docs/streaming-flume-integration.md +++ b/docs/streaming-flume-integration.md @@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide [Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this. +**Note: Flume support is deprecated as of Spark 2.3.0.** + ## Approach 1: Flume-style Push-based Approach Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps. @@ -44,8 +46,7 @@ configuring Flume agents. val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) - See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala). + See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$).
import org.apache.spark.streaming.flume.*; @@ -53,8 +54,7 @@ configuring Flume agents. JavaReceiverInputDStream flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]); - See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java). + See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html).
from pyspark.streaming.flume import FlumeUtils @@ -62,8 +62,7 @@ configuring Flume agents. flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type. - See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils) - and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/flume_wordcount.py). + See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
@@ -162,8 +161,6 @@ configuring Flume agents. - See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala). - Note that each input DStream can be configured to receive data from multiple sinks. 3. **Deploying:** This is same as the first approach. diff --git a/examples/pom.xml b/examples/pom.xml index 52a6764ae26a..1791dbaad775 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -34,7 +34,6 @@ examples none package - provided provided provided provided @@ -78,12 +77,6 @@ ${project.version} provided - - org.apache.spark - spark-streaming-flume_${scala.binary.version} - ${project.version} - provided - org.apache.spark spark-streaming-kafka-0-10_${scala.binary.version} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java similarity index 98% rename from examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java rename to external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java index 0c651049d0ff..4e3420d9c3b0 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java @@ -48,8 +48,6 @@ public static void main(String[] args) throws Exception { System.exit(1); } - StreamingExamples.setStreamingLogLevels(); - String host = args[0]; int port = Integer.parseInt(args[1]); diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala similarity index 98% rename from examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala rename to external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala index 91e52e4eff5a..f877f79391b3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala +++ b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala @@ -47,8 +47,6 @@ object FlumeEventCount { System.exit(1) } - StreamingExamples.setStreamingLogLevels() - val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala similarity index 98% rename from examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala rename to external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala index dd725d72c23e..79a4027ca5bd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala +++ b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala @@ -44,8 +44,6 @@ object FlumePollingEventCount { System.exit(1) } - StreamingExamples.setStreamingLogLevels() - val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 3e3ed712f0db..707193a95770 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -30,6 +30,7 @@ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream +@deprecated("Deprecated without replacement", "2.3.0") object FlumeUtils { private val DEFAULT_POLLING_PARALLELISM = 5 private val DEFAULT_POLLING_BATCH_SIZE = 1000 diff --git a/pom.xml b/pom.xml index 87a468c3a6f5..9fac8b1e5378 100644 --- a/pom.xml +++ b/pom.xml @@ -98,15 +98,13 @@ sql/core sql/hive assembly - external/flume - external/flume-sink - external/flume-assembly examples repl launcher external/kafka-0-10 external/kafka-0-10-assembly external/kafka-0-10-sql + @@ -2583,6 +2581,15 @@ + + flume + + external/flume + external/flume-sink + external/flume-assembly + + + spark-ganglia-lgpl diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a568d264cb2d..9501eed1e906 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -43,11 +43,8 @@ object BuildCommons { "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10" ).map(ProjectRef(buildLocation, _)) - val streamingProjects@Seq( - streaming, streamingFlumeSink, streamingFlume, streamingKafka010 - ) = Seq( - "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-10" - ).map(ProjectRef(buildLocation, _)) + val streamingProjects@Seq(streaming, streamingKafka010) = + Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _)) val allProjects@Seq( core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, _* @@ -56,9 +53,13 @@ object BuildCommons { "tags", "sketch", "kvstore" ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects - val optionallyEnabledProjects@Seq(mesos, yarn, streamingKafka, sparkGangliaLgpl, - streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) = - Seq("mesos", "yarn", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl", + val optionallyEnabledProjects@Seq(mesos, yarn, + streamingFlumeSink, streamingFlume, + streamingKafka, sparkGangliaLgpl, streamingKinesisAsl, + dockerIntegrationTests, hadoopCloud) = + Seq("mesos", "yarn", + "streaming-flume-sink", "streaming-flume", + "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) = diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py index cd30483fc636..2fed5940b31e 100644 --- a/python/pyspark/streaming/flume.py +++ b/python/pyspark/streaming/flume.py @@ -53,6 +53,8 @@ def createStream(ssc, hostname, port, :param enableDecompression: Should netty server decompress input stream :param bodyDecoder: A function used to decode body (default is utf8_decoder) :return: A DStream object + + .. note:: Deprecated in 2.3.0 """ jlevel = ssc._sc._getJavaStorageLevel(storageLevel) helper = FlumeUtils._get_helper(ssc._sc) @@ -79,6 +81,8 @@ def createPollingStream(ssc, addresses, will result in this stream using more threads :param bodyDecoder: A function used to decode body (default is utf8_decoder) :return: A DStream object + + .. note:: Deprecated in 2.3.0 """ jlevel = ssc._sc._getJavaStorageLevel(storageLevel) hosts = [] diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 229cf53e4735..5b86c1cb2c39 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1478,7 +1478,7 @@ def search_kafka_assembly_jar(): ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) + "You need to build Spark with " "'build/sbt assembly/package streaming-kafka-0-8-assembly/assembly' or " - "'build/mvn package' before running this test.") + "'build/mvn -Pkafka-0-8 package' before running this test.") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please " "remove all but one") % (", ".join(jars))) @@ -1495,7 +1495,7 @@ def search_flume_assembly_jar(): ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) + "You need to build Spark with " "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or " - "'build/mvn package' before running this test.") + "'build/mvn -Pflume package' before running this test.") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please " "remove all but one") % (", ".join(jars))) @@ -1516,6 +1516,9 @@ def search_kinesis_asl_assembly_jar(): return jars[0] +# Must be same as the variable and condition defined in modules.py +flume_test_environ_var = "ENABLE_FLUME_TESTS" +are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1' # Must be same as the variable and condition defined in modules.py kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS" are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1' @@ -1538,9 +1541,16 @@ def search_kinesis_asl_assembly_jar(): os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, - FlumeStreamTests, FlumePollingStreamTests, StreamingListenerTests] + if are_flume_tests_enabled: + testcases.append(FlumeStreamTests) + testcases.append(FlumePollingStreamTests) + else: + sys.stderr.write( + "Skipped test_flume_stream (enable by setting environment variable %s=1" + % flume_test_environ_var) + if are_kafka_tests_enabled: testcases.append(KafkaStreamTests) else: