Skip to content

Commit

Permalink
add try catch to avoid system corrupt when using df.cache
Browse files Browse the repository at this point in the history
fix issues in pom
  • Loading branch information
ckeys committed Aug 18, 2022
1 parent 54b1f50 commit b6bea28
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 148 deletions.
1 change: 1 addition & 0 deletions mlsql-mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<artifactId>byzer-extension-tests-3.0_2.12</artifactId>
<groupId>tech.mlsql</groupId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class SQLDataSummary(override val uid: String) extends SQLAlg with MllibFunction
*/
def computePercentile(data: RDD[Double], tile: Double): Double = {
// NIST method; data to be sorted in ascending order
val r = data.sortBy(x => x).cache()
val r = data.sortBy(x => x)
val c = r.count()
val res = if (c == 1) r.first()
else {
Expand Down Expand Up @@ -355,153 +355,157 @@ class SQLDataSummary(override val uid: String) extends SQLAlg with MllibFunction
val modeFormat = Try(params.getOrElse(DataSummary.modeFormat, ModeValueFormat.empty)).getOrElse(ModeValueFormat.empty)
var metrics = params.getOrElse(DataSummary.metrics, "").split(",").filter(!_.equalsIgnoreCase(""))
val repartitionDF = df.repartition(df.schema.map(sc => col(sc.name)).toArray: _*).cache()


val columns = repartitionDF.columns
columns.map(col => {
if (col.contains(".") || col.contains("`")) {
throw new RuntimeException(s"The column name : ${col} contains special symbols, like . or `, please rename it first!! ")
}
})

numericCols = repartitionDF.schema.filter(sc => {
sc.dataType.typeName match {
case datatype: String => Array("integer", "short", "double", "float", "long").contains(datatype) || datatype.contains("decimal")
case _ => false
}
}).map(sc => {
sc.name
}).toArray

val datatype_schema = ("DataType" +: repartitionDF.schema.map(f => f.name)).map(t => {
StructField(t, StringType)
})


// get the quantile number for the numeric columns
val spark = repartitionDF.sparkSession
val total_count = repartitionDF.count()
try {
columns.map(col => {
if (col.contains(".") || col.contains("`")) {
throw new RuntimeException(s"The column name : ${col} contains special symbols, like . or `, please rename it first!! ")
}
})

var new_quantile_rows = approxSwitch match {
case true => {
repartitionDF.select(repartitionDF.schema.map(sc => {
if (numericCols.contains(sc.name)) {
col(sc.name)
} else {
lit(0.0).as(sc.name)
}
numericCols = repartitionDF.schema.filter(sc => {
sc.dataType.typeName match {
case datatype: String => Array("integer", "short", "double", "float", "long").contains(datatype) || datatype.contains("decimal")
case _ => false
}
): _*).na.fill(0.0).stat.approxQuantile(repartitionDF.columns, Array(0.25, 0.5, 0.75), 0.05).transpose.map(_.map(String.valueOf(_)).toSeq).map(row =>
"Q" +: row
)
}
case false => {
getQuantileNum(repartitionDF.schema, repartitionDF, numericCols).transpose.map(_.map(v =>
String.valueOf(v) match {
case "NaN" => ""
case _ => v.formatted(s"%.${round_at}f")
}).map(sc => {
sc.name
}).toArray

val datatype_schema = ("DataType" +: repartitionDF.schema.map(f => f.name)).map(t => {
StructField(t, StringType)
})


// get the quantile number for the numeric columns
val spark = repartitionDF.sparkSession
val total_count = repartitionDF.count()

var new_quantile_rows = approxSwitch match {
case true => {
repartitionDF.select(repartitionDF.schema.map(sc => {
if (numericCols.contains(sc.name)) {
col(sc.name)
} else {
lit(0.0).as(sc.name)
}
}
).toSeq).map(row =>
"Q" +: row
)
): _*).na.fill(0.0).stat.approxQuantile(repartitionDF.columns, Array(0.25, 0.5, 0.75), 0.05).transpose.map(_.map(String.valueOf(_)).toSeq).map(row =>
"Q" +: row
)
}
case false => {
getQuantileNum(repartitionDF.schema, repartitionDF, numericCols).transpose.map(_.map(v =>
String.valueOf(v) match {
case "NaN" => ""
case _ => v.formatted(s"%.${round_at}f")
}
).toSeq).map(row =>
"Q" +: row
)
}
}
}
new_quantile_rows = new_quantile_rows.updated(0, new_quantile_rows.head.updated(0, "%25"))
new_quantile_rows = new_quantile_rows.updated(1, new_quantile_rows(1).updated(0, "median"))
new_quantile_rows = new_quantile_rows.updated(2, new_quantile_rows(2).updated(0, "%75"))
val quantile_df_tmp = new_quantile_rows.map(Row.fromSeq(_)).toSeq
val quantile_df = spark.createDataFrame(spark.sparkContext.parallelize(quantile_df_tmp, 1), StructType(datatype_schema)).na.fill("")
val new_quantile_df = quantile_df.select(quantile_df.schema.map(sc => {
if (!numericCols.contains(sc.name) && !sc.name.equals("DataType")) {
lit("").as(sc.name)
new_quantile_rows = new_quantile_rows.updated(0, new_quantile_rows.head.updated(0, "%25"))
new_quantile_rows = new_quantile_rows.updated(1, new_quantile_rows(1).updated(0, "median"))
new_quantile_rows = new_quantile_rows.updated(2, new_quantile_rows(2).updated(0, "%75"))
val quantile_df_tmp = new_quantile_rows.map(Row.fromSeq(_)).toSeq
val quantile_df = spark.createDataFrame(spark.sparkContext.parallelize(quantile_df_tmp, 1), StructType(datatype_schema)).na.fill("")
val new_quantile_df = quantile_df.select(quantile_df.schema.map(sc => {
if (!numericCols.contains(sc.name) && !sc.name.equals("DataType")) {
lit("").as(sc.name)
} else {
col(sc.name)
}
}): _*)

val mode_df = repartitionDF.select(getModeNum(repartitionDF.schema, numericCols, repartitionDF, modeFormat): _*).select(lit("mode").alias("metric"), col("*"))
val maxlength_df = repartitionDF.select(getMaxLength(repartitionDF.schema): _*).select(lit("maximumLength").alias("metric"), col("*"))
val minlength_df = repartitionDF.select(getMinLength(repartitionDF.schema): _*).select(lit("minimumLength").alias("metric"), col("*"))

val dfWithUniqueRatio = repartitionDF.select(countUniqueValueRatio(repartitionDF.schema): _*)
val distinct_proportion_df = dfWithUniqueRatio.select(lit("uniqueValueRatio").alias(DataSummary.metricColumnName), col("*"))

val is_primary_key_df = dfWithUniqueRatio.select(dfWithUniqueRatio.schema.map(sc => {
when(col(sc.name).cast(DoubleType) >= 1, "1").otherwise("0 ").alias(sc.name)
}).toArray: _*).select(lit("primaryKeyCandidate").alias(DataSummary.metricColumnName), col("*"))

val nullCountDf = repartitionDF.select(nullValueCount(repartitionDF.schema): _*)
val emptyCountDF = repartitionDF.select(emptyCount(repartitionDF.schema): _*)

val null_value_proportion_df = nullCountDf.select(countColsNullNumber(nullCountDf.schema, total_count): _*).select(lit("nullValueRatio").alias(DataSummary.metricColumnName), col("*"))
val empty_value_proportion_df = emptyCountDF.select(countColsEmptyNumber(emptyCountDF.columns, total_count): _*).select(lit("blankValueRatio").alias(DataSummary.metricColumnName), col("*"))
val non_null_df = nullCountDf.join(emptyCountDF).select(repartitionDF.schema.map(sc => {
lit(total_count) - col("left_" + sc.name) - col("right_" + sc.name)
}).toArray: _*).select(lit("nonNullCount").alias(DataSummary.metricColumnName), col("*"))

// val non_null_df = repartitionDF.select(countNonNullValue(repartitionDF.schema): _*).select(lit("nonNullCount").alias(DataSummary.metricColumnName), col("*"))


val mean_df = repartitionDF.select(getMeanValue(repartitionDF.schema): _*).select(lit("mean").alias("metric"), col("*"))
val stddev_df = repartitionDF.select(countColsStdDevNumber(repartitionDF.schema, numericCols): _*).select(lit("standardDeviation").alias(DataSummary.metricColumnName), col("*"))
val stderr_df = repartitionDF.select(countColsStdErrNumber(repartitionDF.schema, numericCols): _*).select(lit("standardError").alias(DataSummary.metricColumnName), col("*"))


val maxvalue_df = repartitionDF.select(getMaxNum(repartitionDF.schema, numericCols): _*).select(lit("max").alias(DataSummary.metricColumnName), col("*"))
val minvalue_df = repartitionDF.select(getMinNum(repartitionDF.schema, numericCols): _*).select(lit("min").alias(DataSummary.metricColumnName), col("*"))
val datatypelen_df = repartitionDF.select(getTypeLength(repartitionDF.schema): _*).select(lit("dataLength").alias(DataSummary.metricColumnName), col("*"))
val datatype_sq = Seq("dataType" +: repartitionDF.schema.map(f => f.dataType.typeName match {
case "null" => "unknown"
case _ => f.dataType.typeName
})).map(Row.fromSeq(_))


val colunm_idx = Seq("ordinalPosition" +: repartitionDF.columns.map(col_name => String.valueOf(repartitionDF.columns.indexOf(col_name) + 1))).map(Row.fromSeq(_))
var numeric_metric_df = mode_df
.union(distinct_proportion_df)
.union(null_value_proportion_df)
.union(empty_value_proportion_df)
.union(mean_df)
.union(non_null_df)
.union(stddev_df)
.union(stderr_df)
.union(maxvalue_df)
.union(minvalue_df)
.union(maxlength_df)
.union(minlength_df)
numeric_metric_df = roundNumericCols(numeric_metric_df, round_at)

val schema = StructType(StructField("metrics", StringType, true) +: df.columns.map(StructField(_, StringType, true)).toSeq)
val sc = spark.sparkContext.emptyRDD[Row]
var res = spark.createDataFrame(sc, schema)
val metric_values = metrics.map("'" + _.stripMargin + "'").mkString(",")

res = res.union(numeric_metric_df)
.union(is_primary_key_df)
.union(datatypelen_df)
.union(spark.createDataFrame(spark.sparkContext.parallelize(datatype_sq, 1), StructType(datatype_schema)))
.union(spark.createDataFrame(spark.sparkContext.parallelize(colunm_idx, 1), StructType(datatype_schema)))
.union(new_quantile_df)

if (metrics == null || metrics.lengthCompare(0) == 0) {
res = res.select(col("*"))
} else {
col(sc.name)
res = res.select(col("*")).where(s"${DataSummary.metrics} in (${metric_values})")
}
}): _*)

val mode_df = repartitionDF.select(getModeNum(repartitionDF.schema, numericCols, repartitionDF, modeFormat): _*).select(lit("mode").alias("metric"), col("*"))
val maxlength_df = repartitionDF.select(getMaxLength(repartitionDF.schema): _*).select(lit("maximumLength").alias("metric"), col("*"))
val minlength_df = repartitionDF.select(getMinLength(repartitionDF.schema): _*).select(lit("minimumLength").alias("metric"), col("*"))

val dfWithUniqueRatio = repartitionDF.select(countUniqueValueRatio(repartitionDF.schema): _*)
val distinct_proportion_df = dfWithUniqueRatio.select(lit("uniqueValueRatio").alias(DataSummary.metricColumnName), col("*"))

val is_primary_key_df = dfWithUniqueRatio.select(dfWithUniqueRatio.schema.map(sc => {
when(col(sc.name).cast(DoubleType) >= 1, "1").otherwise("0 ").alias(sc.name)
}).toArray: _*).select(lit("primaryKeyCandidate").alias(DataSummary.metricColumnName), col("*"))

val nullCountDf = repartitionDF.select(nullValueCount(repartitionDF.schema): _*)
val emptyCountDF = repartitionDF.select(emptyCount(repartitionDF.schema): _*)

val null_value_proportion_df = nullCountDf.select(countColsNullNumber(nullCountDf.schema, total_count): _*).select(lit("nullValueRatio").alias(DataSummary.metricColumnName), col("*"))
val empty_value_proportion_df = emptyCountDF.select(countColsEmptyNumber(emptyCountDF.columns, total_count): _*).select(lit("blankValueRatio").alias(DataSummary.metricColumnName), col("*"))
val non_null_df = nullCountDf.join(emptyCountDF).select(repartitionDF.schema.map(sc => {
lit(total_count) - col("left_" + sc.name) - col("right_" + sc.name)
}).toArray: _*).select(lit("nonNullCount").alias(DataSummary.metricColumnName), col("*"))

// val non_null_df = repartitionDF.select(countNonNullValue(repartitionDF.schema): _*).select(lit("nonNullCount").alias(DataSummary.metricColumnName), col("*"))


val mean_df = repartitionDF.select(getMeanValue(repartitionDF.schema): _*).select(lit("mean").alias("metric"), col("*"))
val stddev_df = repartitionDF.select(countColsStdDevNumber(repartitionDF.schema, numericCols): _*).select(lit("standardDeviation").alias(DataSummary.metricColumnName), col("*"))
val stderr_df = repartitionDF.select(countColsStdErrNumber(repartitionDF.schema, numericCols): _*).select(lit("standardError").alias(DataSummary.metricColumnName), col("*"))


val maxvalue_df = repartitionDF.select(getMaxNum(repartitionDF.schema, numericCols): _*).select(lit("max").alias(DataSummary.metricColumnName), col("*"))
val minvalue_df = repartitionDF.select(getMinNum(repartitionDF.schema, numericCols): _*).select(lit("min").alias(DataSummary.metricColumnName), col("*"))
val datatypelen_df = repartitionDF.select(getTypeLength(repartitionDF.schema): _*).select(lit("dataLength").alias(DataSummary.metricColumnName), col("*"))
val datatype_sq = Seq("dataType" +: repartitionDF.schema.map(f => f.dataType.typeName match {
case "null" => "unknown"
case _ => f.dataType.typeName
})).map(Row.fromSeq(_))


val colunm_idx = Seq("ordinalPosition" +: repartitionDF.columns.map(col_name => String.valueOf(repartitionDF.columns.indexOf(col_name) + 1))).map(Row.fromSeq(_))
var numeric_metric_df = mode_df
.union(distinct_proportion_df)
.union(null_value_proportion_df)
.union(empty_value_proportion_df)
.union(mean_df)
.union(non_null_df)
.union(stddev_df)
.union(stderr_df)
.union(maxvalue_df)
.union(minvalue_df)
.union(maxlength_df)
.union(minlength_df)
numeric_metric_df = roundNumericCols(numeric_metric_df, round_at)

val schema = StructType(StructField("metrics", StringType, true) +: df.columns.map(StructField(_, StringType, true)).toSeq)
val sc = spark.sparkContext.emptyRDD[Row]
var res = spark.createDataFrame(sc, schema)
val metric_values = metrics.map("'" + _.stripMargin + "'").mkString(",")

res = res.union(numeric_metric_df)
.union(is_primary_key_df)
.union(datatypelen_df)
.union(spark.createDataFrame(spark.sparkContext.parallelize(datatype_sq, 1), StructType(datatype_schema)))
.union(spark.createDataFrame(spark.sparkContext.parallelize(colunm_idx, 1), StructType(datatype_schema)))
.union(new_quantile_df)

if (metrics == null || metrics.lengthCompare(0)==0) {
res = res.select(col("*"))
} else {
res = res.select(col("*")).where(s"${DataSummary.metrics} in (${metric_values})")
}
// res.summary()
// Transpose
import spark.implicits._
val (header, data) = res.collect.map(_.toSeq.toArray).transpose match {
case Array(h, t@_*) => {
(h.map(_.toString), t.map(_.map(String.valueOf(_))))
// res.summary()
// Transpose
import spark.implicits._
val (header, data) = res.collect.map(_.toSeq.toArray).transpose match {
case Array(h, t@_*) => {
(h.map(_.toString), t.map(_.map(String.valueOf(_))))
}
}
val rows = res.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val transposeSchema = StructType(
StructField("columnName", StringType) +: header.map(StructField(_, StringType))
)
res = spark.createDataFrame(spark.sparkContext.parallelize(rows), transposeSchema)
res
} catch {
case e: Exception => throw e
} finally {
repartitionDF.unpersist()
}
val rows = res.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val transposeSchema = StructType(
StructField("columnName", StringType) +: header.map(StructField(_, StringType))
)
res = spark.createDataFrame(spark.sparkContext.parallelize(rows), transposeSchema)
res
}

override def load(sparkSession: SparkSession, path: String, params: Map[String, String]): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class SQLDataSummaryTest extends FlatSpec with SparkOperationUtil with Matchers
var seq_df = spark.createDataFrame(sseq).toDF("name", "favoriteNumber", "age", "mock_col1", "income", "date", "mock_col2", "alived", "extra", "extra1", "extra2","extra3")
var seq_df1 = seq_df.select(seq_df("income").cast(DoubleType).alias("income1"), col("*"))
val res = et.train(seq_df, "", Map("atRound" -> "2"))
et.train(seq_df, "", Map("atRound" -> "2","modeFormat"->"all")).show()
res.show()
val sseq2 = Seq(
(336, 123, "plan1", "", "534", 1, Timestamp.valueOf(LocalDateTime.of(2021, 3, 8, 18, 0)), Timestamp.valueOf(LocalDateTime.of(2021, 3, 8, 18, 0)), 1, Double.NaN),
Expand All @@ -56,14 +55,12 @@ class SQLDataSummaryTest extends FlatSpec with SparkOperationUtil with Matchers
var seq_df2 = spark.createDataFrame(sseq2).toDF("id", "dataset_id", "leftPlan_name", "right_plan_name", "plan_desc", "user_id", "ctime", "mtime", "alived", "plan_name_convert")
val res1 = et.train(seq_df2, "", Map("atRound" -> "2"))
res1.show()
val res11 = et.train(seq_df2, "", Map("atRound" -> "3","approxSwitch"->"true"))
res11.show()
val r0 = res.collectAsList().get(0).toSeq
println(r0.mkString(","))
assert(r0.mkString(",") === "name,,1.0,0.0,0.1667,,6,,,elena,AA,5,0,1,5,string,1,,,")
assert(r0.mkString(",") === "name,,1.0,0.0,0.1667,,5,,,elena,AA,5,0,1,5,string,1,,,")
val r1 = res.collectAsList().get(1).toSeq
println(r1.mkString(","))
assert(r1.mkString(",") === "favoriteNumber,57,0.5,0.0,0.0,37.83,6,29.69,12.12,57,-1,,,0,4,integer,2,-0.25,57.00,57.00")
assert(r1.mkString(",") === "favoriteNumber,57,0.5,0.0,0.0,37.83,6,29.69,12.12,57,-1,,,0 ,4,integer,2,-0.25,57.00,57.00")
val r2 = res.collectAsList().get(2).toSeq
println(r2.mkString(","))
assert(r2.mkString(",") === "age,,1.0,0.0,0.0,34.67,6,17.77,7.26,57,10,,,1,4,integer,3,18.25,35.00,51.75")
Expand All @@ -72,16 +69,16 @@ class SQLDataSummaryTest extends FlatSpec with SparkOperationUtil with Matchers
assert(r3.mkString(",") === "mock_col1,,1.0,0.0,0.0,128.33,6,23.17,9.46,160,100,,,1,8,long,4,107.50,125.00,152.50")
val r4 = res.collectAsList().get(4).toSeq
println(r4.mkString(","))
assert(r4.mkString(",") === "income,433000,0.8,0.0,0.1667,,6,,,533000,432000,6,0,0,6,string,5,,,")
assert(r4.mkString(",") === "income,433000,0.8,0.0,0.1667,,5,,,533000,432000,6,0,0 ,6,string,5,,,")
val r5 = res.collectAsList().get(5).toSeq
println(r5.mkString(","))
assert(r5.mkString(",") === "date,2021-03-08 18:00:00,0.1667,0.0,0.0,,6,,,2021-03-08 18:00:00,2021-03-08 18:00:00,,,0,8,timestamp,6,,,")
assert(r5.mkString(",") === "date,2021-03-08 18:00:00,0.1667,0.0,0.0,,6,,,2021-03-08 18:00:00,2021-03-08 18:00:00,,,0 ,8,timestamp,6,,,")
val r6 = res.collectAsList().get(6).toSeq
println(r6.mkString(","))
assert(r6.mkString(",") === "mock_col2,,1.0,0.3333,0.0,127.5,6,17.08,8.54,150.0,110.0,,,1,4,float,7,112.50,125.00,145.00")
assert(r6.mkString(",") === "mock_col2,,1.0,0.3333,0.0,127.5,4,17.08,8.54,150.0,110.0,,,1,4,float,7,112.50,125.00,145.00")
val r7 = res.collectAsList().get(7).toSeq
println(r7.mkString(","))
assert(r7.mkString(",") === "alived,true,0.3333,0.0,0.0,,6,,,true,false,,,0,1,boolean,8,,,")
assert(r7.mkString(",") === "alived,true,0.3333,0.0,0.0,,6,,,true,false,,,0 ,1,boolean,8,,,")
}
}
}

0 comments on commit b6bea28

Please sign in to comment.