Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve test coverage #1631

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ trait HasImageInput extends HasImageUrl
override protected def inputFunc(schema: StructType): Row => Option[HttpRequestBase] = {
val rowToUrl = prepareUrl
val rowToEntity = prepareEntity;
if (get(imageUrl).orElse(get(imageBytes)).isEmpty){
throw new IllegalArgumentException("Please set one of the" +
" imageUrl, imageUrlCol, imageBytes, imageBytesCol parameters.")
}

{ row: Row =>
if (shouldSkip(row)) {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ class FitMultivariateAnomaly(override val uid: String) extends Estimator[DetectM
}
}

def getSlidingWindow: Option[Int] = get(slidingWindow)
def getSlidingWindow: Int = $(slidingWindow)

val alignMode = new Param[String](this, "alignMode", "An optional field, indicates how " +
"we align different variables into the same time-range which is required by the model.{Inner, Outer}")
Expand All @@ -403,7 +403,7 @@ class FitMultivariateAnomaly(override val uid: String) extends Estimator[DetectM
}
}

def getAlignMode: Option[String] = get(alignMode)
def getAlignMode: String = $(alignMode)

val fillNAMethod = new Param[String](this, "fillNAMethod", "An optional field, indicates how missed " +
"values will be filled with. Can not be set to NotFill, when alignMode is Outer.{Previous, Subsequent," +
Expand All @@ -417,7 +417,7 @@ class FitMultivariateAnomaly(override val uid: String) extends Estimator[DetectM
}
}

def getFillNAMethod: Option[String] = get(fillNAMethod)
def getFillNAMethod: String = $(fillNAMethod)

val paddingValue = new IntParam(this, "paddingValue", "optional field, is only useful" +
" if FillNAMethod is set to Fixed.")
Expand All @@ -439,10 +439,10 @@ class FitMultivariateAnomaly(override val uid: String) extends Estimator[DetectM
source,
getStartTime,
getEndTime,
getSlidingWindow,
get(slidingWindow),
Option(AlignPolicy(
getAlignMode,
getFillNAMethod,
get(alignMode),
get(fillNAMethod),
get(paddingValue))),
get(displayName)
).toJson.compactPrint, ContentType.APPLICATION_JSON))
Expand Down Expand Up @@ -480,7 +480,7 @@ class FitMultivariateAnomaly(override val uid: String) extends Estimator[DetectM
})
}

override def copy(extra: ParamMap): Estimator[DetectMultivariateAnomaly] = defaultCopy(extra)
override def copy(extra: ParamMap): FitMultivariateAnomaly = defaultCopy(extra)

override def transformSchema(schema: StructType): StructType = {
schema.add(getErrorCol, DMAError.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import com.microsoft.azure.synapse.ml.Secrets
import com.microsoft.azure.synapse.ml.cognitive._
import com.microsoft.azure.synapse.ml.core.spark.FluentAPI._
import com.microsoft.azure.synapse.ml.core.test.base.{Flaky, TestBase}
import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, TransformerFuzzing}
import com.microsoft.azure.synapse.ml.core.test.fuzzing.{GetterSetterFuzzing, TestObject, TransformerFuzzing}
import org.apache.spark.ml.NamespaceInjections.pipelineModel
import org.apache.spark.ml.util.MLReadable
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.functions.{col, typedLit}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.scalactic.Equality

Expand Down Expand Up @@ -87,7 +87,8 @@ class OCRSuite extends TransformerFuzzing[OCR] with CognitiveKey with Flaky with
override def reader: MLReadable[_] = OCR
}

class AnalyzeImageSuite extends TransformerFuzzing[AnalyzeImage] with CognitiveKey with Flaky {
class AnalyzeImageSuite extends TransformerFuzzing[AnalyzeImage]
with CognitiveKey with Flaky with GetterSetterFuzzing[AnalyzeImage] {

import spark.implicits._

Expand All @@ -97,6 +98,12 @@ class AnalyzeImageSuite extends TransformerFuzzing[AnalyzeImage] with CognitiveK
("https://mmlspark.blob.core.windows.net/datasets/OCR/test3.png", "en")
).toDF("url", "language")

lazy val nullDf: DataFrame = Seq(
("https://mmlspark.blob.core.windows.net/datasets/OCR/test1.jpg", "en"),
("https://mmlspark.blob.core.windows.net/datasets/OCR/test2.png", null), //scalastyle:ignore null
(null, "en")
).toDF("url", "language")

def baseAI: AnalyzeImage = new AnalyzeImage()
.setSubscriptionKey(cognitiveKey)
.setLocation("eastus")
Expand All @@ -118,6 +125,13 @@ class AnalyzeImageSuite extends TransformerFuzzing[AnalyzeImage] with CognitiveK
def bytesAI: AnalyzeImage = baseAI
.setImageBytesCol("imageBytes")

test("Null handling"){
assertThrows[IllegalArgumentException]{
baseAI.transform(nullDf)
}
assert(ai.transform(nullDf).where(col("features").isNull).count() == 1)
}

test("full parametrization") {
val row = (Seq("Categories"), "en", Seq("Celebrities"),
"https://mmlspark.blob.core.windows.net/datasets/OCR/test1.jpg")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import com.microsoft.azure.synapse.ml.Secrets
import com.microsoft.azure.synapse.ml.cognitive._
import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, TransformerFuzzing}
import com.microsoft.azure.synapse.ml.stages.{FixedMiniBatchTransformer, FlattenBatch}
import org.apache.spark.SparkException
import org.apache.spark.ml.util.MLReadable
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, Row}

import java.util.concurrent.TimeoutException


trait TextEndpoint {
lazy val textKey: String = sys.env.getOrElse("TEXT_API_KEY", Secrets.CognitiveApiKey)
lazy val textApiLocation: String = sys.env.getOrElse("TEXT_API_LOCATION", "eastus")
Expand Down Expand Up @@ -327,6 +332,24 @@ class TextAnalyzeSuite extends TransformerFuzzing[TextAnalyze] with TextEndpoint
assert(results(24).get.sentimentAnalysis.get.document.get.sentiment == "positive")
}

test("Exceeded Retries Info") {
val badModel = model
.setPollingDelay(0)
.setInitialPollingDelay(0)
.setMaxPollingRetries(1)

val results = badModel
.setSuppressMaxRetriesException(true)
.transform(df.coalesce(1))
assert(results.where(!col("error").isNull).count() > 0)

assertThrows[SparkException] {
badModel.setSuppressMaxRetriesException(false)
.transform(df.coalesce(1))
.collect()
}
}

override def testObjects(): Seq[TestObject[TextAnalyze]] =
Seq(new TestObject[TextAnalyze](model, df))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.microsoft.azure.synapse.ml.cognitive.split1.AnomalyKey
import com.microsoft.azure.synapse.ml.core.test.base.TestBase
import com.microsoft.azure.synapse.ml.core.test.benchmarks.DatasetUtils
import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject}
import org.apache.spark.ml.param.{Param, ParamPair}
import org.apache.spark.ml.util.MLReadable
import org.apache.spark.sql.DataFrame
import spray.json.{DefaultJsonProtocol, _}
Expand Down Expand Up @@ -231,6 +232,11 @@ class FitMultivariateAnomalySuite extends EstimatorFuzzing[FitMultivariateAnomal

}

override def getterSetterParamExamples(p: FitMultivariateAnomaly): Map[Param[_],Any] = Map(
(p.alignMode, "Inner"),
(p.fillNAMethod, "Zero")
)

override def testSerialization(): Unit = {
println("ignore the Serialization Fuzzing test because fitting process takes more than 3 minutes")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,65 @@ package com.microsoft.azure.synapse.ml.codegen

import com.microsoft.azure.synapse.ml.param._
import org.apache.spark.internal.Logging
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.param._

import scala.reflect.ClassTag

case class ParamInfo[T <: Param[_]: ClassTag](pyType: String,
pyTypeConverter: Option[String],
rTypeConverter: Option[String],
dotnetType: String) {
pyTypeConverter: Option[String],
rTypeConverter: Option[String],
dotnetType: String,
example: Any) {

def this(pyType: String, typeConverterArg: String, rTypeConverterArg: String, dotnetType: String) = {
this(pyType, Some(typeConverterArg), Some(rTypeConverterArg), dotnetType)
def this(pyType: String, typeConverterArg: String, rTypeConverterArg: String, dotnetType: String, example: Any) = {
this(pyType, Some(typeConverterArg), Some(rTypeConverterArg), dotnetType, example)
}

def this(pyType: String, dotnetType: String) = {
this(pyType, None, None, dotnetType)
def this(pyType: String, dotnetType: String, example: Any) = {
this(pyType, None, None, dotnetType, example)
}

}

object DefaultParamInfo extends Logging {

val BooleanInfo = new ParamInfo[BooleanParam]("bool", "TypeConverters.toBoolean", "as.logical", "bool")
val IntInfo = new ParamInfo[IntParam]("int", "TypeConverters.toInt", "as.integer", "int")
val LongInfo = new ParamInfo[LongParam]("long", None, Some("as.integer"), "long")
val FloatInfo = new ParamInfo[FloatParam]("float", "TypeConverters.toFloat", "as.double", "float")
val DoubleInfo = new ParamInfo[DoubleParam]("float", "TypeConverters.toFloat", "as.double", "double")
val StringInfo = new ParamInfo[Param[String]]("str", Some("TypeConverters.toString"), None, "string")
val StringArrayInfo = new ParamInfo[StringArrayParam]("list", "TypeConverters.toListString",
"as.array", "string[]")
val DoubleArrayInfo = new ParamInfo[DoubleArrayParam]("list", "TypeConverters.toListFloat",
"as.array", "double[]")
val IntArrayInfo = new ParamInfo[IntArrayParam]("list", "TypeConverters.toListInt",
"as.array", "int[]")
val ByteArrayInfo = new ParamInfo[ByteArrayParam]("list", "byte[]")
val DoubleArrayArrayInfo = new ParamInfo[DoubleArrayArrayParam]("object", "double[][]")
val StringStringMapInfo = new ParamInfo[StringStringMapParam]("dict", "Dictionary<string, string>")
val StringIntMapInfo = new ParamInfo[StringIntMapParam]("dict", "Dictionary<string, int>")
val ArrayMapInfo = new ParamInfo[ArrayMapParam]("object", "Dictionary<string, object>[]")
val TypedIntArrayInfo = new ParamInfo[TypedIntArrayParam]("object", "int[]")
val TypedDoubleArrayInfo = new ParamInfo[TypedDoubleArrayParam]("object", "double[]")
val UntypedArrayInfo = new ParamInfo[UntypedArrayParam]("object", "object[]")

val UnknownInfo = new ParamInfo[Param[_]]("object", "object")
val BooleanInfo = new ParamInfo[BooleanParam](
"bool", "TypeConverters.toBoolean", "as.logical", "bool", true)
val IntInfo = new ParamInfo[IntParam](
"int", "TypeConverters.toInt", "as.integer", "int", 1)
val LongInfo = new ParamInfo[LongParam](
"long", None, Some("as.integer"), "long", 1L)
val FloatInfo = new ParamInfo[FloatParam](
"float", "TypeConverters.toFloat", "as.double", "float", 1.0)
val DoubleInfo = new ParamInfo[DoubleParam](
"float", "TypeConverters.toFloat", "as.double", "double", 1.0)
val StringInfo = new ParamInfo[Param[String]](
"str", Some("TypeConverters.toString"), None, "string", "foo")
val StringArrayInfo = new ParamInfo[StringArrayParam](
"list", "TypeConverters.toListString", "as.array", "string[]", Array("foo", "bar"))
val DoubleArrayInfo = new ParamInfo[DoubleArrayParam](
"list", "TypeConverters.toListFloat", "as.array", "double[]", Array(1.0, 2.0))
val IntArrayInfo = new ParamInfo[IntArrayParam](
"list", "TypeConverters.toListInt", "as.array", "int[]", Array(1, 2))
val ByteArrayInfo = new ParamInfo[ByteArrayParam](
"list", "byte[]", Array(1.toByte, 0.toByte))
val DoubleArrayArrayInfo = new ParamInfo[DoubleArrayArrayParam](
"object", "double[][]", Array(Array(1.0, 2.0)))
val StringStringMapInfo = new ParamInfo[StringStringMapParam](
"dict", "Dictionary<string, string>", Map("foo" -> "bar"))
val StringIntMapInfo = new ParamInfo[StringIntMapParam](
"dict", "Dictionary<string, int>", Map("foo" -> 1))
val ArrayMapInfo = new ParamInfo[ArrayMapParam](
"object", "Dictionary<string, object>[]", Array(Map("foo" -> 1)))
val TypedIntArrayInfo = new ParamInfo[TypedIntArrayParam](
"object", "int[]", Array(1, 2))
val TypedDoubleArrayInfo = new ParamInfo[TypedDoubleArrayParam](
"object", "double[]", Array(1.0, 2.0))
val UntypedArrayInfo = new ParamInfo[UntypedArrayParam](
"object", "object[]", Array(1.0, 2.0))
val UnknownInfo = new ParamInfo[Param[_]](
"object", "object", null)

//noinspection ScalaStyle
def getGeneralParamInfo(dataType: Param[_]): ParamInfo[_] = {
Expand Down Expand Up @@ -75,4 +91,16 @@ object DefaultParamInfo extends Logging {
}
}

def defaultGetParamInfo(stage: Params, p: Param[_]): ParamInfo[_] = {
try {
stage.getClass.getMethod(p.name)
.getAnnotatedReturnType.getType.toString match {
case "org.apache.spark.ml.param.Param<java.lang.String>" => StringInfo
case _ => getGeneralParamInfo(p)
}
} catch {
case _: Exception => getGeneralParamInfo(p)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,7 @@ trait BaseWrappable extends Params {
}

def getParamInfo(p: Param[_]): ParamInfo[_] = {
try {
thisStage.getClass.getMethod(p.name)
.getAnnotatedReturnType.getType.toString match {
case "org.apache.spark.ml.param.Param<java.lang.String>" => StringInfo
case _ => getGeneralParamInfo(p)
}
} catch {
case _: Exception => getGeneralParamInfo(p)
}

DefaultParamInfo.defaultGetParamInfo(thisStage, p)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class PageSplitter(override val uid: String)
)
}

override def copy(extra: ParamMap): MultiNGram =
override def copy(extra: ParamMap): PageSplitter =
defaultCopy(extra)

def transformSchema(schema: StructType): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ class RankingAdapter(override val uid: String)

def this() = this(Identifiable.randomUID("RecommenderAdapter"))

/** @group getParam */
override def getUserCol: String = getRecommender.asInstanceOf[Estimator[_] with RecommendationParams].getUserCol

/** @group getParam */
override def getItemCol: String = getRecommender.asInstanceOf[Estimator[_] with RecommendationParams].getItemCol

/** @group getParam */
override def getRatingCol: String = getRecommender.asInstanceOf[Estimator[_] with RecommendationParams].getRatingCol
override def setRecommender(value: Estimator[_ <: Model[_]]): this.type = {
val v = value.asInstanceOf[Estimator[_ <: Model[_]] with RecommendationParams]
v.get(v.userCol).map(setUserCol)
v.get(v.ratingCol).map(setRatingCol)
v.get(v.itemCol).map(setItemCol)
super.setRecommender(v)
}

def fit(dataset: Dataset[_]): RankingAdapterModel = {
transformSchema(dataset.schema)
logFit({
new RankingAdapterModel()
.setRecommenderModel(getRecommender.fit(dataset))
Expand All @@ -96,6 +96,13 @@ class RankingAdapter(override val uid: String)
})
}

override def transformSchema(schema: StructType): StructType = {
// Trigger the updating of parameters
// in case python parameterizes the call
get(recommender).foreach(setRecommender)
super.transformSchema(schema)
}

override def copy(extra: ParamMap): RankingAdapter = {
defaultCopy(extra)
}
Expand Down Expand Up @@ -138,8 +145,8 @@ class RankingAdapterModel private[ml](val uid: String)
val recs = getMode match {
case "allUsers" =>
this.getRecommenderModel match {
case als: ALSModel => als.asInstanceOf[ALSModel].recommendForAllUsers(getK)
case sar: SARModel => sar.asInstanceOf[SARModel].recommendForAllUsers(getK)
case als: ALSModel => als.recommendForAllUsers(getK)
case sar: SARModel => sar.recommendForAllUsers(getK)
}
case "normal" => SparkHelpers.flatten(getRecommenderModel.transform(dataset), getK, getItemCol, getUserCol)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ class StratifiedRepartition(val uid: String) extends Transformer with Wrappable

def transformSchema(schema: StructType): StructType = schema

def copy(extra: ParamMap): DropColumns = defaultCopy(extra)
def copy(extra: ParamMap): StratifiedRepartition = defaultCopy(extra)
}
Loading