Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister {
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Map[String, String] = {
def computeNumFeatures(): Int = {
val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse {
Copy link
Contributor Author

@rxin rxin Jun 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cloud-fan / @liancheng

I got rid of the nested function.

val dataFiles = files.filterNot(_.getPath.getName startsWith "_")
val path = if (dataFiles.length == 1) {
dataFiles.head.getPath.toUri.toString
Expand All @@ -156,10 +156,6 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister {
MLUtils.computeNumFeatures(parsed)
}

val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse {
computeNumFeatures()
}

new CaseInsensitiveMap(options + ("numFeatures" -> numFeatures.toString))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
this.sparkSchema = new CatalystSchemaConverter(configuration).convert(requestedSchema);
this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema);
this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
Expand Down Expand Up @@ -195,12 +195,12 @@ protected void initialize(String path, List<String> columns) throws IOException
}
builder.addFields(fileSchema.getType(s));
}
this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
this.requestedSchema = builder.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
} else {
this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE();
}
}
this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema);
this.sparkSchema = new ParquetSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
for (int i = rowId; i < rowId + num; ++i) {
// TODO: Convert dictionary of Binaries to dictionary of Longs
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
} else {
throw new NotImplementedException();
Expand All @@ -280,12 +280,12 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v));
column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Expand Down Expand Up @@ -375,7 +375,7 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
// Read 12 bytes for INT96
CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
} else {
column.putNull(rowId + i);
}
Expand All @@ -394,7 +394,7 @@ private void readFixedLenByteArrayBatch(int rowId, int num,
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putInt(rowId + i,
(int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
(int) ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
} else {
column.putNull(rowId + i);
}
Expand All @@ -403,7 +403,7 @@ private void readFixedLenByteArrayBatch(int rowId, int num,
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
} else {
column.putNull(rowId + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,20 @@ trait FileFormat {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory

/**
* Returns a [[OutputWriterFactory]] for generating output writers that can write data.
* This method is current used only by FileStreamSinkWriter to generate output writers that
* does not use output committers to write data. The OutputWriter generated by the returned
* [[OutputWriterFactory]] must implement the method `newWriter(path)`..
*/
def buildWriter(
sqlContext: SQLContext,
dataSchema: StructType,
options: Map[String, String]): OutputWriterFactory = {
// TODO: Remove this default implementation when the other formats have been ported
throw new UnsupportedOperationException(s"buildWriter is not supported for $this")
}

/**
* Returns whether this format support returning columnar batch or not.
*
Expand Down Expand Up @@ -293,19 +307,6 @@ trait FileFormat {
}
}

/**
* Returns a [[OutputWriterFactory]] for generating output writers that can write data.
* This method is current used only by FileStreamSinkWriter to generate output writers that
* does not use output committers to write data. The OutputWriter generated by the returned
* [[OutputWriterFactory]] must implement the method `newWriter(path)`..
*/
def buildWriter(
sqlContext: SQLContext,
dataSchema: StructType,
options: Map[String, String]): OutputWriterFactory = {
// TODO: Remove this default implementation when the other formats have been ported
throw new UnsupportedOperationException(s"buildWriter is not supported for $this")
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.minBytesForPrecision
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -82,8 +82,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
}
this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)

val messageType = new CatalystSchemaConverter(configuration).convert(schema)
val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
val messageType = new ParquetSchemaConverter(configuration).convert(schema)
val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava

logInfo(
s"""Initialized Parquet WriteSupport with Catalyst schema:
Expand Down Expand Up @@ -427,7 +427,7 @@ private[parquet] object CatalystWriteSupport {
val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"

def setSchema(schema: StructType, configuration: Configuration): Unit = {
schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName)
configuration.set(SPARK_ROW_SCHEMA, schema.json)
configuration.setIfUnset(
ParquetOutputFormat.WRITER_VERSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ private[sql] class ParquetFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
Expand Down Expand Up @@ -369,10 +369,10 @@ private[sql] class ParquetFileFormat
val reader = pushed match {
case Some(filter) =>
new ParquetRecordReader[InternalRow](
new CatalystReadSupport,
new ParquetReadSupport,
FilterCompat.get(filter, null))
case _ =>
new ParquetRecordReader[InternalRow](new CatalystReadSupport)
new ParquetRecordReader[InternalRow](new ParquetReadSupport)
}
reader.initialize(split, hadoopAttemptContext)
reader
Expand Down Expand Up @@ -590,7 +590,7 @@ private[sql] object ParquetFileFormat extends Logging {
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)

// Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) {
Expand All @@ -603,14 +603,14 @@ private[sql] object ParquetFileFormat extends Logging {
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}

conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
CatalystSchemaConverter.checkFieldNames(requestedSchema).json
ParquetSchemaConverter.checkFieldNames(requestedSchema).json
})

conf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(dataSchema).json)
ParquetSchemaConverter.checkFieldNames(dataSchema).json)

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
Expand Down Expand Up @@ -639,7 +639,7 @@ private[sql] object ParquetFileFormat extends Logging {
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {

def parseParquetSchema(schema: MessageType): StructType = {
val converter = new CatalystSchemaConverter(
val converter = new ParquetSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.writeLegacyParquetFormat)
Expand All @@ -653,7 +653,7 @@ private[sql] object ParquetFileFormat extends Logging {
val serializedSchema = metadata
.getKeyValueMetaData
.asScala.toMap
.get(CatalystReadSupport.SPARK_METADATA_KEY)
.get(ParquetReadSupport.SPARK_METADATA_KEY)
if (serializedSchema.isEmpty) {
// Falls back to Parquet schema if no Spark SQL schema found.
Some(parseParquetSchema(metadata.getSchema))
Expand Down Expand Up @@ -820,7 +820,7 @@ private[sql] object ParquetFileFormat extends Logging {

// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter =
new CatalystSchemaConverter(
new ParquetSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat)
Expand Down Expand Up @@ -864,12 +864,12 @@ private[sql] object ParquetFileFormat extends Logging {
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
*/
def readSchemaFromFooter(
footer: Footer, converter: CatalystSchemaConverter): StructType = {
footer: Footer, converter: ParquetSchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
.asScala.toMap
.get(CatalystReadSupport.SPARK_METADATA_KEY)
.get(ParquetReadSupport.SPARK_METADATA_KEY)
.flatMap(deserializeSchemaString)
.getOrElse(converter.convert(fileMetaData.getSchema))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _

/**
Expand All @@ -58,13 +58,13 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
override def init(context: InitContext): ReadContext = {
catalystRequestedSchema = {
val conf = context.getConfiguration
val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}

val parquetRequestedSchema =
CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)

new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}
Expand Down Expand Up @@ -92,13 +92,13 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
""".stripMargin
}

new CatalystRecordMaterializer(
new ParquetRecordMaterializer(
parquetRequestedSchema,
CatalystReadSupport.expandUDT(catalystRequestedSchema))
ParquetReadSupport.expandUDT(catalystRequestedSchema))
}
}

private[parquet] object CatalystReadSupport {
private[parquet] object ParquetReadSupport {
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"

val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
Expand All @@ -110,12 +110,12 @@ private[parquet] object CatalystReadSupport {
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
if (clippedParquetFields.isEmpty) {
CatalystSchemaConverter.EMPTY_MESSAGE
ParquetSchemaConverter.EMPTY_MESSAGE
} else {
Types
.buildMessage()
.addFields(clippedParquetFields: _*)
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}
}

Expand Down Expand Up @@ -269,7 +269,7 @@ private[parquet] object CatalystReadSupport {
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false)
val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
*/
private[parquet] class CatalystRecordMaterializer(
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType)
extends RecordMaterializer[InternalRow] {

private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord

Expand Down
Loading