Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Adding telemetry for the dataset metadata. This one is specially for … #1917

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,7 @@ abstract class CognitiveServicesBaseNoHandler(val uid: String) extends Transform
}

override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame](
getInternalTransformer(dataset.schema).transform(dataset)
logTransform[DataFrame](getInternalTransformer(dataset.schema).transform(dataset), dataset.columns.length
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class SimpleDetectAnomalies(override val uid: String) extends AnomalyDetectorBas
getErrorCol,
s"$getOutputCol.1"
).withColumnRenamed("1", getOutputCol)
})
}, dataset.columns.length)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class SimpleFitMultivariateAnomaly(override val uid: String) extends Estimator[S
.setModelId(modelId)
.setIntermediateSaveDir(getIntermediateSaveDir)
.setDiagnosticsInfo(modelInfo("diagnosticsInfo").convertTo[DiagnosticsInfo])
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): SimpleFitMultivariateAnomaly = defaultCopy(extra)
Expand Down Expand Up @@ -589,7 +589,7 @@ class SimpleDetectMultivariateAnomaly(override val uid: String) extends Model[Si

//scalastyle:off method.length
override def transform(dataset: Dataset[_]): DataFrame =
logTransform[DataFrame] {
logTransform[DataFrame] ({

// check model status first
MADUtils.checkModelStatus(getUrl, getModelId, getSubscriptionKey)
Expand Down Expand Up @@ -635,7 +635,7 @@ class SimpleDetectMultivariateAnomaly(override val uid: String) extends Model[Si
df.join(finalDF, df(getTimestampCol) === finalDF("resultTimestamp"), "left")
.drop("resultTimestamp")
.sort(col(getTimestampCol).asc)
}
}, dataset.columns.length)
//scalastyle:on method.length

override def copy(extra: ParamMap): SimpleDetectMultivariateAnomaly = defaultCopy(extra)
Expand Down Expand Up @@ -720,7 +720,7 @@ class DetectLastMultivariateAnomaly(override val uid: String) extends CognitiveS
col(s"$getOutputCol.results.timestamp")(0)).otherwise(null))
.drop(columnNames: _*)

})
}, dataset.columns.length)
}
// scalastyle:on null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class FormOntologyLearner(override val uid: String) extends Estimator[FormOntolo
.setInputCol(getInputCol)
.setOutputCol(getOutputCol)
.setOntology(mergedSchema)
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): Estimator[FormOntologyTransformer] = defaultCopy(extra)
Expand Down Expand Up @@ -121,7 +121,7 @@ class FormOntologyTransformer(override val uid: String) extends Model[FormOntolo

dataset.toDF()
.withColumn(getOutputCol, convertToOntologyUDF(col(getInputCol)))
})
}, dataset.columns.length)
}

override def transformSchema(schema: StructType): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class OpenAIPrompt(override val uid: String) extends Transformer
} else {
results
}
})
}, dataset.columns.length)
}

private def openAICompletion: OpenAICompletion = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class AddDocuments(override val uid: String) extends CognitiveServicesBase(uid)
s"/indexes/$getIndexName/docs/index?api-version=${AzureSearchAPIConstants.DefaultAPIVersion}")
}
super.transform(dataset)
})
}, dataset.columns.length)
}

override def prepareEntity: Row => Option[AbstractHttpEntity] = row =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ abstract class SpeechSDKBase extends Transformer
isUriAudio
))(enc)
.drop(dynamicParamColName)
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): this.type = defaultCopy(extra)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class DocumentTranslator(override val uid: String) extends CognitiveServicesBase
override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
getInternalTransformer(dataset.schema).transform(dataset)
})
}, dataset.columns.length)
}

override def responseDataType: DataType = TranslationStatusResponse.schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class FindBestModel(override val uid: String) extends Estimator[BestModel]
.setBestModelMetrics(evaluator.transform(bestScoredDf))
.setRocCurve(evaluator.rocCurve)
.setAllModelMetrics(allModelMetrics)
})
}, dataset.columns.length)
}

// Choose a random model as we don't know which one will be chosen yet - all will transform schema in same way
Expand Down Expand Up @@ -189,7 +189,7 @@ class BestModel(val uid: String) extends Model[BestModel]

override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame](
getBestModel.transform(dataset)
getBestModel.transform(dataset), dataset.columns.length
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class TuneHyperparameters(override val uid: String) extends Estimator[TuneHyperp
// Compute best model fit on dataset
val bestModel = getModels(bestIndex % numModels).fit(dataset, paramsPerRun(bestIndex)).asInstanceOf[Model[_]]
new TuneHyperparametersModel(uid).setBestModel(bestModel).setBestMetric(bestMetric)
})
}, dataset.columns.length)
}
//scalastyle:on method.length

Expand Down Expand Up @@ -247,7 +247,7 @@ class TuneHyperparametersModel(val uid: String)

override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame](
getBestModel.transform(dataset)
getBestModel.transform(dataset), dataset.columns.length
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class DoubleMLEstimator(override val uid: String)
}
val dmlModel = this.copyValues(new DoubleMLModel(uid)).setRawTreatmentEffects(ates.toArray)
dmlModel
})
}, dataset.columns.length)
}

//scalastyle:off method.length
Expand Down Expand Up @@ -358,7 +358,7 @@ class DoubleMLModel(val uid: String)
override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
dataset.toDF()
})
}, dataset.columns.length)
}

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ResidualTransformer(override val uid: String) extends Transformer
s"Prediction column $getPredictedCol must be of type Vector or NumericType, but is $predictedColDataType" +
s", please use 'setPredictedCol' to set the correct prediction column")
}
})
}, dataset.columns.length)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class ICETransformer(override val uid: String) extends Transformer


def transform(ds: Dataset[_]): DataFrame = {
logTransform {
logTransform ({
transformSchema(ds.schema)
val df = ds.toDF
val idCol = DatasetExtensions.findUnusedColumnName("idCol", df)
Expand Down Expand Up @@ -250,7 +250,7 @@ class ICETransformer(override val uid: String) extends Transformer
case `featureKind` =>
dependenceDfs.reduce(_ union _).orderBy(desc(getDependenceNameCol))
}
}
}, ds.columns.length)
}

private def collectCategoricalValues[_](df: DataFrame, feature: ICECategoricalFeature): Array[_] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ abstract class KernelSHAPBase(override val uid: String)
with Wrappable
with SynapseMLLogging {

override def transform(instances: Dataset[_]): DataFrame = logTransform {
override def transform(instances: Dataset[_]): DataFrame = logTransform ({
import instances.sparkSession.implicits._
this.validateSchema(instances.schema)

Expand Down Expand Up @@ -91,7 +91,7 @@ abstract class KernelSHAPBase(override val uid: String)
}.toDF(idCol, this.getOutputCol, this.getMetricsCol)

preprocessed.join(fitted, Seq(idCol), "inner").drop(idCol)
}
}, instances.columns.length)

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ abstract class LIMEBase(override val uid: String)
weightUdf
}

final override def transform(instances: Dataset[_]): DataFrame = logTransform {
final override def transform(instances: Dataset[_]): DataFrame = logTransform ({
import instances.sparkSession.implicits._
this.validateSchema(instances.schema)
val regularization = this.getRegularization
Expand Down Expand Up @@ -200,7 +200,7 @@ abstract class LIMEBase(override val uid: String)
}.toDF(idCol, this.getOutputCol, this.getMetricsCol)

preprocessed.join(fitted, Seq(idCol), "inner").drop(idCol)
}
}, instances.columns.length)

override def copy(extra: ParamMap): Transformer = this.defaultCopy(extra)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class AggregateBalanceMeasure(override val uid: String)

df.unpersist
calculateAggregateMeasures(featureStats, featureProbCol)
})
}, dataset.columns.length)
}

private def calculateAggregateMeasures(featureStats: DataFrame, featureProbCol: String): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class DistributionBalanceMeasure(override val uid: String)

df.unpersist
calculateDistributionMeasures(featureStats, featureProbCol, featureCountCol, numRows)
})
}, dataset.columns.length)
}

private def calculateDistributionMeasures(featureStats: DataFrame,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class FeatureBalanceMeasure(override val uid: String)

df.unpersist
calculateParity(associationMetricsDf, featureValueCol)
})
}, dataset.columns.length)
}

private def calculateParity(associationMetricsDf: DataFrame, featureValueCol: String): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class CleanMissingData(override val uid: String) extends Estimator[CleanMissingD
.setFillValues(fillValues.toArray)
.setInputCols(getInputCols)
.setOutputCols(getOutputCols)
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): Estimator[CleanMissingDataModel] = defaultCopy(extra)
Expand Down Expand Up @@ -179,7 +179,7 @@ class CleanMissingDataModel(val uid: String)
}).toList
val addedCols = dataset.select(datasetCols ::: datasetInputCols: _*)
addedCols.na.fill(getColsToFill.zip(getFillValues).toMap)
})
}, dataset.columns.length)
}

override def transformSchema(schema: StructType): StructType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CountSelector(override val uid: String) extends Estimator[CountSelectorMod
.setIndices(slotsToKeep)
.setInputCol(getInputCol)
.setOutputCol(getOutputCol)
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): this.type = defaultCopy(extra)
Expand Down Expand Up @@ -77,7 +77,8 @@ class CountSelectorModel(val uid: String) extends Model[CountSelectorModel]

override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame](
getModel.transform(dataset)
getModel.transform(dataset),
dataset.columns.length
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class DataConversion(override val uid: String) extends Transformer
df
}
res
})
}, dataset.columns.length)
}
//scalastyle:on cyclomatic.complexity

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class Featurize(override val uid: String) extends Estimator[PipelineModel]
)

new Pipeline().setStages(Seq(encoders, casters, imputers, featurizers, va).flatten.toArray).fit(dataset)
})
}, dataset.columns.length)
}
//scalastyle:on cyclomatic.complexity
//scalastyle:on method.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class IndexToValue(val uid: String) extends Transformer
case _ => throw new Exception("Unsupported type " + dataType.toString)
}
dataset.withColumn(getOutputCol, getLevel(dataset(getInputCol)).as(getOutputCol))
})
}, dataset.columns.length)
}

private class Default[T] {var value: T = _ }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ValueIndexer(override val uid: String) extends Estimator[ValueIndexerModel
.setOutputCol(getOutputCol)
.setLevels(castSortLevels)
.setDataType(dataType)
})
}, dataset.columns.length)
}

private def sortLevels[T: TypeTag](levels: Array[_])
Expand Down Expand Up @@ -198,7 +198,7 @@ class ValueIndexerModel(val uid: String)
mmlStyle = false)
val inputColIndex = getIndex(dataset(getInputCol))
dataset.withColumn(getOutputCol, inputColIndex.as(getOutputCol, metadata))
})
}, dataset.columns.length)
}
//scalastyle:on cyclomatic.complexity

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class MultiNGram(override val uid: String)
Row.fromSeq(row.toSeq :+ mergedNGrams)
}(RowEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType))))
.drop(intermediateOutputCols: _*)
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): MultiNGram =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class PageSplitter(override val uid: String)

override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame](
dataset.toDF().withColumn(getOutputCol, UDFUtils.oldUdf(split _, ArrayType(StringType))(col(getInputCol)))
dataset.toDF().withColumn(getOutputCol, UDFUtils.oldUdf(split _, ArrayType(StringType))(col(getInputCol))),
dataset.columns.length
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class TextFeaturizer(override val uid: String)

val stages = chainedModels ++ Seq(new DropColumns().setCols(colsToDrop.toArray))
new Pipeline().setStages(stages.toArray).fit(dataset).setParent(this)
})
}, dataset.columns.length)
}
//scalastyle:on method.length

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SuperpixelTransformer(val uid: String) extends Transformer
dataset.schema(getInputCol).dataType, getCellSize, getModifier)

dataset.toDF().withColumn(getOutputCol, getSuperPixels(col(getInputCol)))
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class UnrollImage(val uid: String) extends Transformer
assert(ImageSchemaUtils.isImage(df.schema(getInputCol)), "input column should have Image type")
val unrollUDF = udf(unroll _)
df.withColumn(getOutputCol, unrollUDF(df(getInputCol)))
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
Expand Down Expand Up @@ -238,7 +238,7 @@ class UnrollBinaryImage(val uid: String) extends Transformer
}, VectorType)

df.withColumn($(outputCol), unrollUDF(df($(inputCol))))
})
}, dataset.columns.length)
}

override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class HTTPTransformer(val uid: String)
}
}
}(enc)
})
}, dataset.columns.length)
}

def copy(extra: ParamMap): HTTPTransformer = defaultCopy(extra)
Expand Down
Loading