From 7206483623563d3b71a2722f54c0cb7547080f34 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 16 Sep 2018 21:12:58 +0200 Subject: [PATCH 1/6] Added an expression test --- .../sql/catalyst}/csv/CSVInferSchema.scala | 30 +++++++++------ .../catalyst/expressions/csvExpressions.scala | 37 +++++++++++++++++++ .../expressions/CsvExpressionsSuite.scala | 4 ++ .../datasources/csv/CSVDataSource.scala | 4 +- .../datasources/csv/CSVInferSchemaSuite.scala | 3 +- 5 files changed, 63 insertions(+), 15 deletions(-) rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/csv/CSVInferSchema.scala (93%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala similarity index 93% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index ba6c0dde7ee3d..6dd52e0d05494 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.csv +package org.apache.spark.sql.catalyst.csv import java.math.BigDecimal -import scala.util.control.Exception._ +import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion -import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ -private[csv] object CSVInferSchema { +object CSVInferSchema { /** * Similar to the JSON schema inference @@ -44,13 +43,7 @@ private[csv] object CSVInferSchema { val rootTypes: Array[DataType] = tokenRDD.aggregate(startType)(inferRowType(options), mergeRowTypes) - header.zip(rootTypes).map { case (thisHeader, rootType) => - val dType = rootType match { - case _: NullType => StringType - case other => other - } - StructField(thisHeader, dType, nullable = true) - } + toStructFields(rootTypes, header, options) } else { // By default fields are assumed to be StringType header.map(fieldName => StructField(fieldName, StringType, nullable = true)) @@ -59,7 +52,20 @@ private[csv] object CSVInferSchema { StructType(fields) } - private def inferRowType(options: CSVOptions) + def toStructFields( + fieldTypes: Array[DataType], + header: Array[String], + options: CSVOptions): Array[StructField] = { + header.zip(fieldTypes).map { case (thisHeader, rootType) => + val dType = rootType match { + case _: NullType => StringType + case other => other + } + StructField(thisHeader, dType, nullable = true) + } + } + + def inferRowType(options: CSVOptions) (rowSoFar: Array[DataType], next: Array[String]): Array[DataType] = { var i = 0 while (i < math.min(rowSoFar.length, next.length)) { // May have columns on right missing. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 008f4a4147ed5..1244495cbbe8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -17,13 +17,19 @@ package org.apache.spark.sql.catalyst.expressions +import com.fasterxml.jackson.core.JsonFactory +import com.univocity.parsers.csv.CsvParser + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.csv._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils /** * Converts a CSV input string to a [[StructType]] with the specified schema. @@ -107,3 +113,34 @@ case class CsvToStructs( override def inputTypes: Seq[AbstractDataType] = StringType :: Nil } + +/** + * A function infers schema of CSV string. + */ +@ExpressionDescription( + usage = "_FUNC_(csv[, options]) - Returns schema in the DDL format of CSV string.", + examples = """ + Examples: + > SELECT _FUNC_('1,abc'); + struct<_c0:int,_c1:string> + """, + since = "3.0.0") +case class SchemaOfCsv(child: Expression) + extends UnaryExpression with String2StringExpression with CodegenFallback { + + override def convert(v: UTF8String): UTF8String = { + val parsedOptions = new CSVOptions(Map.empty, true, "UTC") + val parser = new CsvParser(parsedOptions.asParserSettings) + val row = parser.parseLine(v.toString) + + if (row != null) { + val header = row.zipWithIndex.map { case (_, index) => s"_c$index" } + val startType: Array[DataType] = Array.fill[DataType](header.length)(NullType) + val fieldTypes = CSVInferSchema.inferRowType(parsedOptions)(startType, row) + val st = StructType(CSVInferSchema.toStructFields(fieldTypes, header, parsedOptions)) + UTF8String.fromString(st.catalogString) + } else { + null + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 670b219d9439d..d3d0022d226a3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -131,4 +131,8 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P val schemaToCompare = csvSchema.asNullable assert(schemaToCompare == schema) } + + test("infer schema of CSV strings") { + checkEvaluation(SchemaOfCsv(Literal.create("1,abc")), "struct<_c0:int,_c1:string>") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index a972f11b1037d..5d1a2d997ab50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -27,14 +27,14 @@ import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat - import org.apache.spark.TaskContext + import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityParser} +import org.apache.spark.sql.catalyst.csv.{CSVInferSchema, CSVOptions, UnivocityParser} import org.apache.spark.sql.catalyst.csv.CSVUtils.filterCommentAndEmpty import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 6b64f2ffa98dd..b4dec15d795af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.csv.CSVOptions + +import org.apache.spark.sql.catalyst.csv.{CSVInferSchema, CSVOptions} import org.apache.spark.sql.types._ class CSVInferSchemaSuite extends SparkFunSuite { From 3239886c5c812dda29a575edc505af85345b9433 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 21 Sep 2018 15:42:30 +0200 Subject: [PATCH 2/6] Fix imports --- .../spark/sql/execution/datasources/csv/CSVDataSource.scala | 2 +- .../sql/execution/datasources/csv/CSVInferSchemaSuite.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 5d1a2d997ab50..09e3c5461a2ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -27,8 +27,8 @@ import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.TaskContext +import org.apache.spark.TaskContext import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.rdd.{BinaryFileRDD, RDD} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index b4dec15d795af..e8fccfc98fc05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.SparkFunSuite - import org.apache.spark.sql.catalyst.csv.{CSVInferSchema, CSVOptions} import org.apache.spark.sql.types._ From 3dc94a9082648a321eda61e4aa7b0bf965505e67 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 21 Sep 2018 17:03:39 +0200 Subject: [PATCH 3/6] Support options --- .../catalyst/expressions/csvExpressions.scala | 17 +++++++++++------ .../expressions/CsvExpressionsSuite.scala | 9 ++++++++- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala index 1244495cbbe8d..0e6c2cfeaa99f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala @@ -114,9 +114,6 @@ case class CsvToStructs( override def inputTypes: Seq[AbstractDataType] = StringType :: Nil } -/** - * A function infers schema of CSV string. - */ @ExpressionDescription( usage = "_FUNC_(csv[, options]) - Returns schema in the DDL format of CSV string.", examples = """ @@ -124,12 +121,20 @@ case class CsvToStructs( > SELECT _FUNC_('1,abc'); struct<_c0:int,_c1:string> """, - since = "3.0.0") -case class SchemaOfCsv(child: Expression) + since = "2.5.0") +case class SchemaOfCsv( + child: Expression, + options: Map[String, String]) extends UnaryExpression with String2StringExpression with CodegenFallback { + def this(child: Expression) = this(child, Map.empty[String, String]) + + def this(child: Expression, options: Expression) = this( + child = child, + options = ExprUtils.convertToMapData(options)) + override def convert(v: UTF8String): UTF8String = { - val parsedOptions = new CSVOptions(Map.empty, true, "UTC") + val parsedOptions = new CSVOptions(options, true, "UTC") val parser = new CsvParser(parsedOptions.asParserSettings) val row = parser.parseLine(v.toString) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index d3d0022d226a3..aee699178a75b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -133,6 +133,13 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P } test("infer schema of CSV strings") { - checkEvaluation(SchemaOfCsv(Literal.create("1,abc")), "struct<_c0:int,_c1:string>") + checkEvaluation(new SchemaOfCsv(Literal.create("1,abc")), "struct<_c0:int,_c1:string>") + } + + test("infer schema of CSV strings by using options") { + checkEvaluation( + new SchemaOfCsv(Literal.create("1|abc"), + CreateMap(Seq(Literal.create("delimiter"), Literal.create("|")))), + "struct<_c0:int,_c1:string>") } } From 7acdd70924af783e2d6e26365640708b103ab0aa Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 21 Sep 2018 18:05:55 +0200 Subject: [PATCH 4/6] Register schema_of_csv and adding SQL tests --- .../catalyst/analysis/FunctionRegistry.scala | 3 ++- .../sql-tests/inputs/csv-functions.sql | 4 ++++ .../sql-tests/results/csv-functions.sql.out | 18 +++++++++++++++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 08311126f25b0..b728fa33c7f3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -532,7 +532,8 @@ object FunctionRegistry { castAlias("string", StringType), // csv - expression[CsvToStructs]("from_csv") + expression[CsvToStructs]("from_csv"), + expression[SchemaOfCsv]("schema_of_csv") ) val builtin: SimpleFunctionRegistry = { diff --git a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql index c77568914cc51..b5577396ee336 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/csv-functions.sql @@ -10,3 +10,7 @@ select from_csv('1', 'a InvalidType'); select from_csv('1', 'a INT', named_struct('mode', 'PERMISSIVE')); select from_csv('1', 'a INT', map('mode', 1)); select from_csv(); + +-- infer schema of json literal +select schema_of_csv('1,abc'); +select schema_of_csv('1|abc', map('delimiter', '|')); diff --git a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out index 4f74bbebd22ee..c427ef22cd3f7 100644 --- a/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/csv-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 10 +-- Number of queries: 12 -- !query 0 @@ -102,3 +102,19 @@ struct<> -- !query 9 output org.apache.spark.sql.AnalysisException Invalid number of arguments for function from_csv. Expected: one of 2 and 3; Found: 0; line 1 pos 7 + + +-- !query 10 +select schema_of_csv('1,abc') +-- !query 10 schema +struct +-- !query 10 output +struct<_c0:int,_c1:string> + + +-- !query 11 +select schema_of_csv('1|abc', map('delimiter', '|')) +-- !query 11 schema +struct +-- !query 11 output +struct<_c0:int,_c1:string> From 22cfb46c7677fa02c2a6d8fce36d7a1f0732131b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 21 Sep 2018 19:27:33 +0200 Subject: [PATCH 5/6] Adding schema_of_csv and tests --- .../org/apache/spark/sql/functions.scala | 25 +++++++++++++++++++ .../apache/spark/sql/CsvFunctionsSuite.scala | 8 ++++++ 2 files changed, 33 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index c40f0ebf32cf8..11d526a29b69d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3843,6 +3843,31 @@ object functions { withExpr(new CsvToStructs(e.expr, lit(schema).expr, options.asScala.toMap)) } + /** + * Parses a column containing a CSV string and infers its schema. + * + * @param e a string column containing CSV data. + * + * @group collection_funcs + * @since 2.5.0 + */ + def schema_of_csv(e: Column): Column = withExpr(new SchemaOfCsv(e.expr)) + + /** + * Parses a column containing a CSV string and infers its schema using options. + * + * @param e a string column containing CSV data. + * @param options options to control how the CSV is parsed. accepts the same options and the + * json data source. See [[DataFrameReader#csv]]. + * @return a column with string literal containing schema in DDL format. + * + * @group collection_funcs + * @since 2.5.0 + */ + def schema_of_csv(e: Column, options: java.util.Map[String, String]): Column = { + withExpr(SchemaOfCsv(e.expr, options.asScala.toMap)) + } + // scalastyle:off line.size.limit // scalastyle:off parameter.number diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 1b4bc78632011..e89718124300a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import collection.JavaConverters._ + import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -71,4 +73,10 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext { df1.selectExpr("from_csv(value, 'a INT')"), Row(Row(1)) :: Nil) } + + test("infers schemas using options") { + val df = spark.range(1) + .select(schema_of_csv(lit("0.1 1"), Map("sep" -> " ").asJava)) + checkAnswer(df, Seq(Row("struct<_c0:double,_c1:int>"))) + } } From a2322d1ae21f9f89a9ac09dd418bc4f72b50f711 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 21 Sep 2018 19:54:43 +0200 Subject: [PATCH 6/6] Support schema_of_csv in PySpark --- python/pyspark/sql/functions.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 8a666859d520f..60cc9d70a6feb 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2348,6 +2348,27 @@ def schema_of_json(col): return Column(jc) +@ignore_unicode_prefix +@since(2.5) +def schema_of_csv(col, options={}): + """ + Parses a column containing a CSV string and infers its schema in DDL format. + + :param col: string column in CSV format + :param options: options to control parsing. accepts the same options as the CSV datasource + + >>> from pyspark.sql.types import * + >>> data = [(1, '1|a')] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(schema_of_csv(df.value, {'sep':'|'}).alias("csv")).collect() + [Row(csv=u'struct<_c0:int,_c1:string>')] + """ + + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.schema_of_csv(_to_java_column(col), options) + return Column(jc) + + @since(1.5) def size(col): """