Skip to content

Conversation

@maropu
Copy link
Member

@maropu maropu commented Nov 28, 2016

What changes were proposed in this pull request?

A vectorized parquet reader fails to read column data if data schema and partition schema overlap with each other and inferred types in the partition schema differ from ones in the data schema. An example code to reproduce this bug is as follows;

scala> case class A(a: Long, b: Int)
scala> val as = Seq(A(1, 2))
scala> spark.createDataFrame(as).write.parquet("/data/a=1/")
scala> val df = spark.read.parquet("/data/")
scala> df.printSchema
root
 |-- a: long (nullable = true)
 |-- b: integer (nullable = true)
scala> df.collect
java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283)
        at org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

The root cause is that a logical layer (HadoopFsRelation) and a physical layer (VectorizedParquetRecordReader) have a different assumption on partition schema; the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string. To fix this bug, this pr simply updates HadoopFsRelation.schema to respect the partition columns position in data schema and respect the partition columns type in partition schema.

How was this patch tested?

Add tests in ParquetPartitionDiscoverySuite

@maropu
Copy link
Member Author

maropu commented Nov 28, 2016

This query passed in the released spark-2.0.2, so it seems this regression is involved with SPARK-18510.

@SparkQA
Copy link

SparkQA commented Nov 28, 2016

Test build #69230 has finished for PR 16030 at commit 6bd8b4c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Nov 28, 2016

@brkyvz @tdas Could you check this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu I think you're doing the wrong thing. You're having the value a=1 as part of your data schema and your partition directory, which is not allowed. I would say the correct behavior here would be to fail the reading case, because the value is both in the partition schema and the data schema, and it's possible that they may not be equal.
For example:

val df = Seq((1L, 2.0)).toDF("a", "b")
df.write.parquet(s"$path/a=2")
// this should fail because `a` is both part of the partitioning and schema
checkAnswer(spark.read.parquet(s"$path"), Seq(Row(1L, 2.0)))

@brkyvz
Copy link
Contributor

brkyvz commented Nov 28, 2016

@maropu I wouldn't say this is a regression. I would say that this working for 2.0.2 was a bug in 2.0.2. If you want the column a to be interpreted as a LongType instead of IntegerType, you should provide the schema with df.read.schema(schemaWhereAIsLong).parquet("path")

@brkyvz
Copy link
Contributor

brkyvz commented Nov 28, 2016

Or the thing that we should fix here is that if a partition column is found also as part of the dataSchema, to throw an exception.

@maropu
Copy link
Member Author

maropu commented Nov 28, 2016

@brkyvz Thanks for your comment! okay, I'll fix in a that way.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69307 has started for PR 16030 at commit 43f028d.

@maropu
Copy link
Member Author

maropu commented Nov 29, 2016

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69312 has finished for PR 16030 at commit 43f028d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Nov 29, 2016

I'm looking into the failures.

@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69336 has finished for PR 16030 at commit 43b4eb0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Nov 30, 2016

@brkyvz How about this fix?

@liancheng
Copy link
Contributor

liancheng commented Nov 30, 2016

@brkyvz @maropu Actually, we do allow users to create partitioned tables that allow data schema to contain (part of) the partition columns, and there are test cases for this use case.

This use case is mostly useful when you are trying to reorganize an existing dataset into a partitioned form. Say you have a JSON dataset containing all the tweets in 2016 and you'd like to partition it by date. By allowing the data schema to contain partitioned columns, you may simply put JSON files of the same date into the same directory. Otherwise, you'll have to run an ETL job to erase the date column from the dataset, which can be time-consuming.

As for the query @maropu mentioned in the PR description, the query itself is problematic, because it lacks a user-specified schema to override the data type of the partitioned column a. Ideally, partition discovery should be able to fill in the correct data type LongType, but it's impossible since the directory path doesn't really expose that information. That's why a user-specified schema is necessary.

This query works in 2.0.2 because of the bug @brkyvz fixed in PR #15951: Spark ignores the data types of partitioned columns specified in the user-specified schema. Now the bug is fixed and this query doesn't work as expected. (UPDATE: Made a mistake in this paragraph. Verified locally that this query doesn't work in 2.0.2 either.)

In short:

  1. This isn't a regression, the original query itself is problematic.

  2. For this PR, we can either just close it or try to provide a better error message in the read path (ask the user to provide a user-specified schema) when:

    • A partitioned columns p also appears in the data schema, and
    • The discovered data type of p is different from the data type specified in the data schema.

    An alternative is to override the discovered partition column data type using the one in the data schema if any. But I'd say this change is probably too risky at this moment for 2.1.

@liancheng
Copy link
Contributor

liancheng commented Nov 30, 2016

@maropu I tried your snippet (with minor modifications). It works for 1.6.0 instead of 2.0.2:

case class A(a: Long, b: Int)
val as = Seq(A(1, 2))
val path = "/tmp/part"
sqlContext.createDataFrame(as).write.mode("overwrite").parquet(s"$path/a=1/")
val df = sqlContext.read.parquet(path)
df.printSchema()
df.collect()

For 2.0.2, it throws exactly the same NPE.

@brkyvz
Copy link
Contributor

brkyvz commented Nov 30, 2016

I also made this query work on 2.1 branch by configuring spark.sql.parquet.enableVectorizedReader = false. It seems to be an issue with the VectorizedParquetReader.

@liancheng
Copy link
Contributor

My hunch is that we somehow passed a wrong requested schema containing the partition column down to the vectorized Parquet reader. IIRC, we prune partition columns from the data schema when generating the requested schema for the underlying reader since partition values are directly available in the directory path, there's no need to read and decode them from the physical file.

@maropu
Copy link
Member Author

maropu commented Dec 1, 2016

This is not a bug in VectorizedParquetReader as @liancheng said, and the root cause is that wrongly inferred types are passed into the reader in VectorizedParquetReader#initBatch https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L176

yea, I know this functionality is helpful for skilful users, and, on the other hand, newbies easily could break query results via this interface as @brkyvz said. Therefore, in case of that we allow data schema to contain (part of) the partition columns, IMO it'd be better to alert that risk to users via logWarning or something.

BTW this is the original fix of this bug (Sorry, but I wrongly overrode and remove this commit ): master...maropu:SPARK-18108-2. The fix of this commit is to fill in correct data types when data schema contains (part of) the partition columns.

@brkyvz
Copy link
Contributor

brkyvz commented Dec 1, 2016

@maropu The reason I call it a bug with the VectorizedParquetReader is that all other data sources always replace the value read from the data with the partition value. The VectorizedReader also doesn't throw an exception if you try the following example:

case class A(a: Long, b: Int)
val as = Seq(A(1, 2))
val path = "/tmp/part"
sqlContext.createDataFrame(as).write.mode("overwrite").parquet(s"$path/a=1480617712537/")
val df = sqlContext.read.parquet(path)
df.printSchema()
df.collect()

returns

root
 |-- a: long (nullable = true)
 |-- b: integer (nullable = true)

df: org.apache.spark.sql.DataFrame = [a: bigint, b: int]
res7: Array[org.apache.spark.sql.Row] = Array([1480617712537,2])

@brkyvz
Copy link
Contributor

brkyvz commented Dec 1, 2016

@maropu I think I found the simplest fix!
Would you mind changing:

val dataSchema = userSpecifiedSchema.map { schema =>

to something like:

val dataSchema = userSpecifiedSchema.orElse {
    format.inferSchema(
      sparkSession,
      caseInsensitiveOptions
      tempFileIndex.allFiles())
}.getOrElse {
  throw new AnalysisException(
    s"Unable to infer schema for $format. It must be specified manually.")
}
val dataWithoutPartitions = dataSchema.filterNot { field =>
  partitionSchema.exists(p => equality(p.name, field.name))
}

What we were missing was removing the partition columns from the data schema when we infer the format

@maropu
Copy link
Member Author

maropu commented Dec 2, 2016

@brkyvz Thanks! Does the latest fix apply your suggestion?

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69528 has finished for PR 16030 at commit 7e068dd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still keep this below the if (justPartitioning) area, because otherwise everytime someone performs a df.mode("append").saveAsTable() we will perform schema inference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is not necessary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep the code here like I mentioned above, but keep your changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I reverted this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this else if

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, removed this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this test is broken only when the vectorizedReader is on, I would also keep the conf here as enabled, just in case anyone changes the default implementation later.
so I would also wrap this as:

withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
  withTempPath { dir =>
    ...
  }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@brkyvz
Copy link
Contributor

brkyvz commented Dec 2, 2016

@maropu I would still keep the changes I proposed below L180 like I commented before. We don't need to use the inferred data type as the partition type

@maropu
Copy link
Member Author

maropu commented Dec 15, 2016

@cloud-fan Does the latest fix satisfy what you suggested?

@maropu
Copy link
Member Author

maropu commented Dec 15, 2016

@liancheng As for DataFrameReader.dataSchema() and DataFrameReader.partitoinSchema(), did you mean we add new interfaces there for users to set user-defined data and partition schema, respectively? This made me image like this fix (maropu@039884e#diff-f70bda59304588cc3abfa3a9840653f4R78);

scala> import org.apache.spark.sql.types._
scala> val partitionSchema = new StructType().add("a", LongType)
scala> val df = Seq((1L, 0), (2L, 0)).toDF("a", "b")
scala> df.write.save("/Users/maropu/Desktop/data/a=1")
scala> spark.read.partitionSchema(partitionSchema).load("/Users/maropu/Desktop/data").printSchema
root
 |-- a: long (nullable = true)
 |-- b: integer (nullable = true)

StructType(dataSchema ++ partitionSchema.filterNot { column =>
dataSchemaColumnNames.contains(column.name.toLowerCase)
val equality = sparkSession.sessionState.conf.resolver
val overriddenDataSchema = dataSchema.map { dataField =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

val getColName: (StructField => String) = if (conf.caseSensitive) _.name else _.name.toLowerCase
val overlappedPartCols = mutable.Map.empty[String, StructField]
for {
  dataField <- dataSchema
  partitionField <- partitionSchema
  if getColName(dataField) == getColName(partitionField)
} overlappedPartCols += getColName(partitionField) -> partitionField

StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
  partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you use sparkSession.sessionState.conf.resolver? Any reason I missed?
I just wrote this code with the same style with DataSource#getOrInferFileFormatSchema and is this a bad idea? Anyway, since the two patterns have the same output, both is okay to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the current code will iterate dataSchema many times(depend on the number of partition columns), while my proposal only iterate it 2 times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit modified, how about this?

@SparkQA
Copy link

SparkQA commented Dec 15, 2016

Test build #70177 has finished for PR 16030 at commit 5b23b89.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 15, 2016

Test build #70178 has finished for PR 16030 at commit dc54b69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else _.name.toLowerCase
val overlappedPartCols = mutable.Map.empty[String, StructField]
partitionSchema.foreach { partitionField =>
dataSchema.find(getColName(_) == getColName(partitionField)).map { overlappedCol =>
Copy link
Contributor

@cloud-fan cloud-fan Dec 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit clearer:

if (dataSchema.find(getColName(_) == getColName(partitionField)).isDefined) {
  overlappedPartCols += getColName(partitionField) -> partitionField
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed and thanks!

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins. Can you also update the PR title and description? thanks!

@maropu
Copy link
Member Author

maropu commented Dec 15, 2016

okay

@SparkQA
Copy link

SparkQA commented Dec 15, 2016

Test build #70186 has finished for PR 16030 at commit 0601ccd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Dec 15, 2016

@cloud-fan okay, I updated the desc.

@SparkQA
Copy link

SparkQA commented Dec 15, 2016

Test build #70190 has finished for PR 16030 at commit 8c7d3b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

Can you also update the title? And the description has a mistake: the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string.

@maropu maropu changed the title [SPARK-18108][SQL] Fix a bug to fail partition schema inference [SPARK-18108][SQL] Fix a schema inconsistent bug that makes a parquet reader fail to read data Dec 16, 2016
@maropu
Copy link
Member Author

maropu commented Dec 16, 2016

oh, I wrongly wrote in an opposite way..., okay, fixed cc: @cloud-fan

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.1!

asfgit pushed a commit that referenced this pull request Dec 16, 2016
… reader fail to read data

## What changes were proposed in this pull request?
A vectorized parquet reader fails to read column data if data schema and partition schema overlap with each other and inferred types in the partition schema differ from ones in the data schema. An example code to reproduce this bug is as follows;

```
scala> case class A(a: Long, b: Int)
scala> val as = Seq(A(1, 2))
scala> spark.createDataFrame(as).write.parquet("/data/a=1/")
scala> val df = spark.read.parquet("/data/")
scala> df.printSchema
root
 |-- a: long (nullable = true)
 |-- b: integer (nullable = true)
scala> df.collect
java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283)
        at org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
The root cause is that a logical layer (`HadoopFsRelation`) and a physical layer (`VectorizedParquetRecordReader`) have a different assumption on partition schema; the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string. To fix this bug, this pr simply updates `HadoopFsRelation.schema` to respect the partition columns position in data schema and respect the partition columns type in partition schema.

## How was this patch tested?
Add tests in `ParquetPartitionDiscoverySuite`

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #16030 from maropu/SPARK-18108.

(cherry picked from commit dc2a4d4)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@asfgit asfgit closed this in dc2a4d4 Dec 16, 2016
@maropu
Copy link
Member Author

maropu commented Dec 16, 2016

Thanks! I found another wired behaviour related to this issue. Is this expected? or, should we fix?

scala> import org.apache.spark.sql.types._
scala> val schema = new StructType().add("a", LongType).add("b", IntegerType)
scala> case class A(a: Long, b: Int)
scala> val as = Seq(A(1, 2))
scala> spark.createDataFrame(as).write.parquet("/Users/maropu/Desktop/data/a=1/")
scala> val df = spark.read.schema(schema).parquet("/Users/maropu/Desktop/data/")
scala> df.printSchema
root
 |-- b: integer (nullable = true)
 |-- a: long (nullable = true)

scala> val df = spark.read.parquet("/Users/maropu/Desktop/data/")
scala> df.printSchema
root
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)

@cloud-fan
Copy link
Contributor

This was the behavior change your PR proposed before, I think it makes sense, you can send a PR to fix it in Spark 2.2

@maropu
Copy link
Member Author

maropu commented Dec 17, 2016

okay! I'll make a JIRA later and thanks!

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… reader fail to read data

## What changes were proposed in this pull request?
A vectorized parquet reader fails to read column data if data schema and partition schema overlap with each other and inferred types in the partition schema differ from ones in the data schema. An example code to reproduce this bug is as follows;

```
scala> case class A(a: Long, b: Int)
scala> val as = Seq(A(1, 2))
scala> spark.createDataFrame(as).write.parquet("/data/a=1/")
scala> val df = spark.read.parquet("/data/")
scala> df.printSchema
root
 |-- a: long (nullable = true)
 |-- b: integer (nullable = true)
scala> df.collect
java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283)
        at org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
The root cause is that a logical layer (`HadoopFsRelation`) and a physical layer (`VectorizedParquetRecordReader`) have a different assumption on partition schema; the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string. To fix this bug, this pr simply updates `HadoopFsRelation.schema` to respect the partition columns position in data schema and respect the partition columns type in partition schema.

## How was this patch tested?
Add tests in `ParquetPartitionDiscoverySuite`

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes apache#16030 from maropu/SPARK-18108.
@maropu maropu deleted the SPARK-18108 branch July 5, 2017 11:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants