From 1264b4c50134e89c325c892501f59ca61e7667c5 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 4 Oct 2016 17:23:18 -0700 Subject: [PATCH 01/10] add spark.logit --- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 4 + R/pkg/R/mllib.R | 191 +++++++++++++++++- R/pkg/inst/tests/testthat/test_mllib.R | 11 + .../ml/r/LogisticRegressionWrapper.scala | 162 +++++++++++++++ 5 files changed, 368 insertions(+), 3 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 87181851714e0..cb9ab0fcaa141 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -43,7 +43,8 @@ exportMethods("glm", "spark.isoreg", "spark.gaussianMixture", "spark.als", - "spark.kstest") + "spark.kstest", + "spark.logit") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 5549cd7cac516..877c4ca972406 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1371,6 +1371,10 @@ setGeneric("spark.gaussianMixture", standardGeneric("spark.gaussianMixture") }) +#' @rdname spark.logit +#' @export +setGeneric("spark.logit", function(data, formula, ...) { standardGeneric("spark.logit") }) + #' @param object a fitted ML model object. #' @param path the directory where the model is saved. #' @param ... additional argument(s) passed to the method. diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index b901307f8f409..26a7252afdc42 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -95,6 +95,13 @@ setClass("ALSModel", representation(jobj = "jobj")) #' @note KSTest since 2.1.0 setClass("KSTest", representation(jobj = "jobj")) +#' S4 class that represents an LogisticRegressionModel +#' +#' @param jobj a Java object reference to the backing Scala LogisticRegressionModel +#' @export +#' @note LogisticRegressionModel since 2.1.0 +setClass("LogisticRegressionModel", representation(jobj = "jobj")) + #' Saves the MLlib model to the input path #' #' Saves the MLlib model to the input path. For more information, see the specific @@ -105,7 +112,7 @@ setClass("KSTest", representation(jobj = "jobj")) #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, #' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} -#' @seealso \link{read.ml} +#' @seealso \link{spark.logit}, \link{read.ml} NULL #' Makes predictions from a MLlib model @@ -117,7 +124,7 @@ NULL #' @export #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} +#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}, \link{spark.logit} NULL write_internal <- function(object, path, overwrite = FALSE) { @@ -647,6 +654,169 @@ setMethod("predict", signature(object = "KMeansModel"), predict_internal(object, newData) }) +#' Logistic Regression Model +#' +#' Fits an logistic regression model against a Spark DataFrame. It supports "binomial": Binary logistic regression +#' with pivoting; "multinomial": Multinomial logistic (softmax) regression without pivoting, similar to glmnet. +#' Users can print, make predictions on the produced model and save the model to the input path. +#' +#' @param data SparkDataFrame for training +#' @param formula A symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', '.', ':', '+', and '-'. +#' @param regParam the regularization parameter. Default is 0.0. +#' @param elasticNetParam the ElasticNet mixing parameter. For alpha = 0, the penalty is an L2 penalty. +#' For alpha = 1, it is an L1 penalty. For 0 < alpha < 1, the penalty is a combination +#' of L1 and L2. Default is 0.0 which is an L2 penalty. +#' @param maxIter maximum iteration number. +#' @param tol convergence tolerance of iterations. +#' @param fitIntercept whether to fit an intercept term. Default is TRUE. +#' @param family the name of family which is a description of the label distribution to be used in the model. +#' Supported options: +#' - "auto": Automatically select the family based on the number of classes: +#' If numClasses == 1 || numClasses == 2, set to "binomial". +#' Else, set to "multinomial". +#' - "binomial": Binary logistic regression with pivoting. +#' - "multinomial": Multinomial logistic (softmax) regression without pivoting. +#' Default is "auto". +#' @param standardization whether to standardize the training features before fitting the model. The coefficients +#' of models will be always returned on the original scale, so it will be transparent for +#' users. Note that with/without standardization, the models should be always converged +#' to the same solution when no regularization is applied. Default is TRUE, same as glmnet. +#' @param threshold in binary classification, in range [0, 1]. If the estimated probability of class label 1 +#' is > threshold, then predict 1, else 0. A high threshold encourages the model to predict 0 +#' more often; a low threshold encourages the model to predict 1 more often. Note: Setting this with +#' threshold p is equivalent to setting thresholds (Array(1-p, p)). When threshold is set, any user-set +#' value for thresholds will be cleared. If both threshold and thresholds are set, then they must be +#' equivalent. Default is 0.5. +#' @param thresholds in multiclass (or binary) classification to adjust the probability of predicting each class. +#' Array must have length equal to the number of classes, with values > 0, excepting that at most one +#' value may be 0. The class with largest value p/t is predicted, where p is the original probability +#' of that class and t is the class's threshold. Note: When thresholds is set, any user-set +#' value for threshold will be cleared. If both threshold and thresholds are set, then they must be +#' equivalent. Default is NULL. +#' @param weightCol The weight column name. +#' @param aggregationDepth depth for treeAggregate (>= 2). If the dimensions of features or the number of partitions +#' are large, this param could be adjusted to a larger size. Default is 2. +#' @param ... additional arguments passed to the method. +#' @return \code{spark.logit} returns a fitted logistic regression model +#' @rdname spark.logit +#' @aliases spark.logit,SparkDataFrame,formula-method +#' @name spark.logit +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' data <- list(list(7.0, 0.0), list(5.0, 1.0), list(3.0, 2.0), +#' list(5.0, 3.0), list(1.0, 4.0)) +#' df <- createDataFrame(data, c("label", "feature")) +#' # save fitted model to input path +#' path <- "path/to/model" +#' write.ml(model, path) +#' +#' # can also read back the saved model and print +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' } +#' @note spark.logit since 2.1.0 +setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"), + function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100, + tol = 1E-6, fitIntercept = TRUE, family = "auto", standardization = TRUE, + threshold = 0.5, thresholds = NULL, weightCol = NULL, aggregationDepth = 2) { + formula <- paste0(deparse(formula), collapse = "") + + if (is.null(weightCol)) { + weightCol <- "" + } + + if (!is.null(thresholds)) { + thresholds <- as.array(thresholds) + } + + jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit", + data@sdf, formula, as.numeric(regParam), as.numeric(elasticNetParam), + as.integer(maxIter), as.numeric(tol), as.logical(fitIntercept), as.character(family), + as.logical(standardization), as.numeric(threshold), thresholds, + as.character(weightCol), as.integer(aggregationDepth)) + new("LogisticRegressionModel", jobj = jobj) + }) + +# Predicted values based on an LogisticRegressionModel model + +#' @param newData a SparkDataFrame for testing. +#' @return \code{predict} returns the predicted values based on an LogisticRegressionModel. +#' @rdname spark.logit +#' @export +#' @note predict(LogisticRegressionModel) since 2.1.0 +setMethod("predict", signature(object = "LogisticRegressionModel"), + function(object, newData) { + predict_internal(object, newData) + }) + +# Get the summary of an LogisticRegressionModel + +#' @return \code{summary} returns the Binary Logistic regression results of a given model as lists. Note that +#' Multinomial logistic regression summary is not available now. +#' @rdname spark.logit +#' @aliases spark.logit,SparkDataFrame,formula-method +#' @export +#' @note summary(LogisticRegressionModel) since 2.1.0 +setMethod("summary", signature(object = "LogisticRegressionModel"), + function(object) { + jobj <- object@jobj + is.loaded <- callJMethod(jobj, "isLoaded") + + roc <- if (is.loaded) { + NULL + } else { + dataFrame(callJMethod(jobj, "roc")) + } + + areaUnderROC <- if (is.loaded) { + NULL + } else { + callJMethod(jobj, "areaUnderROC") + } + + pr <- if (is.loaded) { + NULL + } else { + dataFrame(callJMethod(jobj, "pr")) + } + + fMeasureByThreshold <- if (is.loaded) { + NULL + } else { + callJMethod(jobj, "fMeasureByThreshold") + } + + precisionByThreshold <- if (is.loaded) { + NULL + } else { + callJMethod(jobj, "precisionByThreshold") + } + + recallByThreshold <- if (is.loaded) { + NULL + } else { + callJMethod(jobj, "recallByThreshold") + } + + totalIterations <- if (is.loaded) { + NULL + } else { + callJMethod(jobj, "totalIterations") + } + + objectiveHistory <- if (is.loaded) { + NULL + } else { + callJMethod(jobj, "objectiveHistory") + } + list(roc = roc, areaUnderROC = areaUnderROC, pr = pr, fMeasureByThreshold = fMeasureByThreshold, + precisionByThreshold= precisionByThreshold, recallByThreshold = recallByThreshold, + totalIterations = totalIterations, objectiveHistory = objectiveHistory) + }) + #' Multilayer Perceptron Classification Model #' #' \code{spark.mlp} fits a multi-layer perceptron neural network model against a SparkDataFrame. @@ -882,6 +1052,21 @@ setMethod("write.ml", signature(object = "IsotonicRegressionModel", path = "char write_internal(object, path, overwrite) }) +# Save fitted LogisticRegressionModel to the input path + +#' @param path The directory where the model is saved +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @rdname spark.logit +#' @aliases write.ml,LogisticRegressionModel,character-method +#' @export +#' @note write.ml(LogisticRegression, character) since 2.1.0 +setMethod("write.ml", signature(object = "LogisticRegressionModel", path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) + # Save fitted MLlib model to the input path #' @param path the directory where the model is saved. @@ -932,6 +1117,8 @@ read.ml <- function(path) { new("GaussianMixtureModel", jobj = jobj) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.ALSWrapper")) { new("ALSModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LogisticRegressionWrapper")) { + new("LogisticRegressionModel", jobj = jobj) } else { stop("Unsupported model: ", jobj) } diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index c99315726a22c..c04b99d8b66a7 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -587,6 +587,17 @@ test_that("spark.isotonicRegression", { unlink(modelPath) }) +test_that("spark.logit", { + label <- c(1.0, 1.0, 1.0, 0.0, 0.0) + feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) + binary_data <- as.data.frame(cbind(label, feature)) + binary_df <- suppressWarnings(createDataFrame(binary_data)) + + blr_model <- spark.logit(binary_df, label ~ feature, threshold = 1.0) + blr_predict <- collect(select(predict(blr_model, binary_df), "prediction")) + expect_equal(blr_predict$prediction, c(0, 0, 0, 0, 0)) +}) + test_that("spark.gaussianMixture", { # R code to reproduce the result. # nolint start diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala new file mode 100644 index 0000000000000..ee84c8808739e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression, LogisticRegressionModel} +import org.apache.spark.ml.feature.RFormula +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} + +private[r] class LogisticRegressionWrapper private ( + val pipeline: PipelineModel, + val features: Array[String], + val isLoaded: Boolean = false) extends MLWritable { + + private val logisticRegressionModel: LogisticRegressionModel = + pipeline.stages(1).asInstanceOf[LogisticRegressionModel] + + lazy val totalIterations: Int = logisticRegressionModel.summary.totalIterations + + lazy val objectiveHistory: Array[Double] = logisticRegressionModel.summary.objectiveHistory + + lazy val roc: DataFrame = + logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary].roc + + lazy val areaUnderROC: Double = + logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary].areaUnderROC + + lazy val pr: DataFrame = + logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary].pr + + lazy val fMeasureByThreshold: DataFrame = + logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary] + .fMeasureByThreshold + + lazy val precisionByThreshold: DataFrame = + logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary] + .precisionByThreshold + + lazy val recallByThreshold: DataFrame = + logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary] + .recallByThreshold + + def transform(dataset: Dataset[_]): DataFrame = { + pipeline.transform(dataset).drop(logisticRegressionModel.getFeaturesCol) + } + + override def write: MLWriter = new LogisticRegressionWrapper.LogisticRegressionWrapperWriter(this) +} + +private[r] object LogisticRegressionWrapper + extends MLReadable[LogisticRegressionWrapper] { + + def fit( // scalastyle:ignore + data: DataFrame, + formula: String, + regParam: Double, + elasticNetParam: Double, + maxIter: Int, + tol: Double, + fitIntercept: Boolean, + family: String, + standardization: Boolean, + threshold: Double, + thresholds: Array[Double], + weightCol: String, + aggregationDepth: Int + ): LogisticRegressionWrapper = { + + val rFormula = new RFormula() + .setFormula(formula) + .setFeaturesCol("features") + RWrapperUtils.checkDataColumns(rFormula, data) + val rFormulaModel = rFormula.fit(data) + + // get feature names from output schema + val schema = rFormulaModel.transform(data).schema + val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol)) + .attributes.get + val features = featureAttrs.map(_.name.get) + + // assemble and fit the pipeline + val logisticRegression = new LogisticRegression() + .setRegParam(regParam) + .setElasticNetParam(elasticNetParam) + .setMaxIter(maxIter) + .setTol(tol) + .setFitIntercept(fitIntercept) + .setFamily(family) + .setStandardization(standardization) + .setThreshold(threshold) + .setWeightCol(weightCol) + .setAggregationDepth(aggregationDepth) + .setFeaturesCol(rFormula.getFeaturesCol) + + if (thresholds != null) { + logisticRegression.setThresholds(thresholds) + } + + val pipeline = new Pipeline() + .setStages(Array(rFormulaModel, logisticRegression)) + .fit(data) + + new LogisticRegressionWrapper(pipeline, features) + } + + override def read: MLReader[LogisticRegressionWrapper] = new LogisticRegressionWrapperReader + + override def load(path: String): LogisticRegressionWrapper = super.load(path) + + class LogisticRegressionWrapperWriter(instance: LogisticRegressionWrapper) extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadata = ("class" -> instance.getClass.getName) ~ + ("features" -> instance.features.toSeq) + val rMetadataJson: String = compact(render(rMetadata)) + sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + + instance.pipeline.save(pipelinePath) + } + } + + class LogisticRegressionWrapperReader extends MLReader[LogisticRegressionWrapper] { + + override def load(path: String): LogisticRegressionWrapper = { + implicit val format = DefaultFormats + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadata = parse(rMetadataStr) + val features = (rMetadata \ "features").extract[Array[String]] + + val pipeline = PipelineModel.load(pipelinePath) + new LogisticRegressionWrapper(pipeline, features, true) + } + } +} \ No newline at end of file From e264d6d683696e4fc2a4b1f3851ecc50c9474c0c Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Wed, 5 Oct 2016 16:36:54 -0700 Subject: [PATCH 02/10] add unit tests --- R/pkg/R/mllib.R | 35 ++++++++++++---- R/pkg/inst/tests/testthat/test_mllib.R | 42 +++++++++++++++++++ .../org/apache/spark/ml/r/RWrappers.scala | 2 + 3 files changed, 72 insertions(+), 7 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 26a7252afdc42..078665f4ee848 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -706,16 +706,37 @@ setMethod("predict", signature(object = "KMeansModel"), #' @examples #' \dontrun{ #' sparkR.session() -#' data <- list(list(7.0, 0.0), list(5.0, 1.0), list(3.0, 2.0), -#' list(5.0, 3.0), list(1.0, 4.0)) -#' df <- createDataFrame(data, c("label", "feature")) +#' # binary logistic regression +#' label <- c(1.0, 1.0, 1.0, 0.0, 0.0) +#' feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) +#' binary_data <- as.data.frame(cbind(label, feature)) +#' binary_df <- suppressWarnings(createDataFrame(binary_data)) +#' blr_model <- spark.logit(binary_df, label ~ feature, threshold = 1.0) +#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction")) +#' +#' # summary of binary logistic regression +#' blr_summary <- summary(blr_model) +#' blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure")) #' # save fitted model to input path #' path <- "path/to/model" -#' write.ml(model, path) +#' write.ml(blr_model, path) #' #' # can also read back the saved model and print #' savedModel <- read.ml(path) #' summary(savedModel) +#' +#' # multinomial logistic regression +#' +#' label <- c(0.0, 1.0, 2.0, 0.0, 0.0) +#' feature1 <- c(4.845940, 5.64480, 7.430381, 6.464263, 5.555667) +#' feature2 <- c(2.941319, 2.614812, 2.162451, 3.339474, 2.970987) +#' feature3 <- c(1.322733, 1.348044, 3.861237, 9.686976, 3.447130) +#' feature4 <- c(1.3246388, 0.5510444, 0.9225810, 1.2147881, 1.6020842) +#' data <- as.data.frame(cbind(label, feature1, feature2, feature3, feature4)) +#' df <- suppressWarnings(createDataFrame(data)) +#' +#' model <- spark.logit(df, label ~ ., family = "multinomial", thresholds=c(0, 1, 1)) +#' predict1 <- collect(select(predict(model, df), "prediction")) #' } #' @note spark.logit since 2.1.0 setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"), @@ -786,19 +807,19 @@ setMethod("summary", signature(object = "LogisticRegressionModel"), fMeasureByThreshold <- if (is.loaded) { NULL } else { - callJMethod(jobj, "fMeasureByThreshold") + dataFrame(callJMethod(jobj, "fMeasureByThreshold")) } precisionByThreshold <- if (is.loaded) { NULL } else { - callJMethod(jobj, "precisionByThreshold") + dataFrame(callJMethod(jobj, "precisionByThreshold")) } recallByThreshold <- if (is.loaded) { NULL } else { - callJMethod(jobj, "recallByThreshold") + dataFrame(callJMethod(jobj, "recallByThreshold")) } totalIterations <- if (is.loaded) { diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index c04b99d8b66a7..31da77f04d184 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -588,6 +588,7 @@ test_that("spark.isotonicRegression", { }) test_that("spark.logit", { + # test binary logistic regression label <- c(1.0, 1.0, 1.0, 0.0, 0.0) feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) binary_data <- as.data.frame(cbind(label, feature)) @@ -596,6 +597,47 @@ test_that("spark.logit", { blr_model <- spark.logit(binary_df, label ~ feature, threshold = 1.0) blr_predict <- collect(select(predict(blr_model, binary_df), "prediction")) expect_equal(blr_predict$prediction, c(0, 0, 0, 0, 0)) + blr_model1 <- spark.logit(binary_df, label ~ feature, threshold = 0.0) + blr_predict1 <- collect(select(predict(blr_model1, binary_df), "prediction")) + expect_equal(blr_predict1$prediction, c(1, 1, 1, 1, 1)) + + # test summary of binary logistic regression + blr_summary <- summary(blr_model) + blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure")) + expect_equal(blr_fmeasure$threshold, c(0.8221347, 0.7884005, 0.6674709, 0.3785437, 0.3434487), + tolerance = 1e-4) + expect_equal(blr_fmeasure$'F-Measure', c(0.5000000, 0.8000000, 0.6666667, 0.8571429, 0.7500000), + tolerance = 1e-4) + blr_precision <- collect(select(blr_summary$precisionByThreshold, "threshold", "precision")) + expect_equal(blr_precision$precision, c(1.0000000, 1.0000000, 0.6666667, 0.7500000, 0.6000000), + tolerance = 1e-4) + blr_recall <- collect(select(blr_summary$recallByThreshold, "threshold", "recall")) + expect_equal(blr_recall$recall, c(0.3333333, 0.6666667, 0.6666667, 1.0000000, 1.0000000), + tolerance = 1e-4) + + # test model save and read + modelPath <- tempfile(pattern = "spark-logisticRegression", fileext = ".tmp") + write.ml(blr_model, modelPath) + expect_error(write.ml(blr_model, modelPath)) + write.ml(blr_model, modelPath, overwrite = TRUE) + blr_model2 <- read.ml(modelPath) + blr_predict2 <- collect(select(predict(blr_model2, binary_df), "prediction")) + expect_equal(blr_predict$prediction, blr_predict2$prediction) + + # test multinomial logistic regression + label <- c(0.0, 1.0, 2.0, 0.0, 0.0) + feature1 <- c(4.845940, 5.64480, 7.430381, 6.464263, 5.555667) + feature2 <- c(2.941319, 2.614812, 2.162451, 3.339474, 2.970987) + feature3 <- c(1.322733, 1.348044, 3.861237, 9.686976, 3.447130) + feature4 <- c(1.3246388, 0.5510444, 0.9225810, 1.2147881, 1.6020842) + data <- as.data.frame(cbind(label, feature1, feature2, feature3, feature4)) + df <- suppressWarnings(createDataFrame(data)) + + model <- spark.logit(df, label ~ ., family = "multinomial", thresholds=c(0, 1, 1)) + predict1 <- collect(select(predict(model, df), "prediction")) + expect_equal(predict1$prediction, c(0, 0, 0, 0, 0)) + # Summary of multinomial logistic regression is not implemented yet + expect_error(summary(model)) }) test_that("spark.gaussianMixture", { diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala index d64de1b6abb63..1df3662a5822b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala @@ -54,6 +54,8 @@ private[r] object RWrappers extends MLReader[Object] { GaussianMixtureWrapper.load(path) case "org.apache.spark.ml.r.ALSWrapper" => ALSWrapper.load(path) + case "org.apache.spark.ml.r.LogisticRegressionWrapper" => + LogisticRegressionWrapper.load(path) case _ => throw new SparkException(s"SparkR read.ml does not support load $className") } From b341d771bfc238603a3068ec43c716a8e72b9181 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Wed, 5 Oct 2016 16:52:14 -0700 Subject: [PATCH 03/10] fix R style --- R/pkg/R/mllib.R | 11 +++++++---- R/pkg/inst/tests/testthat/test_mllib.R | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 078665f4ee848..1023f27b93d72 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -755,8 +755,9 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula") jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit", data@sdf, formula, as.numeric(regParam), as.numeric(elasticNetParam), - as.integer(maxIter), as.numeric(tol), as.logical(fitIntercept), as.character(family), - as.logical(standardization), as.numeric(threshold), thresholds, + as.integer(maxIter), as.numeric(tol), as.logical(fitIntercept), + as.character(family), as.logical(standardization), + as.numeric(threshold), thresholds, as.character(weightCol), as.integer(aggregationDepth)) new("LogisticRegressionModel", jobj = jobj) }) @@ -833,8 +834,10 @@ setMethod("summary", signature(object = "LogisticRegressionModel"), } else { callJMethod(jobj, "objectiveHistory") } - list(roc = roc, areaUnderROC = areaUnderROC, pr = pr, fMeasureByThreshold = fMeasureByThreshold, - precisionByThreshold= precisionByThreshold, recallByThreshold = recallByThreshold, + list(roc = roc, areaUnderROC = areaUnderROC, pr = pr, + fMeasureByThreshold = fMeasureByThreshold, + precisionByThreshold = precisionByThreshold, + recallByThreshold = recallByThreshold, totalIterations = totalIterations, objectiveHistory = objectiveHistory) }) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 31da77f04d184..06901908324a0 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -606,7 +606,7 @@ test_that("spark.logit", { blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure")) expect_equal(blr_fmeasure$threshold, c(0.8221347, 0.7884005, 0.6674709, 0.3785437, 0.3434487), tolerance = 1e-4) - expect_equal(blr_fmeasure$'F-Measure', c(0.5000000, 0.8000000, 0.6666667, 0.8571429, 0.7500000), + expect_equal(blr_fmeasure$"F-Measure", c(0.5000000, 0.8000000, 0.6666667, 0.8571429, 0.7500000), tolerance = 1e-4) blr_precision <- collect(select(blr_summary$precisionByThreshold, "threshold", "precision")) expect_equal(blr_precision$precision, c(1.0000000, 1.0000000, 0.6666667, 0.7500000, 0.6000000), @@ -633,7 +633,7 @@ test_that("spark.logit", { data <- as.data.frame(cbind(label, feature1, feature2, feature3, feature4)) df <- suppressWarnings(createDataFrame(data)) - model <- spark.logit(df, label ~ ., family = "multinomial", thresholds=c(0, 1, 1)) + model <- spark.logit(df, label ~., family = "multinomial", thresholds=c(0, 1, 1)) predict1 <- collect(select(predict(model, df), "prediction")) expect_equal(predict1$prediction, c(0, 0, 0, 0, 0)) # Summary of multinomial logistic regression is not implemented yet From 63a3ac2c0b58e1ec75ddcf5953b7392a0d076e32 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Wed, 5 Oct 2016 21:10:20 -0700 Subject: [PATCH 04/10] fix R style issue --- R/pkg/R/mllib.R | 5 +++-- R/pkg/inst/tests/testthat/test_mllib.R | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 1023f27b93d72..9e635c3a9e8e2 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -754,8 +754,9 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula") } jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit", - data@sdf, formula, as.numeric(regParam), as.numeric(elasticNetParam), - as.integer(maxIter), as.numeric(tol), as.logical(fitIntercept), + data@sdf, formula, as.numeric(regParam), + as.numeric(elasticNetParam), as.integer(maxIter), + as.numeric(tol), as.logical(fitIntercept), as.character(family), as.logical(standardization), as.numeric(threshold), thresholds, as.character(weightCol), as.integer(aggregationDepth)) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 06901908324a0..552206ebc5d39 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -633,7 +633,7 @@ test_that("spark.logit", { data <- as.data.frame(cbind(label, feature1, feature2, feature3, feature4)) df <- suppressWarnings(createDataFrame(data)) - model <- spark.logit(df, label ~., family = "multinomial", thresholds=c(0, 1, 1)) + model <- spark.logit(df, label ~., family = "multinomial", thresholds = c(0, 1, 1)) predict1 <- collect(select(predict(model, df), "prediction")) expect_equal(predict1$prediction, c(0, 0, 0, 0, 0)) # Summary of multinomial logistic regression is not implemented yet From c9e10009ec05dd1a0b7f7dc519a01b55f9ad357e Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 6 Oct 2016 15:38:18 -0700 Subject: [PATCH 05/10] fix cran warning --- R/pkg/R/mllib.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 9e635c3a9e8e2..de33e4ea021bb 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -777,6 +777,7 @@ setMethod("predict", signature(object = "LogisticRegressionModel"), # Get the summary of an LogisticRegressionModel +#' @param object an LogisticRegressionModel fitted by \code{spark.logit} #' @return \code{summary} returns the Binary Logistic regression results of a given model as lists. Note that #' Multinomial logistic regression summary is not available now. #' @rdname spark.logit From 0b54f46d40d8f55fd8a3a36982f1866cf63f2612 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 7 Oct 2016 20:59:18 -0700 Subject: [PATCH 06/10] remove redudant function call --- R/pkg/inst/tests/testthat/test_mllib.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 552206ebc5d39..8faa7e0be43d2 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -592,7 +592,7 @@ test_that("spark.logit", { label <- c(1.0, 1.0, 1.0, 0.0, 0.0) feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) binary_data <- as.data.frame(cbind(label, feature)) - binary_df <- suppressWarnings(createDataFrame(binary_data)) + binary_df <- createDataFrame(binary_data) blr_model <- spark.logit(binary_df, label ~ feature, threshold = 1.0) blr_predict <- collect(select(predict(blr_model, binary_df), "prediction")) @@ -631,7 +631,7 @@ test_that("spark.logit", { feature3 <- c(1.322733, 1.348044, 3.861237, 9.686976, 3.447130) feature4 <- c(1.3246388, 0.5510444, 0.9225810, 1.2147881, 1.6020842) data <- as.data.frame(cbind(label, feature1, feature2, feature3, feature4)) - df <- suppressWarnings(createDataFrame(data)) + df <- createDataFrame(data) model <- spark.logit(df, label ~., family = "multinomial", thresholds = c(0, 1, 1)) predict1 <- collect(select(predict(model, df), "prediction")) From e2ca4963102273c065570bd15bbe6eb698678e23 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Wed, 12 Oct 2016 16:40:27 -0700 Subject: [PATCH 07/10] address review comments --- R/pkg/R/mllib.R | 70 +++++++------------ .../ml/r/LogisticRegressionWrapper.scala | 5 +- 2 files changed, 30 insertions(+), 45 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index de33e4ea021bb..61fe11ca90896 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -672,12 +672,14 @@ setMethod("predict", signature(object = "KMeansModel"), #' @param fitIntercept whether to fit an intercept term. Default is TRUE. #' @param family the name of family which is a description of the label distribution to be used in the model. #' Supported options: -#' - "auto": Automatically select the family based on the number of classes: +#' \itemize{ +#' \item{"auto": Automatically select the family based on the number of classes: #' If numClasses == 1 || numClasses == 2, set to "binomial". -#' Else, set to "multinomial". -#' - "binomial": Binary logistic regression with pivoting. -#' - "multinomial": Multinomial logistic (softmax) regression without pivoting. -#' Default is "auto". +#' Else, set to "multinomial".} +#' \item{"binomial": Binary logistic regression with pivoting.} +#' \item{"multinomial": Multinomial logistic (softmax) regression without pivoting. +#' Default is "auto".} +#' } #' @param standardization whether to standardize the training features before fitting the model. The coefficients #' of models will be always returned on the original scale, so it will be transparent for #' users. Note that with/without standardization, the models should be always converged @@ -685,7 +687,7 @@ setMethod("predict", signature(object = "KMeansModel"), #' @param threshold in binary classification, in range [0, 1]. If the estimated probability of class label 1 #' is > threshold, then predict 1, else 0. A high threshold encourages the model to predict 0 #' more often; a low threshold encourages the model to predict 1 more often. Note: Setting this with -#' threshold p is equivalent to setting thresholds (Array(1-p, p)). When threshold is set, any user-set +#' threshold p is equivalent to setting thresholds c(1-p, p). When threshold is set, any user-set #' value for thresholds will be cleared. If both threshold and thresholds are set, then they must be #' equivalent. Default is 0.5. #' @param thresholds in multiclass (or binary) classification to adjust the probability of predicting each class. @@ -697,6 +699,7 @@ setMethod("predict", signature(object = "KMeansModel"), #' @param weightCol The weight column name. #' @param aggregationDepth depth for treeAggregate (>= 2). If the dimensions of features or the number of partitions #' are large, this param could be adjusted to a larger size. Default is 2. +#' @param probability column name for predicted class conditional probabilities. Default is "probability". #' @param ... additional arguments passed to the method. #' @return \code{spark.logit} returns a fitted logistic regression model #' @rdname spark.logit @@ -733,7 +736,7 @@ setMethod("predict", signature(object = "KMeansModel"), #' feature3 <- c(1.322733, 1.348044, 3.861237, 9.686976, 3.447130) #' feature4 <- c(1.3246388, 0.5510444, 0.9225810, 1.2147881, 1.6020842) #' data <- as.data.frame(cbind(label, feature1, feature2, feature3, feature4)) -#' df <- suppressWarnings(createDataFrame(data)) +#' df <- createDataFrame(data) #' #' model <- spark.logit(df, label ~ ., family = "multinomial", thresholds=c(0, 1, 1)) #' predict1 <- collect(select(predict(model, df), "prediction")) @@ -742,7 +745,8 @@ setMethod("predict", signature(object = "KMeansModel"), setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100, tol = 1E-6, fitIntercept = TRUE, family = "auto", standardization = TRUE, - threshold = 0.5, thresholds = NULL, weightCol = NULL, aggregationDepth = 2) { + threshold = 0.5, thresholds = NULL, weightCol = NULL, aggregationDepth = 2, + probability = "probability") { formula <- paste0(deparse(formula), collapse = "") if (is.null(weightCol)) { @@ -759,7 +763,8 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula") as.numeric(tol), as.logical(fitIntercept), as.character(family), as.logical(standardization), as.numeric(threshold), thresholds, - as.character(weightCol), as.integer(aggregationDepth)) + as.character(weightCol), as.integer(aggregationDepth), + as.character(probability)) new("LogisticRegressionModel", jobj = jobj) }) @@ -768,6 +773,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula") #' @param newData a SparkDataFrame for testing. #' @return \code{predict} returns the predicted values based on an LogisticRegressionModel. #' @rdname spark.logit +#' @aliases predict,LogisticRegressionModel,SparkDataFrame-method #' @export #' @note predict(LogisticRegressionModel) since 2.1.0 setMethod("predict", signature(object = "LogisticRegressionModel"), @@ -789,53 +795,31 @@ setMethod("summary", signature(object = "LogisticRegressionModel"), jobj <- object@jobj is.loaded <- callJMethod(jobj, "isLoaded") - roc <- if (is.loaded) { - NULL - } else { - dataFrame(callJMethod(jobj, "roc")) + if (is.loaded) { + stop("Loaded model doesn't have training summary.") } - areaUnderROC <- if (is.loaded) { - NULL - } else { - callJMethod(jobj, "areaUnderROC") - } + roc <- dataFrame(callJMethod(jobj, "roc")) - pr <- if (is.loaded) { - NULL - } else { - dataFrame(callJMethod(jobj, "pr")) - } + areaUnderROC <- callJMethod(jobj, "areaUnderROC") - fMeasureByThreshold <- if (is.loaded) { - NULL - } else { + pr <- dataFrame(callJMethod(jobj, "pr")) + + fMeasureByThreshold <- dataFrame(callJMethod(jobj, "fMeasureByThreshold")) - } - precisionByThreshold <- if (is.loaded) { - NULL - } else { + precisionByThreshold <- dataFrame(callJMethod(jobj, "precisionByThreshold")) - } - recallByThreshold <- if (is.loaded) { - NULL - } else { + recallByThreshold <- dataFrame(callJMethod(jobj, "recallByThreshold")) - } - totalIterations <- if (is.loaded) { - NULL - } else { + totalIterations <- callJMethod(jobj, "totalIterations") - } - objectiveHistory <- if (is.loaded) { - NULL - } else { + objectiveHistory <- callJMethod(jobj, "objectiveHistory") - } + list(roc = roc, areaUnderROC = areaUnderROC, pr = pr, fMeasureByThreshold = fMeasureByThreshold, precisionByThreshold = precisionByThreshold, diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala index ee84c8808739e..33b50dd6a156c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala @@ -85,12 +85,12 @@ private[r] object LogisticRegressionWrapper threshold: Double, thresholds: Array[Double], weightCol: String, - aggregationDepth: Int + aggregationDepth: Int, + probability: String ): LogisticRegressionWrapper = { val rFormula = new RFormula() .setFormula(formula) - .setFeaturesCol("features") RWrapperUtils.checkDataColumns(rFormula, data) val rFormulaModel = rFormula.fit(data) @@ -113,6 +113,7 @@ private[r] object LogisticRegressionWrapper .setWeightCol(weightCol) .setAggregationDepth(aggregationDepth) .setFeaturesCol(rFormula.getFeaturesCol) + .setProbabilityCol(probability) if (thresholds != null) { logisticRegression.setThresholds(thresholds) From 558dc203a4ac14dbfd740f5d10e5b5737b62263f Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 20 Oct 2016 14:38:01 -0700 Subject: [PATCH 08/10] address review comments --- R/pkg/R/mllib.R | 30 ++++++++----------- .../ml/r/LogisticRegressionWrapper.scala | 6 ++-- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 61fe11ca90896..775aee6907e93 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -684,22 +684,21 @@ setMethod("predict", signature(object = "KMeansModel"), #' of models will be always returned on the original scale, so it will be transparent for #' users. Note that with/without standardization, the models should be always converged #' to the same solution when no regularization is applied. Default is TRUE, same as glmnet. -#' @param threshold in binary classification, in range [0, 1]. If the estimated probability of class label 1 +#' @param thresholds in binary classification, in range [0, 1]. If the estimated probability of class label 1 #' is > threshold, then predict 1, else 0. A high threshold encourages the model to predict 0 #' more often; a low threshold encourages the model to predict 1 more often. Note: Setting this with #' threshold p is equivalent to setting thresholds c(1-p, p). When threshold is set, any user-set #' value for thresholds will be cleared. If both threshold and thresholds are set, then they must be -#' equivalent. Default is 0.5. -#' @param thresholds in multiclass (or binary) classification to adjust the probability of predicting each class. -#' Array must have length equal to the number of classes, with values > 0, excepting that at most one -#' value may be 0. The class with largest value p/t is predicted, where p is the original probability -#' of that class and t is the class's threshold. Note: When thresholds is set, any user-set -#' value for threshold will be cleared. If both threshold and thresholds are set, then they must be -#' equivalent. Default is NULL. +#' equivalent. In multiclass (or binary) classification to adjust the probability of +#' predicting each class. Array must have length equal to the number of classes, with values > 0, +#' excepting that at most one value may be 0. The class with largest value p/t is predicted, where p +#' is the original probability of that class and t is the class's threshold. Note: When thresholds +#' is set, any user-set value for threshold will be cleared. If both threshold and thresholds are +#' set, then they must be equivalent. Default is 0.5. #' @param weightCol The weight column name. #' @param aggregationDepth depth for treeAggregate (>= 2). If the dimensions of features or the number of partitions #' are large, this param could be adjusted to a larger size. Default is 2. -#' @param probability column name for predicted class conditional probabilities. Default is "probability". +#' @param probabilityCol column name for predicted class conditional probabilities. Default is "probability". #' @param ... additional arguments passed to the method. #' @return \code{spark.logit} returns a fitted logistic regression model #' @rdname spark.logit @@ -745,26 +744,21 @@ setMethod("predict", signature(object = "KMeansModel"), setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100, tol = 1E-6, fitIntercept = TRUE, family = "auto", standardization = TRUE, - threshold = 0.5, thresholds = NULL, weightCol = NULL, aggregationDepth = 2, - probability = "probability") { + thresholds = 0.5, weightCol = NULL, aggregationDepth = 2, + probabilityCol = "probability") { formula <- paste0(deparse(formula), collapse = "") if (is.null(weightCol)) { weightCol <- "" } - if (!is.null(thresholds)) { - thresholds <- as.array(thresholds) - } - jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit", data@sdf, formula, as.numeric(regParam), as.numeric(elasticNetParam), as.integer(maxIter), as.numeric(tol), as.logical(fitIntercept), as.character(family), as.logical(standardization), - as.numeric(threshold), thresholds, - as.character(weightCol), as.integer(aggregationDepth), - as.character(probability)) + as.array(thresholds), as.character(weightCol), + as.integer(aggregationDepth), as.character(probabilityCol)) new("LogisticRegressionModel", jobj = jobj) }) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala index 33b50dd6a156c..5bba39aee30a1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala @@ -82,7 +82,6 @@ private[r] object LogisticRegressionWrapper fitIntercept: Boolean, family: String, standardization: Boolean, - threshold: Double, thresholds: Array[Double], weightCol: String, aggregationDepth: Int, @@ -109,14 +108,15 @@ private[r] object LogisticRegressionWrapper .setFitIntercept(fitIntercept) .setFamily(family) .setStandardization(standardization) - .setThreshold(threshold) .setWeightCol(weightCol) .setAggregationDepth(aggregationDepth) .setFeaturesCol(rFormula.getFeaturesCol) .setProbabilityCol(probability) - if (thresholds != null) { + if (thresholds.length > 1) { logisticRegression.setThresholds(thresholds) + } else { + logisticRegression.setThreshold(thresholds(0)) } val pipeline = new Pipeline() From d0452ae1f92b9d72a620942be269f965c5810392 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 25 Oct 2016 13:08:50 -0700 Subject: [PATCH 09/10] address review comments --- R/pkg/R/mllib.R | 31 +++++++++---------- R/pkg/inst/tests/testthat/test_mllib.R | 6 ++-- .../ml/r/LogisticRegressionWrapper.scala | 26 ++++++---------- 3 files changed, 28 insertions(+), 35 deletions(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 775aee6907e93..ea56f167a78a1 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -664,8 +664,8 @@ setMethod("predict", signature(object = "KMeansModel"), #' @param formula A symbolic description of the model to be fitted. Currently only a few formula #' operators are supported, including '~', '.', ':', '+', and '-'. #' @param regParam the regularization parameter. Default is 0.0. -#' @param elasticNetParam the ElasticNet mixing parameter. For alpha = 0, the penalty is an L2 penalty. -#' For alpha = 1, it is an L1 penalty. For 0 < alpha < 1, the penalty is a combination +#' @param elasticNetParam the ElasticNet mixing parameter. For alpha = 0.0, the penalty is an L2 penalty. +#' For alpha = 1.0, it is an L1 penalty. For 0.0 < alpha < 1.0, the penalty is a combination #' of L1 and L2. Default is 0.0 which is an L2 penalty. #' @param maxIter maximum iteration number. #' @param tol convergence tolerance of iterations. @@ -674,7 +674,7 @@ setMethod("predict", signature(object = "KMeansModel"), #' Supported options: #' \itemize{ #' \item{"auto": Automatically select the family based on the number of classes: -#' If numClasses == 1 || numClasses == 2, set to "binomial". +#' If number of classes == 1 || number of classes == 2, set to "binomial". #' Else, set to "multinomial".} #' \item{"binomial": Binary logistic regression with pivoting.} #' \item{"multinomial": Multinomial logistic (softmax) regression without pivoting. @@ -712,8 +712,8 @@ setMethod("predict", signature(object = "KMeansModel"), #' label <- c(1.0, 1.0, 1.0, 0.0, 0.0) #' feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) #' binary_data <- as.data.frame(cbind(label, feature)) -#' binary_df <- suppressWarnings(createDataFrame(binary_data)) -#' blr_model <- spark.logit(binary_df, label ~ feature, threshold = 1.0) +#' binary_df <- createDataFrame(binary_data) +#' blr_model <- spark.logit(binary_df, label ~ feature, thresholds = 1.0) #' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction")) #' #' # summary of binary logistic regression @@ -723,9 +723,10 @@ setMethod("predict", signature(object = "KMeansModel"), #' path <- "path/to/model" #' write.ml(blr_model, path) #' -#' # can also read back the saved model and print +#' # can also read back the saved model and predict +#' Note that summary deos not work on loaded model #' savedModel <- read.ml(path) -#' summary(savedModel) +#' blr_predict2 <- collect(select(predict(savedModel, binary_df), "prediction")) #' #' # multinomial logistic regression #' @@ -737,6 +738,7 @@ setMethod("predict", signature(object = "KMeansModel"), #' data <- as.data.frame(cbind(label, feature1, feature2, feature3, feature4)) #' df <- createDataFrame(data) #' +#' Note that summary of multinomial logistic regression is not implemented yet #' model <- spark.logit(df, label ~ ., family = "multinomial", thresholds=c(0, 1, 1)) #' predict1 <- collect(select(predict(model, df), "prediction")) #' } @@ -799,20 +801,15 @@ setMethod("summary", signature(object = "LogisticRegressionModel"), pr <- dataFrame(callJMethod(jobj, "pr")) - fMeasureByThreshold <- - dataFrame(callJMethod(jobj, "fMeasureByThreshold")) + fMeasureByThreshold <- dataFrame(callJMethod(jobj, "fMeasureByThreshold")) - precisionByThreshold <- - dataFrame(callJMethod(jobj, "precisionByThreshold")) + precisionByThreshold <- dataFrame(callJMethod(jobj, "precisionByThreshold")) - recallByThreshold <- - dataFrame(callJMethod(jobj, "recallByThreshold")) + recallByThreshold <- dataFrame(callJMethod(jobj, "recallByThreshold")) - totalIterations <- - callJMethod(jobj, "totalIterations") + totalIterations <- callJMethod(jobj, "totalIterations") - objectiveHistory <- - callJMethod(jobj, "objectiveHistory") + objectiveHistory <- callJMethod(jobj, "objectiveHistory") list(roc = roc, areaUnderROC = areaUnderROC, pr = pr, fMeasureByThreshold = fMeasureByThreshold, diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 8faa7e0be43d2..e85c118d3de1a 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -594,10 +594,10 @@ test_that("spark.logit", { binary_data <- as.data.frame(cbind(label, feature)) binary_df <- createDataFrame(binary_data) - blr_model <- spark.logit(binary_df, label ~ feature, threshold = 1.0) + blr_model <- spark.logit(binary_df, label ~ feature, thresholds = 1.0) blr_predict <- collect(select(predict(blr_model, binary_df), "prediction")) expect_equal(blr_predict$prediction, c(0, 0, 0, 0, 0)) - blr_model1 <- spark.logit(binary_df, label ~ feature, threshold = 0.0) + blr_model1 <- spark.logit(binary_df, label ~ feature, thresholds = 0.0) blr_predict1 <- collect(select(predict(blr_model1, binary_df), "prediction")) expect_equal(blr_predict1$prediction, c(1, 1, 1, 1, 1)) @@ -623,6 +623,8 @@ test_that("spark.logit", { blr_model2 <- read.ml(modelPath) blr_predict2 <- collect(select(predict(blr_model2, binary_df), "prediction")) expect_equal(blr_predict$prediction, blr_predict2$prediction) + expect_error(summary(blr_model2)) + unlink(modelPath) # test multinomial logistic regression label <- c(0.0, 1.0, 2.0, 0.0, 0.0) diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala index 5bba39aee30a1..9b352c9863114 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LogisticRegressionWrapper.scala @@ -41,26 +41,20 @@ private[r] class LogisticRegressionWrapper private ( lazy val objectiveHistory: Array[Double] = logisticRegressionModel.summary.objectiveHistory - lazy val roc: DataFrame = - logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary].roc + lazy val blrSummary = + logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary] - lazy val areaUnderROC: Double = - logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary].areaUnderROC + lazy val roc: DataFrame = blrSummary.roc - lazy val pr: DataFrame = - logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary].pr + lazy val areaUnderROC: Double = blrSummary.areaUnderROC - lazy val fMeasureByThreshold: DataFrame = - logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary] - .fMeasureByThreshold + lazy val pr: DataFrame = blrSummary.pr - lazy val precisionByThreshold: DataFrame = - logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary] - .precisionByThreshold + lazy val fMeasureByThreshold: DataFrame = blrSummary.fMeasureByThreshold - lazy val recallByThreshold: DataFrame = - logisticRegressionModel.summary.asInstanceOf[BinaryLogisticRegressionSummary] - .recallByThreshold + lazy val precisionByThreshold: DataFrame = blrSummary.precisionByThreshold + + lazy val recallByThreshold: DataFrame = blrSummary.recallByThreshold def transform(dataset: Dataset[_]): DataFrame = { pipeline.transform(dataset).drop(logisticRegressionModel.getFeaturesCol) @@ -157,7 +151,7 @@ private[r] object LogisticRegressionWrapper val features = (rMetadata \ "features").extract[Array[String]] val pipeline = PipelineModel.load(pipelinePath) - new LogisticRegressionWrapper(pipeline, features, true) + new LogisticRegressionWrapper(pipeline, features, isLoaded = true) } } } \ No newline at end of file From 031cf9b8efa104de0bc288b5833ffc03b2fe7097 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 25 Oct 2016 14:11:11 -0700 Subject: [PATCH 10/10] fix aliases for summary --- R/pkg/R/mllib.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index ea56f167a78a1..26c268624273e 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -783,7 +783,7 @@ setMethod("predict", signature(object = "LogisticRegressionModel"), #' @return \code{summary} returns the Binary Logistic regression results of a given model as lists. Note that #' Multinomial logistic regression summary is not available now. #' @rdname spark.logit -#' @aliases spark.logit,SparkDataFrame,formula-method +#' @aliases summary,LogisticRegressionModel-method #' @export #' @note summary(LogisticRegressionModel) since 2.1.0 setMethod("summary", signature(object = "LogisticRegressionModel"),