Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister {
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Map[String, String] = {
def computeNumFeatures(): Int = {
val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse {
val dataFiles = files.filterNot(_.getPath.getName startsWith "_")
val path = if (dataFiles.length == 1) {
dataFiles.head.getPath.toUri.toString
Expand All @@ -156,10 +156,6 @@ class LibSVMFileFormat extends TextBasedFileFormat with DataSourceRegister {
MLUtils.computeNumFeatures(parsed)
}

val numFeatures = options.get("numFeatures").filter(_.toInt > 0).getOrElse {
computeNumFeatures()
}

new CaseInsensitiveMap(options + ("numFeatures" -> numFeatures.toString))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
this.sparkSchema = new CatalystSchemaConverter(configuration).convert(requestedSchema);
this.sparkSchema = new ParquetSchemaConverter(configuration).convert(requestedSchema);
this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
Expand Down Expand Up @@ -196,7 +196,7 @@ protected void initialize(String path, List<String> columns) throws IOException
}
this.requestedSchema = builder.named("spark_schema");
}
this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema);
this.sparkSchema = new ParquetSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
for (int i = rowId; i < rowId + num; ++i) {
// TODO: Convert dictionary of Binaries to dictionary of Longs
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
} else {
throw new NotImplementedException();
Expand All @@ -280,12 +280,12 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v));
column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
}
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
Expand Down Expand Up @@ -375,7 +375,7 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
// Read 12 bytes for INT96
CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
} else {
column.putNull(rowId + i);
}
Expand All @@ -394,7 +394,7 @@ private void readFixedLenByteArrayBatch(int rowId, int num,
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putInt(rowId + i,
(int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
(int) ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
} else {
column.putNull(rowId + i);
}
Expand All @@ -403,7 +403,7 @@ private void readFixedLenByteArrayBatch(int rowId, int num,
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
} else {
column.putNull(rowId + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ private[sql] class ParquetFileFormat
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -288,19 +288,19 @@ private[sql] class ParquetFileFormat
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
hadoopConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(requiredSchema).json)
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)

// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
Expand Down Expand Up @@ -369,10 +369,10 @@ private[sql] class ParquetFileFormat
val reader = pushed match {
case Some(filter) =>
new ParquetRecordReader[InternalRow](
new CatalystReadSupport,
new ParquetReadSupport,
FilterCompat.get(filter, null))
case _ =>
new ParquetRecordReader[InternalRow](new CatalystReadSupport)
new ParquetRecordReader[InternalRow](new ParquetReadSupport)
}
reader.initialize(split, hadoopAttemptContext)
reader
Expand Down Expand Up @@ -433,14 +433,14 @@ private[sql] class ParquetOutputWriterFactory(
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -590,7 +590,7 @@ private[sql] object ParquetFileFormat extends Logging {
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)

// Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) {
Expand All @@ -603,14 +603,14 @@ private[sql] object ParquetFileFormat extends Logging {
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}

conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
CatalystSchemaConverter.checkFieldNames(requestedSchema).json
ParquetSchemaConverter.checkFieldNames(requestedSchema).json
})

conf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(dataSchema).json)
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(dataSchema).json)

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
Expand Down Expand Up @@ -639,7 +639,7 @@ private[sql] object ParquetFileFormat extends Logging {
footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = {

def parseParquetSchema(schema: MessageType): StructType = {
val converter = new CatalystSchemaConverter(
val converter = new ParquetSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.writeLegacyParquetFormat)
Expand All @@ -653,7 +653,7 @@ private[sql] object ParquetFileFormat extends Logging {
val serializedSchema = metadata
.getKeyValueMetaData
.asScala.toMap
.get(CatalystReadSupport.SPARK_METADATA_KEY)
.get(ParquetReadSupport.SPARK_METADATA_KEY)
if (serializedSchema.isEmpty) {
// Falls back to Parquet schema if no Spark SQL schema found.
Some(parseParquetSchema(metadata.getSchema))
Expand Down Expand Up @@ -820,7 +820,7 @@ private[sql] object ParquetFileFormat extends Logging {

// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter =
new CatalystSchemaConverter(
new ParquetSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat)
Expand Down Expand Up @@ -864,12 +864,12 @@ private[sql] object ParquetFileFormat extends Logging {
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
*/
def readSchemaFromFooter(
footer: Footer, converter: CatalystSchemaConverter): StructType = {
footer: Footer, converter: ParquetSchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
.asScala.toMap
.get(CatalystReadSupport.SPARK_METADATA_KEY)
.get(ParquetReadSupport.SPARK_METADATA_KEY)
.flatMap(deserializeSchemaString)
.getOrElse(converter.convert(fileMetaData.getSchema))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]]
* to [[prepareForRead()]], but use a private `var` for simplicity.
*/
private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging {
private var catalystRequestedSchema: StructType = _

/**
Expand All @@ -58,13 +58,13 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
override def init(context: InitContext): ReadContext = {
catalystRequestedSchema = {
val conf = context.getConfiguration
val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}

val parquetRequestedSchema =
CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)

new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}
Expand Down Expand Up @@ -92,13 +92,13 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
""".stripMargin
}

new CatalystRecordMaterializer(
new ParquetRecordMaterializer(
parquetRequestedSchema,
CatalystReadSupport.expandUDT(catalystRequestedSchema))
ParquetReadSupport.expandUDT(catalystRequestedSchema))
}
}

private[parquet] object CatalystReadSupport {
private[parquet] object ParquetReadSupport {
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"

val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
Expand All @@ -112,7 +112,7 @@ private[parquet] object CatalystReadSupport {
Types
.buildMessage()
.addFields(clippedParquetFields: _*)
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}

private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
Expand Down Expand Up @@ -265,7 +265,7 @@ private[parquet] object CatalystReadSupport {
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false)
val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
*/
private[parquet] class CatalystRecordMaterializer(
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType)
extends RecordMaterializer[InternalRow] {

private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRecord

Expand Down
Loading