diff --git a/.gitignore b/.gitignore index a204456d4208..d029de1ae024 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ conf/spark-env.sh conf/streaming-env.sh conf/log4j.properties conf/spark-defaults.conf +conf/*.xml docs/_site docs/api target/ diff --git a/assembly/pom.xml b/assembly/pom.xml index bdb38806492a..057324e88907 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -84,11 +84,6 @@ spark-sql_${scala.binary.version} ${project.version} - - net.sf.py4j - py4j - 0.8.1 - @@ -173,6 +168,21 @@ + + python + + + net.sf.py4j + py4j + 0.8.1 + + + org.apache.spark + python-api_${scala.binary.version} + ${project.version} + + + spark-ganglia-lgpl diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index b0218531e9eb..70ac71e45db4 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -44,6 +44,7 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes" DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar` CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR" diff --git a/bin/spark-class b/bin/spark-class index e8160c8af64c..7506b82a48a2 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -110,8 +110,8 @@ export JAVA_OPTS if [ ! -f "$FWDIR/RELEASE" ]; then # Exit if the user hasn't compiled Spark - num_jars=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar" | wc -l) - jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep "spark-assembly.*hadoop.*.jar") + num_jars=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep -E "spark-assembly.*hadoop.*.jar$" | wc -l) + jars_list=$(ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/ | grep -E "spark-assembly.*hadoop.*.jar$") if [ "$num_jars" -eq "0" ]; then echo "Failed to find Spark assembly in $FWDIR/assembly/target/scala-$SCALA_VERSION/" >&2 echo "You need to build Spark with 'sbt/sbt assembly' before running this program." >&2 diff --git a/core/pom.xml b/core/pom.xml index 822b5b1dd7cc..b4e608588fd3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -254,35 +254,6 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes - - org.apache.maven.plugins - maven-antrun-plugin - - - test - - run - - - true - - - - - - - - - - - - - - - - - - org.scalatest scalatest-maven-plugin @@ -294,48 +265,6 @@ - - - org.codehaus.mojo - exec-maven-plugin - 1.2.1 - - - generate-resources - - exec - - - - - unzip - ../python - - -o - lib/py4j*.zip - -d - build - - - - - - - src/main/resources - - - ../python - - pyspark/*.py - - - - ../python/build - - py4j/*.py - - - diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index bea435ec34ce..454a6e744e4d 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -26,7 +26,6 @@ import akka.actor._ import com.google.common.collect.MapMaker import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.ConnectionManager @@ -67,15 +66,14 @@ class SparkEnv ( // A mapping of thread ID to amount of memory used for shuffle in bytes // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() - - private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + val closeables = mutable.ListBuffer[java.io.Closeable]() // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() private[spark] def stop() { - pythonWorkers.foreach { case(key, worker) => worker.stop() } + closeables.toList.foreach(_.close()) httpFileServer.stop() mapOutputTracker.stop() shuffleFetcher.stop() @@ -89,22 +87,6 @@ class SparkEnv ( // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. // actorSystem.awaitTermination() } - - private[spark] - def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { - synchronized { - val key = (pythonExec, envVars) - pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() - } - } - - private[spark] - def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) { - synchronized { - val key = (pythonExec, envVars) - pythonWorkers(key).stop() - } - } } object SparkEnv extends Logging { diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md index a5e530346740..e447dfea3bac 100644 --- a/docs/building-with-maven.md +++ b/docs/building-with-maven.md @@ -45,17 +45,20 @@ For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions wit For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you can enable the "yarn-alpha" or "yarn" profile and set the "hadoop.version", "yarn.version" property. Note that Hadoop 0.23.X requires a special `-Phadoop-0.23` profile: # Apache Hadoop 2.0.5-alpha - $ mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package + $ mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -DskipTests clean package # Cloudera CDH 4.2.0 with MapReduce v2 - $ mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-cdh4.2.0 -DskipTests clean package + $ mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -DskipTests clean package # Apache Hadoop 2.2.X (e.g. 2.2.0 as below) and newer - $ mvn -Pyarn -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -DskipTests clean package + $ mvn -Pyarn -Dhadoop.version=2.2.0 -DskipTests clean package # Apache Hadoop 0.23.x $ mvn -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.7 -Dyarn.version=0.23.7 -DskipTests clean package + # Different versions of HDFS and YARN. + $ mvn -Pyarn-alpha -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -DskipTests clean package + ## Spark Tests in Maven ## Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin). Some of the require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. You can then run the tests with `mvn -Dhadoop.version=... test`. diff --git a/make-distribution.sh b/make-distribution.sh index c05dcd89d90a..dc7723e80183 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -55,6 +55,7 @@ SPARK_HADOOP_VERSION=1.0.4 SPARK_YARN=false SPARK_HIVE=false SPARK_TACHYON=false +SPARK_PYTHON=true MAKE_TGZ=false NAME=none @@ -105,6 +106,12 @@ else echo "YARN disabled" fi +if [ "$SPARK_PYTHON" == "true" ]; then + echo "Python enabled" +else + echo "Python disabled" +fi + if [ "$SPARK_TACHYON" == "true" ]; then echo "Tachyon Enabled" else @@ -122,22 +129,31 @@ else MAYBE_HIVE="" fi +if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then + MAYBE_HADOOP023="-Phadoop-0.23" +else + MAYBE_HADOOP023="" +fi + if [ "$SPARK_YARN" == "true" ]; then - if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then - mvn clean package -DskipTests -Pyarn-alpha -Dhadoop.version=$SPARK_HADOOP_VERSION \ - -Dyarn.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE -Phadoop-0.23 + if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." || "$SPARK_HADOOP_VERSION" =~ "2.0." ]]; then + MAYBE_YARN="-Pyarn-alpha -Dyarn.version=$SPARK_HADOOP_VERSION" else - mvn clean package -DskipTests -Pyarn -Dhadoop.version=$SPARK_HADOOP_VERSION \ - -Dyarn.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE + MAYBE_YARN="-Pyarn -Dyarn.version=$SPARK_HADOOP_VERSION" fi else - if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then - mvn clean package -Phadoop-0.23 -DskipTests -Dhadoop.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE - else - mvn clean package -DskipTests -Dhadoop.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE - fi + MAYBE_YARN="" +fi + +if [ "$SPARK_PYTHON" == "true" ]; then + MAYBE_PYTHON="-Ppython" +else + MAYBE_PYTHON="" fi +mvn package -Dhadoop.version=$SPARK_HADOOP_VERSION \ +-DskipTests $MAYBE_HIVE $MAYBE_HADOOP023 $MAYBE_YARN $MAYBE_PYTHON + # Make directories rm -rf "$DISTDIR" mkdir -p "$DISTDIR/lib" @@ -152,9 +168,11 @@ mkdir "$DISTDIR"/conf cp "$FWDIR"/conf/*.template "$DISTDIR"/conf cp "$FWDIR"/conf/slaves "$DISTDIR"/conf cp -r "$FWDIR/bin" "$DISTDIR" -cp -r "$FWDIR/python" "$DISTDIR" cp -r "$FWDIR/sbin" "$DISTDIR" +if [ "$SPARK_PYTHON" == "true" ]; then + cp -r "$FWDIR/python" "$DISTDIR" +fi # Download and copy in tachyon, if requested if [ "$SPARK_TACHYON" == "true" ]; then diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 471546cd82c7..b17b95f62dde 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -23,7 +23,6 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.api.python.PythonMLLibAPI /** @@ -68,20 +67,6 @@ class MatrixFactorizationModel( } } - /** - * :: DeveloperApi :: - * Predict the rating of many users for many products. - * This is a Java stub for python predictAll() - * - * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) - * @return JavaRDD of serialized Rating objects. - */ - def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { - val pythonAPI = new PythonMLLibAPI() - val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) - predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) - } - // TODO: Figure out what other good bulk prediction methods would look like. // Probably want a way to get the top users for a product or vice-versa. } diff --git a/pom.xml b/pom.xml index 646753fe3030..c825fac5c04d 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,8 @@ ~ limitations under the License. --> - + 4.0.0 org.apache @@ -93,7 +94,6 @@ streaming sql/catalyst sql/core - sql/hive repl assembly external/twitter @@ -102,6 +102,8 @@ external/zeromq external/mqtt examples + sql/hive + python-api @@ -119,7 +121,7 @@ 1.2.17 1.0.4 2.4.1 - 0.23.7 + ${hadoop.version} 0.94.6 0.12.0 1.3.2 @@ -135,7 +137,8 @@ - maven-repo + maven-repo + Maven Repository http://repo.maven.apache.org/maven2 @@ -371,6 +374,11 @@ commons-net 2.2 + + commons-lang + commons-lang + 2.5 + io.netty netty-all @@ -558,64 +566,7 @@ jets3t 0.7.1 - - org.apache.hadoop - hadoop-yarn-api - ${yarn.version} - - - asm - asm - - - org.ow2.asm - asm - - - org.jboss.netty - netty - - - - - org.apache.hadoop - hadoop-yarn-common - ${yarn.version} - - - asm - asm - - - org.ow2.asm - asm - - - org.jboss.netty - netty - - - - - org.apache.hadoop - hadoop-yarn-client - ${yarn.version} - - - asm - asm - - - org.ow2.asm - asm - - - org.jboss.netty - netty - - - org.codehaus.jackson @@ -737,6 +688,10 @@ ${project.build.directory}/SparkTestSuite.txt -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m + + ${session.executionRootDirectory} + 1 + @@ -850,12 +805,78 @@ yarn - - - org.apache.avro - avro - - + + + + org.apache.hadoop + hadoop-yarn-api + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + + + org.apache.hadoop + hadoop-yarn-common + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + + javax.servlet + servlet-api + + + + + + org.apache.hadoop + hadoop-yarn-client + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + javax.servlet + servlet-api + + + + + @@ -901,7 +922,76 @@ yarn + + + + org.apache.hadoop + hadoop-yarn-api + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + + + org.apache.hadoop + hadoop-yarn-common + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + javax.servlet + servlet-api + + + + + org.apache.hadoop + hadoop-yarn-client + ${yarn.version} + + + asm + asm + + + org.ow2.asm + asm + + + org.jboss.netty + netty + + + javax.servlet + servlet-api + + + + + @@ -949,6 +1039,5 @@ - diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 51f733511116..ed30e3b39559 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -55,7 +55,7 @@ object SparkBuild extends Build { val SCALAC_JVM_VERSION = "jvm-1.6" val JAVAC_JVM_VERSION = "1.6" - lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects: _*) + lazy val root = Project("spark", file("."), settings = rootSettings) aggregate(allProjects: _*) lazy val core = Project("core", file("core"), settings = coreSettings) @@ -266,16 +266,16 @@ object SparkBuild extends Build { */ libraryDependencies ++= Seq( - "io.netty" % "netty-all" % "4.0.17.Final", - "org.eclipse.jetty" % "jetty-server" % jettyVersion, - "org.eclipse.jetty" % "jetty-util" % jettyVersion, - "org.eclipse.jetty" % "jetty-plus" % jettyVersion, - "org.eclipse.jetty" % "jetty-security" % jettyVersion, - "org.scalatest" %% "scalatest" % "1.9.1" % "test", - "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", - "com.novocode" % "junit-interface" % "0.10" % "test", - "org.easymock" % "easymock" % "3.1" % "test", - "org.mockito" % "mockito-all" % "1.8.5" % "test" + "io.netty" % "netty-all" % "4.0.17.Final", + "org.eclipse.jetty" % "jetty-server" % jettyVersion, + "org.eclipse.jetty" % "jetty-util" % jettyVersion, + "org.eclipse.jetty" % "jetty-plus" % jettyVersion, + "org.eclipse.jetty" % "jetty-security" % jettyVersion, + "org.scalatest" %% "scalatest" % "1.9.1" % "test", + "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", + "com.novocode" % "junit-interface" % "0.10" % "test", + "org.easymock" % "easymock" % "3.1" % "test", + "org.mockito" % "mockito-all" % "1.8.5" % "test" ), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), @@ -315,6 +315,7 @@ object SparkBuild extends Build { val excludeFastutil = ExclusionRule(organization = "it.unimi.dsi") val excludeJruby = ExclusionRule(organization = "org.jruby") val excludeThrift = ExclusionRule(organization = "org.apache.thrift") + val excludeJettyServlet= ExclusionRule(organization = "org.eclipse.jetty.orbit") def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark", version: String = "0.9.0-incubating", crossVersion: String = "2.10"): Option[sbt.ModuleID] = { diff --git a/python-api/pom.xml b/python-api/pom.xml new file mode 100644 index 000000000000..69a4739e8254 --- /dev/null +++ b/python-api/pom.xml @@ -0,0 +1,111 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.0.0-SNAPSHOT + ../pom.xml + + + org.apache.spark + python-api_2.10 + jar + Spark Project Python API + http://spark.apache.org/ + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-mllib_${scala.binary.version} + ${project.version} + provided + + + org.spark-project + pyrolite + 2.0.1 + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + generate-resources + + exec + + + + + unzip + ../python + + -o + lib/py4j*.zip + -d + build + + + + + + + src/main/resources + + + ../python + + pyspark/*.py + + + + ../python/build + + py4j/*.py + + + + + diff --git a/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala b/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala new file mode 100644 index 000000000000..40bc23c6df7f --- /dev/null +++ b/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.api.python.PythonWorkerFactory + +/** + * :: DeveloperApi :: + * Holds all the runtime environment objects for a running Spark instance (either master or worker), + * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently + * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these + * objects needs to have the right SparkEnv set. You can get the current environment with + * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. + * + * NOTE: This is not intended for external use. This is exposed for Shark and may be made private + * in a future release. + */ +@DeveloperApi +class PythonSparkEnv(val sparkEnv: SparkEnv) { + private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + + sparkEnv.closeables += new java.io.Closeable { + override def close() { + pythonWorkers.foreach { + case (key, worker) => worker.stop() + } + } + } + + private[spark] + def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() + } + } + + private[spark] + def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers(key).stop() + } + } + +} + +object PythonSparkEnv extends Logging { + private val env = new ThreadLocal[PythonSparkEnv] + + def get: PythonSparkEnv = { + if (env.get == null) { + env.set(new PythonSparkEnv(SparkEnv.get)) + } + env.get + } + + def set(e: PythonSparkEnv) { + env.set(e) + } + +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala similarity index 100% rename from core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala rename to python-api/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala similarity index 99% rename from core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala rename to python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 61407007087c..4d0da61b7236 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -53,7 +53,7 @@ private[spark] class PythonRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis - val env = SparkEnv.get + val env = PythonSparkEnv.get val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) // Ensure worker socket is closed on task completion. Closing sockets is idempotent. @@ -71,7 +71,7 @@ private[spark] class PythonRDD[T: ClassTag]( new Thread("stdin writer for " + pythonExec) { override def run() { try { - SparkEnv.set(env) + PythonSparkEnv.set(env) val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) // Partition index diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala similarity index 99% rename from core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala rename to python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 02799ce0091b..8f8b103e42c6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConversions._ import org.apache.spark._ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) - extends Logging { + extends Logging { // Because forking processes from Java is expensive, we prefer to launch a single Python daemon // (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently @@ -86,6 +86,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Redirect the worker's stderr to ours new Thread("stderr reader for " + pythonExec) { setDaemon(true) + override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. @@ -103,6 +104,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Redirect worker's stdout to our stderr new Thread("stdout reader for " + pythonExec) { setDaemon(true) + override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. @@ -159,6 +161,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Redirect the stderr to ours new Thread("stderr reader for " + pythonExec) { setDaemon(true) + override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. @@ -179,6 +182,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Redirect further stdout output to our stderr new Thread("stdout reader for " + pythonExec) { setDaemon(true) + override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala similarity index 95% rename from mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala rename to python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 7c65b0d4750f..f0852f290fdd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -27,6 +27,8 @@ import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.mllib.recommendation._ import org.apache.spark.mllib.regression._ import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.api.python.recommendation. +{MatrixFactorizationModel=> PythonMatrixFactorizationModel} /** * :: DeveloperApi :: @@ -384,9 +386,9 @@ class PythonMLLibAPI extends Serializable { rank: Int, iterations: Int, lambda: Double, - blocks: Int): MatrixFactorizationModel = { + blocks: Int): PythonMatrixFactorizationModel = { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) - ALS.train(ratings, rank, iterations, lambda, blocks) + pythonModel(ALS.train(ratings, rank, iterations, lambda, blocks)) } /** @@ -401,8 +403,12 @@ class PythonMLLibAPI extends Serializable { iterations: Int, lambda: Double, blocks: Int, - alpha: Double): MatrixFactorizationModel = { + alpha: Double): PythonMatrixFactorizationModel = { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) - ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) + pythonModel(ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)) + } + + private def pythonModel(model: MatrixFactorizationModel):PythonMatrixFactorizationModel= { + new PythonMatrixFactorizationModel(model.rank,model.userFeatures,model.productFeatures) } } diff --git a/python-api/src/main/scala/org/apache/spark/mllib/api/python/recommendation/MatrixFactorizationModel.scala b/python-api/src/main/scala/org/apache/spark/mllib/api/python/recommendation/MatrixFactorizationModel.scala new file mode 100644 index 000000000000..ddb868c368cd --- /dev/null +++ b/python-api/src/main/scala/org/apache/spark/mllib/api/python/recommendation/MatrixFactorizationModel.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.api.python.recommendation + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.api.python.PythonMLLibAPI + +class MatrixFactorizationModel( + override val rank: Int, + override val userFeatures: RDD[(Int, Array[Double])], + override val productFeatures: RDD[(Int, Array[Double])]) + extends org.apache.spark.mllib.recommendation.MatrixFactorizationModel(rank, + userFeatures, productFeatures) { + + /** + * :: DeveloperApi :: + * Predict the rating of many users for many products. + * This is a Java stub for python predictAll() + * + * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) + * @return JavaRDD of serialized Rating objects. + */ + def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { + val pythonAPI = new PythonMLLibAPI() + val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) + predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) + } + +} diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/python-api/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala similarity index 100% rename from core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala rename to python-api/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala diff --git a/repl/pom.xml b/repl/pom.xml index b761a176ce25..bcdb24b040cc 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -92,42 +92,12 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes - - org.apache.maven.plugins - maven-antrun-plugin - - - test - - run - - - true - - - - - - - - - - - - - - - - - - org.scalatest scalatest-maven-plugin ${basedir}/.. - 1 diff --git a/yarn/pom.xml b/yarn/pom.xml index 02f36627431b..b8714b54502e 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -28,7 +28,7 @@ yarn-parent_2.10 pom Spark Project YARN Parent POM - + org.apache.spark @@ -50,7 +50,6 @@ org.apache.hadoop hadoop-client - ${yarn.version} org.scalatest @@ -114,42 +113,12 @@ - - org.apache.maven.plugins - maven-antrun-plugin - - - test - - run - - - true - - - - - - - - - - - - - - - - - - org.scalatest scalatest-maven-plugin ${basedir}/../.. - 1 ${spark.classpath}