Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ private[parquet] object ParquetTypesConverter extends Logging {

def convertFromAttributes(attributes: Seq[Attribute],
toThriftSchemaNames: Boolean = false): MessageType = {
checkSpecialCharacters(attributes)
val fields = attributes.map(
attribute =>
fromDataType(attribute.dataType, attribute.name, attribute.nullable,
Expand All @@ -404,7 +405,20 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
}

private def checkSpecialCharacters(schema: Seq[Attribute]) = {
// ,;{}()\n\t= and space character are special characters in Parquet schema
schema.map(_.name).foreach { name =>
if (name.matches(".*[ ,;{}()\n\t=].*")) {
sys.error(
s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=".
|Please use alias to rename it.
""".stripMargin.split("\n").mkString(" "))
}
}
}

def convertToString(schema: Seq[Attribute]): String = {
checkSpecialCharacters(schema)
StructType.fromAttributes(schema).json
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,22 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {

sql("DROP TABLE alwaysNullable")
}

test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
val tempDir = Utils.createTempDir()
val filePath = new File(tempDir, "testParquet").getCanonicalPath
val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath

val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
intercept[RuntimeException](df2.saveAsParquetFile(filePath))

val df3 = df2.toDF("str", "max_int")
df3.saveAsParquetFile(filePath2)
val df4 = parquetFile(filePath2)
checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
assert(df4.columns === Array("str", "max_int"))
}
}

class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
Expand Down