Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
Expand Down Expand Up @@ -191,6 +191,21 @@ class SparkHadoopUtil extends Logging {
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}

/**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
*/
def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
def recurse(path: Path) = {
val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
}

val baseStatus = fs.getFileStatus(basePath)
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}
}

object SparkHadoopUtil {
Expand Down
9 changes: 7 additions & 2 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ def registerRDDAsTable(self, rdd, tableName):
else:
raise ValueError("Can only register DataFrame as table")

def parquetFile(self, path):
def parquetFile(self, *paths):
"""Loads a Parquet file, returning the result as a L{DataFrame}.

>>> import tempfile, shutil
Expand All @@ -1483,7 +1483,12 @@ def parquetFile(self, path):
>>> sorted(df.collect()) == sorted(df2.collect())
True
"""
jdf = self._ssql_ctx.parquetFile(path)
gateway = self._sc._gateway
jpath = paths[0]
jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths) - 1)
for i in range(1, len(paths)):
jpaths[i] = paths[i]
jdf = self._ssql_ctx.parquetFile(jpath, jpaths)
return DataFrame(jdf, self)

def jsonFile(self, path, schema=None, samplingRatio=1.0):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.{BinaryType, BooleanType}

object InterpretedPredicate {
def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
Expand Down Expand Up @@ -175,7 +175,10 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
null
} else {
val r = right.eval(input)
if (r == null) null else l == r
if (r == null) null
else if (left.dataType != BinaryType) l == r
else BinaryType.ordering.compare(
l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]]) == 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes SPARK-5509. Hit this bug while testing Parquet filters for new data source implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this is really expensive. i'd use sth like this: http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/primitives/UnsignedBytes.html

If you don't want to change it as part of this PR, file a jira ticket to track it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed SPARK-5553 to track this. I'd like to make sure equality comparison for binary types works properly in this PR. Also, we're already using Ordering to compare binary values in LessThan and GreaterThan etc., so at least this isn't a performance regression.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.types

import java.sql.Timestamp

import scala.collection.mutable.ArrayBuffer
import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
Expand All @@ -29,6 +30,7 @@ import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.ScalaReflectionLock
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
Expand Down Expand Up @@ -159,7 +161,6 @@ object DataType {
case failure: NoSuccess =>
throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
}

}

protected[types] def buildFormattedString(
Expand Down Expand Up @@ -754,6 +755,57 @@ object StructType {
def apply(fields: java.util.List[StructField]): StructType = {
StructType(fields.toArray.asInstanceOf[Array[StructField]])
}

private[sql] def merge(left: DataType, right: DataType): DataType =
(left, right) match {
case (ArrayType(leftElementType, leftContainsNull),
ArrayType(rightElementType, rightContainsNull)) =>
ArrayType(
merge(leftElementType, rightElementType),
leftContainsNull || rightContainsNull)

case (MapType(leftKeyType, leftValueType, leftContainsNull),
MapType(rightKeyType, rightValueType, rightContainsNull)) =>
MapType(
merge(leftKeyType, rightKeyType),
merge(leftValueType, rightValueType),
leftContainsNull || rightContainsNull)

case (StructType(leftFields), StructType(rightFields)) =>
val newFields = ArrayBuffer.empty[StructField]

leftFields.foreach {
case leftField @ StructField(leftName, leftType, leftNullable, _) =>
rightFields
.find(_.name == leftName)
.map { case rightField @ StructField(_, rightType, rightNullable, _) =>
leftField.copy(
dataType = merge(leftType, rightType),
nullable = leftNullable || rightNullable)
}
.orElse(Some(leftField))
.foreach(newFields += _)
}

rightFields
.filterNot(f => leftFields.map(_.name).contains(f.name))
.foreach(newFields += _)

StructType(newFields)

case (DecimalType.Fixed(leftPrecision, leftScale),
DecimalType.Fixed(rightPrecision, rightScale)) =>
DecimalType(leftPrecision.max(rightPrecision), leftScale.max(rightScale))

case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
if leftUdt.userClass == rightUdt.userClass => leftUdt

case (leftType, rightType) if leftType == rightType =>
leftType

case _ =>
throw new SparkException(s"Failed to merge incompatible data types $left and $right")
}
}


Expand Down Expand Up @@ -890,6 +942,20 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
s"struct<${fieldTypes.mkString(",")}>"
}

/**
* Merges with another schema (`StructType`). For a struct field A from `this` and a struct field
* B from `that`,
*
* 1. If A and B have the same name and data type, they are merged to a field C with the same name
* and data type. C is nullable if and only if either A or B is nullable.
* 2. If A doesn't exist in `that`, it's included in the result schema.
* 3. If B doesn't exist in `this`, it's also included in the result schema.
* 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
* thrown.
*/
private[sql] def merge(that: StructType): StructType =
StructType.merge(this, that).asInstanceOf[StructType]
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,11 @@ private[sql] class DataFrameImpl protected[sql](
}

override def saveAsParquetFile(path: String): Unit = {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
if (sqlContext.conf.parquetUseDataSourceApi) {
save("org.apache.spark.sql.parquet", "path" -> path)
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
}

override def saveAsTable(tableName: String): Unit = {
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ private[spark] object SQLConf {
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"

val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
Expand Down Expand Up @@ -105,6 +106,10 @@ private[sql] class SQLConf extends Serializable {
private[spark] def parquetFilterPushDown =
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean

/** When true uses Parquet implementation based on data source API */
private[spark] def parquetUseDataSourceApi =
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean

/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean

Expand Down
20 changes: 13 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ package org.apache.spark.sql
import java.beans.Introspector
import java.util.Properties

import scala.collection.immutable
import scala.collection.JavaConversions._
import scala.collection.immutable
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.{SparkContext, Partition}
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
Expand All @@ -36,11 +35,12 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.json._
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.json._
import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation, _}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.{Partition, SparkContext}

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -303,8 +303,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
def parquetFile(path: String): DataFrame =
DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
@scala.annotation.varargs
def parquetFile(path: String, paths: String*): DataFrame =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just path: String*?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is per @rxin's this comment, which I think makes sense.

if (conf.parquetUseDataSourceApi) {
baseRelationToDataFrame(parquet.ParquetRelation2(path +: paths, Map.empty)(this))
} else {
DataFrame(this, parquet.ParquetRelation(
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
}

/**
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.parquet._
import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{SQLContext, Strategy, execution}

private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType


private[sql] class DefaultSource
extends RelationProvider with SchemaRelationProvider with CreateableRelationProvider {
extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {

/** Returns a new base relation with the parameters. */
override def createRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
val attributesSize = attributes.size
if (attributesSize > record.size) {
throw new IndexOutOfBoundsException(
s"Trying to write more fields than contained in row (${attributesSize}>${record.size})")
s"Trying to write more fields than contained in row ($attributesSize > ${record.size})")
}

var index = 0
Expand Down Expand Up @@ -325,7 +325,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
val attributesSize = attributes.size
if (attributesSize > record.size) {
throw new IndexOutOfBoundsException(
s"Trying to write more fields than contained in row (${attributesSize}>${record.size})")
s"Trying to write more fields than contained in row ($attributesSize > ${record.size})")
}

var index = 0
Expand All @@ -348,10 +348,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
index: Int): Unit = {
ctype match {
case StringType => writer.addBinary(
Binary.fromByteArray(
record(index).asInstanceOf[String].getBytes("utf-8")
)
)
Binary.fromByteArray(record(index).asInstanceOf[String].getBytes("utf-8")))
case BinaryType => writer.addBinary(
Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
case IntegerType => writer.addInteger(record.getInt(index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,23 @@ package org.apache.spark.sql.parquet

import java.io.IOException

import scala.collection.mutable.ArrayBuffer
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job

import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
import parquet.hadoop.util.ContextUtil
import parquet.schema.{Type => ParquetType, Types => ParquetTypes, PrimitiveType => ParquetPrimitiveType, MessageType}
import parquet.schema.{GroupType => ParquetGroupType, OriginalType => ParquetOriginalType, ConversionPatterns, DecimalMetadata}
import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
import parquet.schema.Type.Repetition
import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types._
import org.apache.spark.{Logging, SparkException}

// Implicits
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -285,7 +284,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
ctype: DataType,
name: String,
nullable: Boolean = true,
inArray: Boolean = false,
inArray: Boolean = false,
toThriftSchemaNames: Boolean = false): ParquetType = {
val repetition =
if (inArray) {
Expand Down Expand Up @@ -340,7 +339,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
case StructType(structFields) => {
val fields = structFields.map {
field => fromDataType(field.dataType, field.name, field.nullable,
field => fromDataType(field.dataType, field.name, field.nullable,
inArray = false, toThriftSchemaNames)
}
new ParquetGroupType(repetition, name, fields.toSeq)
Expand Down
Loading