-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV parser #21415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala # sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
|
The difference between this PR and #21296 is that the |
|
jenkins, retest this, please |
|
Test build #91061 has finished for PR 21415 at commit
|
|
Test build #91071 has finished for PR 21415 at commit
|
|
retest this please |
|
Test build #91079 has finished for PR 21415 at commit
|
|
jenkins, retest this, please |
|
Test build #91087 has finished for PR 21415 at commit
|
|
jenkins, retest this, please |
|
Test build #91096 has finished for PR 21415 at commit
|
|
jenkins, retest this, please |
|
Test build #91110 has finished for PR 21415 at commit
|
| defaultTimeZoneId: String, | ||
| defaultColumnNameOfCorruptRecord: String = "") = { | ||
| defaultColumnNameOfCorruptRecord: String = "", | ||
| columnPruning: Boolean = false) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us do not set the default value for columnPruning? We might lose the pruning opportunity if we call this constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor with disabled columnPruning is called in the CSV writer and 30 times from test suites like UnivocityParserSuite and CSVInferSchemaSuite where the pruning is not needed.
We might lose the pruning opportunity if we call this constructor.
ok. I will enable it by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
always enabling it is also not right. Can we remove the default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| val idf = spark.read | ||
| .schema(schema) | ||
| .csv(path.getCanonicalPath) | ||
| .select('f15, 'f10, 'f5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add an extreme test case? Try count(1) on csv files? That means zero column is required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added an assert for count(). In the CSVSuite, there are a few tests with count() over malformed csv files.
| -------------------------------------------------------------------------------------------- | ||
| Select 1000 columns 76910 / 78065 0.0 76909.8 1.0X | ||
| Select 100 columns 28625 / 32884 0.0 28625.1 2.7X | ||
| Select one column 22498 / 22669 0.0 22497.8 3.4X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
count(1) too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, added count()
|
Test build #91111 has finished for PR 21415 at commit
|
|
Test build #91126 has finished for PR 21415 at commit
|
|
LGTM Thanks! Merged to master. |
What changes were proposed in this pull request?
uniVocity parser allows to specify only required column names or indexes for parsing like:
In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns:
Note: Comparing to current implementation, the changes can return different result for malformed rows in the
DROPMALFORMEDandFAILFASTmodes if only subset of all columns is requested. To have previous behavior, setspark.sql.csv.parser.columnPruning.enabledtofalse.How was this patch tested?
It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks.