diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/CognitiveServiceBase.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/CognitiveServiceBase.scala index bffe542717..6b78f7657c 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/CognitiveServiceBase.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/CognitiveServiceBase.scala @@ -494,8 +494,7 @@ abstract class CognitiveServicesBaseNoHandler(val uid: String) extends Transform } override def transform(dataset: Dataset[_]): DataFrame = { - logTransform[DataFrame]( - getInternalTransformer(dataset.schema).transform(dataset) + logTransform[DataFrame](getInternalTransformer(dataset.schema).transform(dataset), dataset.columns.length ) } diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/anomaly/AnomalyDetection.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/anomaly/AnomalyDetection.scala index 6add1426c7..cf60fbac63 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/anomaly/AnomalyDetection.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/anomaly/AnomalyDetection.scala @@ -265,7 +265,7 @@ class SimpleDetectAnomalies(override val uid: String) extends AnomalyDetectorBas getErrorCol, s"$getOutputCol.1" ).withColumnRenamed("1", getOutputCol) - }) + }, dataset.columns.length) } diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/anomaly/MultivariateAnomalyDetection.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/anomaly/MultivariateAnomalyDetection.scala index 933ff8f4b4..af91385e3f 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/anomaly/MultivariateAnomalyDetection.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/anomaly/MultivariateAnomalyDetection.scala @@ -526,7 +526,7 @@ class SimpleFitMultivariateAnomaly(override val uid: String) extends Estimator[S .setModelId(modelId) .setIntermediateSaveDir(getIntermediateSaveDir) .setDiagnosticsInfo(modelInfo("diagnosticsInfo").convertTo[DiagnosticsInfo]) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): SimpleFitMultivariateAnomaly = defaultCopy(extra) @@ -589,7 +589,7 @@ class SimpleDetectMultivariateAnomaly(override val uid: String) extends Model[Si //scalastyle:off method.length override def transform(dataset: Dataset[_]): DataFrame = - logTransform[DataFrame] { + logTransform[DataFrame] ({ // check model status first MADUtils.checkModelStatus(getUrl, getModelId, getSubscriptionKey) @@ -635,7 +635,7 @@ class SimpleDetectMultivariateAnomaly(override val uid: String) extends Model[Si df.join(finalDF, df(getTimestampCol) === finalDF("resultTimestamp"), "left") .drop("resultTimestamp") .sort(col(getTimestampCol).asc) - } + }, dataset.columns.length) //scalastyle:on method.length override def copy(extra: ParamMap): SimpleDetectMultivariateAnomaly = defaultCopy(extra) @@ -720,7 +720,7 @@ class DetectLastMultivariateAnomaly(override val uid: String) extends CognitiveS col(s"$getOutputCol.results.timestamp")(0)).otherwise(null)) .drop(columnNames: _*) - }) + }, dataset.columns.length) } // scalastyle:on null diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/form/FormOntologyLearner.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/form/FormOntologyLearner.scala index e654ff058e..4236b0f5c4 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/form/FormOntologyLearner.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/form/FormOntologyLearner.scala @@ -73,7 +73,7 @@ class FormOntologyLearner(override val uid: String) extends Estimator[FormOntolo .setInputCol(getInputCol) .setOutputCol(getOutputCol) .setOntology(mergedSchema) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Estimator[FormOntologyTransformer] = defaultCopy(extra) @@ -121,7 +121,7 @@ class FormOntologyTransformer(override val uid: String) extends Model[FormOntolo dataset.toDF() .withColumn(getOutputCol, convertToOntologyUDF(col(getInputCol))) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/openai/OpenAIPrompt.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/openai/OpenAIPrompt.scala index 894ef1c93a..b671f13604 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/openai/OpenAIPrompt.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/openai/OpenAIPrompt.scala @@ -108,7 +108,7 @@ class OpenAIPrompt(override val uid: String) extends Transformer } else { results } - }) + }, dataset.columns.length) } private def openAICompletion: OpenAICompletion = { diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/search/AzureSearch.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/search/AzureSearch.scala index d687224681..d4db72e3f3 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/search/AzureSearch.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/search/AzureSearch.scala @@ -133,7 +133,7 @@ class AddDocuments(override val uid: String) extends CognitiveServicesBase(uid) s"/indexes/$getIndexName/docs/index?api-version=${AzureSearchAPIConstants.DefaultAPIVersion}") } super.transform(dataset) - }) + }, dataset.columns.length) } override def prepareEntity: Row => Option[AbstractHttpEntity] = row => diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/speech/SpeechToTextSDK.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/speech/SpeechToTextSDK.scala index 1447cd6abd..3796fb6f49 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/speech/SpeechToTextSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/speech/SpeechToTextSDK.scala @@ -418,7 +418,7 @@ abstract class SpeechSDKBase extends Transformer isUriAudio ))(enc) .drop(dynamicParamColName) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): this.type = defaultCopy(extra) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/translate/DocumentTranslator.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/translate/DocumentTranslator.scala index 3a3ec5aefc..510f516e40 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/translate/DocumentTranslator.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/translate/DocumentTranslator.scala @@ -164,7 +164,7 @@ class DocumentTranslator(override val uid: String) extends CognitiveServicesBase override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ getInternalTransformer(dataset.schema).transform(dataset) - }) + }, dataset.columns.length) } override def responseDataType: DataType = TranslationStatusResponse.schema diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/FindBestModel.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/FindBestModel.scala index d380bba7c1..c67d351aeb 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/FindBestModel.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/FindBestModel.scala @@ -114,7 +114,7 @@ class FindBestModel(override val uid: String) extends Estimator[BestModel] .setBestModelMetrics(evaluator.transform(bestScoredDf)) .setRocCurve(evaluator.rocCurve) .setAllModelMetrics(allModelMetrics) - }) + }, dataset.columns.length) } // Choose a random model as we don't know which one will be chosen yet - all will transform schema in same way @@ -189,7 +189,7 @@ class BestModel(val uid: String) extends Model[BestModel] override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]( - getBestModel.transform(dataset) + getBestModel.transform(dataset), dataset.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/TuneHyperparameters.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/TuneHyperparameters.scala index 9f16852b9a..df5d95fd58 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/TuneHyperparameters.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/TuneHyperparameters.scala @@ -215,7 +215,7 @@ class TuneHyperparameters(override val uid: String) extends Estimator[TuneHyperp // Compute best model fit on dataset val bestModel = getModels(bestIndex % numModels).fit(dataset, paramsPerRun(bestIndex)).asInstanceOf[Model[_]] new TuneHyperparametersModel(uid).setBestModel(bestModel).setBestMetric(bestMetric) - }) + }, dataset.columns.length) } //scalastyle:on method.length @@ -247,7 +247,7 @@ class TuneHyperparametersModel(val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]( - getBestModel.transform(dataset) + getBestModel.transform(dataset), dataset.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/DoubleMLEstimator.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/DoubleMLEstimator.scala index f509c418f7..cf30172da6 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/DoubleMLEstimator.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/DoubleMLEstimator.scala @@ -133,7 +133,7 @@ class DoubleMLEstimator(override val uid: String) } val dmlModel = this.copyValues(new DoubleMLModel(uid)).setRawTreatmentEffects(ates.toArray) dmlModel - }) + }, dataset.columns.length) } //scalastyle:off method.length @@ -358,7 +358,7 @@ class DoubleMLModel(val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ dataset.toDF() - }) + }, dataset.columns.length) } @DeveloperApi diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/ResidualTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/ResidualTransformer.scala index cea917e5b5..a9ce76384f 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/ResidualTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/ResidualTransformer.scala @@ -87,7 +87,7 @@ class ResidualTransformer(override val uid: String) extends Transformer s"Prediction column $getPredictedCol must be of type Vector or NumericType, but is $predictedColDataType" + s", please use 'setPredictedCol' to set the correct prediction column") } - }) + }, dataset.columns.length) } } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/ICEExplainer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/ICEExplainer.scala index 0945f6d873..4d3eeb90a5 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/ICEExplainer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/ICEExplainer.scala @@ -209,7 +209,7 @@ class ICETransformer(override val uid: String) extends Transformer def transform(ds: Dataset[_]): DataFrame = { - logTransform { + logTransform ({ transformSchema(ds.schema) val df = ds.toDF val idCol = DatasetExtensions.findUnusedColumnName("idCol", df) @@ -250,7 +250,7 @@ class ICETransformer(override val uid: String) extends Transformer case `featureKind` => dependenceDfs.reduce(_ union _).orderBy(desc(getDependenceNameCol)) } - } + }, ds.columns.length) } private def collectCategoricalValues[_](df: DataFrame, feature: ICECategoricalFeature): Array[_] = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/KernelSHAPBase.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/KernelSHAPBase.scala index e78ab9971d..5ebb43b1e4 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/KernelSHAPBase.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/KernelSHAPBase.scala @@ -40,7 +40,7 @@ abstract class KernelSHAPBase(override val uid: String) with Wrappable with SynapseMLLogging { - override def transform(instances: Dataset[_]): DataFrame = logTransform { + override def transform(instances: Dataset[_]): DataFrame = logTransform ({ import instances.sparkSession.implicits._ this.validateSchema(instances.schema) @@ -91,7 +91,7 @@ abstract class KernelSHAPBase(override val uid: String) }.toDF(idCol, this.getOutputCol, this.getMetricsCol) preprocessed.join(fitted, Seq(idCol), "inner").drop(idCol) - } + }, instances.columns.length) override def copy(extra: ParamMap): Transformer = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala index 8bb74611d6..483b058f17 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala @@ -152,7 +152,7 @@ abstract class LIMEBase(override val uid: String) weightUdf } - final override def transform(instances: Dataset[_]): DataFrame = logTransform { + final override def transform(instances: Dataset[_]): DataFrame = logTransform ({ import instances.sparkSession.implicits._ this.validateSchema(instances.schema) val regularization = this.getRegularization @@ -200,7 +200,7 @@ abstract class LIMEBase(override val uid: String) }.toDF(idCol, this.getOutputCol, this.getMetricsCol) preprocessed.join(fitted, Seq(idCol), "inner").drop(idCol) - } + }, instances.columns.length) override def copy(extra: ParamMap): Transformer = this.defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/AggregateBalanceMeasure.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/AggregateBalanceMeasure.scala index 0d5d905d05..ccf2a8b582 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/AggregateBalanceMeasure.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/AggregateBalanceMeasure.scala @@ -87,7 +87,7 @@ class AggregateBalanceMeasure(override val uid: String) df.unpersist calculateAggregateMeasures(featureStats, featureProbCol) - }) + }, dataset.columns.length) } private def calculateAggregateMeasures(featureStats: DataFrame, featureProbCol: String): DataFrame = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/DistributionBalanceMeasure.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/DistributionBalanceMeasure.scala index df5258805b..a2933dd4e0 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/DistributionBalanceMeasure.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/DistributionBalanceMeasure.scala @@ -124,7 +124,7 @@ class DistributionBalanceMeasure(override val uid: String) df.unpersist calculateDistributionMeasures(featureStats, featureProbCol, featureCountCol, numRows) - }) + }, dataset.columns.length) } private def calculateDistributionMeasures(featureStats: DataFrame, diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/FeatureBalanceMeasure.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/FeatureBalanceMeasure.scala index 8e14a2fed7..b847dddbcc 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/FeatureBalanceMeasure.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/exploratory/FeatureBalanceMeasure.scala @@ -128,7 +128,7 @@ class FeatureBalanceMeasure(override val uid: String) df.unpersist calculateParity(associationMetricsDf, featureValueCol) - }) + }, dataset.columns.length) } private def calculateParity(associationMetricsDf: DataFrame, featureValueCol: String): DataFrame = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/CleanMissingData.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/CleanMissingData.scala index 909794e0a8..d534be90a1 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/CleanMissingData.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/CleanMissingData.scala @@ -84,7 +84,7 @@ class CleanMissingData(override val uid: String) extends Estimator[CleanMissingD .setFillValues(fillValues.toArray) .setInputCols(getInputCols) .setOutputCols(getOutputCols) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Estimator[CleanMissingDataModel] = defaultCopy(extra) @@ -179,7 +179,7 @@ class CleanMissingDataModel(val uid: String) }).toList val addedCols = dataset.select(datasetCols ::: datasetInputCols: _*) addedCols.na.fill(getColsToFill.zip(getFillValues).toMap) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/CountSelector.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/CountSelector.scala index fc2841b845..ac2276674d 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/CountSelector.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/CountSelector.scala @@ -41,7 +41,7 @@ class CountSelector(override val uid: String) extends Estimator[CountSelectorMod .setIndices(slotsToKeep) .setInputCol(getInputCol) .setOutputCol(getOutputCol) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): this.type = defaultCopy(extra) @@ -77,7 +77,8 @@ class CountSelectorModel(val uid: String) extends Model[CountSelectorModel] override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]( - getModel.transform(dataset) + getModel.transform(dataset), + dataset.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/DataConversion.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/DataConversion.scala index 0fc1316e89..56fd8950a8 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/DataConversion.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/DataConversion.scala @@ -101,7 +101,7 @@ class DataConversion(override val uid: String) extends Transformer df } res - }) + }, dataset.columns.length) } //scalastyle:on cyclomatic.complexity diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/Featurize.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/Featurize.scala index 6b6ce8bb11..ac1bbf706c 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/Featurize.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/Featurize.scala @@ -225,7 +225,7 @@ class Featurize(override val uid: String) extends Estimator[PipelineModel] ) new Pipeline().setStages(Seq(encoders, casters, imputers, featurizers, va).flatten.toArray).fit(dataset) - }) + }, dataset.columns.length) } //scalastyle:on cyclomatic.complexity //scalastyle:on method.length diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/IndexToValue.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/IndexToValue.scala index b5625700fe..4e8a0ae847 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/IndexToValue.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/IndexToValue.scala @@ -50,7 +50,7 @@ class IndexToValue(val uid: String) extends Transformer case _ => throw new Exception("Unsupported type " + dataType.toString) } dataset.withColumn(getOutputCol, getLevel(dataset(getInputCol)).as(getOutputCol)) - }) + }, dataset.columns.length) } private class Default[T] {var value: T = _ } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/ValueIndexer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/ValueIndexer.scala index b34b2613a8..be6df541d8 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/ValueIndexer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/ValueIndexer.scala @@ -86,7 +86,7 @@ class ValueIndexer(override val uid: String) extends Estimator[ValueIndexerModel .setOutputCol(getOutputCol) .setLevels(castSortLevels) .setDataType(dataType) - }) + }, dataset.columns.length) } private def sortLevels[T: TypeTag](levels: Array[_]) @@ -198,7 +198,7 @@ class ValueIndexerModel(val uid: String) mmlStyle = false) val inputColIndex = getIndex(dataset(getInputCol)) dataset.withColumn(getOutputCol, inputColIndex.as(getOutputCol, metadata)) - }) + }, dataset.columns.length) } //scalastyle:on cyclomatic.complexity diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala index 2637360b74..1c2cb7e198 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala @@ -58,7 +58,7 @@ class MultiNGram(override val uid: String) Row.fromSeq(row.toSeq :+ mergedNGrams) }(RowEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType)))) .drop(intermediateOutputCols: _*) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): MultiNGram = diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/PageSplitter.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/PageSplitter.scala index 77991ffd59..bd4a5d9634 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/PageSplitter.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/PageSplitter.scala @@ -94,7 +94,8 @@ class PageSplitter(override val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]( - dataset.toDF().withColumn(getOutputCol, UDFUtils.oldUdf(split _, ArrayType(StringType))(col(getInputCol))) + dataset.toDF().withColumn(getOutputCol, UDFUtils.oldUdf(split _, ArrayType(StringType))(col(getInputCol))), + dataset.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/TextFeaturizer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/TextFeaturizer.scala index fdb8cc18a6..ded629426c 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/TextFeaturizer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/TextFeaturizer.scala @@ -342,7 +342,7 @@ class TextFeaturizer(override val uid: String) val stages = chainedModels ++ Seq(new DropColumns().setCols(colsToDrop.toArray)) new Pipeline().setStages(stages.toArray).fit(dataset).setParent(this) - }) + }, dataset.columns.length) } //scalastyle:on method.length diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/image/SuperpixelTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/image/SuperpixelTransformer.scala index fe6526974f..b7351ef8c5 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/image/SuperpixelTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/image/SuperpixelTransformer.scala @@ -49,7 +49,7 @@ class SuperpixelTransformer(val uid: String) extends Transformer dataset.schema(getInputCol).dataType, getCellSize, getModifier) dataset.toDF().withColumn(getOutputCol, getSuperPixels(col(getInputCol))) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Transformer = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/image/UnrollImage.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/image/UnrollImage.scala index ceee15703e..ccc67d5f6f 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/image/UnrollImage.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/image/UnrollImage.scala @@ -182,7 +182,7 @@ class UnrollImage(val uid: String) extends Transformer assert(ImageSchemaUtils.isImage(df.schema(getInputCol)), "input column should have Image type") val unrollUDF = udf(unroll _) df.withColumn(getOutputCol, unrollUDF(df(getInputCol))) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Transformer = defaultCopy(extra) @@ -238,7 +238,7 @@ class UnrollBinaryImage(val uid: String) extends Transformer }, VectorType) df.withColumn($(outputCol), unrollUDF(df($(inputCol)))) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Transformer = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala index 37dfafb70c..674c9c9f23 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala @@ -137,7 +137,7 @@ class HTTPTransformer(val uid: String) } } }(enc) - }) + }, dataset.columns.length) } def copy(extra: ParamMap): HTTPTransformer = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/Parsers.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/Parsers.scala index 25530d6543..684e91034c 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/Parsers.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/Parsers.scala @@ -85,9 +85,8 @@ class JSONInputParser(val uid: String) extends HTTPInputParser HTTPSchema.to_http_request(urlCol, headersCol, methodCol, entityCol)) .drop(entityCol, urlCol, headersCol, methodCol) .withColumnRenamed(requestCol, getOutputCol) - }) + }, dataset.columns.length) } - } object CustomInputParser extends ComplexParamsReadable[CustomInputParser] with Serializable @@ -143,7 +142,7 @@ class CustomInputParser(val uid: String) extends HTTPInputParser with ComplexPar } } dataset.toDF().withColumn(getOutputCol, parseInputExpression) - }) + }, dataset.columns.length) } } @@ -197,7 +196,7 @@ class JSONOutputParser(val uid: String) extends HTTPOutputParser with ComplexPar .setInputCol(getOutputCol) .setOutputCol(getOutputCol) .transform(parsed)).getOrElse(parsed) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { @@ -218,7 +217,7 @@ class StringOutputParser(val uid: String) extends HTTPOutputParser with ComplexP logTransform[DataFrame]({ val stringEntityCol = HTTPSchema.entity_to_string(col(getInputCol + ".entity")) dataset.toDF.withColumn(getOutputCol, stringEntityCol) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { @@ -275,7 +274,7 @@ class CustomOutputParser(val uid: String) extends HTTPOutputParser with ComplexP } dataset.toDF() .withColumn(getOutputCol, parseOutputExpression) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/SimpleHTTPTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/SimpleHTTPTransformer.scala index 28b10f60ac..2ac095c142 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/SimpleHTTPTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/SimpleHTTPTransformer.scala @@ -159,7 +159,8 @@ class SimpleHTTPTransformer(val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]( - makePipeline(dataset.schema).transform(dataset.toDF()) + makePipeline(dataset.schema).transform(dataset.toDF()), + dataset.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/isolationforest/IsolationForest.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/isolationforest/IsolationForest.scala index 65f1879651..132f18dae9 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/isolationforest/IsolationForest.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/isolationforest/IsolationForest.scala @@ -27,13 +27,13 @@ class IsolationForest(override val uid: String, val that: IsolationForestSource) override def copy(extra: ParamMap): IsolationForest = defaultCopy(extra) - override def fit(data: Dataset[_]): IsolationForestModel = { - logFit { - val innerModel = copyValues(that).fit(data) + override def fit(dataset: Dataset[_]): IsolationForestModel = { + logFit ({ + val innerModel = copyValues(that).fit(dataset) new IsolationForestModel(uid) .setInnerModel(innerModel) .copy(innerModel.extractParamMap()) - } + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = @@ -59,7 +59,8 @@ class IsolationForestModel(override val uid: String) override def transform(data: Dataset[_]): DataFrame = { logTransform[DataFrame]( - getInnerModel.setPredictionCol(getPredictionCol).transform(data) + getInnerModel.setPredictionCol(getPredictionCol).transform(data), + data.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/SynapseMLLogging.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/SynapseMLLogging.scala index b829ccb283..a4919b1fb5 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/SynapseMLLogging.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/logging/SynapseMLLogging.scala @@ -5,17 +5,19 @@ package com.microsoft.azure.synapse.ml.logging import com.microsoft.azure.synapse.ml.build.BuildInfo import org.apache.spark.internal.Logging -import spray.json.{DefaultJsonProtocol, RootJsonFormat} +import spray.json.{DefaultJsonProtocol, RootJsonFormat, NullOptions} import scala.collection.JavaConverters._ import scala.collection.mutable case class SynapseMLLogInfo(uid: String, className: String, method: String, - buildVersion: String) + buildVersion: String, + columns: Option[Int] = None) -object LogJsonProtocol extends DefaultJsonProtocol { - implicit val LogFormat: RootJsonFormat[SynapseMLLogInfo] = jsonFormat4(SynapseMLLogInfo) +object LogJsonProtocol extends DefaultJsonProtocol with NullOptions +{ + implicit val LogFormat: RootJsonFormat[SynapseMLLogInfo] = jsonFormat5(SynapseMLLogInfo) } import com.microsoft.azure.synapse.ml.logging.LogJsonProtocol._ @@ -44,8 +46,13 @@ trait SynapseMLLogging extends Logging { val uid: String - protected def logBase(methodName: String): Unit = { - logBase(SynapseMLLogInfo(uid, getClass.toString, methodName, BuildInfo.version)) + protected def logBase(methodName: String, columns: Option[Int]): Unit = { + logBase(SynapseMLLogInfo( + uid, + getClass.toString, + methodName, + BuildInfo.version, + columns)) } protected def logBase(info: SynapseMLLogInfo): Unit = { @@ -60,23 +67,22 @@ trait SynapseMLLogging extends Logging { } def logClass(): Unit = { - logBase("constructor") + logBase("constructor", None) } - def logFit[T](f: => T): T = { - logVerb("fit", f) + def logFit[T](f: => T, columns: Int): T = { + logVerb("fit", f, columns) } - def logTrain[T](f: => T): T = { - logVerb("train", f) + def logTrain[T](f: => T, columns: Int): T = { + logVerb("train", f, columns) } - def logTransform[T](f: => T): T = { - logVerb("transform", f) + def logTransform[T](f: => T, columns: Int): T = { + logVerb("transform", f, columns) } - - def logVerb[T](verb: String, f: => T): T = { - logBase(verb) + def logVerb[T](verb: String, f: => T, columns: Int = -1): T = { + logBase(verb, if(columns == -1) None else Some(columns)) try { f } catch { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/nn/ConditionalKNN.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/nn/ConditionalKNN.scala index 51fef5fd9d..a0aca204ff 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/nn/ConditionalKNN.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/nn/ConditionalKNN.scala @@ -45,7 +45,7 @@ class ConditionalKNN(override val uid: String) extends Estimator[ConditionalKNNM override def fit(dataset: Dataset[_]): ConditionalKNNModel = { logFit( - fitOptimized(dataset) + fitOptimized(dataset), dataset.columns.length ) } @@ -105,7 +105,7 @@ class ConditionalKNNModel(val uid: String) extends Model[ConditionalKNNModel] )) dataset.toDF().withColumn(getOutputCol, getNeighborUDF(col(getFeaturesCol), col(getConditionerCol))) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/nn/KNN.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/nn/KNN.scala index d2785b2810..e630b0414b 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/nn/KNN.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/nn/KNN.scala @@ -60,7 +60,7 @@ class KNN(override val uid: String) extends Estimator[KNNModel] with KNNParams override def fit(dataset: Dataset[_]): KNNModel = { logFit( - fitOptimized(dataset) + fitOptimized(dataset), dataset.columns.length ) } @@ -112,7 +112,7 @@ class KNNModel(val uid: String) extends Model[KNNModel] )) dataset.toDF().withColumn(getOutputCol, getNeighborUDF(col(getFeaturesCol))) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingAdapter.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingAdapter.scala index a3c00e3b6e..37623faf37 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingAdapter.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingAdapter.scala @@ -93,7 +93,7 @@ class RankingAdapter(override val uid: String) .setItemCol(getItemCol) .setRatingCol(getRatingCol) .setLabelCol(getLabelCol) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { @@ -155,7 +155,7 @@ class RankingAdapterModel private[ml](val uid: String) .select(col(getUserCol), col("recommendations." + getItemCol).as("prediction")) .join(perUserActualItemsDF, getUserCol) .drop(getUserCol) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): RankingAdapterModel = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingTrainValidationSplit.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingTrainValidationSplit.scala index a4af3b50e0..17e5141a79 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingTrainValidationSplit.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingTrainValidationSplit.scala @@ -142,7 +142,7 @@ class RankingTrainValidationSplit(override val uid: String) extends Estimator[Ra .setBestModel(est.fit(dataset, epm(bestIndex))) .setValidationMetrics(metrics) .setParent(this)) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): RankingTrainValidationSplit = defaultCopy(extra) @@ -327,7 +327,7 @@ class RankingTrainValidationSplitModel( //sort to pass unit test getBestModel.transform(dataset).sort("prediction") - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RecommendationIndexer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RecommendationIndexer.scala index a76791289a..57a230384c 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RecommendationIndexer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RecommendationIndexer.scala @@ -42,7 +42,7 @@ class RecommendationIndexer(override val uid: String) .setUserOutputCol(getUserOutputCol) .setItemInputCol(getItemInputCol) .setItemOutputCol(getItemOutputCol) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Estimator[RecommendationIndexerModel] = defaultCopy(extra) @@ -59,7 +59,8 @@ class RecommendationIndexerModel(override val uid: String) extends Model[Recomme override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]( - getItemIndexModel.transform(getUserIndexModel.transform(dataset)) + getItemIndexModel.transform(getUserIndexModel.transform(dataset)), + dataset.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala index 4e2a37755b..0addd6b66e 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala @@ -72,7 +72,7 @@ class SAR(override val uid: String) extends Estimator[SARModel] .setSupportThreshold(getSupportThreshold) .setItemCol(getItemCol) .setUserCol(getUserCol) - }) + }, dataset.columns.length) } /** diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala index 82bf4cc3fe..b92339cff1 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala @@ -145,7 +145,8 @@ class SARModel(override val uid: String) extends Model[SARModel] override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]( - transform($(rank), $(userDataFrame), $(itemDataFrame), dataset) + transform($(rank), $(userDataFrame), $(itemDataFrame), dataset), + dataset.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Cacher.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Cacher.scala index bc6c98d0ed..6fabce6739 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Cacher.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Cacher.scala @@ -30,7 +30,7 @@ class Cacher(val uid: String) extends Transformer with Wrappable with DefaultPar } else { dataset.toDF } - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Transformer = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/ClassBalancer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/ClassBalancer.scala index dfe34b8715..401469c7ab 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/ClassBalancer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/ClassBalancer.scala @@ -53,7 +53,7 @@ class ClassBalancer(override val uid: String) extends Estimator[ClassBalancerMod .setWeights(weights) .setBroadcastJoin(getBroadcastJoin) .setParent(this) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Estimator[ClassBalancerModel] = defaultCopy(extra) @@ -94,7 +94,7 @@ class ClassBalancerModel(val uid: String) extends Model[ClassBalancerModel] getWeights } dataset.toDF().join(w, getInputCol) - }) + }, dataset.columns.length) } } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/DropColumns.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/DropColumns.scala index 8423e33972..818d06bc99 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/DropColumns.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/DropColumns.scala @@ -39,7 +39,7 @@ class DropColumns(val uid: String) extends Transformer with Wrappable with Defau override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ dataset.toDF().drop(getCols: _*) - }) + }, dataset.columns.length) } def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/EnsembleByKey.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/EnsembleByKey.scala index 5b98e9a689..9129af28c2 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/EnsembleByKey.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/EnsembleByKey.scala @@ -125,7 +125,7 @@ class EnsembleByKey(val uid: String) extends Transformer val needToDrop = getColNames.toSet & dataset.columns.toSet dataset.drop(needToDrop.toList: _*).toDF().join(aggregated, getKeys) } - }) + }, dataset.columns.length) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Explode.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Explode.scala index 6a47cd1038..682bdf6655 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Explode.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Explode.scala @@ -27,7 +27,7 @@ class Explode(val uid: String) extends Transformer logTransform[DataFrame]({ transformSchema(dataset.schema) dataset.toDF().withColumn(getOutputCol, explode(col(getInputCol))) - }) + }, dataset.columns.length) } def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Lambda.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Lambda.scala index 2262967a14..53e9efa5b2 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Lambda.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Lambda.scala @@ -52,7 +52,8 @@ class Lambda(val uid: String) extends Transformer with Wrappable with ComplexPar override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]( - getTransform(dataset) + getTransform(dataset), + dataset.columns.length ) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala index 0d4e0a498a..d32633edc4 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala @@ -46,7 +46,7 @@ trait MiniBatchBase extends Transformer with DefaultParamsWritable with Wrappabl } } } - }) + }, dataset.columns.length) } } @@ -237,7 +237,7 @@ class FlattenBatch(val uid: String) } } ) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): this.type = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MultiColumnAdapter.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MultiColumnAdapter.scala index ad2c4243cf..14a81859a5 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MultiColumnAdapter.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MultiColumnAdapter.scala @@ -108,7 +108,7 @@ class MultiColumnAdapter(override val uid: String) extends Estimator[PipelineMod logFit({ transformSchema(dataset.schema) new Pipeline(uid).setStages(getStages).fit(dataset) - }) + }, dataset.columns.length) } def copy(extra: ParamMap): this.type = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala index fe2ccb6d71..1b2cf34ca5 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala @@ -40,7 +40,7 @@ class PartitionConsolidator(val uid: String) Iterator() } }(RowEncoder(dataset.schema)) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Transformer = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/RenameColumn.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/RenameColumn.scala index 0ba80be44a..b2f5cb1119 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/RenameColumn.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/RenameColumn.scala @@ -31,7 +31,7 @@ class RenameColumn(val uid: String) extends Transformer with Wrappable with Defa logTransform[DataFrame]({ transformSchema(dataset.schema, logging = true) dataset.toDF().withColumnRenamed(getInputCol, getOutputCol) - }) + }, dataset.columns.length) } def validateAndTransformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Repartition.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Repartition.scala index 735a4d84c2..17f2931576 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Repartition.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Repartition.scala @@ -56,7 +56,7 @@ class Repartition(val uid: String) extends Transformer with Wrappable with Defau dataset.sqlContext.createDataFrame( dataset.rdd.repartition(getN).asInstanceOf[RDD[Row]], dataset.schema) - }) + }, dataset.columns.length) } def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/SelectColumns.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/SelectColumns.scala index 28da4e0541..4274ec0655 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/SelectColumns.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/SelectColumns.scala @@ -43,7 +43,7 @@ class SelectColumns(val uid: String) extends Transformer logTransform[DataFrame]({ verifySchema(dataset.schema) dataset.toDF().select(getCols.map(col): _*) - }) + }, dataset.columns.length) } def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartition.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartition.scala index afcb998009..7ae19a5fbc 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartition.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartition.scala @@ -70,7 +70,7 @@ class StratifiedRepartition(val uid: String) extends Transformer with Wrappable val rspdata = spdata.partitionBy(rangePartitioner).mapPartitions(keyToRow => keyToRow.map { case (key, row) => row }).persist() dataset.sqlContext.createDataFrame(rspdata, dataset.schema) - }) + }, dataset.columns.length) } private def getEqualLabelCount(labelToCount: Array[(Int, Long)], dataset: Dataset[_]): Map[Int, Double] = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/SummarizeData.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/SummarizeData.scala index 1a0890020f..e18d1ec2fb 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/SummarizeData.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/SummarizeData.scala @@ -122,7 +122,7 @@ class SummarizeData(override val uid: String) val base = createJoinBase(df) subFrames.foldLeft(base) { (z, dfi) => z.join(dfi, SummarizeData.FeatureColumnName) } - }) + }, dataset.columns.length) } def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/TextPreprocessor.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/TextPreprocessor.scala index c2406c6ab4..c9fc5b45d1 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/TextPreprocessor.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/TextPreprocessor.scala @@ -140,7 +140,7 @@ class TextPreprocessor(val uid: String) extends Transformer val mapText: String => String = broadcastedTrie.value.mapText val textMapper = udf(mapText) dataset.withColumn(getOutputCol, textMapper(dataset(getInputCol)).as(getOutputCol)) - }) + }, dataset.columns.length) } def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Timer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Timer.scala index 4d223257da..fe6092c4f7 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Timer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Timer.scala @@ -84,7 +84,7 @@ class Timer(val uid: String) extends Estimator[TimerModel] val (model, message) = fitWithTime(dataset) log(message) model - }) + }, dataset.columns.length) } } @@ -128,7 +128,7 @@ class TimerModel(val uid: String) val (model, message) = transformWithTime(dataset) log(message) model - }) + }, dataset.columns.length) } } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/UDFTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/UDFTransformer.scala index 55a056c57b..7a964d4b7a 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/UDFTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/UDFTransformer.scala @@ -97,7 +97,7 @@ class UDFTransformer(val uid: String) extends Transformer with Wrappable with Co } else { dataset.withColumn(getOutputCol, applyUDFOnCols(getInputCols.map(col): _*)) } - }) + }, dataset.columns.length) } def validateAndTransformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/UnicodeNormalize.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/UnicodeNormalize.scala index 310e05f8d3..546691540f 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/UnicodeNormalize.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/UnicodeNormalize.scala @@ -67,7 +67,7 @@ class UnicodeNormalize(val uid: String) extends Transformer val textMapper = udf(f) dataset.withColumn(getOutputCol, textMapper(dataset(getInputCol)).as(getOutputCol)) - }) + }, dataset.columns.length) } def transformSchema(schema: StructType): StructType = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala index 3372484a10..767ec41423 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala @@ -167,7 +167,7 @@ class ComputeModelStatistics(override val uid: String) extends Transformer } else { throwOnInvalidScoringKind(scoreValueKind) } - }) + }, dataset.columns.length) } //scalastyle:on method.length //scalastyle:on cyclomatic.complexity diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputePerInstanceStatistics.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputePerInstanceStatistics.scala index c7ec055499..795b87135a 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputePerInstanceStatistics.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputePerInstanceStatistics.scala @@ -101,7 +101,7 @@ class ComputePerInstanceStatistics(override val uid: String) extends Transformer .withColumn(MetricConstants.L2LossMetric, l2LossFunc(dataset(labelColumnName), dataset(scoresColumnName))) } - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): Transformer = new ComputePerInstanceStatistics() diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/TrainClassifier.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/TrainClassifier.scala index 0ddfd1dd80..7847bc50ff 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/TrainClassifier.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/TrainClassifier.scala @@ -201,7 +201,7 @@ class TrainClassifier(override val uid: String) extends AutoTrainer[TrainedClass .setFeaturesCol(getFeaturesCol) levels.map(l => model.setLevels(l.toArray)).getOrElse(model) - }) + }, dataset.columns.length) } //scalastyle:on method.length //scalastyle:on cyclomatic.complexity @@ -361,7 +361,7 @@ class TrainedClassifierModel(val uid: String) else CategoricalUtilities.setLevels(scoredDataWithUpdatedScoredLevels, getLabelCol, getLevels) - }) + }, dataset.columns.length) } private def setMetadataForColumnName(sparkColumnName: String, diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/TrainRegressor.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/TrainRegressor.scala index 213afa730c..06b81f0270 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/TrainRegressor.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/TrainRegressor.scala @@ -123,7 +123,7 @@ class TrainRegressor(override val uid: String) extends AutoTrainer[TrainedRegres .setLabelCol(labelColumn) .setModel(pipelineModel) .setFeaturesCol(getFeaturesCol) - }) + }, dataset.columns.length) } //scalastyle:on method.length //scalastyle:on cyclomatic.complexity @@ -172,7 +172,7 @@ class TrainedRegressorModel(val uid: String) cleanedScoredData, moduleName, getLabelCol, SchemaConstants.RegressionKind) SparkSchema.updateColumnMetadata(schematizedScoredDataWithLabel, moduleName, SchemaConstants.SparkPredictionColumn, SchemaConstants.RegressionKind) - }) + }, dataset.columns.length) } @DeveloperApi diff --git a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ImageFeaturizer.scala b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ImageFeaturizer.scala index 37a58db7ab..44223d3cfd 100644 --- a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ImageFeaturizer.scala +++ b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ImageFeaturizer.scala @@ -245,7 +245,7 @@ class ImageFeaturizer(val uid: String) extends Transformer with HasInputCol with } result.drop(tempCol) - }) + }, dataset.columns.length) } val convertOutputToVector: Seq[Float] => DenseVector = (raw: Seq[Float]) => { diff --git a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala index 59eb57d8bd..4792ba5668 100644 --- a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala +++ b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala @@ -208,7 +208,7 @@ class ONNXModel(override val uid: String) sliceModelAtOutputs(this, outputs) } - override def transform(dataset: Dataset[_]): DataFrame = logTransform { + override def transform(dataset: Dataset[_]): DataFrame = logTransform ({ val inputSchema = dataset.schema this.validateSchema(inputSchema) @@ -225,9 +225,9 @@ class ONNXModel(override val uid: String) // Due to potential slicing of model, we either use this model or a sliced one to do the actual transform actualModel.transformInner(dataset, inputSchema) - } + }, dataset.columns.length) - def transformInner(dataset: Dataset[_], inputSchema: StructType): DataFrame = logTransform { + def transformInner(dataset: Dataset[_], inputSchema: StructType): DataFrame = logTransform ({ val modelOutputSchema = getModelOutputSchema(inputSchema) implicit val enc: Encoder[Row] = RowEncoder( @@ -253,7 +253,7 @@ class ONNXModel(override val uid: String) val flattenedDF = new FlattenBatch().transform(outputDf) (softMaxTransform _ andThen argMaxTransform) (flattenedDF) - } + }, dataset.columns.length) private def softMaxTransform(input: DataFrame): DataFrame = { this.getSoftMaxDict.foldLeft(input) { diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index 79731fa08e..75b17183d7 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -60,7 +60,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine } else { trainOneDataBatch(dataset, batchIndex = 0, 1) } - }) + }, dataset.columns.length) } def beforeTrainBatch(batchIndex: Int, dataset: Dataset[_], model: Option[TrainedModel]): Unit = { diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMClassifier.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMClassifier.scala index 01a28eeeca..5916d085f6 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMClassifier.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMClassifier.scala @@ -160,7 +160,7 @@ class LightGBMClassificationModel(override val uid: String) " since no output columns were set.") } outputData.toDF - }) + }, dataset.columns.length) } override protected def raw2probabilityInPlace(rawPrediction: Vector): Vector = { diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRanker.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRanker.scala index 6fbbdd7794..9dc62831c6 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRanker.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRanker.scala @@ -152,7 +152,7 @@ class LightGBMRankerModel(override val uid: String) outputData = outputData.withColumn(getFeaturesShapCol, featureShapUDF(col(getFeaturesCol))) } outputData.toDF - }) + }, dataset.columns.length) } override def predict(features: Vector): Double = { diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRegressor.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRegressor.scala index a53e351682..20c4871d26 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRegressor.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRegressor.scala @@ -123,7 +123,7 @@ class LightGBMRegressionModel(override val uid: String) outputData = outputData.withColumn(getFeaturesShapCol, featureShapUDF(col(getFeaturesCol))) } outputData.toDF - }) + }, dataset.columns.length) } override def predict(features: Vector): Double = { diff --git a/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/ImageSetAugmenter.scala b/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/ImageSetAugmenter.scala index dc9e362944..5e023543e1 100644 --- a/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/ImageSetAugmenter.scala +++ b/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/ImageSetAugmenter.scala @@ -64,7 +64,7 @@ class ImageSetAugmenter(val uid: String) extends Transformer else Some(flipImages(df, getInputCol, getOutputCol, Flip.flipUpDown)) List(dfLR, dfUD).flatten(x => x).foldLeft(dfID) { case (dfl, tdr) => dfl.union(tdr) } - }) + }, dataset.columns.length) } diff --git a/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/ImageTransformer.scala b/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/ImageTransformer.scala index 5282f09d1c..e2c856933d 100644 --- a/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/ImageTransformer.scala +++ b/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/ImageTransformer.scala @@ -682,7 +682,7 @@ class ImageTransformer(val uid: String) extends Transformer } else { df.withColumn(getOutputCol, convert(df(getInputCol))) } - }) + }, dataset.columns.length) } private def getDecodedImage(decodeMode: String)(r: Any): Option[(String, Mat)] = { diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VectorZipper.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VectorZipper.scala index 6ebed03e61..e473b9bb51 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VectorZipper.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VectorZipper.scala @@ -36,6 +36,6 @@ class VectorZipper(override val uid: String) extends Transformer logTransform[DataFrame]({ val inputCols = getInputCols dataset.withColumn(getOutputCol, array(inputCols.head, inputCols.tail: _*)) - }) + }, dataset.columns.length) } } diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitCSETransformer.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitCSETransformer.scala index fcb257a41f..6171f97075 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitCSETransformer.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitCSETransformer.scala @@ -157,7 +157,7 @@ class VowpalWabbitCSETransformer(override val uid: String) // optional stratification .groupBy(getMetricsStratificationCols.map(F.col): _*) .agg(metrics.head, metrics.drop(1): _*) - }) + }, dataset.columns.length) } private def perRewardSchema(f: T.StructField): T.StructField = { diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitClassifier.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitClassifier.scala index 304ca889a2..b4a27dc365 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitClassifier.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitClassifier.scala @@ -81,7 +81,7 @@ class VowpalWabbitClassifier(override val uid: String) } trainInternal(finalDataset, model) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): this.type = defaultCopy(extra) @@ -154,7 +154,7 @@ class VowpalWabbitClassificationModel(override val uid: String) col(vowpalWabbitPredictionCol).getField("prediction").cast(DoubleType)) .withColumn($(predictionCol), col($(rawPredictionCol))) } - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): this.type = defaultCopy(extra) diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitContextualBandit.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitContextualBandit.scala index ae35728f23..ac59ab1020 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitContextualBandit.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitContextualBandit.scala @@ -273,7 +273,7 @@ class VowpalWabbitContextualBandit(override val uid: String) .setPredictionCol(getPredictionCol) trainInternal(dataset, model) - }) + }, dataset.columns.length) } override def fit(dataset: Dataset[_], paramMaps: Seq[ParamMap]): Seq[VowpalWabbitContextualBanditModel] = { @@ -293,7 +293,7 @@ class VowpalWabbitContextualBandit(override val uid: String) } awaitFutures(modelFutures).map(model => model.setParent(this)) - }) + }, dataset.columns.length) } def parallelFit(dataset: Dataset[_], paramMaps: util.ArrayList[ParamMap]): @@ -361,7 +361,7 @@ class VowpalWabbitContextualBanditModel(override val uid: String) dataset.withColumn( $(predictionCol), predictUDF(struct(dataset.columns.map(dataset(_)): _*))) - }) + }, dataset.columns.length) } override def predict(features: Row): Double = { diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitDSJsonTransformer.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitDSJsonTransformer.scala index 45f89d59aa..7de7a2d0bd 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitDSJsonTransformer.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitDSJsonTransformer.scala @@ -75,7 +75,7 @@ class VowpalWabbitDSJsonTransformer(override val uid: String) dataset.toDF .withColumn(JsonColName, F.from_json(F.col(getDsJsonColumn), jsonSchema)) .select(outputFields: _ *) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitFeaturizer.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitFeaturizer.scala index 6cf97347be..72fb64da03 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitFeaturizer.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitFeaturizer.scala @@ -212,7 +212,7 @@ class VowpalWabbitFeaturizer(override val uid: String) extends Transformer val mode = udf(featurizeRow(featurizers)) dataset.toDF.withColumn(getOutputCol, mode.apply(struct(fieldSubset.map(f => col(f.name)): _*))) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): VowpalWabbitFeaturizer = defaultCopy(extra) diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala index c08d97c354..15bb171693 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala @@ -76,7 +76,7 @@ class VowpalWabbitGeneric(override val uid: String) .setInputCol(getInputCol) trainInternal(dataset.toDF, model) - }) + }, dataset.columns.length) } } @@ -100,7 +100,7 @@ class VowpalWabbitGenericModel(override val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { // this is doing predict, but lightgbm also logs logTransform in the model... - logTransform { + logTransform ({ val df = dataset.toDF() val inputColIdx = df.schema.fieldIndex(getInputCol) @@ -116,7 +116,7 @@ class VowpalWabbitGenericModel(override val uid: String) }} })(rowEncoder) .toDF() - } + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitInteractions.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitInteractions.scala index f67a39f827..109506d457 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitInteractions.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitInteractions.scala @@ -74,7 +74,7 @@ class VowpalWabbitInteractions(override val uid: String) extends Transformer }) dataset.toDF.withColumn(getOutputCol, mode.apply(struct(fieldSubset.map(f => col(f.name)): _*))) - }) + }, dataset.columns.length) } override def transformSchema(schema: StructType): StructType = { diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitRegressor.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitRegressor.scala index 4747521ed3..2959380ee9 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitRegressor.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitRegressor.scala @@ -34,7 +34,7 @@ class VowpalWabbitRegressor(override val uid: String) .setPredictionCol(getPredictionCol) trainInternal(dataset, model) - }) + }, dataset.columns.length) } override def copy(extra: ParamMap): VowpalWabbitRegressor = defaultCopy(extra)