Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix merge from 22.04 to 22.06 #5059

Merged
merged 5 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_gen import *
from marks import *
from pyspark.sql.types import *
from spark_session import with_cpu_session, is_before_spark_330
from spark_session import with_cpu_session, is_before_spark_320, is_before_spark_330
from parquet_test import _nested_pruning_schemas
from conftest import is_databricks_runtime

Expand Down Expand Up @@ -232,6 +232,30 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs):
lambda spark : spark.read.orc(data_path),
conf=all_confs)

# Setup external table by altering column names
def setup_external_table_with_forced_positions(spark, table_name, data_path):
rename_cols_query = "CREATE EXTERNAL TABLE `{}` (`col10` INT, `_c1` STRING, `col30` DOUBLE) STORED AS orc LOCATION '{}'".format(table_name, data_path)
spark.sql(rename_cols_query).collect

@pytest.mark.skipif(is_before_spark_320(), reason='ORC forced positional evolution support is added in Spark-3.2')
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@pytest.mark.parametrize('forced_position', ["true", "false"])
@pytest.mark.parametrize('orc_impl', ["native", "hive"])
def test_orc_forced_position(spark_tmp_path, spark_tmp_table_factory, reader_confs, forced_position, orc_impl):
orc_gens = [int_gen, string_gen, double_gen]
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
data_path = spark_tmp_path + 'ORC_DATA'
with_cpu_session(lambda spark : gen_df(spark, gen_list).write.orc(data_path))
table_name = spark_tmp_table_factory.get()
with_cpu_session(lambda spark : setup_external_table_with_forced_positions(spark, table_name, data_path))

all_confs = copy_and_update(reader_confs, {
'orc.force.positional.evolution': forced_position,
'spark.sql.orc.impl': orc_impl})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT * FROM {}".format(table_name)),
conf=all_confs)

# In this we are reading the data, but only reading the key the data was partitioned by
@pytest.mark.parametrize('v1_enabled_list', ["", "orc"])
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,9 @@ trait OrcShims311until320Base {
def typeDescriptionEqual(lhs: TypeDescription, rhs: TypeDescription): Boolean = {
lhs.equals(rhs)
}

// forcePositionalEvolution is available from Spark-3.2. So setting this as false.
def forcePositionalEvolution(conf:Configuration): Boolean = {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,9 @@ object OrcShims {
def typeDescriptionEqual(lhs: TypeDescription, rhs: TypeDescription): Boolean = {
lhs.equals(rhs, false)
}

// forcePositionalEvolution is available from Spark-3.2.
def forcePositionalEvolution(conf: Configuration): Boolean = {
OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf)
}
}
59 changes: 45 additions & 14 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ private case class GpuOrcFileFilterHandler(
// After getting the necessary information from ORC reader, we must close the ORC reader
OrcShims.withReader(OrcFile.createReader(filePath, orcFileReaderOpts)) { orcReader =>
val resultedColPruneInfo = requestedColumnIds(isCaseSensitive, dataSchema,
readDataSchema, orcReader)
readDataSchema, orcReader, conf)
if (resultedColPruneInfo.isEmpty) {
// Be careful when the OrcPartitionReaderContext is null, we should change
// reader to EmptyPartitionReader for throwing exception
Expand Down Expand Up @@ -874,15 +874,20 @@ private case class GpuOrcFileFilterHandler(
isCaseSensitive: Boolean,
dataSchema: StructType,
requiredSchema: StructType,
reader: Reader): Option[(Array[Int], Boolean)] = {
reader: Reader,
conf: Configuration): Option[(Array[Int], Boolean)] = {
val orcFieldNames = reader.getSchema.getFieldNames.asScala
if (orcFieldNames.isEmpty) {
// SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
None
} else {
if (orcFieldNames.forall(_.startsWith("_col"))) {
// This is a ORC file written by Hive, no field names in the physical schema, assume the
// physical schema maps to the data scheme by index.
if (OrcShims.forcePositionalEvolution(conf) || orcFieldNames.forall(_.startsWith("_col"))) {
// This is either an ORC file written by an old version of Hive and there are no field
// names in the physical schema, or `orc.force.positional.evolution=true` is forced because
// the file was written by a newer version of Hive where
// `orc.force.positional.evolution=true` was set (possibly because columns were renamed so
// the physical schema doesn't match the data schema).
// In these cases we map the physical schema to the data schema by index.
assert(orcFieldNames.length <= dataSchema.length, "The given data schema " +
s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " +
"no idea which columns were dropped, fail to read.")
Expand Down Expand Up @@ -1055,11 +1060,24 @@ private case class GpuOrcFileFilterHandler(
} else {
CaseInsensitiveMap[TypeDescription](mapSensitive)
}
rSchema.getFieldNames.asScala.zip(rSchema.getChildren.asScala)
.foreach { case (rName, rChild) =>
val fChild = name2ChildMap(rName)
setMapping(fChild.getId)
updateMapping(rChild, fChild)
// Config to match the top level columns using position rather than column names
if (OrcShims.forcePositionalEvolution(conf)) {
val rChildren = rSchema.getChildren
val fChildren = fSchema.getChildren
if (rChildren != null) {
rChildren.asScala.zipWithIndex.foreach { case (rChild, id) =>
val fChild = fChildren.get(id)
setMapping(fChild.getId)
updateMapping(rChild, fChild)
}
}
} else {
rSchema.getFieldNames.asScala.zip(rSchema.getChildren.asScala)
.foreach { case (rName, rChild) =>
val fChild = name2ChildMap(rName)
setMapping(fChild.getId)
updateMapping(rChild, fChild)
}
}
} else {
val rChildren = rSchema.getChildren
Expand Down Expand Up @@ -1239,13 +1257,26 @@ private case class GpuOrcFileFilterHandler(
}
val readerFieldNames = readSchema.getFieldNames.asScala
val readerChildren = readSchema.getChildren.asScala
val fileTypesWithIndex = for (
(ftMap, index) <- fileTypesMap.zipWithIndex) yield (index, ftMap)

val prunedReadSchema = TypeDescription.createStruct()
val prunedInclude = mutable.ArrayBuffer(include(readSchema.getId))
readerFieldNames.zip(readerChildren).foreach { case (readField, readType) =>
// Skip check for the missing names because a column with nulls will be added
// for each of them.
if (fileTypesMap.contains(readField)) {
val readerMap = readerFieldNames.zip(readerChildren).toList
readerMap.zipWithIndex.foreach { case ((readField, readType), idx) =>
// Config to match the top level columns using position rather than column names
if (OrcShims.forcePositionalEvolution(conf)) {
fileTypesWithIndex(idx) match {
case (_, fileReadType) => if (fileReadType == readType) {
val (newChild, childInclude) =
checkSchemaCompatibility(fileReadType, readType, isCaseAware, include)
prunedReadSchema.addField(readField, newChild)
prunedInclude ++= childInclude
}
}
} else if (fileTypesMap.contains(readField)) {
// Skip check for the missing names because a column with nulls will be added
// for each of them.
val (newChild, childInclude) = checkSchemaCompatibility(
fileTypesMap(readField), readType, isCaseAware, include)
prunedReadSchema.addField(readField, newChild)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
import org.apache.parquet.hadoop.metadata._
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, PrimitiveType, Type, Types}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

Expand Down Expand Up @@ -370,8 +369,8 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte

val filePath = new Path(new URI(file.filePath))
//noinspection ScalaDeprecation
val inputFile = HadoopInputFile.fromPath(filePath, conf)
val footer = withResource(ParquetFileReader.open(inputFile))(_.getFooter)
val footer = ParquetFileReader.readFooter(conf, filePath,
ParquetMetadataConverter.range(file.start, file.start + file.length))
val fileSchema = footer.getFileMetaData.getSchema
val pushedFilters = if (enableParquetFilterPushDown) {
val parquetFilters = SparkShimImpl.getParquetFilters(fileSchema, pushDownDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes, Literal, NullIntolerant, Predicate, RegExpExtract, RLike, StringSplit, StringToMap, SubstringIndex, TernaryExpression}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes, InputFileName, Literal, NullIntolerant, Predicate, RegExpExtract, RLike, StringSplit, StringToMap, SubstringIndex, TernaryExpression}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -969,6 +969,12 @@ class GpuRegExpExtractMeta(
override def tagExprForGpu(): Unit = {
GpuRegExpUtils.tagForRegExpEnabled(this)

ShimLoader.getShimVersion match {
case _: DatabricksShimVersion if expr.subject.isInstanceOf[InputFileName] =>
willNotWorkOnGpu("avoiding Databricks Delta problem with regexp extract")
case _ =>
}

def countGroups(regexp: RegexAST): Int = {
regexp match {
case RegexGroup(_, term) => 1 + countGroups(term)
Expand Down
6 changes: 0 additions & 6 deletions tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,6 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

package com.nvidia.spark.rapids

import java.io.{File, FilenameFilter}
import java.io.File
import java.nio.charset.StandardCharsets

import com.nvidia.spark.rapids.shims.SparkShimImpl
import org.apache.commons.io.filefilter.WildcardFileFilter
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.io.FileCommitProtocol
Expand All @@ -34,20 +32,22 @@ import org.apache.spark.sql.rapids.BasicColumnarWriteJobStatsTracker
/**
* Tests for writing Parquet files with the GPU.
*/
@scala.annotation.nowarn(
"msg=method readFooters in class ParquetFileReader is deprecated"
)
class ParquetWriterSuite extends SparkQueryCompareTestSuite {
test("file metadata") {
val tempFile = File.createTempFile("stats", ".parquet")
try {
withGpuSparkSession(spark => {
val df = mixedDfWithNulls(spark)
df.write.mode("overwrite").parquet(tempFile.getAbsolutePath)
val filter: FilenameFilter = new WildcardFileFilter("*.parquet")
val inputFile = HadoopInputFile.fromPath(
new Path(tempFile.listFiles(filter)(0).getAbsolutePath),
spark.sparkContext.hadoopConfiguration)
val parquetMeta = withResource(ParquetFileReader.open(inputFile))(_.getFooter)

val fileMeta = parquetMeta.getFileMetaData
val footer = ParquetFileReader.readFooters(spark.sparkContext.hadoopConfiguration,
new Path(tempFile.getAbsolutePath)).get(0)

val parquetMeta = footer.getParquetMetadata
val fileMeta = footer.getParquetMetadata.getFileMetaData
val extra = fileMeta.getKeyValueMetaData
assert(extra.containsKey("org.apache.spark.version"))
assert(extra.containsKey("org.apache.spark.sql.parquet.row.metadata"))
Expand Down