Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Upgrade the univariate anomaly detection version to v1.1-preview #1440

Merged
merged 12 commits into from
Mar 15, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,31 @@ abstract class AnomalyDetectorBase(override val uid: String) extends CognitiveSe

def setPeriodCol(v: String): this.type = setVectorParam(period, v)

val imputeMode = new ServiceParam[String](this, "imputeMode",
"""
|Optional argument, impute mode of a time series.
|Possible values: auto, previous, linear, fixed, zero, notFill
""".stripMargin.replace("\n", " ").replace("\r", " "),
{ _ => true },
isRequired = false
)

def setImputeMode(v: String): this.type = setScalarParam(imputeMode, v)

def setImputeModeCol(v: String): this.type = setVectorParam(imputeMode, v)

val imputeFixedValue = new ServiceParam[Double](this, "imputeFixedValue",
"""
|Optional argument, fixed value to use when imputeMode is set to "fixed"
""".stripMargin.replace("\n", " ").replace("\r", " "),
{ _ => true },
isRequired = false
)

def setImputeFixedValue(v: Double): this.type = setScalarParam(imputeFixedValue, v)

def setImputeFixedValueCol(v: String): this.type = setVectorParam(imputeFixedValue, v)

val series = new ServiceParam[Seq[TimeSeriesPoint]](this, "series",
"""
|Time series data points. Points should be sorted by timestamp in ascending order
Expand All @@ -109,7 +134,9 @@ abstract class AnomalyDetectorBase(override val uid: String) extends CognitiveSe
getValueOpt(row, maxAnomalyRatio),
getValueOpt(row, sensitivity),
getValueOpt(row, customInterval),
getValueOpt(row, period)
getValueOpt(row, period),
getValueOpt(row, imputeMode),
getValueOpt(row, imputeFixedValue)
).toJson.compactPrint))
}
}
Expand All @@ -125,7 +152,7 @@ class DetectLastAnomaly(override val uid: String) extends AnomalyDetectorBase(ui

def setSeriesCol(v: String): this.type = setVectorParam(series, v)

def urlPath: String = "/anomalydetector/v1.0/timeseries/last/detect"
def urlPath: String = "/anomalydetector/v1.1-preview.1/timeseries/last/detect"

override def responseDataType: DataType = ADLastResponse.schema

Expand All @@ -142,7 +169,7 @@ class DetectAnomalies(override val uid: String) extends AnomalyDetectorBase(uid)

def setSeriesCol(v: String): this.type = setVectorParam(series, v)

def urlPath: String = "/anomalydetector/v1.0/timeseries/entire/detect"
def urlPath: String = "/anomalydetector/v1.1-preview.1/timeseries/entire/detect"

override def responseDataType: DataType = ADEntireResponse.schema

Expand Down Expand Up @@ -238,7 +265,7 @@ class SimpleDetectAnomalies(override val uid: String) extends AnomalyDetectorBas

}

def urlPath: String = "/anomalydetector/v1.0/timeseries/entire/detect"
def urlPath: String = "/anomalydetector/v1.1-preview.1/timeseries/entire/detect"

override def responseDataType: DataType = ADEntireResponse.schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ case class ADRequest(series: Seq[TimeSeriesPoint],
maxAnomalyRatio: Option[Double],
sensitivity: Option[Int],
customInterval: Option[Int],
period: Option[Int])
period: Option[Int],
imputeMode: Option[String],
imputeFixedValue: Option[Double])

object ADRequest extends SparkBindings[ADRequest]

Expand All @@ -27,7 +29,8 @@ case class ADLastResponse(isAnomaly: Boolean,
expectedValue: Double,
upperMargin: Double,
lowerMargin: Double,
suggestedWindow: Int)
suggestedWindow: Int,
severity: Double)

object ADLastResponse extends SparkBindings[ADLastResponse]

Expand All @@ -37,7 +40,8 @@ case class ADSingleResponse(isAnomaly: Boolean,
period: Int,
expectedValue: Double,
upperMargin: Double,
lowerMargin: Double)
lowerMargin: Double,
severity: Double)

object ADSingleResponse extends SparkBindings[ADSingleResponse]

Expand All @@ -47,13 +51,14 @@ case class ADEntireResponse(isAnomaly: Seq[Boolean],
period: Int,
expectedValues: Seq[Double],
upperMargins: Seq[Double],
lowerMargins: Seq[Double]) {
lowerMargins: Seq[Double],
severity: Seq[Double]) {

def explode: Seq[ADSingleResponse] = {
isAnomaly.indices.map { i =>
ADSingleResponse(
isAnomaly(i), isPositiveAnomaly(i), isNegativeAnomaly(i),
period, expectedValues(i), upperMargins(i), lowerMargins(i)
period, expectedValues(i), upperMargins(i), lowerMargins(i), severity(i)
)
}
}
Expand All @@ -63,5 +68,5 @@ object ADEntireResponse extends SparkBindings[ADEntireResponse]

object AnomalyDetectorProtocol {
implicit val TspEnc: RootJsonFormat[TimeSeriesPoint] = jsonFormat2(TimeSeriesPoint.apply)
implicit val AdreqEnc: RootJsonFormat[ADRequest] = jsonFormat6(ADRequest.apply)
implicit val AdreqEnc: RootJsonFormat[ADRequest] = jsonFormat8(ADRequest.apply)
}