From 8390437d5b35c362479e7979fa6c58398f872c1e Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Mon, 26 Jun 2017 16:52:54 -0700 Subject: [PATCH 01/12] Added functionality for CrossValidator and TrainValidationSplit to persist nested estimators such as OneVsRest. --- .../spark/ml/tuning/ValidatorParams.scala | 24 +++++++++-- .../spark/ml/tuning/CrossValidatorSuite.scala | 42 ++++++++++++++++++- .../ml/tuning/TrainValidationSplitSuite.scala | 25 ++++++++++- 3 files changed, 86 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index d55eb14d0345..317ea84915fb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -126,10 +126,22 @@ private[ml] object ValidatorParams { extraMetadata: Option[JObject] = None): Unit = { import org.json4s.JsonDSL._ + var numParamsNotJson = 0 val estimatorParamMapsJson = compact(render( instance.getEstimatorParamMaps.map { case paramMap => paramMap.toSeq.map { case ParamPair(p, v) => - Map("parent" -> p.parent, "name" -> p.name, "value" -> p.jsonEncode(v)) + v match { + case writeableObj: MLWritable => + numParamsNotJson += 1 + val paramPath = new Path(path, "param" + p.name + numParamsNotJson).toString + writeableObj.save(paramPath) + Map("parent" -> p.parent, "name" -> p.name, + "value" -> compact(render(JString(paramPath))), + "isJson" -> compact(render(JBool(false)))) + case _ => + Map("parent" -> p.parent, "name" -> p.name, "value" -> p.jsonEncode(v), + "isJson" -> compact(render(JBool(true)))) + } } }.toSeq )) @@ -183,8 +195,14 @@ private[ml] object ValidatorParams { val paramPairs = pMap.map { case pInfo: Map[String, String] => val est = uidToParams(pInfo("parent")) val param = est.getParam(pInfo("name")) - val value = param.jsonDecode(pInfo("value")) - param -> value + if (pInfo("isJson").toBoolean.booleanValue()) { + val value = param.jsonDecode(pInfo("value")) + param -> value + } else { + val path = param.jsonDecode(pInfo("value")).toString + val value = DefaultParamsReader.loadParamsInstance[MLWritable](path, sc) + param -> value + } } ParamMap(paramPairs: _*) }.toArray diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 2b4e6b53e4f8..3efe97f5e3b4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.tuning import org.apache.spark.SparkFunSuite import org.apache.spark.ml.{Estimator, Model, Pipeline} -import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} +import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest} import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator} import org.apache.spark.ml.feature.HashingTF @@ -156,6 +156,46 @@ class CrossValidatorSuite CrossValidatorSuite.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) } + test("read/write: CrossValidator with nested estimator") { + val ova = new OneVsRest() + .setClassifier(new LogisticRegression) + val evaluator = new BinaryClassificationEvaluator() + .setMetricName("areaUnderPR") // not default metric + + + val classifier1 = new LogisticRegression().setRegParam(2.0) + val classifier2 = new LogisticRegression().setRegParam(3.0) + val paramMaps = new ParamGridBuilder() + .addGrid(ova.classifier, Array(classifier1, classifier2)) + .build() + val cv = new CrossValidator() + .setEstimator(ova) + .setEvaluator(evaluator) + .setNumFolds(20) + .setEstimatorParamMaps(paramMaps) + + val cv2 = testDefaultReadWrite(cv, testParams = false) + + assert(cv.uid === cv2.uid) + assert(cv.getNumFolds === cv2.getNumFolds) + assert(cv.getSeed === cv2.getSeed) + + assert(cv2.getEvaluator.isInstanceOf[BinaryClassificationEvaluator]) + val evaluator2 = cv2.getEvaluator.asInstanceOf[BinaryClassificationEvaluator] + assert(evaluator.uid === evaluator2.uid) + assert(evaluator.getMetricName === evaluator2.getMetricName) + + cv2.getEstimator match { + case ova2: OneVsRest => + assert(ova.uid === ova2.uid) + assert(ova.getClassifier.asInstanceOf[LogisticRegression].getMaxIter + === ova2.getClassifier.asInstanceOf[LogisticRegression].getMaxIter) + case other => + throw new AssertionError(s"Loaded CrossValidator expected estimator of type" + + s" OneVsRest but found ${other.getClass.getName}") + } + } + test("read/write: CrossValidator with complex estimator") { // workflow: CrossValidator[Pipeline[HashingTF, CrossValidator[LogisticRegression]]] val lrEvaluator = new BinaryClassificationEvaluator() diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index a34f930aa11c..520a6a4e0857 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.tuning import org.apache.spark.SparkFunSuite import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} +import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest} import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator} import org.apache.spark.ml.linalg.Vectors @@ -136,6 +136,29 @@ class TrainValidationSplitSuite assert(tvs.getSeed === tvs2.getSeed) } + test("read/write: TrainValidationSplit with nested estimator") { + val ova = new OneVsRest() + .setClassifier(new LogisticRegression) + val evaluator = new BinaryClassificationEvaluator() + .setMetricName("areaUnderPR") // not default metric + val classifier1 = new LogisticRegression().setRegParam(2.0) + val classifier2 = new LogisticRegression().setRegParam(3.0) + val paramMaps = new ParamGridBuilder() + .addGrid(ova.classifier, Array(classifier1, classifier2)) + .build() + val tvs = new TrainValidationSplit() + .setEstimator(ova) + .setEvaluator(evaluator) + .setTrainRatio(0.5) + .setEstimatorParamMaps(paramMaps) + .setSeed(42L) + + val tvs2 = testDefaultReadWrite(tvs, testParams = false) + + assert(tvs.getTrainRatio === tvs2.getTrainRatio) + assert(tvs.getSeed === tvs2.getSeed) + } + test("read/write: TrainValidationSplitModel") { val lr = new LogisticRegression() .setThreshold(0.6) From 76aece51709a51c66d840dab07c0755c3a038905 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 29 Jun 2017 14:34:56 -0700 Subject: [PATCH 02/12] Responded to first round of code review on Scala nested estimator persistence. Implemented python persistence for meta-algorithms. OneVsRest overrides necessary persistence functions. Code still has prints and comments that need to be cleaned up. --- .../spark/ml/tuning/ValidatorParams.scala | 7 +- .../spark/ml/tuning/CrossValidatorSuite.scala | 28 +- .../ml/tuning/TrainValidationSplitSuite.scala | 51 +++- python/pyspark/ml/classification.py | 143 +++++++--- python/pyspark/ml/common.py | 1 + python/pyspark/ml/tests.py | 151 +++++++++- python/pyspark/ml/tuning.py | 257 +++++++++++++++++- python/pyspark/ml/wrapper.py | 12 +- 8 files changed, 589 insertions(+), 61 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index 317ea84915fb..cab6f9215007 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -131,13 +131,16 @@ private[ml] object ValidatorParams { instance.getEstimatorParamMaps.map { case paramMap => paramMap.toSeq.map { case ParamPair(p, v) => v match { - case writeableObj: MLWritable => + case writeableObj: DefaultParamsWritable => + val paramPath = new Path(path, "epm_" + p.name + numParamsNotJson).toString numParamsNotJson += 1 - val paramPath = new Path(path, "param" + p.name + numParamsNotJson).toString writeableObj.save(paramPath) Map("parent" -> p.parent, "name" -> p.name, "value" -> compact(render(JString(paramPath))), "isJson" -> compact(render(JBool(false)))) + case _: MLWritable => + throw new NotImplementedError("ValidatorParams.saveImpl does not handle parameters " + + "of type: MLWritable that are not DefaultParamsWritable") case _ => Map("parent" -> p.parent, "name" -> p.name, "value" -> p.jsonEncode(v), "isJson" -> compact(render(JBool(true)))) diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 3efe97f5e3b4..2d74ff2e8ff8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.ml.{Estimator, Model, Pipeline} import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest} import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput -import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator} +import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, MulticlassClassificationEvaluator, RegressionEvaluator} import org.apache.spark.ml.feature.HashingTF import org.apache.spark.ml.linalg.{DenseMatrix, Vectors} import org.apache.spark.ml.param.{ParamMap, ParamPair} @@ -157,14 +157,12 @@ class CrossValidatorSuite } test("read/write: CrossValidator with nested estimator") { - val ova = new OneVsRest() - .setClassifier(new LogisticRegression) - val evaluator = new BinaryClassificationEvaluator() - .setMetricName("areaUnderPR") // not default metric - - + val ova = new OneVsRest().setClassifier(new LogisticRegression) + val evaluator = new MulticlassClassificationEvaluator() + .setMetricName("accuracy") val classifier1 = new LogisticRegression().setRegParam(2.0) val classifier2 = new LogisticRegression().setRegParam(3.0) + // params that are not JSON serializable must inherit from Params val paramMaps = new ParamGridBuilder() .addGrid(ova.classifier, Array(classifier1, classifier2)) .build() @@ -180,16 +178,24 @@ class CrossValidatorSuite assert(cv.getNumFolds === cv2.getNumFolds) assert(cv.getSeed === cv2.getSeed) - assert(cv2.getEvaluator.isInstanceOf[BinaryClassificationEvaluator]) - val evaluator2 = cv2.getEvaluator.asInstanceOf[BinaryClassificationEvaluator] + assert(cv2.getEvaluator.isInstanceOf[MulticlassClassificationEvaluator]) + val evaluator2 = cv2.getEvaluator.asInstanceOf[MulticlassClassificationEvaluator] assert(evaluator.uid === evaluator2.uid) assert(evaluator.getMetricName === evaluator2.getMetricName) cv2.getEstimator match { case ova2: OneVsRest => assert(ova.uid === ova2.uid) - assert(ova.getClassifier.asInstanceOf[LogisticRegression].getMaxIter - === ova2.getClassifier.asInstanceOf[LogisticRegression].getMaxIter) + val classifier = ova2.getClassifier + classifier match { + case lr: LogisticRegression => + assert(ova.getClassifier.asInstanceOf[LogisticRegression].getMaxIter + === lr.asInstanceOf[LogisticRegression].getMaxIter) + case _ => + throw new AssertionError(s"Loaded CrossValidator expected estimator of type" + + s" LogisticREgression but found ${classifier.getClass.getName}") + } + case other => throw new AssertionError(s"Loaded CrossValidator expected estimator of type" + s" OneVsRest but found ${other.getClass.getName}") diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index 520a6a4e0857..3a09396e79e7 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressio import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator} import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.{ParamMap, ParamPair} import org.apache.spark.ml.param.shared.HasInputCol import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} @@ -95,7 +95,7 @@ class TrainValidationSplitSuite } test("transformSchema should check estimatorParamMaps") { - import TrainValidationSplitSuite._ + import TrainValidationSplitSuite.{MyEstimator, MyEvaluator} val est = new MyEstimator("est") val eval = new MyEvaluator @@ -134,6 +134,18 @@ class TrainValidationSplitSuite assert(tvs.getTrainRatio === tvs2.getTrainRatio) assert(tvs.getSeed === tvs2.getSeed) + + TrainValidationSplitSuite + .compareParamMaps(tvs.getEstimatorParamMaps, tvs2.getEstimatorParamMaps) + + tvs2.getEstimator match { + case lr2: LogisticRegression => + assert(lr.uid === lr2.uid) + assert(lr.getMaxIter === lr2.getMaxIter) + case other => + throw new AssertionError(s"Loaded TrainValidationSplit expected estimator of type" + + s" LogisticRegression but found ${other.getClass.getName}") + } } test("read/write: TrainValidationSplit with nested estimator") { @@ -157,6 +169,24 @@ class TrainValidationSplitSuite assert(tvs.getTrainRatio === tvs2.getTrainRatio) assert(tvs.getSeed === tvs2.getSeed) + + tvs2.getEstimator match { + case ova2: OneVsRest => + assert(ova.uid === ova2.uid) + val classifier = ova2.getClassifier + classifier match { + case lr: LogisticRegression => + assert(ova.getClassifier.asInstanceOf[LogisticRegression].getMaxIter + === lr.asInstanceOf[LogisticRegression].getMaxIter) + case _ => + throw new AssertionError(s"Loaded TrainValidationSplit expected estimator of type" + + s" LogisticREgression but found ${classifier.getClass.getName}") + } + + case other => + throw new AssertionError(s"Loaded TrainValidationSplit expected estimator of type" + + s" OneVsRest but found ${other.getClass.getName}") + } } test("read/write: TrainValidationSplitModel") { @@ -183,8 +213,21 @@ class TrainValidationSplitSuite } } -object TrainValidationSplitSuite { - +object TrainValidationSplitSuite extends SparkFunSuite{ + /** + * Assert sequences of estimatorParamMaps are identical. + * Params must be simple types comparable with `===`. + */ + def compareParamMaps(pMaps: Array[ParamMap], pMaps2: Array[ParamMap]): Unit = { + assert(pMaps.length === pMaps2.length) + pMaps.zip(pMaps2).foreach { case (pMap, pMap2) => + assert(pMap.size === pMap2.size) + pMap.toSeq.foreach { case ParamPair(p, v) => + assert(pMap2.contains(p)) + assert(pMap2(p) === v) + } + } + } abstract class MyModel extends Model[MyModel] class MyEstimator(override val uid: String) extends Estimator[MyModel] with HasInputCol { diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 60bdeedd6a14..ae33dbe7a458 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -25,7 +25,7 @@ from pyspark.ml.util import * from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams from pyspark.ml.wrapper import JavaWrapper -from pyspark.ml.common import inherit_doc +from pyspark.ml.common import inherit_doc, _java2py, _py2java from pyspark.sql import DataFrame from pyspark.sql.functions import udf, when from pyspark.sql.types import ArrayType, DoubleType @@ -1468,7 +1468,7 @@ def getClassifier(self): @inherit_doc -class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable): +class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1519,8 +1519,24 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred classifier=None) """ super(OneVsRest, self).__init__() + + # self._java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", self.uid) + # print "JAVA OBJ:",self._java_obj + #self._setDefault(classifier=LogisticRegression()) + kwargs = self._input_kwargs self._set(**kwargs) + # if classifier is None: + # self.setClassifier(LogisticRegression()) + + + # super(LogisticRegression, self).__init__() + # self._java_obj = self._new_java_obj( + # "org.apache.spark.ml.classification.LogisticRegression", self.uid) + # self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto") + # kwargs = self._input_kwargs + # self.setParams(**kwargs) + # self._checkThresholdConsistency() @keyword_only @since("2.0.0") @@ -1567,6 +1583,9 @@ def trainSingleClass(index): multiclassLabeled.unpersist() return self._copyValues(OneVsRestModel(models=models)) + # ovr_model = OneVsRestModel() + # ovr_model.setModels(models=models) + # return self._copyValues(ovr_model) @since("2.0.0") def copy(self, extra=None): @@ -1585,21 +1604,21 @@ def copy(self, extra=None): newOvr.setClassifier(self.getClassifier().copy(extra)) return newOvr - @since("2.0.0") - def write(self): - """Returns an MLWriter instance for this ML instance.""" - return JavaMLWriter(self) + # @since("2.0.0") + # def write(self): + # """Returns an MLWriter instance for this ML instance.""" + # return JavaMLWriter(self) - @since("2.0.0") - def save(self, path): - """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - self.write().save(path) + # @since("2.0.0") + # def save(self, path): + # """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" + # self.write().save(path) - @classmethod - @since("2.0.0") - def read(cls): - """Returns an MLReader instance for this class.""" - return JavaMLReader(cls) + # @classmethod + # @since("2.0.0") + # def read(cls): + # """Returns an MLReader instance for this class.""" + # return JavaMLReader(cls) @classmethod def _from_java(cls, java_stage): @@ -1630,8 +1649,58 @@ def _to_java(self): _java_obj.setPredictionCol(self.getPredictionCol()) return _java_obj + def _make_java_param_pair(self, param, value): + """ + Makes a Java parm pair. + """ + sc = SparkContext._active_spark_context + param = self._resolveParam(param) + _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", + self.uid) + java_param = _java_obj.getParam(param.name) + if isinstance(value, Estimator) or isinstance(value, Model): + # used in the case of an estimator having another estimator as a parameter + # such as with the OneVsRest classifier + # the reason why this is not in _py2java in common.py is that importing Estimator and Model in common.py results + # in a circular import with inherit_doc + # print "IN THE ESTIMATOR CASE - estimator: {}, model: {}".format(isinstance(value, Estimator), isinstance(value, Model)) + java_value = value._to_java() + else: + java_value = _py2java(sc, value) + return java_param.w(java_value) + + def _transfer_param_map_to_java(self, pyParamMap): + """ + Transforms a Python ParamMap into a Java ParamMap. + """ + paramMap = JavaWrapper._new_java_obj("org.apache.spark.ml.param.ParamMap") + for param in self.params: + if param in pyParamMap: + pair = self._make_java_param_pair(param, pyParamMap[param]) + paramMap.put([pair]) + return paramMap + + def _transfer_param_map_from_java(self, javaParamMap): + """ + Transforms a Java ParamMap into a Python ParamMap. + """ + sc = SparkContext._active_spark_context + paramMap = dict() + for pair in javaParamMap.toList(): + param = pair.param() + if self.hasParam(str(param.name())): + if param.name() == "classifier": + paramMap[self.getParam(param.name())] = JavaParams._from_java(pair.value()) + # print "PAIR VALUE:",pair.value() + # print "AFTER FROM JAVA:",JavaParams._from_java(pair.value()) + else: + paramMap[self.getParam(param.name())] = _java2py(sc, pair.value()) + return paramMap + + + -class OneVsRestModel(Model, OneVsRestParams, MLReadable, MLWritable): +class OneVsRestModel(Model, OneVsRestParams, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1642,10 +1711,18 @@ class OneVsRestModel(Model, OneVsRestParams, MLReadable, MLWritable): .. versionadded:: 2.0.0 """ - - def __init__(self, models): + # this used to be __init__ ... can't do because JavaModel has an init that takes only 1 parameter + def __init__(self, models): # one fix was models=[] super(OneVsRestModel, self).__init__() self.models = models + java_models = [model._to_java() for model in self.models] + sc = SparkContext._active_spark_context + java_models_array = JavaWrapper._new_java_array( + java_models, sc._gateway.jvm.org.apache.spark.ml.classification.ClassificationModel) + metadata = JavaParams._new_java_obj("org.apache.spark.sql.types.Metadata") + self._java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRestModel", + self.uid, metadata.empty(), java_models_array) + def _transform(self, dataset): # determine the input columns: these need to be passed through @@ -1711,21 +1788,21 @@ def copy(self, extra=None): newModel.models = [model.copy(extra) for model in self.models] return newModel - @since("2.0.0") - def write(self): - """Returns an MLWriter instance for this ML instance.""" - return JavaMLWriter(self) - - @since("2.0.0") - def save(self, path): - """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - self.write().save(path) - - @classmethod - @since("2.0.0") - def read(cls): - """Returns an MLReader instance for this class.""" - return JavaMLReader(cls) + # @since("2.0.0") + # def write(self): + # """Returns an MLWriter instance for this ML instance.""" + # return JavaMLWriter(self) + + # @since("2.0.0") + # def save(self, path): + # """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" + # self.write().save(path) + + # @classmethod + # @since("2.0.0") + # def read(cls): + # """Returns an MLReader instance for this class.""" + # return JavaMLReader(cls) @classmethod def _from_java(cls, java_stage): diff --git a/python/pyspark/ml/common.py b/python/pyspark/ml/common.py index 387c5d7309de..af09204fd8ff 100644 --- a/python/pyspark/ml/common.py +++ b/python/pyspark/ml/common.py @@ -26,6 +26,7 @@ from py4j.java_collections import JavaArray, JavaList from pyspark import RDD, SparkContext +# from pyspark.ml import Estimator, Model from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.sql import DataFrame, SQLContext diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 17a39472e1fe..b0bc24a4e6a9 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -49,7 +49,7 @@ from pyspark.ml.classification import * from pyspark.ml.clustering import * from pyspark.ml.common import _java2py, _py2java -from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator +from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator from pyspark.ml.feature import * from pyspark.ml.fpm import FPGrowth, FPGrowthModel from pyspark.ml.linalg import DenseMatrix, DenseMatrix, DenseVector, Matrices, MatrixUDT, \ @@ -657,7 +657,7 @@ def test_fit_maximize_metric(self): "Best model should have zero induced error") self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") - def test_save_load(self): + def test_save_load_trained_model(self): # This tests saving and loading the trained model only. # Save/load for CrossValidator will be added later: SPARK-13786 temp_path = tempfile.mkdtemp() @@ -681,6 +681,77 @@ def test_save_load(self): self.assertEqual(loadedLrModel.uid, lrModel.uid) self.assertEqual(loadedLrModel.intercept, lrModel.intercept) + def test_save_load_simple_estimator(self): + temp_path = tempfile.mkdtemp() + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + + lr = LogisticRegression() + grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + evaluator = BinaryClassificationEvaluator() + + # test save/load of CrossValidator + cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + cvModel = cv.fit(dataset) + cvPath = temp_path + "/cv" + cv.save(cvPath) + loadedCV = CrossValidator.load(cvPath) + self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid) + self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid) + self.assertEqual(loadedCV.getEstimatorParamMaps(), cv.getEstimatorParamMaps()) + + # test save/load of CrossValidatorModel + cvModelPath = temp_path + "/cvModel" + cvModel.save(cvModelPath) + loadedModel = CrossValidatorModel.load(cvModelPath) + self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) + + def test_save_load_nested_stimator(self): + temp_path = tempfile.mkdtemp() + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + + ova = OneVsRest(classifier=LogisticRegression()) + lr1 = LogisticRegression().setMaxIter(100) + lr2 = LogisticRegression().setMaxIter(150) + grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build() + evaluator = MulticlassClassificationEvaluator() + + # test save/load of CrossValidator + cv = CrossValidator(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator) + cvModel = cv.fit(dataset) + cvPath = temp_path + "/cv" + cv.save(cvPath) + loadedCV = CrossValidator.load(cvPath) + self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid) + self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid) + + originalParamMap = cv.getEstimatorParamMaps() + loadedParamMap = loadedCV.getEstimatorParamMaps() + for i,param in enumerate(loadedParamMap): + # print "PARAM:",param + for p in param: + if p.name == "classifier": + self.assertEqual(param[p].uid, originalParamMap[i][p].uid) + else: + self.assertEqual(param[p], originalParamMap[i][p]) + + # test save/load of CrossValidatorModel + cvModelPath = temp_path + "/cvModel" + cvModel.save(cvModelPath) + loadedModel = CrossValidatorModel.load(cvModelPath) + self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) + class TrainValidationSplitTests(SparkSessionTestCase): @@ -738,7 +809,7 @@ def test_fit_maximize_metric(self): "validationMetrics has the same size of grid parameter") self.assertEqual(1.0, max(validationMetrics)) - def test_save_load(self): + def test_save_load_trained_model(self): # This tests saving and loading the trained model only. # Save/load for TrainValidationSplit will be added later: SPARK-13786 temp_path = tempfile.mkdtemp() @@ -762,6 +833,74 @@ def test_save_load(self): self.assertEqual(loadedLrModel.uid, lrModel.uid) self.assertEqual(loadedLrModel.intercept, lrModel.intercept) + def test_save_load_simple_estimator(self): + # This tests saving and loading the trained model only. + # Save/load for TrainValidationSplit will be added later: SPARK-13786 + temp_path = tempfile.mkdtemp() + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + lr = LogisticRegression() + grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + evaluator = BinaryClassificationEvaluator() + tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + tvsModel = tvs.fit(dataset) + + tvsPath = temp_path + "/tvs" + tvs.save(tvsPath) + loadedTvs = TrainValidationSplit.load(tvsPath) + self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid) + self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid) + self.assertEqual(loadedTvs.getEstimatorParamMaps(), tvs.getEstimatorParamMaps()) + + tvsModelPath = temp_path + "/tvsModel" + tvsModel.save(tvsModelPath) + loadedModel = TrainValidationSplitModel.load(tvsModelPath) + self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) + + def test_save_load_nested_estimator(self): + # This tests saving and loading the trained model only. + # Save/load for TrainValidationSplit will be added later: SPARK-13786 + temp_path = tempfile.mkdtemp() + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + ova = OneVsRest(classifier=LogisticRegression()) + lr1 = LogisticRegression().setMaxIter(100) + lr2 = LogisticRegression().setMaxIter(150) + grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build() + evaluator = MulticlassClassificationEvaluator() + + tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator) + tvsModel = tvs.fit(dataset) + tvsPath = temp_path + "/tvs" + tvs.save(tvsPath) + loadedTvs = TrainValidationSplit.load(tvsPath) + self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid) + self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid) + + originalParamMap = tvs.getEstimatorParamMaps() + loadedParamMap = loadedTvs.getEstimatorParamMaps() + for i,param in enumerate(loadedParamMap): + for p in param: + if p.name == "classifier": + self.assertEqual(param[p].uid, originalParamMap[i][p].uid) + else: + self.assertEqual(param[p], originalParamMap[i][p]) + + tvsModelPath = temp_path + "/tvsModel" + tvsModel.save(tvsModelPath) + loadedModel = TrainValidationSplitModel.load(tvsModelPath) + self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) + def test_copy(self): dataset = self.spark.createDataFrame([ (10, 10.0), @@ -1349,8 +1488,8 @@ def check_params(self, py_stage): py_has_default = py_stage.hasDefault(p) java_has_default = java_stage.hasDefault(java_param) self.assertEqual(py_has_default, java_has_default, - "Default value mismatch of param %s for Params %s" - % (p.name, str(py_stage))) + "Default value mismatch of param %s for Params %s, py has default: %s java has default: %s" + % (p.name, str(py_stage), py_has_default, java_has_default)) if py_has_default: if p.name == "seed": return # Random seeds between Spark and PySpark are different @@ -1358,6 +1497,8 @@ def check_params(self, py_stage): _java2py(self.sc, java_stage.clear(java_param).getOrDefault(java_param)) py_stage._clear(p) py_default = py_stage.getOrDefault(p) + if p.name == "classifier": + print "py default: {} java default: {}".format(py_default, java_default) if isinstance(py_stage, pyspark.ml.feature.Imputer) and p.name == "missingValue": # SPARK-15040 - default value for Imputer param 'missingValue' is NaN, # and NaN != NaN, so handle it specially here diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index b64858214d20..013b7b5b2f72 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -20,8 +20,11 @@ from pyspark import since, keyword_only from pyspark.ml import Estimator, Model +from pyspark.ml.common import _py2java from pyspark.ml.param import Params, Param, TypeConverters from pyspark.ml.param.shared import HasSeed +from pyspark.ml.util import * +from pyspark.ml.wrapper import JavaParams from pyspark.sql.functions import rand __all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel', 'TrainValidationSplit', @@ -137,8 +140,43 @@ def getEvaluator(self): """ return self.getOrDefault(self.evaluator) - -class CrossValidator(Estimator, ValidatorParams): + def getEvaluator(self): + """ + Gets the value of evaluator or its default value. + """ + return self.getOrDefault(self.evaluator) + + @classmethod + def _from_java_impl(cls, java_stage): + """ + Return Python estimator, estimatorParamMaps, and evaluator from a Java ValidatorParams. + """ + + # Load information from java_stage to the instance. + estimator = JavaParams._from_java(java_stage.getEstimator()) + evaluator = JavaParams._from_java(java_stage.getEvaluator()) + epms = [estimator._transfer_param_map_from_java(epm) + for epm in java_stage.getEstimatorParamMaps()] + return estimator, epms, evaluator + + def _to_java_impl(self): + """ + Return Java estimator, estimatorParamMaps, and evaluator from this Python instance. + """ + + gateway = SparkContext._gateway + cls = SparkContext._jvm.org.apache.spark.ml.param.ParamMap + + java_epms = gateway.new_array(cls, len(self.getEstimatorParamMaps())) + for idx, epm in enumerate(self.getEstimatorParamMaps()): + java_epms[idx] = self.getEstimator()._transfer_param_map_to_java(epm) + + java_estimator = self.getEstimator()._to_java() + java_evaluator = self.getEvaluator()._to_java() + return java_estimator, java_epms, java_evaluator + + +class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): """ K-fold cross validation performs model selection by splitting the dataset into a set of @@ -263,8 +301,60 @@ def copy(self, extra=None): newCV.setEvaluator(self.getEvaluator().copy(extra)) return newCV + @since("2.2.0") + def write(self): + """Returns an MLWriter instance for this ML instance.""" + return JavaMLWriter(self) + + @since("2.2.0") + def save(self, path): + """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" + self.write().save(path) + + @classmethod + @since("2.2.0") + def read(cls): + """Returns an MLReader instance for this class.""" + return JavaMLReader(cls) -class CrossValidatorModel(Model, ValidatorParams): + @classmethod + @since("2.2.0") + def _from_java(cls, java_stage): + """ + Given a Java CrossValidator, create and return a Python wrapper of it. + Used for ML persistence. + """ + + estimator, epms, evaluator = super(CrossValidator, cls)._from_java_impl(java_stage) + numFolds = java_stage.getNumFolds() + seed = java_stage.getSeed() + # Create a new instance of this stage. + py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, + numFolds=numFolds, seed=seed) + py_stage._resetUid(java_stage.uid()) + return py_stage + + @since("2.2.0") + def _to_java(self): + """ + Transfer this instance to a Java CrossValidator. Used for ML persistence. + + :return: Java object equivalent to this instance. + """ + + estimator, epms, evaluator = super(CrossValidator, self)._to_java_impl() + + _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.CrossValidator", self.uid) + _java_obj.setEstimatorParamMaps(epms) + _java_obj.setEvaluator(evaluator) + _java_obj.setEstimator(estimator) + _java_obj.setSeed(self.getSeed()) + _java_obj.setNumFolds(self.getNumFolds()) + + return _java_obj + + +class CrossValidatorModel(Model, ValidatorParams, MLReadable, MLWritable): """ CrossValidatorModel contains the model with the highest average cross-validation @@ -302,8 +392,61 @@ def copy(self, extra=None): avgMetrics = self.avgMetrics return CrossValidatorModel(bestModel, avgMetrics) + @since("2.2.0") + def write(self): + """Returns an MLWriter instance for this ML instance.""" + return JavaMLWriter(self) + + @since("2.2.0") + def save(self, path): + """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" + self.write().save(path) -class TrainValidationSplit(Estimator, ValidatorParams): + @classmethod + @since("2.2.0") + def read(cls): + """Returns an MLReader instance for this class.""" + return JavaMLReader(cls) + + @classmethod + @since("2.2.0") + def _from_java(cls, java_stage): + """ + Given a Java CrossValidatorModel, create and return a Python wrapper of it. + Used for ML persistence. + """ + + bestModel = JavaParams._from_java(java_stage.bestModel()) + # avgMetrics = JavaParams._from_java(java_stage.avgMetrics()) + estimator, epms, evaluator = super(CrossValidatorModel, cls)._from_java_impl(java_stage) + + py_stage = cls(bestModel=bestModel).setEstimator(estimator).setEstimatorParamMaps(epms).setEvaluator(evaluator) + py_stage._resetUid(java_stage.uid()) + return py_stage + + @since("2.2.0") + def _to_java(self): + """ + Transfer this instance to a Java CrossValidatorModel. Used for ML persistence. + + :return: Java object equivalent to this instance. + """ + + sc = SparkContext._active_spark_context + + _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.CrossValidatorModel", + self.uid, + self.bestModel._to_java(), + _py2java(sc, self.avgMetrics)) + estimator, epms, evaluator = super(CrossValidatorModel, self)._to_java_impl() + + _java_obj.set("evaluator", evaluator) + _java_obj.set("estimator", estimator) + _java_obj.set("estimatorParamMaps", epms) + return _java_obj + + +class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable): """ .. note:: Experimental @@ -418,8 +561,60 @@ def copy(self, extra=None): newTVS.setEvaluator(self.getEvaluator().copy(extra)) return newTVS + @since("2.2.0") + def write(self): + """Returns an MLWriter instance for this ML instance.""" + return JavaMLWriter(self) -class TrainValidationSplitModel(Model, ValidatorParams): + @since("2.2.0") + def save(self, path): + """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" + self.write().save(path) + + @classmethod + @since("2.2.0") + def read(cls): + """Returns an MLReader instance for this class.""" + return JavaMLReader(cls) + + @classmethod + def _from_java(cls, java_stage): + """ + Given a Java TrainValidationSplit, create and return a Python wrapper of it. + Used for ML persistence. + """ + + estimator, epms, evaluator = super(TrainValidationSplit, cls)._from_java_impl(java_stage) + trainRatio = java_stage.getTrainRatio() + seed = java_stage.getSeed() + # Create a new instance of this stage. + py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, + trainRatio=trainRatio, seed=seed) + py_stage._resetUid(java_stage.uid()) + return py_stage + + def _to_java(self): + """ + Transfer this instance to a Java TrainValidationSplit. Used for ML persistence. + :return: Java object equivalent to this instance. + """ + + estimator, epms, evaluator = super(TrainValidationSplit, self)._to_java_impl() + + _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.TrainValidationSplit", + self.uid) + _java_obj.setEstimatorParamMaps(epms) + _java_obj.setEvaluator(evaluator) + _java_obj.setEstimator(estimator) + _java_obj.setTrainRatio(self.getTrainRatio()) + _java_obj.setSeed(self.getSeed()) + + return _java_obj + + + + +class TrainValidationSplitModel(Model, ValidatorParams, MLReadable, MLWritable): """ .. note:: Experimental @@ -456,6 +651,58 @@ def copy(self, extra=None): validationMetrics = list(self.validationMetrics) return TrainValidationSplitModel(bestModel, validationMetrics) + @since("2.0.0") + def write(self): + """Returns an MLWriter instance for this ML instance.""" + return JavaMLWriter(self) + + @since("2.0.0") + def save(self, path): + """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" + self.write().save(path) + + @classmethod + @since("2.0.0") + def read(cls): + """Returns an MLReader instance for this class.""" + return JavaMLReader(cls) + + @classmethod + def _from_java(cls, java_stage): + """ + Given a Java TrainValidationSplitModel, create and return a Python wrapper of it. + Used for ML persistence. + """ + + # Load information from java_stage to the instance. + bestModel = JavaParams._from_java(java_stage.bestModel()) + estimator, epms, evaluator = super(TrainValidationSplitModel, cls)._from_java_impl(java_stage) + # Create a new instance of this stage. + py_stage = cls(bestModel=bestModel).setEstimator(estimator).setEstimatorParamMaps(epms).setEvaluator(evaluator) + py_stage._resetUid(java_stage.uid()) + return py_stage + + def _to_java(self): + """ + Transfer this instance to a Java TrainValidationSplitModel. Used for ML persistence. + :return: Java object equivalent to this instance. + """ + + sc = SparkContext._active_spark_context + + _java_obj = JavaParams._new_java_obj( + "org.apache.spark.ml.tuning.TrainValidationSplitModel", + self.uid, + self.bestModel._to_java(), + _py2java(sc, self.validationMetrics)) + estimator, epms, evaluator = super(TrainValidationSplitModel, self)._to_java_impl() + + _java_obj.set("evaluator", evaluator) + _java_obj.set("estimator", estimator) + _java_obj.set("estimatorParamMaps", epms) + return _java_obj + + if __name__ == "__main__": import doctest diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 80a0b31cd88d..56c142d89055 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -111,7 +111,15 @@ def _make_java_param_pair(self, param, value): sc = SparkContext._active_spark_context param = self._resolveParam(param) java_param = self._java_obj.getParam(param.name) - java_value = _py2java(sc, value) + if isinstance(value, Estimator) or isinstance(value, Model): + # used in the case of an estimator having another estimator as a parameter + # such as with the OneVsRest classifier + # the reason why this is not in _py2java in common.py is that importing Estimator and Model in common.py results + # in a circular import with inherit_doc + # print "IN THE ESTIMATOR CASE - estimator: {}, model: {}".format(isinstance(value, Estimator), isinstance(value, Model)) + java_value = value._to_java() + else: + java_value = _py2java(sc, value) return java_param.w(java_value) def _transfer_params_to_java(self): @@ -144,7 +152,9 @@ def _transfer_params_from_java(self): if self._java_obj.hasParam(param.name): java_param = self._java_obj.getParam(param.name) # SPARK-14931: Only check set params back to avoid default params mismatch. + # print "PARAM NAME IN _transfer_params_from_java:",param.name if self._java_obj.isSet(java_param): + # print "Entered if with param name:",param.name value = _java2py(sc, self._java_obj.getOrDefault(java_param)) self._set(**{param.name: value}) From a0dbf6cec804f0fd14bb31a935a8cddbb182bf5a Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 29 Jun 2017 15:36:52 -0700 Subject: [PATCH 03/12] Cleaned up python meta algorithm persistence code. CrossValidation and TrainValidationSplit now persist estimators in both Scala and Python. --- python/pyspark/ml/classification.py | 78 ++++------------------- python/pyspark/ml/common.py | 1 - python/pyspark/ml/tests.py | 22 +++---- python/pyspark/ml/tuning.py | 95 +++++++++++++++-------------- python/pyspark/ml/wrapper.py | 7 +-- 5 files changed, 74 insertions(+), 129 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index ae33dbe7a458..f278b7f374dc 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1519,24 +1519,8 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred classifier=None) """ super(OneVsRest, self).__init__() - - # self._java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", self.uid) - # print "JAVA OBJ:",self._java_obj - #self._setDefault(classifier=LogisticRegression()) - kwargs = self._input_kwargs self._set(**kwargs) - # if classifier is None: - # self.setClassifier(LogisticRegression()) - - - # super(LogisticRegression, self).__init__() - # self._java_obj = self._new_java_obj( - # "org.apache.spark.ml.classification.LogisticRegression", self.uid) - # self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto") - # kwargs = self._input_kwargs - # self.setParams(**kwargs) - # self._checkThresholdConsistency() @keyword_only @since("2.0.0") @@ -1583,9 +1567,6 @@ def trainSingleClass(index): multiclassLabeled.unpersist() return self._copyValues(OneVsRestModel(models=models)) - # ovr_model = OneVsRestModel() - # ovr_model.setModels(models=models) - # return self._copyValues(ovr_model) @since("2.0.0") def copy(self, extra=None): @@ -1604,22 +1585,6 @@ def copy(self, extra=None): newOvr.setClassifier(self.getClassifier().copy(extra)) return newOvr - # @since("2.0.0") - # def write(self): - # """Returns an MLWriter instance for this ML instance.""" - # return JavaMLWriter(self) - - # @since("2.0.0") - # def save(self, path): - # """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - # self.write().save(path) - - # @classmethod - # @since("2.0.0") - # def read(cls): - # """Returns an MLReader instance for this class.""" - # return JavaMLReader(cls) - @classmethod def _from_java(cls, java_stage): """ @@ -1656,14 +1621,12 @@ def _make_java_param_pair(self, param, value): sc = SparkContext._active_spark_context param = self._resolveParam(param) _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", - self.uid) + self.uid) java_param = _java_obj.getParam(param.name) if isinstance(value, Estimator) or isinstance(value, Model): # used in the case of an estimator having another estimator as a parameter - # such as with the OneVsRest classifier - # the reason why this is not in _py2java in common.py is that importing Estimator and Model in common.py results - # in a circular import with inherit_doc - # print "IN THE ESTIMATOR CASE - estimator: {}, model: {}".format(isinstance(value, Estimator), isinstance(value, Model)) + # the reason why this is not in _py2java in common.py is that importing + # Estimator and Model in common.py results in a circular import with inherit_doc java_value = value._to_java() else: java_value = _py2java(sc, value) @@ -1691,15 +1654,11 @@ def _transfer_param_map_from_java(self, javaParamMap): if self.hasParam(str(param.name())): if param.name() == "classifier": paramMap[self.getParam(param.name())] = JavaParams._from_java(pair.value()) - # print "PAIR VALUE:",pair.value() - # print "AFTER FROM JAVA:",JavaParams._from_java(pair.value()) else: paramMap[self.getParam(param.name())] = _java2py(sc, pair.value()) return paramMap - - class OneVsRestModel(Model, OneVsRestParams, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1711,18 +1670,19 @@ class OneVsRestModel(Model, OneVsRestParams, JavaMLReadable, JavaMLWritable): .. versionadded:: 2.0.0 """ - # this used to be __init__ ... can't do because JavaModel has an init that takes only 1 parameter - def __init__(self, models): # one fix was models=[] + + def __init__(self, models): super(OneVsRestModel, self).__init__() self.models = models java_models = [model._to_java() for model in self.models] sc = SparkContext._active_spark_context - java_models_array = JavaWrapper._new_java_array( - java_models, sc._gateway.jvm.org.apache.spark.ml.classification.ClassificationModel) + java_models_array = JavaWrapper._new_java_array(java_models, + sc._gateway.jvm.org.apache.spark.ml + .classification.ClassificationModel) metadata = JavaParams._new_java_obj("org.apache.spark.sql.types.Metadata") - self._java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRestModel", - self.uid, metadata.empty(), java_models_array) - + self._java_obj = \ + JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRestModel", + self.uid, metadata.empty(), java_models_array) def _transform(self, dataset): # determine the input columns: these need to be passed through @@ -1788,22 +1748,6 @@ def copy(self, extra=None): newModel.models = [model.copy(extra) for model in self.models] return newModel - # @since("2.0.0") - # def write(self): - # """Returns an MLWriter instance for this ML instance.""" - # return JavaMLWriter(self) - - # @since("2.0.0") - # def save(self, path): - # """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - # self.write().save(path) - - # @classmethod - # @since("2.0.0") - # def read(cls): - # """Returns an MLReader instance for this class.""" - # return JavaMLReader(cls) - @classmethod def _from_java(cls, java_stage): """ diff --git a/python/pyspark/ml/common.py b/python/pyspark/ml/common.py index af09204fd8ff..387c5d7309de 100644 --- a/python/pyspark/ml/common.py +++ b/python/pyspark/ml/common.py @@ -26,7 +26,6 @@ from py4j.java_collections import JavaArray, JavaList from pyspark import RDD, SparkContext -# from pyspark.ml import Estimator, Model from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.sql import DataFrame, SQLContext diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index b0bc24a4e6a9..b7fd9e377416 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -49,7 +49,8 @@ from pyspark.ml.classification import * from pyspark.ml.clustering import * from pyspark.ml.common import _java2py, _py2java -from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator +from pyspark.ml.evaluation import BinaryClassificationEvaluator, \ + MulticlassClassificationEvaluator, RegressionEvaluator from pyspark.ml.feature import * from pyspark.ml.fpm import FPGrowth, FPGrowthModel from pyspark.ml.linalg import DenseMatrix, DenseMatrix, DenseVector, Matrices, MatrixUDT, \ @@ -694,7 +695,7 @@ def test_save_load_simple_estimator(self): lr = LogisticRegression() grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() evaluator = BinaryClassificationEvaluator() - + # test save/load of CrossValidator cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) @@ -720,13 +721,13 @@ def test_save_load_nested_stimator(self): (Vectors.dense([0.6]), 1.0), (Vectors.dense([1.0]), 1.0)] * 10, ["features", "label"]) - + ova = OneVsRest(classifier=LogisticRegression()) lr1 = LogisticRegression().setMaxIter(100) lr2 = LogisticRegression().setMaxIter(150) grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build() evaluator = MulticlassClassificationEvaluator() - + # test save/load of CrossValidator cv = CrossValidator(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) @@ -738,8 +739,7 @@ def test_save_load_nested_stimator(self): originalParamMap = cv.getEstimatorParamMaps() loadedParamMap = loadedCV.getEstimatorParamMaps() - for i,param in enumerate(loadedParamMap): - # print "PARAM:",param + for i, param in enumerate(loadedParamMap): for p in param: if p.name == "classifier": self.assertEqual(param[p].uid, originalParamMap[i][p].uid) @@ -878,7 +878,7 @@ def test_save_load_nested_estimator(self): lr2 = LogisticRegression().setMaxIter(150) grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build() evaluator = MulticlassClassificationEvaluator() - + tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator) tvsModel = tvs.fit(dataset) tvsPath = temp_path + "/tvs" @@ -886,10 +886,10 @@ def test_save_load_nested_estimator(self): loadedTvs = TrainValidationSplit.load(tvsPath) self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid) self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid) - + originalParamMap = tvs.getEstimatorParamMaps() loadedParamMap = loadedTvs.getEstimatorParamMaps() - for i,param in enumerate(loadedParamMap): + for i, param in enumerate(loadedParamMap): for p in param: if p.name == "classifier": self.assertEqual(param[p].uid, originalParamMap[i][p].uid) @@ -1488,8 +1488,8 @@ def check_params(self, py_stage): py_has_default = py_stage.hasDefault(p) java_has_default = java_stage.hasDefault(java_param) self.assertEqual(py_has_default, java_has_default, - "Default value mismatch of param %s for Params %s, py has default: %s java has default: %s" - % (p.name, str(py_stage), py_has_default, java_has_default)) + "Default value mismatch of param %s for Params %s" + % (p.name, str(py_stage))) if py_has_default: if p.name == "seed": return # Random seeds between Spark and PySpark are different diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 013b7b5b2f72..d1e54dd4abd7 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -141,36 +141,36 @@ def getEvaluator(self): return self.getOrDefault(self.evaluator) def getEvaluator(self): - """ - Gets the value of evaluator or its default value. - """ - return self.getOrDefault(self.evaluator) - + """ + Gets the value of evaluator or its default value. + """ + return self.getOrDefault(self.evaluator) + @classmethod def _from_java_impl(cls, java_stage): """ Return Python estimator, estimatorParamMaps, and evaluator from a Java ValidatorParams. """ - + # Load information from java_stage to the instance. estimator = JavaParams._from_java(java_stage.getEstimator()) evaluator = JavaParams._from_java(java_stage.getEvaluator()) epms = [estimator._transfer_param_map_from_java(epm) for epm in java_stage.getEstimatorParamMaps()] return estimator, epms, evaluator - + def _to_java_impl(self): """ Return Java estimator, estimatorParamMaps, and evaluator from this Python instance. """ - + gateway = SparkContext._gateway cls = SparkContext._jvm.org.apache.spark.ml.param.ParamMap - + java_epms = gateway.new_array(cls, len(self.getEstimatorParamMaps())) for idx, epm in enumerate(self.getEstimatorParamMaps()): java_epms[idx] = self.getEstimator()._transfer_param_map_to_java(epm) - + java_estimator = self.getEstimator()._to_java() java_evaluator = self.getEvaluator()._to_java() return java_estimator, java_epms, java_evaluator @@ -301,47 +301,47 @@ def copy(self, extra=None): newCV.setEvaluator(self.getEvaluator().copy(extra)) return newCV - @since("2.2.0") + @since("2.3.0") def write(self): """Returns an MLWriter instance for this ML instance.""" return JavaMLWriter(self) - @since("2.2.0") + @since("2.3.0") def save(self, path): """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" self.write().save(path) @classmethod - @since("2.2.0") + @since("2.3.0") def read(cls): """Returns an MLReader instance for this class.""" return JavaMLReader(cls) @classmethod - @since("2.2.0") + @since("2.3.0") def _from_java(cls, java_stage): """ Given a Java CrossValidator, create and return a Python wrapper of it. Used for ML persistence. """ - + estimator, epms, evaluator = super(CrossValidator, cls)._from_java_impl(java_stage) numFolds = java_stage.getNumFolds() seed = java_stage.getSeed() # Create a new instance of this stage. py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, - numFolds=numFolds, seed=seed) + numFolds=numFolds, seed=seed) py_stage._resetUid(java_stage.uid()) return py_stage - @since("2.2.0") + @since("2.3.0") def _to_java(self): """ Transfer this instance to a Java CrossValidator. Used for ML persistence. :return: Java object equivalent to this instance. """ - + estimator, epms, evaluator = super(CrossValidator, self)._to_java_impl() _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.CrossValidator", self.uid) @@ -392,24 +392,24 @@ def copy(self, extra=None): avgMetrics = self.avgMetrics return CrossValidatorModel(bestModel, avgMetrics) - @since("2.2.0") + @since("2.3.0") def write(self): """Returns an MLWriter instance for this ML instance.""" return JavaMLWriter(self) - @since("2.2.0") + @since("2.3.0") def save(self, path): """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" self.write().save(path) @classmethod - @since("2.2.0") + @since("2.3.0") def read(cls): """Returns an MLReader instance for this class.""" return JavaMLReader(cls) @classmethod - @since("2.2.0") + @since("2.3.0") def _from_java(cls, java_stage): """ Given a Java CrossValidatorModel, create and return a Python wrapper of it. @@ -417,27 +417,28 @@ def _from_java(cls, java_stage): """ bestModel = JavaParams._from_java(java_stage.bestModel()) - # avgMetrics = JavaParams._from_java(java_stage.avgMetrics()) estimator, epms, evaluator = super(CrossValidatorModel, cls)._from_java_impl(java_stage) - - py_stage = cls(bestModel=bestModel).setEstimator(estimator).setEstimatorParamMaps(epms).setEvaluator(evaluator) + + py_stage = cls(bestModel=bestModel).setEstimator(estimator) + py_stage = py_stage.setEstimatorParamMaps(epms).setEvaluator(evaluator) + py_stage._resetUid(java_stage.uid()) return py_stage - @since("2.2.0") + @since("2.3.0") def _to_java(self): """ Transfer this instance to a Java CrossValidatorModel. Used for ML persistence. :return: Java object equivalent to this instance. """ - - sc = SparkContext._active_spark_context + sc = SparkContext._active_spark_context + # TODO: persist average metrics as well _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.CrossValidatorModel", - self.uid, - self.bestModel._to_java(), - _py2java(sc, self.avgMetrics)) + self.uid, + self.bestModel._to_java(), + _py2java(sc, [])) estimator, epms, evaluator = super(CrossValidatorModel, self)._to_java_impl() _java_obj.set("evaluator", evaluator) @@ -561,23 +562,24 @@ def copy(self, extra=None): newTVS.setEvaluator(self.getEvaluator().copy(extra)) return newTVS - @since("2.2.0") + @since("2.3.0") def write(self): """Returns an MLWriter instance for this ML instance.""" return JavaMLWriter(self) - @since("2.2.0") + @since("2.3.0") def save(self, path): """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" self.write().save(path) @classmethod - @since("2.2.0") + @since("2.3.0") def read(cls): """Returns an MLReader instance for this class.""" return JavaMLReader(cls) @classmethod + @since("2.3.0") def _from_java(cls, java_stage): """ Given a Java TrainValidationSplit, create and return a Python wrapper of it. @@ -593,6 +595,7 @@ def _from_java(cls, java_stage): py_stage._resetUid(java_stage.uid()) return py_stage + @since("2.3.0") def _to_java(self): """ Transfer this instance to a Java TrainValidationSplit. Used for ML persistence. @@ -602,7 +605,7 @@ def _to_java(self): estimator, epms, evaluator = super(TrainValidationSplit, self)._to_java_impl() _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.tuning.TrainValidationSplit", - self.uid) + self.uid) _java_obj.setEstimatorParamMaps(epms) _java_obj.setEvaluator(evaluator) _java_obj.setEstimator(estimator) @@ -612,8 +615,6 @@ def _to_java(self): return _java_obj - - class TrainValidationSplitModel(Model, ValidatorParams, MLReadable, MLWritable): """ .. note:: Experimental @@ -651,23 +652,24 @@ def copy(self, extra=None): validationMetrics = list(self.validationMetrics) return TrainValidationSplitModel(bestModel, validationMetrics) - @since("2.0.0") + @since("2.3.0") def write(self): """Returns an MLWriter instance for this ML instance.""" return JavaMLWriter(self) - @since("2.0.0") + @since("2.3.0") def save(self, path): """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" self.write().save(path) @classmethod - @since("2.0.0") + @since("2.3.0") def read(cls): """Returns an MLReader instance for this class.""" return JavaMLReader(cls) @classmethod + @since("2.3.0") def _from_java(cls, java_stage): """ Given a Java TrainValidationSplitModel, create and return a Python wrapper of it. @@ -676,12 +678,16 @@ def _from_java(cls, java_stage): # Load information from java_stage to the instance. bestModel = JavaParams._from_java(java_stage.bestModel()) - estimator, epms, evaluator = super(TrainValidationSplitModel, cls)._from_java_impl(java_stage) + estimator, epms, evaluator = super(TrainValidationSplitModel, + cls)._from_java_impl(java_stage) # Create a new instance of this stage. - py_stage = cls(bestModel=bestModel).setEstimator(estimator).setEstimatorParamMaps(epms).setEvaluator(evaluator) + py_stage = cls(bestModel=bestModel).setEstimator(estimator) + py_stage = py_stage.setEstimatorParamMaps(epms).setEvaluator(evaluator) + py_stage._resetUid(java_stage.uid()) return py_stage + @since("2.3.0") def _to_java(self): """ Transfer this instance to a Java TrainValidationSplitModel. Used for ML persistence. @@ -689,12 +695,12 @@ def _to_java(self): """ sc = SparkContext._active_spark_context - + # TODO: persst validation metrics as well _java_obj = JavaParams._new_java_obj( "org.apache.spark.ml.tuning.TrainValidationSplitModel", self.uid, self.bestModel._to_java(), - _py2java(sc, self.validationMetrics)) + _py2java(sc, [])) estimator, epms, evaluator = super(TrainValidationSplitModel, self)._to_java_impl() _java_obj.set("evaluator", evaluator) @@ -703,7 +709,6 @@ def _to_java(self): return _java_obj - if __name__ == "__main__": import doctest diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 56c142d89055..7f739fbc3c23 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -114,9 +114,8 @@ def _make_java_param_pair(self, param, value): if isinstance(value, Estimator) or isinstance(value, Model): # used in the case of an estimator having another estimator as a parameter # such as with the OneVsRest classifier - # the reason why this is not in _py2java in common.py is that importing Estimator and Model in common.py results - # in a circular import with inherit_doc - # print "IN THE ESTIMATOR CASE - estimator: {}, model: {}".format(isinstance(value, Estimator), isinstance(value, Model)) + # the reason why this is not in _py2java in common.py is that importing + # Estimator and Model in common.py results in a circular import with inherit_doc java_value = value._to_java() else: java_value = _py2java(sc, value) @@ -152,9 +151,7 @@ def _transfer_params_from_java(self): if self._java_obj.hasParam(param.name): java_param = self._java_obj.getParam(param.name) # SPARK-14931: Only check set params back to avoid default params mismatch. - # print "PARAM NAME IN _transfer_params_from_java:",param.name if self._java_obj.isSet(java_param): - # print "Entered if with param name:",param.name value = _java2py(sc, self._java_obj.getOrDefault(java_param)) self._set(**{param.name: value}) From 253f39e26bed2e4271ef027e4315e6d11b693ee5 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 29 Jun 2017 15:45:05 -0700 Subject: [PATCH 04/12] Small style fix. --- python/pyspark/ml/tests.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index b7fd9e377416..5d2ea99ab469 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1497,8 +1497,6 @@ def check_params(self, py_stage): _java2py(self.sc, java_stage.clear(java_param).getOrDefault(java_param)) py_stage._clear(p) py_default = py_stage.getOrDefault(p) - if p.name == "classifier": - print "py default: {} java default: {}".format(py_default, java_default) if isinstance(py_stage, pyspark.ml.feature.Imputer) and p.name == "missingValue": # SPARK-15040 - default value for Imputer param 'missingValue' is NaN, # and NaN != NaN, so handle it specially here From 89be87e7e0892ef5c24689b14a07a69bcef5b28a Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 6 Jul 2017 13:40:00 -0700 Subject: [PATCH 05/12] Attempt at removing two of the duplicated persistence functions in OneVsRest. Does not work because the make java param pair function in wrapper.py does not recognize the uid set in self._java_obj in the OneVsRest constructor. --- .../spark/ml/classification/OneVsRest.scala | 1 + .../spark/ml/tuning/CrossValidatorSuite.scala | 32 +++++++---------- .../ml/tuning/TrainValidationSplitSuite.scala | 22 ++++-------- python/pyspark/ml/classification.py | 16 ++++++--- python/pyspark/ml/tests.py | 15 +++++--- python/pyspark/ml/tuning.py | 34 ------------------- python/pyspark/ml/wrapper.py | 2 +- 7 files changed, 43 insertions(+), 79 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 7cbcccf2720a..2f6021fd4223 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -62,6 +62,7 @@ private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTra * @group param */ val classifier: Param[ClassifierType] = new Param(this, "classifier", "base binary classifier") + setDefault(classifier -> new LogisticRegression) /** @group getParam */ def getClassifier: ClassifierType = $(classifier) diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 2d74ff2e8ff8..2f0b18c4dcc2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -153,7 +153,8 @@ class CrossValidatorSuite s" LogisticRegression but found ${other.getClass.getName}") } - CrossValidatorSuite.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) + ValidatorParamsSuiteHelpers + .compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) } test("read/write: CrossValidator with nested estimator") { @@ -190,7 +191,7 @@ class CrossValidatorSuite classifier match { case lr: LogisticRegression => assert(ova.getClassifier.asInstanceOf[LogisticRegression].getMaxIter - === lr.asInstanceOf[LogisticRegression].getMaxIter) + === lr.getMaxIter) case _ => throw new AssertionError(s"Loaded CrossValidator expected estimator of type" + s" LogisticREgression but found ${classifier.getClass.getName}") @@ -200,6 +201,9 @@ class CrossValidatorSuite throw new AssertionError(s"Loaded CrossValidator expected estimator of type" + s" OneVsRest but found ${other.getClass.getName}") } + + ValidatorParamsSuiteHelpers + .compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) } test("read/write: CrossValidator with complex estimator") { @@ -239,7 +243,8 @@ class CrossValidatorSuite assert(cv2.getEvaluator.isInstanceOf[BinaryClassificationEvaluator]) assert(cv.getEvaluator.uid === cv2.getEvaluator.uid) - CrossValidatorSuite.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) + ValidatorParamsSuiteHelpers + .compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) cv2.getEstimator match { case pipeline2: Pipeline => @@ -258,7 +263,8 @@ class CrossValidatorSuite assert(lrcv.uid === lrcv2.uid) assert(lrcv2.getEvaluator.isInstanceOf[BinaryClassificationEvaluator]) assert(lrEvaluator.uid === lrcv2.getEvaluator.uid) - CrossValidatorSuite.compareParamMaps(lrParamMaps, lrcv2.getEstimatorParamMaps) + ValidatorParamsSuiteHelpers + .compareParamMaps(lrParamMaps, lrcv2.getEstimatorParamMaps) case other => throw new AssertionError("Loaded Pipeline expected stages (HashingTF, CrossValidator)" + " but found: " + other.map(_.getClass.getName).mkString(", ")) @@ -324,7 +330,8 @@ class CrossValidatorSuite s" LogisticRegression but found ${other.getClass.getName}") } - CrossValidatorSuite.compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) + ValidatorParamsSuiteHelpers + .compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) cv2.bestModel match { case lrModel2: LogisticRegressionModel => @@ -342,21 +349,6 @@ class CrossValidatorSuite object CrossValidatorSuite extends SparkFunSuite { - /** - * Assert sequences of estimatorParamMaps are identical. - * Params must be simple types comparable with `===`. - */ - def compareParamMaps(pMaps: Array[ParamMap], pMaps2: Array[ParamMap]): Unit = { - assert(pMaps.length === pMaps2.length) - pMaps.zip(pMaps2).foreach { case (pMap, pMap2) => - assert(pMap.size === pMap2.size) - pMap.toSeq.foreach { case ParamPair(p, v) => - assert(pMap2.contains(p)) - assert(pMap2(p) === v) - } - } - } - abstract class MyModel extends Model[MyModel] class MyEstimator(override val uid: String) extends Estimator[MyModel] with HasInputCol { diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index 3a09396e79e7..ef86a5da5193 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -135,7 +135,7 @@ class TrainValidationSplitSuite assert(tvs.getTrainRatio === tvs2.getTrainRatio) assert(tvs.getSeed === tvs2.getSeed) - TrainValidationSplitSuite + ValidatorParamsSuiteHelpers .compareParamMaps(tvs.getEstimatorParamMaps, tvs2.getEstimatorParamMaps) tvs2.getEstimator match { @@ -177,7 +177,7 @@ class TrainValidationSplitSuite classifier match { case lr: LogisticRegression => assert(ova.getClassifier.asInstanceOf[LogisticRegression].getMaxIter - === lr.asInstanceOf[LogisticRegression].getMaxIter) + === lr.getMaxIter) case _ => throw new AssertionError(s"Loaded TrainValidationSplit expected estimator of type" + s" LogisticREgression but found ${classifier.getClass.getName}") @@ -187,6 +187,9 @@ class TrainValidationSplitSuite throw new AssertionError(s"Loaded TrainValidationSplit expected estimator of type" + s" OneVsRest but found ${other.getClass.getName}") } + + ValidatorParamsSuiteHelpers + .compareParamMaps(tvs.getEstimatorParamMaps, tvs2.getEstimatorParamMaps) } test("read/write: TrainValidationSplitModel") { @@ -214,20 +217,7 @@ class TrainValidationSplitSuite } object TrainValidationSplitSuite extends SparkFunSuite{ - /** - * Assert sequences of estimatorParamMaps are identical. - * Params must be simple types comparable with `===`. - */ - def compareParamMaps(pMaps: Array[ParamMap], pMaps2: Array[ParamMap]): Unit = { - assert(pMaps.length === pMaps2.length) - pMaps.zip(pMaps2).foreach { case (pMap, pMap2) => - assert(pMap.size === pMap2.size) - pMap.toSeq.foreach { case ParamPair(p, v) => - assert(pMap2.contains(p)) - assert(pMap2(p) === v) - } - } - } + abstract class MyModel extends Model[MyModel] class MyEstimator(override val uid: String) extends Estimator[MyModel] with HasInputCol { diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index f278b7f374dc..def58c0d303a 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1468,7 +1468,7 @@ def getClassifier(self): @inherit_doc -class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): +class OneVsRest(Estimator, OneVsRestParams, JavaParams, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1522,6 +1522,10 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred kwargs = self._input_kwargs self._set(**kwargs) + self._setDefault(classifier=LogisticRegression()) + self._java_obj = self._new_java_obj("org.apache.spark.ml.classification.OneVsRest", + self.uid) + @keyword_only @since("2.0.0") def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): @@ -1616,14 +1620,17 @@ def _to_java(self): def _make_java_param_pair(self, param, value): """ - Makes a Java parm pair. + Makes a Java param pair. """ sc = SparkContext._active_spark_context param = self._resolveParam(param) + print "self uid in function:",self.uid + print "JAVA OBJ in func:",self._java_obj _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", self.uid) - java_param = _java_obj.getParam(param.name) - if isinstance(value, Estimator) or isinstance(value, Model): + print "Created java obj:",_java_obj + java_param = self._java_obj.getParam(param.name) + if isinstance(value, JavaParams): # used in the case of an estimator having another estimator as a parameter # the reason why this is not in _py2java in common.py is that importing # Estimator and Model in common.py results in a circular import with inherit_doc @@ -1679,6 +1686,7 @@ def __init__(self, models): java_models_array = JavaWrapper._new_java_array(java_models, sc._gateway.jvm.org.apache.spark.ml .classification.ClassificationModel) + # TODO: need to set metadata metadata = JavaParams._new_java_obj("org.apache.spark.sql.types.Metadata") self._java_obj = \ JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRestModel", diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 5d2ea99ab469..3c6a61514974 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -712,7 +712,7 @@ def test_save_load_simple_estimator(self): loadedModel = CrossValidatorModel.load(cvModelPath) self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) - def test_save_load_nested_stimator(self): + def test_save_load_nested_estimator(self): temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( [(Vectors.dense([0.0]), 0.0), @@ -1506,9 +1506,16 @@ def check_params(self, py_stage): "param %s for Params %s" % (str(java_default), str(py_default), p.name, str(py_stage))) return - self.assertEqual(java_default, py_default, - "Java default %s != python default %s of param %s for Params %s" - % (str(java_default), str(py_default), p.name, str(py_stage))) + if p.name == "classifier": + for param in py_default.extractParamMap(): + self.assertEqual(py_default.getOrDefault(param), java_default.getOrDefault(param), + "Java default %s != python default %s of classifier param" + % (str(py_default.getOrDefault(param)), + str(java_default.getOrDefaultparam))) + else: + self.assertEqual(java_default, py_default, + "Java default %s != python default %s of param %s for Params %s" + % (str(java_default), str(py_default), p.name, str(py_stage))) def test_java_params(self): import pyspark.ml.feature diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index d1e54dd4abd7..00c348aa9f7d 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -140,12 +140,6 @@ def getEvaluator(self): """ return self.getOrDefault(self.evaluator) - def getEvaluator(self): - """ - Gets the value of evaluator or its default value. - """ - return self.getOrDefault(self.evaluator) - @classmethod def _from_java_impl(cls, java_stage): """ @@ -306,11 +300,6 @@ def write(self): """Returns an MLWriter instance for this ML instance.""" return JavaMLWriter(self) - @since("2.3.0") - def save(self, path): - """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - self.write().save(path) - @classmethod @since("2.3.0") def read(cls): @@ -318,7 +307,6 @@ def read(cls): return JavaMLReader(cls) @classmethod - @since("2.3.0") def _from_java(cls, java_stage): """ Given a Java CrossValidator, create and return a Python wrapper of it. @@ -334,7 +322,6 @@ def _from_java(cls, java_stage): py_stage._resetUid(java_stage.uid()) return py_stage - @since("2.3.0") def _to_java(self): """ Transfer this instance to a Java CrossValidator. Used for ML persistence. @@ -397,11 +384,6 @@ def write(self): """Returns an MLWriter instance for this ML instance.""" return JavaMLWriter(self) - @since("2.3.0") - def save(self, path): - """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - self.write().save(path) - @classmethod @since("2.3.0") def read(cls): @@ -409,7 +391,6 @@ def read(cls): return JavaMLReader(cls) @classmethod - @since("2.3.0") def _from_java(cls, java_stage): """ Given a Java CrossValidatorModel, create and return a Python wrapper of it. @@ -425,7 +406,6 @@ def _from_java(cls, java_stage): py_stage._resetUid(java_stage.uid()) return py_stage - @since("2.3.0") def _to_java(self): """ Transfer this instance to a Java CrossValidatorModel. Used for ML persistence. @@ -567,11 +547,6 @@ def write(self): """Returns an MLWriter instance for this ML instance.""" return JavaMLWriter(self) - @since("2.3.0") - def save(self, path): - """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - self.write().save(path) - @classmethod @since("2.3.0") def read(cls): @@ -579,7 +554,6 @@ def read(cls): return JavaMLReader(cls) @classmethod - @since("2.3.0") def _from_java(cls, java_stage): """ Given a Java TrainValidationSplit, create and return a Python wrapper of it. @@ -595,7 +569,6 @@ def _from_java(cls, java_stage): py_stage._resetUid(java_stage.uid()) return py_stage - @since("2.3.0") def _to_java(self): """ Transfer this instance to a Java TrainValidationSplit. Used for ML persistence. @@ -657,11 +630,6 @@ def write(self): """Returns an MLWriter instance for this ML instance.""" return JavaMLWriter(self) - @since("2.3.0") - def save(self, path): - """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - self.write().save(path) - @classmethod @since("2.3.0") def read(cls): @@ -669,7 +637,6 @@ def read(cls): return JavaMLReader(cls) @classmethod - @since("2.3.0") def _from_java(cls, java_stage): """ Given a Java TrainValidationSplitModel, create and return a Python wrapper of it. @@ -687,7 +654,6 @@ def _from_java(cls, java_stage): py_stage._resetUid(java_stage.uid()) return py_stage - @since("2.3.0") def _to_java(self): """ Transfer this instance to a Java TrainValidationSplitModel. Used for ML persistence. diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 7f739fbc3c23..85c79daa721c 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -106,7 +106,7 @@ def __del__(self): def _make_java_param_pair(self, param, value): """ - Makes a Java parm pair. + Makes a Java param pair. """ sc = SparkContext._active_spark_context param = self._resolveParam(param) From 44342b26d6fac2e24eb0accd6c6064420f329398 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 6 Jul 2017 13:57:00 -0700 Subject: [PATCH 06/12] Made changes based on pull request comments. --- .../apache/spark/ml/classification/OneVsRest.scala | 1 - python/pyspark/ml/classification.py | 11 ++--------- python/pyspark/ml/tests.py | 13 +++---------- python/pyspark/ml/wrapper.py | 9 +-------- 4 files changed, 6 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 2f6021fd4223..7cbcccf2720a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -62,7 +62,6 @@ private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTra * @group param */ val classifier: Param[ClassifierType] = new Param(this, "classifier", "base binary classifier") - setDefault(classifier -> new LogisticRegression) /** @group getParam */ def getClassifier: ClassifierType = $(classifier) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index def58c0d303a..5c05442f02e2 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1468,7 +1468,7 @@ def getClassifier(self): @inherit_doc -class OneVsRest(Estimator, OneVsRestParams, JavaParams, JavaMLReadable, JavaMLWritable): +class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1522,10 +1522,6 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred kwargs = self._input_kwargs self._set(**kwargs) - self._setDefault(classifier=LogisticRegression()) - self._java_obj = self._new_java_obj("org.apache.spark.ml.classification.OneVsRest", - self.uid) - @keyword_only @since("2.0.0") def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): @@ -1624,12 +1620,9 @@ def _make_java_param_pair(self, param, value): """ sc = SparkContext._active_spark_context param = self._resolveParam(param) - print "self uid in function:",self.uid - print "JAVA OBJ in func:",self._java_obj _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", self.uid) - print "Created java obj:",_java_obj - java_param = self._java_obj.getParam(param.name) + java_param = _java_obj.getParam(param.name) if isinstance(value, JavaParams): # used in the case of an estimator having another estimator as a parameter # the reason why this is not in _py2java in common.py is that importing diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 3c6a61514974..52bc15f48479 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1506,16 +1506,9 @@ def check_params(self, py_stage): "param %s for Params %s" % (str(java_default), str(py_default), p.name, str(py_stage))) return - if p.name == "classifier": - for param in py_default.extractParamMap(): - self.assertEqual(py_default.getOrDefault(param), java_default.getOrDefault(param), - "Java default %s != python default %s of classifier param" - % (str(py_default.getOrDefault(param)), - str(java_default.getOrDefaultparam))) - else: - self.assertEqual(java_default, py_default, - "Java default %s != python default %s of param %s for Params %s" - % (str(java_default), str(py_default), p.name, str(py_stage))) + self.assertEqual(java_default, py_default, + "Java default %s != python default %s of param %s for Params %s" + % (str(java_default), str(py_default), p.name, str(py_stage))) def test_java_params(self): import pyspark.ml.feature diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 85c79daa721c..ee6301ef19a4 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -111,14 +111,7 @@ def _make_java_param_pair(self, param, value): sc = SparkContext._active_spark_context param = self._resolveParam(param) java_param = self._java_obj.getParam(param.name) - if isinstance(value, Estimator) or isinstance(value, Model): - # used in the case of an estimator having another estimator as a parameter - # such as with the OneVsRest classifier - # the reason why this is not in _py2java in common.py is that importing - # Estimator and Model in common.py results in a circular import with inherit_doc - java_value = value._to_java() - else: - java_value = _py2java(sc, value) + java_value = _py2java(sc, value) return java_param.w(java_value) def _transfer_params_to_java(self): From 823593dec754dafc85c55b60db564216b6a934df Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 6 Jul 2017 14:04:57 -0700 Subject: [PATCH 07/12] Added ValidatorParamsSuiteHelpers file. --- .../tuning/ValidatorParamsSuiteHelpers.scala | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala new file mode 100644 index 000000000000..a55750c92fce --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tuning + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.param.{ParamMap, ParamPair, Params} + +object ValidatorParamsSuiteHelpers extends SparkFunSuite { + /** + * Assert sequences of estimatorParamMaps are identical. + * If the values for a parameter are not directly comparable with === + * and are instead Params types themselves then their corresponding paramMaps + * are compared against each other. + */ + def compareParamMaps(pMaps: Array[ParamMap], pMaps2: Array[ParamMap]): Unit = { + assert(pMaps.length === pMaps2.length) + pMaps.zip(pMaps2).foreach { case (pMap, pMap2) => + assert(pMap.size === pMap2.size) + pMap.toSeq.foreach { case ParamPair(p, v) => + assert(pMap2.contains(p)) + val otherParam = pMap2(p) + v match { + case estimator: Params => + otherParam match { + case estimator2: Params => + val estimatorParamMap = Array(estimator.extractParamMap()) + val estimatorParamMap2 = Array(estimator2.extractParamMap()) + compareParamMaps(estimatorParamMap, estimatorParamMap2) + case other => + throw new AssertionError(s"Expected parameter of type Params but" + + s" found ${otherParam.getClass.getName}") + } + case _ => + assert(otherParam === v) + } + } + } + } +} From f169aa5f62b83474b10877ac646f28c88ca8e5bf Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Wed, 12 Jul 2017 11:24:25 -0700 Subject: [PATCH 08/12] Fixed backwards compatibility issue --- .../scala/org/apache/spark/ml/tuning/ValidatorParams.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index cab6f9215007..438a67fc11db 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -198,7 +198,8 @@ private[ml] object ValidatorParams { val paramPairs = pMap.map { case pInfo: Map[String, String] => val est = uidToParams(pInfo("parent")) val param = est.getParam(pInfo("name")) - if (pInfo("isJson").toBoolean.booleanValue()) { + if (!pInfo.contains("isJson") || + (pInfo.contains("isJson") && pInfo("isJson").toBoolean.booleanValue())) { val value = param.jsonDecode(pInfo("value")) param -> value } else { From 7601df729432b50ffe3bf1b61de36a07ab86c621 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Wed, 12 Jul 2017 15:26:14 -0700 Subject: [PATCH 09/12] Fixed small style change --- .../scala/org/apache/spark/ml/tuning/ValidatorParams.scala | 3 ++- .../scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index 438a67fc11db..e5d02053b15d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -198,8 +198,9 @@ private[ml] object ValidatorParams { val paramPairs = pMap.map { case pInfo: Map[String, String] => val est = uidToParams(pInfo("parent")) val param = est.getParam(pInfo("name")) + // [Spark-21221] introduced the isJson field if (!pInfo.contains("isJson") || - (pInfo.contains("isJson") && pInfo("isJson").toBoolean.booleanValue())) { + (pInfo.contains("isJson") && pInfo("isJson").toBoolean.booleanValue())) { val value = param.jsonDecode(pInfo("value")) param -> value } else { diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala b/mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala index 27d606cb05dc..4da95e74434e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala @@ -55,7 +55,6 @@ trait DefaultReadWriteTest extends TempDirectory { self: Suite => instance.write.overwrite().save(path) val loader = instance.getClass.getMethod("read").invoke(null).asInstanceOf[MLReader[T]] val newInstance = loader.load(path) - assert(newInstance.uid === instance.uid) if (testParams) { instance.params.foreach { p => From 231ec55e1c85215553ca72d2fabebc29a0a09b64 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 13 Jul 2017 18:15:32 -0700 Subject: [PATCH 10/12] Changed saving of nested estimators to store the path of the estimator as a relative path instead of absolute path. --- .../spark/ml/tuning/ValidatorParams.scala | 10 +++--- .../spark/ml/tuning/CrossValidatorSuite.scala | 23 ++++++++++-- .../ml/tuning/TrainValidationSplitSuite.scala | 22 +++++++++++- .../tuning/ValidatorParamsSuiteHelpers.scala | 35 ++++++++++++++++++- 4 files changed, 82 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index e5d02053b15d..0ab6eed95938 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -132,11 +132,12 @@ private[ml] object ValidatorParams { paramMap.toSeq.map { case ParamPair(p, v) => v match { case writeableObj: DefaultParamsWritable => - val paramPath = new Path(path, "epm_" + p.name + numParamsNotJson).toString + val relativePath = "epm_" + p.name + numParamsNotJson + val paramPath = new Path(path, relativePath).toString numParamsNotJson += 1 writeableObj.save(paramPath) Map("parent" -> p.parent, "name" -> p.name, - "value" -> compact(render(JString(paramPath))), + "value" -> compact(render(JString(relativePath))), "isJson" -> compact(render(JBool(false)))) case _: MLWritable => throw new NotImplementedError("ValidatorParams.saveImpl does not handle parameters " + @@ -204,8 +205,9 @@ private[ml] object ValidatorParams { val value = param.jsonDecode(pInfo("value")) param -> value } else { - val path = param.jsonDecode(pInfo("value")).toString - val value = DefaultParamsReader.loadParamsInstance[MLWritable](path, sc) + val relativePath = param.jsonDecode(pInfo("value")).toString + val value = DefaultParamsReader + .loadParamsInstance[MLWritable](new Path(path, relativePath).toString, sc) param -> value } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 2f0b18c4dcc2..2791ea715ace 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -23,8 +23,8 @@ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressio import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, MulticlassClassificationEvaluator, RegressionEvaluator} import org.apache.spark.ml.feature.HashingTF -import org.apache.spark.ml.linalg.{DenseMatrix, Vectors} -import org.apache.spark.ml.param.{ParamMap, ParamPair} +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared.HasInputCol import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} @@ -206,6 +206,25 @@ class CrossValidatorSuite .compareParamMaps(cv.getEstimatorParamMaps, cv2.getEstimatorParamMaps) } + test("read/write: Persistence of nested estimator works if parent directory changes") { + val ova = new OneVsRest().setClassifier(new LogisticRegression) + val evaluator = new MulticlassClassificationEvaluator() + .setMetricName("accuracy") + val classifier1 = new LogisticRegression().setRegParam(2.0) + val classifier2 = new LogisticRegression().setRegParam(3.0) + // params that are not JSON serializable must inherit from Params + val paramMaps = new ParamGridBuilder() + .addGrid(ova.classifier, Array(classifier1, classifier2)) + .build() + val cv = new CrossValidator() + .setEstimator(ova) + .setEvaluator(evaluator) + .setNumFolds(20) + .setEstimatorParamMaps(paramMaps) + + ValidatorParamsSuiteHelpers.testFileMove(cv) + } + test("read/write: CrossValidator with complex estimator") { // workflow: CrossValidator[Pipeline[HashingTF, CrossValidator[LogisticRegression]]] val lrEvaluator = new BinaryClassificationEvaluator() diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index ef86a5da5193..71a1776a2cdd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressio import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator} import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.param.{ParamMap, ParamPair} +import org.apache.spark.ml.param.{ParamMap} import org.apache.spark.ml.param.shared.HasInputCol import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} @@ -192,6 +192,26 @@ class TrainValidationSplitSuite .compareParamMaps(tvs.getEstimatorParamMaps, tvs2.getEstimatorParamMaps) } + test("read/write: Persistence of nested estimator works if parent directory changes") { + val ova = new OneVsRest() + .setClassifier(new LogisticRegression) + val evaluator = new BinaryClassificationEvaluator() + .setMetricName("areaUnderPR") // not default metric + val classifier1 = new LogisticRegression().setRegParam(2.0) + val classifier2 = new LogisticRegression().setRegParam(3.0) + val paramMaps = new ParamGridBuilder() + .addGrid(ova.classifier, Array(classifier1, classifier2)) + .build() + val tvs = new TrainValidationSplit() + .setEstimator(ova) + .setEvaluator(evaluator) + .setTrainRatio(0.5) + .setEstimatorParamMaps(paramMaps) + .setSeed(42L) + + ValidatorParamsSuiteHelpers.testFileMove(tvs) + } + test("read/write: TrainValidationSplitModel") { val lr = new LogisticRegression() .setThreshold(0.6) diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala index a55750c92fce..0fdfeddcf1a4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala @@ -17,10 +17,14 @@ package org.apache.spark.ml.tuning +import java.io.File +import java.nio.file.{Files, StandardCopyOption} + import org.apache.spark.SparkFunSuite import org.apache.spark.ml.param.{ParamMap, ParamPair, Params} +import org.apache.spark.ml.util.{DefaultReadWriteTest, Identifiable, MLReader, MLWritable} -object ValidatorParamsSuiteHelpers extends SparkFunSuite { +object ValidatorParamsSuiteHelpers extends SparkFunSuite with DefaultReadWriteTest { /** * Assert sequences of estimatorParamMaps are identical. * If the values for a parameter are not directly comparable with === @@ -51,4 +55,33 @@ object ValidatorParamsSuiteHelpers extends SparkFunSuite { } } } + + /** + * When nested estimators (ex. OneVsRest) are saved within meta-algorithms such as + * CrossValidator and TrainValidationSplit, relative paths should be used to store + * the path of the estimator so that if the parent directory changes, loading the + * model still works. + */ + def testFileMove[T <: Params with MLWritable](instance: T): Unit = { + val uid = instance.uid + val subdirName = Identifiable.randomUID("test") + + val subdir = new File(tempDir, subdirName) + val subDirWithUid = new File(subdir, uid) + + instance.save(subDirWithUid.getPath) + + val newSubdirName = Identifiable.randomUID("test_moved") + val newSubdir = new File(tempDir, newSubdirName) + val newSubdirWithUid = new File(newSubdir, uid) + + Files.createDirectory(newSubdir.toPath) + Files.createDirectory(newSubdirWithUid.toPath) + Files.move(subDirWithUid.toPath, newSubdirWithUid.toPath, StandardCopyOption.ATOMIC_MOVE) + + val loader = instance.getClass.getMethod("read") + .invoke(null).asInstanceOf[MLReader[T]] + val newInstance = loader.load(newSubdirWithUid.getPath) + assert(uid == newInstance.uid) + } } From a6bd1972cfcebe66b67f263ed82fc533f5287e9b Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Fri, 14 Jul 2017 14:10:16 -0700 Subject: [PATCH 11/12] Fixed indentation --- .../apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala index 0fdfeddcf1a4..cd0ff5b52c46 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala @@ -80,7 +80,7 @@ object ValidatorParamsSuiteHelpers extends SparkFunSuite with DefaultReadWriteTe Files.move(subDirWithUid.toPath, newSubdirWithUid.toPath, StandardCopyOption.ATOMIC_MOVE) val loader = instance.getClass.getMethod("read") - .invoke(null).asInstanceOf[MLReader[T]] + .invoke(null).asInstanceOf[MLReader[T]] val newInstance = loader.load(newSubdirWithUid.getPath) assert(uid == newInstance.uid) } From 6a7162dfbefbc900cc103f6fd7d7df5510cf2154 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Fri, 14 Jul 2017 14:13:04 -0700 Subject: [PATCH 12/12] Better indentation fix --- .../apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala index cd0ff5b52c46..1df673cf4016 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/ValidatorParamsSuiteHelpers.scala @@ -79,8 +79,7 @@ object ValidatorParamsSuiteHelpers extends SparkFunSuite with DefaultReadWriteTe Files.createDirectory(newSubdirWithUid.toPath) Files.move(subDirWithUid.toPath, newSubdirWithUid.toPath, StandardCopyOption.ATOMIC_MOVE) - val loader = instance.getClass.getMethod("read") - .invoke(null).asInstanceOf[MLReader[T]] + val loader = instance.getClass.getMethod("read").invoke(null).asInstanceOf[MLReader[T]] val newInstance = loader.load(newSubdirWithUid.getPath) assert(uid == newInstance.uid) }