Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/src/main/python/parquet_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

parquet_rdd = sc.newAPIHadoopFile(
path,
'parquet.avro.AvroParquetInputFormat',
'org.apache.parquet.avro.AvroParquetInputFormat',
'java.lang.Void',
'org.apache.avro.generic.IndexedRecord',
valueConverter='org.apache.spark.examples.pythonconverters.IndexedRecordToJavaConverter')
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>0.13.1</hive.version.short>
<derby.version>10.10.1.1</derby.version>
<parquet.version>1.6.0rc3</parquet.version>
<parquet.version>1.7.0</parquet.version>
<jblas.version>1.2.4</jblas.version>
<jetty.version>8.1.14.v20131031</jetty.version>
<orbit.version>3.0.0.v201112011016</orbit.version>
Expand Down Expand Up @@ -1069,13 +1069,13 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.deps.scope}</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.deps.scope}</scope>
Expand Down
4 changes: 2 additions & 2 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.Log
import parquet.hadoop.util.ContextUtil
import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
import org.apache.parquet.Log
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}

private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import java.util.{TimeZone, Calendar}
import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}

import jodd.datetime.JDateTime
import parquet.column.Dictionary
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
import parquet.schema.MessageType
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import java.nio.ByteBuffer

import com.google.common.io.BaseEncoding
import org.apache.hadoop.conf.Configuration
import parquet.filter2.compat.FilterCompat
import parquet.filter2.compat.FilterCompat._
import parquet.filter2.predicate.FilterApi._
import parquet.filter2.predicate.{FilterApi, FilterPredicate}
import parquet.io.api.Binary
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.compat.FilterCompat._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
import org.apache.parquet.io.api.Binary

import org.apache.spark.SparkEnv
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.FsAction
import org.apache.spark.sql.types.{StructType, DataType}
import parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
import parquet.hadoop.metadata.CompressionCodecName
import parquet.schema.MessageType
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
Expand Down Expand Up @@ -107,7 +107,7 @@ private[sql] object ParquetRelation {
//
// Therefore we need to force the class to be loaded.
// This should really be resolved by Parquet.
Class.forName(classOf[parquet.Log].getName)
Class.forName(classOf[org.apache.parquet.Log].getName)

// Note: Logger.getLogger("parquet") has a default logger
// that appends to Console which needs to be cleared.
Expand All @@ -127,7 +127,7 @@ private[sql] object ParquetRelation {
type RowType = org.apache.spark.sql.catalyst.expressions.GenericMutableRow

// The compression type
type CompressionType = parquet.hadoop.metadata.CompressionCodecName
type CompressionType = org.apache.parquet.hadoop.metadata.CompressionCodecName

// The parquet compression short names
val shortParquetCompressionCodecNames = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat => NewFileOutputFormat}
import parquet.hadoop._
import parquet.hadoop.api.ReadSupport.ReadContext
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
import parquet.hadoop.util.ContextUtil
import parquet.io.ParquetDecodingException
import parquet.schema.MessageType
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
import org.apache.parquet.hadoop.metadata.GlobalMetaData
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.io.ParquetDecodingException
import org.apache.parquet.schema.MessageType

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapred.SparkHadoopMapRedUtil
Expand Down Expand Up @@ -78,7 +78,7 @@ private[sql] case class ParquetTableScan(
}.toArray

protected override def doExecute(): RDD[Row] = {
import parquet.filter2.compat.FilterCompat.FilterPredicateCompat
import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat

val sc = sqlContext.sparkContext
val job = new Job(sc.hadoopConfiguration)
Expand Down Expand Up @@ -136,7 +136,7 @@ private[sql] case class ParquetTableScan(
baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
val partValue = "([^=]+)=([^=]+)".r
val partValues =
split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
split.asInstanceOf[org.apache.parquet.hadoop.ParquetInputSplit]
.getPath
.toString
.split("/")
Expand Down Expand Up @@ -378,7 +378,7 @@ private[sql] case class InsertIntoParquetTable(
* to imported ones.
*/
private[parquet] class AppendingParquetOutputFormat(offset: Int)
extends parquet.hadoop.ParquetOutputFormat[Row] {
extends org.apache.parquet.hadoop.ParquetOutputFormat[Row] {
// override to accept existing directories as valid output directory
override def checkOutputSpecs(job: JobContext): Unit = {}
var committer: OutputCommitter = null
Expand Down Expand Up @@ -431,15 +431,15 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
* RecordFilter we want to use.
*/
private[parquet] class FilteringParquetRowInputFormat
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
extends org.apache.parquet.hadoop.ParquetInputFormat[Row] with Logging {

private var fileStatuses = Map.empty[Path, FileStatus]

override def createRecordReader(
inputSplit: InputSplit,
taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = {

import parquet.filter2.compat.FilterCompat.NoOpFilter
import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter

val readSupport: ReadSupport[Row] = new RowReadSupport()

Expand Down Expand Up @@ -501,7 +501,7 @@ private[parquet] class FilteringParquetRowInputFormat
globalMetaData = new GlobalMetaData(globalMetaData.getSchema,
mergedMetadata, globalMetaData.getCreatedBy)

val readContext = getReadSupport(configuration).init(
val readContext = ParquetInputFormat.getReadSupportInstance(configuration).init(
new InitContext(configuration,
globalMetaData.getKeyValueMetaData,
globalMetaData.getSchema))
Expand Down Expand Up @@ -531,8 +531,8 @@ private[parquet] class FilteringParquetRowInputFormat
minSplitSize: JLong,
readContext: ReadContext): JList[ParquetInputSplit] = {

import parquet.filter2.compat.FilterCompat.Filter
import parquet.filter2.compat.RowGroupFilter
import org.apache.parquet.filter2.compat.FilterCompat.Filter
import org.apache.parquet.filter2.compat.RowGroupFilter

import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache

Expand All @@ -547,7 +547,7 @@ private[parquet] class FilteringParquetRowInputFormat
// https://github.com/apache/incubator-parquet-mr/pull/17
// is resolved
val generateSplits =
Class.forName("parquet.hadoop.ClientSideMetadataSplitStrategy")
Class.forName("org.apache.parquet.hadoop.ClientSideMetadataSplitStrategy")
.getDeclaredMethods.find(_.getName == "generateSplits").getOrElse(
sys.error(s"Failed to reflectively invoke ClientSideMetadataSplitStrategy.generateSplits"))
generateSplits.setAccessible(true)
Expand Down Expand Up @@ -612,7 +612,7 @@ private[parquet] class FilteringParquetRowInputFormat
// https://github.com/apache/incubator-parquet-mr/pull/17
// is resolved
val generateSplits =
Class.forName("parquet.hadoop.TaskSideMetadataSplitStrategy")
Class.forName("org.apache.parquet.hadoop.TaskSideMetadataSplitStrategy")
.getDeclaredMethods.find(_.getName == "generateTaskSideMDSplits").getOrElse(
sys.error(
s"Failed to reflectively invoke TaskSideMetadataSplitStrategy.generateTaskSideMDSplits"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.spark.sql.parquet
import java.util.{HashMap => JHashMap}

import org.apache.hadoop.conf.Configuration
import parquet.column.ParquetProperties
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.api.ReadSupport.ReadContext
import parquet.hadoop.api.{ReadSupport, WriteSupport}
import parquet.io.api._
import parquet.schema.MessageType
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.hadoop.api.{ReadSupport, WriteSupport}
import org.apache.parquet.io.api._
import org.apache.parquet.schema.MessageType

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
import parquet.hadoop.util.ContextUtil
import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
import parquet.schema.Type.Repetition
import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
import org.apache.parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import parquet.filter2.predicate.FilterApi
import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
import org.apache.spark.broadcast.Broadcast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.sql.parquet.timestamp

import java.nio.{ByteBuffer, ByteOrder}

import parquet.Preconditions
import parquet.io.api.{Binary, RecordConsumer}
import org.apache.parquet.Preconditions
import org.apache.parquet.io.api.{Binary, RecordConsumer}

private[parquet] class NanoTime extends Serializable {
private var julianDay = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
Expand Down
10 changes: 5 additions & 5 deletions sql/core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
log4j.appender.FA.Threshold = INFO

# Some packages are noisy for no good reason.
log4j.additivity.parquet.hadoop.ParquetRecordReader=false
log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false
log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF

log4j.additivity.parquet.hadoop.ParquetOutputCommitter=false
log4j.logger.parquet.hadoop.ParquetOutputCommitter=OFF
log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false
log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF

log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
Expand All @@ -52,5 +52,5 @@ log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF

# Parquet related logging
log4j.logger.parquet.hadoop=WARN
log4j.logger.org.apache.parquet.hadoop=WARN
log4j.logger.org.apache.spark.sql.parquet=INFO
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql.parquet

import org.scalatest.BeforeAndAfterAll
import parquet.filter2.predicate.Operators._
import parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.Operators._
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.scalatest.BeforeAndAfterAll
import parquet.example.data.simple.SimpleGroup
import parquet.example.data.{Group, GroupWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
import parquet.io.api.RecordConsumer
import parquet.schema.{MessageType, MessageTypeParser}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName}
import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.Row
Expand Down Expand Up @@ -400,7 +400,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
finally {
configuration.set("spark.sql.parquet.output.committer.class",
"parquet.hadoop.ParquetOutputCommitter")
"org.apache.parquet.hadoop.ParquetOutputCommitter")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.parquet
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

import parquet.schema.MessageTypeParser
import org.apache.parquet.schema.MessageTypeParser

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.ScalaReflection
Expand Down