Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cloud reading for orc #2828

Merged
merged 13 commits into from
Jul 6, 2021
Merged

Conversation

wbo4958
Copy link
Collaborator

@wbo4958 wbo4958 commented Jun 28, 2021

This PR adds the cloud reading logic for ORC file format, and the implementation is quite similar to what we have done for Parquet file format.

I have done a round of performance test on a total of 100 non-partitioned ORC files, total 1.3G

PERFILE Cloud
1st 34.318 8.645
2nd 29.99 7.829
3rd 28.563 7.852
4th 25.099 7.601

Cloud reading has about ~3x speed up than PERFILE on 100 ORC files.

I can't compare the performance on more ORC files since #2850

This PR didn't fix #2850, I will fix in another PR.

@wbo4958 wbo4958 changed the title [WIP] add cloud reading for orc add cloud reading for orc Jun 29, 2021
@wbo4958
Copy link
Collaborator Author

wbo4958 commented Jun 29, 2021

build

@wbo4958 wbo4958 self-assigned this Jun 29, 2021
@wbo4958 wbo4958 requested a review from tgravescs June 29, 2021 13:17
* @param conf configuration
* @return cloud reading PartitionReader
*/
def buildBaseColumnarReaderForCloud(files: Array[PartitionedFile], conf: Configuration):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:

Suggested change
def buildBaseColumnarReaderForCloud(files: Array[PartitionedFile], conf: Configuration):
def buildBaseColumnarReaderForCloud(
files: Array[PartitionedFile],
conf: Configuration): PartitionReader[ColumnarBatch]

* @param conf the configuration
* @return coalescing reading PartitionReader
*/
def buildBaseColumnarReaderForCoalescing(files: Array[PartitionedFile], conf: Configuration):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:

Suggested change
def buildBaseColumnarReaderForCoalescing(files: Array[PartitionedFile], conf: Configuration):
def buildBaseColumnarReaderForCoalescing(
files: Array[PartitionedFile],
conf: Configuration): PartitionReader[ColumnarBatch]

@@ -549,6 +554,7 @@ case class GpuFileSourceScanExec(
None,
queryUsesInputFile)(rapidsConf)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary change ?

allMetrics,
queryUsesInputFile)

val factory = fsRelation.fileFormat match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it has the totally the same creation code for both parquet and orc, so it can be simplified as:

    val factory = fsRelation.fileFormat match {
      case _: ParquetFileFormat | OrcFileFormat =>
          GpuParquetMultiFilePartitionReaderFactory(
            sqlConf,
            broadcastedHadoopConf,
            relation.dataSchema,
            requiredSchema,
            relation.partitionSchema,
            pushedDownFilters.toArray,
            rapidsConf,
            allMetrics,
            queryUsesInputFile)
      case _ =>
        // never reach here
        throw new RuntimeException(s"File format ${fsRelation.fileFormat} is not supported yet")
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's different, the other one is GpuOrcMultiFilePartitionReaderFactory

private val isParquetFileFormat: Boolean = relation.fileFormat.isInstanceOf[ParquetFileFormat]
private val isPerFileReadEnabled = rapidsConf.isParquetPerFileReadEnabled || !isParquetFileFormat
// CSV should be always using PERFILE read type
val isPerFileReadEnabled = rapidsConf.isParquetPerFileReadEnabled ||
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Suggested change
val isPerFileReadEnabled = rapidsConf.isParquetPerFileReadEnabled ||
private val isPerFileReadEnabled = rapidsConf.isParquetPerFileReadEnabled ||

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx

@@ -82,6 +83,16 @@ abstract class GpuOrcScanBase(
new SerializableConfiguration(hadoopConf))
GpuOrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forget to remove lines 84 - 85 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good finding.

queryUsesInputFile: Boolean)
extends MultiFilePartitionReaderFactoryBase(sqlConf, broadcastedConf, rapidsConf) {

private val fileHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, filters)
Copy link
Collaborator

@firestarman firestarman Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to try to mark members as transient if it will not be used on exectuors, to avoid unnecessary and unexpected serializations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fileHandler will be used on the executor side.

*
* @return the file format short name
*/
override def getFileFormatShortName: String = "ORC"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:

Suggested change
override def getFileFormatShortName: String = "ORC"
override final def getFileFormatShortName: String = "ORC"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty change ?

} else {
val table = readToTable(currentStripes)
try {
table.map(GpuColumnVector.from(_, readDataSchema.toArray.map(_.dataType)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:
There is already an API extractTypes to convert the schema to array of DataType.

Suggested change
table.map(GpuColumnVector.from(_, readDataSchema.toArray.map(_.dataType)))
table.map(GpuColumnVector.from(_, extractTypes(readDataSchema)))

@wbo4958
Copy link
Collaborator Author

wbo4958 commented Jun 30, 2021

build

@tgravescs
Copy link
Collaborator

did you also validate the data read was the same as perfile? meaning we read it all correctly.
Were the tests run on partitioned data or non-partitioned?

@wbo4958
Copy link
Collaborator Author

wbo4958 commented Jun 30, 2021

did you also validate the data read was the same as perfile? meaning we read it all correctly.
Were the tests run on partitioned data or non-partitioned?

You mean I should validate the results of PERFILE and Cloud reading when doing performance test?

The performance test is based on non-partitioned files. I will run it again based on partitioned files.

@wbo4958
Copy link
Collaborator Author

wbo4958 commented Jul 2, 2021

Hi @tgravescs

I did another round of performance tests on a total of 153 partitioned ORC files, a total of 1.3G

the file path like ./partition_id_1=1/partition_id_2=2/part-00000-tid-2957597971096272323-dac313b4-e4c9-4626-8275-1fe9918ff0e0-1-6.c000.snappy.orc

PERFILE Cloud
1st 80.157 8.32
2nd 51.526 7.442
3rd 45.901 7.263
4th 35.775 7.162

I also did the comparison for the results collected back to the driver for both PERFILE and MULTITHREADED reading. Since the driver's memory limitation, I only tested on partitioned 41 orc files, total 316M. And the local sorted results are the same.

I also compared the result reading from CPU and MULTITHREADED, and the local sorted results are the same.

@wbo4958
Copy link
Collaborator Author

wbo4958 commented Jul 2, 2021

build

@wbo4958 wbo4958 requested a review from tgravescs July 2, 2021 07:43
@tgravescs
Copy link
Collaborator

You mean I should validate the results of PERFILE and Cloud reading when doing performance test?

Sorry for my delay, I was ooo. Yes I mean make sure the results coming back are the same as the CPU side results and we didn't drop or corrupt data. It sounds like your latest result verify this on a smaller set of data. You could have also written it out to files and then validated but it sounds like what you did is sufficient

@tgravescs tgravescs added the feature request New feature or request label Jul 6, 2021
@wbo4958
Copy link
Collaborator Author

wbo4958 commented Jul 6, 2021

Thx Tom

@wbo4958 wbo4958 merged commit 91b9a12 into NVIDIA:branch-21.08 Jul 6, 2021
@wbo4958 wbo4958 deleted the orc-cloud-reading branch July 6, 2021 21:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] "java.io.InterruptedIOException: getFileStatus on s3a://xxx" for ORC reading in Databricks 8.2 runtime
3 participants