-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26744][SQL]Support schema validation in FileDataSourceV2 framework #23714
Conversation
@@ -490,9 +490,6 @@ case class DataSource( | |||
outputColumnNames: Seq[String], | |||
physicalPlan: SparkPlan): BaseRelation = { | |||
val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames) | |||
if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile I think the validation here is duplicated with the check supportDataType
. So I remove it. I can revert it if someone has a good reason for keeping it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this method only called by file sources?
* Returns whether this format supports the given [[DataType]] in write path. | ||
* By default all data types are supported. | ||
*/ | ||
def supportDataType(dataType: DataType): Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @gengliangwang and @cloud-fan .
In DSv2, I guess it would be more natural to have a Java interface for this validation API. How do you think about that?
cc @rdblue since this is DSv2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per discussion, this is an implementation in file source, not an API. It's internal so we don't need java here.
@gengliangwang, can you be more clear about what you are proposing to add to DSv2? I don't think that simply porting an API from v1 isn't sufficient justification to add to v2, because v1 has so many problems. I'd like to see a description on the JIRA issue that states exactly what is added and how it changes behavior. Until then, please consider this a -1. |
Test build #101965 has finished for PR 23714 at commit
|
@rdblue @dongjoon-hyun Sorry about the confuse. This PR is for an internal API in file source V2 only (abstract class I have updated the PR description. |
hello, |
@gengliangwang, why are you proposing to add this API that applies only to internal sources? Why not design this to work with all sources? I think you also need to be more clear about what you're trying to commit. What does this do? It sounds like it probably validates that a file format can stores a type. For example, can ORC support DECIMAL(44, 6)? That is generally useful. Why should it be a side API for internal sources? In short:
|
@AsmaZgo I think you can see how the plan changes in log by setting |
@rdblue Thanks for the suggestion. Overall this is a nice-to-have feature. It is simple to validate the schema without the API. It seems overkill to make it a DS V2 API. |
@rdblue @gengliangwang I don't think this needs an API change. This is just a schema validation feature, which can be done in any DS v2 sources. Schema validation needs to know the user-specified schema when reading, or the schema of input data when writing, which are both available in the current ds v2 APIs. It looks to me that, this PR just re-implements the schema validation feature in file source v2 framework. |
retest this please. |
Test build #102188 has finished for PR 23714 at commit
|
@cloud-fan, should this validation be done for all sources, or just for file sources for some reason? I would also like to know exactly what validation is proposed in this PR. It hasn't been written up and I think that a summary of the changes that are proposed is required before we commit changes. |
every source can do schema validation if needed, the DS v2 API already allows you to do so. To save duplicated code, this PR proposes to add the entry point of schema validation in the base class of file source. @gengliangwang Can you post the details of the schema validation for these file sources? |
@rdblue you can treat this PR as implementing schema validation for file sources. We only do it for file sources because for now they are the only builtin DS v2 implementation in Spark. |
The supported data types of file sources:
For details please read #21667 This is very simple abstraction. |
OrcDataSourceV2.supportDataType(dataType) | ||
} | ||
|
||
override def toString: String = "ORC" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is much more useful information in this class than just the file format name. This should use a formatName
method instead so that toString
can be used to show the object itself when debugging or logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
* Returns whether this format supports the given [[DataType]] in write path. | ||
* By default all data types are supported. | ||
*/ | ||
def supportDataType(dataType: DataType): Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is awkward. Similar methods or traits use "supports" instead of "support". I think this should as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I proposed to change the name as "supports..." in previous PR: #23639
See the opposed comment here: #23639 (review)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is different. We are in a new class, and there is no supportXXX
method in this class that we need to follow.
schema.foreach { field => | ||
if (!supportDataType(field.dataType)) { | ||
throw new AnalysisException( | ||
s"$this data source does not support ${field.dataType.catalogString} data type.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use formatName
instead of $this
(toString)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
||
abstract class FileScan( | ||
sparkSession: SparkSession, | ||
fileIndex: PartitioningAwareFileIndex) extends Scan with Batch { | ||
fileIndex: PartitioningAwareFileIndex, | ||
readSchema: StructType) extends Scan with Batch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gengliangwang, why validate the read schema here in FileScan instead of in the scan builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the PR description:
the table schema is determined in
TableProvider.getTable
. The actual read schema can be a subset of the table schema. This PR proposes to validate the actual read schema inFileScan
@cloud-fan, thanks for the clarifications, particularly the updated description. I don't think we need to add type validation to v2 yet. This is something that could be done in that API, but I'm not sure that it is a good idea to standardize it because that would make assumptions about why types are not supported. For example, using a capability-based API for types like @gengliangwang, I flagged a couple of review items to address. In addition, I would recommend taking more care when answering questions. It is concerning that I wasn't able to get a concise answer from you about what you're proposing to change. Of course I can go down a rabbit-hole of trying to find out what your intent is by reading code and other pull requests. But it is much easier for everyone if you clearly state what you're proposing and why. |
@rdblue For example, when Spark try writing data contains Array type column to CSV source:
|
@@ -527,10 +524,6 @@ case class DataSource( | |||
* Returns a logical plan to write the given [[LogicalPlan]] out to this [[DataSource]]. | |||
*/ | |||
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = { | |||
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
93ecb68
to
d8240b3
Compare
Test build #102348 has finished for PR 23714 at commit
|
Test build #102350 has finished for PR 23714 at commit
|
Test build #102349 has finished for PR 23714 at commit
|
retest this please. |
+1 |
Test build #102357 has finished for PR 23714 at commit
|
@@ -530,7 +527,6 @@ case class DataSource( | |||
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { | |||
throw new AnalysisException("Cannot save interval data type into external storage.") | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change
Test build #102376 has finished for PR 23714 at commit
|
thanks, merging to master! |
…ework ## What changes were proposed in this pull request? The file source has a schema validation feature, which validates 2 schemas: 1. the user-specified schema when reading. 2. the schema of input data when writing. If a file source doesn't support the schema, we can fail the query earlier. This PR is to implement the same feature in the `FileDataSourceV2` framework. Comparing to `FileFormat`, `FileDataSourceV2` has multiple layers. The API is added in two places: 1. Read path: the table schema is determined in `TableProvider.getTable`. The actual read schema can be a subset of the table schema. This PR proposes to validate the actual read schema in `FileScan`. 2. Write path: validate the actual output schema in `FileWriteBuilder`. ## How was this patch tested? Unit test Closes apache#23714 from gengliangwang/schemaValidationV2. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ework ## What changes were proposed in this pull request? The file source has a schema validation feature, which validates 2 schemas: 1. the user-specified schema when reading. 2. the schema of input data when writing. If a file source doesn't support the schema, we can fail the query earlier. This PR is to implement the same feature in the `FileDataSourceV2` framework. Comparing to `FileFormat`, `FileDataSourceV2` has multiple layers. The API is added in two places: 1. Read path: the table schema is determined in `TableProvider.getTable`. The actual read schema can be a subset of the table schema. This PR proposes to validate the actual read schema in `FileScan`. 2. Write path: validate the actual output schema in `FileWriteBuilder`. ## How was this patch tested? Unit test Closes apache#23714 from gengliangwang/schemaValidationV2. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
The file source has a schema validation feature, which validates 2 schemas:
If a file source doesn't support the schema, we can fail the query earlier.
This PR is to implement the same feature in the
FileDataSourceV2
framework. Comparing toFileFormat
,FileDataSourceV2
has multiple layers. The API is added in two places:TableProvider.getTable
. The actual read schema can be a subset of the table schema. This PR proposes to validate the actual read schema inFileScan
.FileWriteBuilder
.How was this patch tested?
Unit test