Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcInputFormat;
import org.apache.orc.storage.ql.exec.vector.*;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.VectorizedRowBatchWrap;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.types.*;
Expand All @@ -48,8 +48,8 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
// The capacity of vectorized batch.
private int capacity;

// Vectorized ORC Row Batch
private VectorizedRowBatch batch;
// Vectorized ORC Row Batch wrap.
private VectorizedRowBatchWrap wrap;

/**
* The column IDs of the physical ORC file schema which are required by this reader.
Expand Down Expand Up @@ -146,8 +146,8 @@ public void initBatch(
int[] requestedDataColIds,
int[] requestedPartitionColIds,
InternalRow partitionValues) {
batch = orcSchema.createRowBatch(capacity);
assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.
wrap = new VectorizedRowBatchWrap(orcSchema.createRowBatch(capacity));
assert(!wrap.batch().selectedInUse); // `selectedInUse` should be initialized with `false`.
assert(requiredFields.length == requestedDataColIds.length);
assert(requiredFields.length == requestedPartitionColIds.length);
// If a required column is also partition column, use partition value and don't read from file.
Expand Down Expand Up @@ -180,7 +180,7 @@ public void initBatch(
missingCol.setIsConstant();
orcVectorWrappers[i] = missingCol;
} else {
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
orcVectorWrappers[i] = new OrcColumnVector(dt, wrap.batch().cols[colId]);
}
}
}
Expand All @@ -193,8 +193,8 @@ public void initBatch(
* by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns.
*/
private boolean nextBatch() throws IOException {
recordReader.nextBatch(batch);
int batchSize = batch.size;
recordReader.nextBatch(wrap.batch());
int batchSize = wrap.batch().size;
if (batchSize == 0) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.orc

import org.apache.hadoop.io._
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
Expand Down Expand Up @@ -109,14 +108,13 @@ class OrcDeserializer(
updater.set(ordinal, bytes)

case DateType => (ordinal, value) =>
updater.setInt(ordinal, DateTimeUtils.fromJavaDate(value.asInstanceOf[DateWritable].get))
updater.setInt(ordinal, DateTimeUtils.fromJavaDate(OrcShimUtils.getSqlDate(value)))

case TimestampType => (ordinal, value) =>
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))

case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
val v = Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
val v = OrcShimUtils.getDecimal(value)
v.changePrecision(precision, scale)
updater.set(ordinal, v)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable

import org.apache.spark.sql.sources.{And, Filter}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -56,27 +56,7 @@ import org.apache.spark.sql.types._
* builder methods mentioned above can only be found in test code, where all tested filters are
* known to be convertible.
*/
private[sql] object OrcFilters {
private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
filters match {
case Seq() => None
case Seq(filter) => Some(filter)
case Seq(filter1, filter2) => Some(And(filter1, filter2))
case _ => // length > 2
val (left, right) = filters.splitAt(filters.length / 2)
Some(And(buildTree(left).get, buildTree(right).get))
}
}

// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
private def quoteAttributeNameIfNeeded(name: String) : String = {
if (!name.contains("`") && name.contains(".")) {
s"`$name`"
} else {
name
}
}
private[sql] object OrcFilters extends OrcFiltersBase {

/**
* Create ORC filter as a SearchArgument instance.
Expand All @@ -101,16 +81,6 @@ private[sql] object OrcFilters {
} yield filter
}

/**
* Return true if this is a searchable type in ORC.
* Both CharType and VarcharType are cleaned at AstBuilder.
*/
private def isSearchableType(dataType: DataType) = dataType match {
case BinaryType => false
case _: AtomicType => true
case _ => false
}

/**
* Get PredicateLeafType which is corresponding to the given DataType.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.orc

import org.apache.spark.sql.sources.{And, Filter}
import org.apache.spark.sql.types.{AtomicType, BinaryType, DataType}

/**
* Methods that can be shared when upgrading the built-in Hive.
*/
trait OrcFiltersBase {

private[sql] def buildTree(filters: Seq[Filter]): Option[Filter] = {
filters match {
case Seq() => None
case Seq(filter) => Some(filter)
case Seq(filter1, filter2) => Some(And(filter1, filter2))
case _ => // length > 2
val (left, right) = filters.splitAt(filters.length / 2)
Some(And(buildTree(left).get, buildTree(right).get))
}
}

// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
protected def quoteAttributeNameIfNeeded(name: String) : String = {
if (!name.contains("`") && name.contains(".")) {
s"`$name`"
} else {
name
}
}

/**
* Return true if this is a searchable type in ORC.
* Both CharType and VarcharType are cleaned at AstBuilder.
*/
protected def isSearchableType(dataType: DataType) = dataType match {
case BinaryType => false
case _: AtomicType => true
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.orc
import org.apache.hadoop.io._
import org.apache.orc.TypeDescription
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
Expand Down Expand Up @@ -139,14 +137,7 @@ class OrcSerializer(dataSchema: StructType) {
new BytesWritable(getter.getBinary(ordinal))

case DateType =>
if (reuseObj) {
val result = new DateWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter, ordinal) => new DateWritable(getter.getInt(ordinal))
}
OrcShimUtils.getDateWritable(reuseObj)

// The following cases are already expensive, reusing object or not doesn't matter.

Expand All @@ -156,9 +147,8 @@ class OrcSerializer(dataSchema: StructType) {
result.setNanos(ts.getNanos)
result

case DecimalType.Fixed(precision, scale) => (getter, ordinal) =>
val d = getter.getDecimal(ordinal, precision, scale)
new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
case DecimalType.Fixed(precision, scale) =>
OrcShimUtils.getHiveDecimalWritable(precision, scale)

case st: StructType => (getter, ordinal) =>
val result = createOrcValue(st).asInstanceOf[OrcStruct]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.orc

import java.sql.Date

import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch
import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument}
import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.types.Decimal

/**
* Various utilities for ORC used to upgrade the built-in Hive.
*/
private[sql] object OrcShimUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal but just leave a note for the future, private[sql] should be removed if that's under execution package per SPARK-16964


class VectorizedRowBatchWrap(val batch: VectorizedRowBatch) {}

private[sql] type Operator = OrcOperator
private[sql] type SearchArgument = OrcSearchArgument
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add these two type aliases to avoid copying OrcV1FilterSuite for Hive 2.3.4.


def getSqlDate(value: Any): Date = value.asInstanceOf[DateWritable].get
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The below functions to avoid copying OrcDeserializer and OrcSerializer for Hive 2.3.4.


def getDecimal(value: Any): Decimal = {
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
}

def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
if (reuseObj) {
val result = new DateWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter: SpecializedGetters, ordinal: Int) =>
new DateWritable(getter.getInt(ordinal))
}
}

def getHiveDecimalWritable(precision: Int, scale: Int):
(SpecializedGetters, Int) => HiveDecimalWritable = {
(getter, ordinal) =>
val d = getter.getDecimal(ordinal, precision, scale)
new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,6 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
checkFilterPredicate(df, predicate, checkLogicalOperator)
}

protected def checkNoFilterPredicate
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to super class

(predicate: Predicate, noneSupported: Boolean = false)
(implicit df: DataFrame): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))

query.queryExecution.optimizedPlan match {
case PhysicalOperation(_, filters,
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
val scanBuilder = orcTable.newScanBuilder(options)
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
val pushedFilters = scanBuilder.pushedFilters()
if (noneSupported) {
assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
} else {
assert(pushedFilters.nonEmpty, "No filter is pushed down")
val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
}

case _ =>
throw new AnalysisException("Can not match OrcTable in the query.")
}
}

test("filter pushdown - integer") {
withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import scala.reflect.runtime.universe.TypeTag
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION

Expand Down Expand Up @@ -104,4 +108,32 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
assert(actual < numRows)
}
}

protected def checkNoFilterPredicate
(predicate: Predicate, noneSupported: Boolean = false)
(implicit df: DataFrame): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
val query = df
.select(output.map(e => Column(e)): _*)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No big deal, but .map(Column)?

.where(Column(predicate))

query.queryExecution.optimizedPlan match {
case PhysicalOperation(_, filters,
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
val scanBuilder = orcTable.newScanBuilder(options)
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
val pushedFilters = scanBuilder.pushedFilters()
if (noneSupported) {
assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
} else {
assert(pushedFilters.nonEmpty, "No filter is pushed down")
val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
}

case _ =>
throw new AnalysisException("Can not match OrcTable in the query.")
}
}
}
Loading