From 1de001d375d06ec681a2ac4eb3a62f01310af21d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 30 Mar 2015 19:05:26 +0800 Subject: [PATCH 1/3] Replace special characters '(' and ')' of Parquet schema. --- .../org/apache/spark/sql/parquet/ParquetTypes.scala | 12 ++++++++---- .../org/apache/spark/sql/hive/parquetSuites.scala | 12 ++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index da668f068613b..d1bdaafd9983a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -390,10 +390,11 @@ private[parquet] object ParquetTypesConverter extends Logging { def convertFromAttributes(attributes: Seq[Attribute], toThriftSchemaNames: Boolean = false): MessageType = { - val fields = attributes.map( - attribute => + val fields = attributes.map { old_attribute => + val attribute = old_attribute.withName(old_attribute.name.replaceAll("\\((.*)\\)", "[$1]")) fromDataType(attribute.dataType, attribute.name, attribute.nullable, - toThriftSchemaNames = toThriftSchemaNames)) + toThriftSchemaNames = toThriftSchemaNames) + } new MessageType("root", fields) } @@ -405,7 +406,10 @@ private[parquet] object ParquetTypesConverter extends Logging { } def convertToString(schema: Seq[Attribute]): String = { - StructType.fromAttributes(schema).json + val replaced_schema = schema.map { attribute => + attribute.withName(attribute.name.replaceAll("\\((.*)\\)", "[$1]")) + } + StructType.fromAttributes(replaced_schema).json } def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 432d65a874518..4adc32b663d77 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -578,6 +578,18 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { sql("DROP TABLE alwaysNullable") } + + test("Aggregation attribute names including special chars '(' and ')' should be replaced") { + val tempDir = Utils.createTempDir() + val filePath = new File(tempDir, "testParquet").getCanonicalPath + + val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") + val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int") + df2.saveAsParquetFile(filePath) + val df3 = parquetFile(filePath) + checkAnswer(df3, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil) + assert(df3.columns === Array("str", "MAX[int]")) + } } class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase { From 463dff4ab0c0321e12310be00fa05788beb5a3d8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 31 Mar 2015 18:23:40 +0800 Subject: [PATCH 2/3] Instead of replacing special chars, showing error message to user to suggest using Alias. --- .../spark/sql/parquet/ParquetTypes.scala | 22 ++++++++++++------- .../apache/spark/sql/hive/parquetSuites.scala | 14 +++++++----- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index d1bdaafd9983a..a86983bfcfd65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -390,11 +390,11 @@ private[parquet] object ParquetTypesConverter extends Logging { def convertFromAttributes(attributes: Seq[Attribute], toThriftSchemaNames: Boolean = false): MessageType = { - val fields = attributes.map { old_attribute => - val attribute = old_attribute.withName(old_attribute.name.replaceAll("\\((.*)\\)", "[$1]")) + checkSpecialCharacters(attributes) + val fields = attributes.map( + attribute => fromDataType(attribute.dataType, attribute.name, attribute.nullable, - toThriftSchemaNames = toThriftSchemaNames) - } + toThriftSchemaNames = toThriftSchemaNames)) new MessageType("root", fields) } @@ -405,11 +405,17 @@ private[parquet] object ParquetTypesConverter extends Logging { } } - def convertToString(schema: Seq[Attribute]): String = { - val replaced_schema = schema.map { attribute => - attribute.withName(attribute.name.replaceAll("\\((.*)\\)", "[$1]")) + def checkSpecialCharacters(schema: Seq[Attribute]) = { + // ,;{}()\n\t= and space character are special characters in Parquet schema + if (schema.exists(_.name.matches(".*[ ,;{}()\n\t=].*"))) { + sys.error("""Attribute name can not contain any character among " ,;{}()\n\t=". + | Use Alias to rename attribute name""".stripMargin) } - StructType.fromAttributes(replaced_schema).json + } + + def convertToString(schema: Seq[Attribute]): String = { + checkSpecialCharacters(schema) + StructType.fromAttributes(schema).json } def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 4adc32b663d77..f7f99f47de65e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -579,16 +579,20 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { sql("DROP TABLE alwaysNullable") } - test("Aggregation attribute names including special chars '(' and ')' should be replaced") { + test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") { val tempDir = Utils.createTempDir() val filePath = new File(tempDir, "testParquet").getCanonicalPath + val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int") - df2.saveAsParquetFile(filePath) - val df3 = parquetFile(filePath) - checkAnswer(df3, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil) - assert(df3.columns === Array("str", "MAX[int]")) + intercept[RuntimeException](df2.saveAsParquetFile(filePath)) + + val df3 = df2.toDF("str", "max_int") + df3.saveAsParquetFile(filePath2) + val df4 = parquetFile(filePath2) + checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil) + assert(df4.columns === Array("str", "max_int")) } } From 2d705427fc69bbee483efb64bc0dc8b06f17bd4d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 4 Apr 2015 11:28:10 +0800 Subject: [PATCH 3/3] Address comment. --- .../org/apache/spark/sql/parquet/ParquetTypes.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index a86983bfcfd65..60e1bec4db8e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -405,11 +405,15 @@ private[parquet] object ParquetTypesConverter extends Logging { } } - def checkSpecialCharacters(schema: Seq[Attribute]) = { + private def checkSpecialCharacters(schema: Seq[Attribute]) = { // ,;{}()\n\t= and space character are special characters in Parquet schema - if (schema.exists(_.name.matches(".*[ ,;{}()\n\t=].*"))) { - sys.error("""Attribute name can not contain any character among " ,;{}()\n\t=". - | Use Alias to rename attribute name""".stripMargin) + schema.map(_.name).foreach { name => + if (name.matches(".*[ ,;{}()\n\t=].*")) { + sys.error( + s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=". + |Please use alias to rename it. + """.stripMargin.split("\n").mkString(" ")) + } } }