diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala index ee1fc9b14ceaa..176a6cf852914 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala @@ -83,11 +83,7 @@ private[r] object GeneralizedLinearRegressionWrapper .setStringIndexerOrderType(stringIndexerOrderType) checkDataColumns(rFormula, data) val rFormulaModel = rFormula.fit(data) - // get labels and feature names from output schema - val schema = rFormulaModel.transform(data).schema - val featureAttrs = AttributeGroup.fromStructField(schema(rFormula.getFeaturesCol)) - .attributes.get - val features = featureAttrs.map(_.name.get) + // assemble and fit the pipeline val glr = new GeneralizedLinearRegression() .setFamily(family) @@ -113,37 +109,16 @@ private[r] object GeneralizedLinearRegressionWrapper val summary = glm.summary val rFeatures: Array[String] = if (glm.getFitIntercept) { - Array("(Intercept)") ++ features + Array("(Intercept)") ++ summary.featureNames } else { - features + summary.featureNames } val rCoefficients: Array[Double] = if (summary.isNormalSolver) { - val rCoefficientStandardErrors = if (glm.getFitIntercept) { - Array(summary.coefficientStandardErrors.last) ++ - summary.coefficientStandardErrors.dropRight(1) - } else { - summary.coefficientStandardErrors - } - - val rTValues = if (glm.getFitIntercept) { - Array(summary.tValues.last) ++ summary.tValues.dropRight(1) - } else { - summary.tValues - } - - val rPValues = if (glm.getFitIntercept) { - Array(summary.pValues.last) ++ summary.pValues.dropRight(1) - } else { - summary.pValues - } - - if (glm.getFitIntercept) { - Array(glm.intercept) ++ glm.coefficients.toArray ++ - rCoefficientStandardErrors ++ rTValues ++ rPValues - } else { - glm.coefficients.toArray ++ rCoefficientStandardErrors ++ rTValues ++ rPValues - } + summary.coefficientsWithStatistics.map(_._2) ++ + summary.coefficientsWithStatistics.map(_._3) ++ + summary.coefficientsWithStatistics.map(_._4) ++ + summary.coefficientsWithStatistics.map(_._5) } else { if (glm.getFitIntercept) { Array(glm.intercept) ++ glm.coefficients.toArray diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index c600b87bdc64a..beca5956a2d94 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -20,12 +20,14 @@ package org.apache.spark.ml.regression import java.util.Locale import breeze.stats.{distributions => dist} +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.feature.{Instance, OffsetInstance} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.optim._ @@ -37,7 +39,6 @@ import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} - /** * Params for Generalized Linear Regression. */ @@ -141,6 +142,7 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam /** * Param for offset column name. If this is not set or empty, we treat all instance offsets * as 0.0. The feature specified as offset has a constant coefficient of 1.0. + * * @group param */ @Since("2.3.0") @@ -1204,6 +1206,21 @@ class GeneralizedLinearRegressionSummary private[regression] ( @Since("2.2.0") lazy val numInstances: Long = predictions.count() + + /** + * Name of features. If the name cannot be retrieved from attributes, + * set default names to feature column name with numbered suffix "_0", "_1", and so on. + */ + private[ml] lazy val featureNames: Array[String] = { + val featureAttrs = AttributeGroup.fromStructField( + dataset.schema(model.getFeaturesCol)).attributes + if (featureAttrs.isDefined) { + featureAttrs.get.map(_.name.get) + } else { + Array.tabulate[String](origModel.numFeatures)((x: Int) => model.getFeaturesCol + "_" + x) + } + } + /** The numeric rank of the fitted linear model. */ @Since("2.0.0") lazy val rank: Long = if (model.getFitIntercept) { @@ -1458,4 +1475,96 @@ class GeneralizedLinearRegressionTrainingSummary private[regression] ( "No p-value available for this GeneralizedLinearRegressionModel") } } + + /** + * Coefficients with statistics: feature name, coefficients, standard error, tValue and pValue. + */ + private[ml] lazy val coefficientsWithStatistics: Array[ + (String, Double, Double, Double, Double)] = { + var featureNamesLocal = featureNames + var coefficientsArray = model.coefficients.toArray + var index = Array.range(0, coefficientsArray.length) + if (model.getFitIntercept) { + featureNamesLocal = featureNamesLocal :+ "(Intercept)" + coefficientsArray = coefficientsArray :+ model.intercept + // Reorder so that intercept comes first + index = (coefficientsArray.length - 1) +: index + } + index.map { i => + (featureNamesLocal(i), coefficientsArray(i), coefficientStandardErrors(i), + tValues(i), pValues(i)) + } + } + + override def toString: String = { + if (isNormalSolver) { + + def round(x: Double): String = { + BigDecimal(x).setScale(4, BigDecimal.RoundingMode.HALF_UP).toString + } + + val colNames = Array("Feature", "Estimate", "Std Error", "T Value", "P Value") + + val data = coefficientsWithStatistics.map { row => + val strRow = row.productIterator.map { cell => + val str = cell match { + case s: String => s + case n: Double => round(n) + } + // Truncate if length > 20 + if (str.length > 20) { + str.substring(0, 17) + "..." + } else { + str + } + } + strRow.toArray + } + + // Compute the width of each column + val colWidths = colNames.map(_.length) + data.foreach { strRow => + strRow.zipWithIndex.foreach { case (cell: String, i: Int) => + colWidths(i) = math.max(colWidths(i), cell.length) + } + } + + val sb = new StringBuilder + + // Output coefficients with statistics + sb.append("Coefficients:\n") + colNames.zipWithIndex.map { case (colName: String, i: Int) => + StringUtils.leftPad(colName, colWidths(i)) + }.addString(sb, "", " ", "\n") + + data.foreach { case strRow: Array[String] => + strRow.zipWithIndex.map { case (cell: String, i: Int) => + StringUtils.leftPad(cell.toString, colWidths(i)) + }.addString(sb, "", " ", "\n") + } + + sb.append("\n") + sb.append(s"(Dispersion parameter for ${family.name} family taken to be " + + s"${round(dispersion)})") + + sb.append("\n") + val nd = s"Null deviance: ${round(nullDeviance)} on $degreesOfFreedom degrees of freedom" + val rd = s"Residual deviance: ${round(deviance)} on $residualDegreeOfFreedom degrees of " + + "freedom" + val l = math.max(nd.length, rd.length) + sb.append(StringUtils.leftPad(nd, l)) + sb.append("\n") + sb.append(StringUtils.leftPad(rd, l)) + + if (family.name != "tweedie") { + sb.append("\n") + sb.append(s"AIC: " + round(aic)) + } + + sb.toString() + } else { + throw new UnsupportedOperationException( + "No summary available for this GeneralizedLinearRegressionModel") + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index a47bd17f47bb1..df7dee869d058 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LogisticRegressionSuite._ import org.apache.spark.ml.feature.{Instance, OffsetInstance} -import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.feature.{LabeledPoint, RFormula} import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors} import org.apache.spark.ml.param.{ParamMap, ParamsSuite} import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} @@ -1524,6 +1524,87 @@ class GeneralizedLinearRegressionSuite .fit(datasetGaussianIdentity.as[LabeledPoint]) } + test("glm summary: feature name") { + // dataset1 with no attribute + val dataset1 = Seq( + Instance(2.0, 1.0, Vectors.dense(0.0, 5.0)), + Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)), + Instance(2.0, 5.0, Vectors.dense(2.0, 3.0)) + ).toDF() + + // dataset2 with attribute + val datasetTmp = Seq( + (2.0, 1.0, 0.0, 5.0), + (8.0, 2.0, 1.0, 7.0), + (3.0, 3.0, 2.0, 11.0), + (9.0, 4.0, 3.0, 13.0), + (2.0, 5.0, 2.0, 3.0) + ).toDF("y", "w", "x1", "x2") + val formula = new RFormula().setFormula("y ~ x1 + x2") + val dataset2 = formula.fit(datasetTmp).transform(datasetTmp) + + val expectedFeature = Seq(Array("features_0", "features_1"), Array("x1", "x2")) + + var idx = 0 + for (dataset <- Seq(dataset1, dataset2)) { + val model = new GeneralizedLinearRegression().fit(dataset) + model.summary.featureNames.zip(expectedFeature(idx)) + .foreach{ x => assert(x._1 === x._2) } + idx += 1 + } + } + + test("glm summary: coefficient with statistics") { + /* + R code: + + A <- matrix(c(0, 1, 2, 3, 2, 5, 7, 11, 13, 3), 5, 2) + b <- c(2, 8, 3, 9, 2) + df <- as.data.frame(cbind(A, b)) + model <- glm(formula = "b ~ .", data = df) + summary(model) + + Coefficients: + Estimate Std. Error t value Pr(>|t|) + (Intercept) 0.7903 4.0129 0.197 0.862 + V1 0.2258 2.1153 0.107 0.925 + V2 0.4677 0.5815 0.804 0.506 + */ + val dataset = Seq( + Instance(2.0, 1.0, Vectors.dense(0.0, 5.0)), + Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)), + Instance(2.0, 5.0, Vectors.dense(2.0, 3.0)) + ).toDF() + + val expectedFeature = Seq(Array("features_0", "features_1"), + Array("(Intercept)", "features_0", "features_1")) + val expectedEstimate = Seq(Vectors.dense(0.2884, 0.538), + Vectors.dense(0.7903, 0.2258, 0.4677)) + val expectedStdError = Seq(Vectors.dense(1.724, 0.3787), + Vectors.dense(4.0129, 2.1153, 0.5815)) + + var idx = 0 + for (fitIntercept <- Seq(false, true)) { + val trainer = new GeneralizedLinearRegression() + .setFamily("gaussian") + .setFitIntercept(fitIntercept) + val model = trainer.fit(dataset) + val coefficientsWithStatistics = model.summary.coefficientsWithStatistics + + coefficientsWithStatistics.map(_._1).zip(expectedFeature(idx)).foreach { x => + assert(x._1 === x._2, "Feature name mismatch in coefficientsWithStatistics") } + assert(Vectors.dense(coefficientsWithStatistics.map(_._2)) ~= expectedEstimate(idx) + absTol 1E-3, "Coefficients mismatch in coefficientsWithStatistics") + assert(Vectors.dense(coefficientsWithStatistics.map(_._3)) ~= expectedStdError(idx) + absTol 1E-3, "Standard error mismatch in coefficientsWithStatistics") + idx += 1 + } + } + test("generalized linear regression: regularization parameter") { /* R code: