diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71a49c2657c9..157be1b0f594 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1488,7 +1488,7 @@ object SQLConf { " register class names for which data source V2 write paths are disabled. Writes from these" + " sources will fall back to the V1 sources.") .stringConf - .createWithDefault("csv,orc") + .createWithDefault("csv,orc,text") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index b68618755258..be9cb8153f25 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -4,7 +4,7 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat org.apache.spark.sql.execution.datasources.noop.NoopDataSource org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -org.apache.spark.sql.execution.datasources.text.TextFileFormat +org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.sources.RateStreamProvider org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 60756e7b1da9..d8811c708a6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.text -import java.io.OutputStream - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -143,30 +141,3 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister { dataType == StringType } -class TextOutputWriter( - path: String, - dataSchema: StructType, - lineSeparator: Array[Byte], - context: TaskAttemptContext) - extends OutputWriter { - - private var outputStream: Option[OutputStream] = None - - override def write(row: InternalRow): Unit = { - val os = outputStream.getOrElse { - val newStream = CodecStreams.createOutputStream(context, new Path(path)) - outputStream = Some(newStream) - newStream - } - - if (!row.isNullAt(0)) { - val utf8string = row.getUTF8String(0) - utf8string.writeTo(os) - } - os.write(lineSeparator) - } - - override def close(): Unit = { - outputStream.foreach(_.close()) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala index e4e201995faa..ef132162750b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs /** * Options for the Text data source. */ -private[text] class TextOptions(@transient private val parameters: CaseInsensitiveMap[String]) +class TextOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { import TextOptions._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala new file mode 100644 index 000000000000..faf6e573105f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.text + +import java.io.OutputStream + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter} +import org.apache.spark.sql.types.StructType + +class TextOutputWriter( + path: String, + dataSchema: StructType, + lineSeparator: Array[Byte], + context: TaskAttemptContext) + extends OutputWriter { + + private var outputStream: Option[OutputStream] = None + + override def write(row: InternalRow): Unit = { + val os = outputStream.getOrElse { + val newStream = CodecStreams.createOutputStream(context, new Path(path)) + outputStream = Some(newStream) + newStream + } + + if (!row.isNullAt(0)) { + val utf8string = row.getUTF8String(0) + utf8string.writeTo(os) + } + os.write(lineSeparator) + } + + override def close(): Unit = { + outputStream.foreach(_.close()) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala new file mode 100644 index 000000000000..f6aa1e9c898b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.text + +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.sources.v2.Table +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class TextDataSourceV2 extends FileDataSourceV2 { + + override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[TextFileFormat] + + override def shortName(): String = "text" + + override def getTable(options: CaseInsensitiveStringMap): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + TextTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + } + + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + TextTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala new file mode 100644 index 000000000000..878888711188 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.text + +import org.apache.spark.TaskContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter +import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, HadoopFileWholeTextReader, PartitionedFile} +import org.apache.spark.sql.execution.datasources.text.TextOptions +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.PartitionReader +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +/** + * A factory used to create Text readers. + * + * @param sqlConf SQL configuration. + * @param broadcastedConf Broadcasted serializable Hadoop Configuration. + * @param readDataSchema Required schema in the batch scan. + * @param partitionSchema Schema of partitions. + * @param textOptions Options for reading a text file. + * */ +case class TextPartitionReaderFactory( + sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + readDataSchema: StructType, + partitionSchema: StructType, + textOptions: TextOptions) extends FilePartitionReaderFactory { + + override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { + val confValue = broadcastedConf.value.value + val reader = if (!textOptions.wholeText) { + new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead, confValue) + } else { + new HadoopFileWholeTextReader(file, confValue) + } + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => reader.close())) + val iter = if (readDataSchema.isEmpty) { + val emptyUnsafeRow = new UnsafeRow(0) + reader.map(_ => emptyUnsafeRow) + } else { + val unsafeRowWriter = new UnsafeRowWriter(1) + + reader.map { line => + // Writes to an UnsafeRow directly + unsafeRowWriter.reset() + unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) + unsafeRowWriter.getRow() + } + } + val fileReader = new PartitionReaderFromIterator[InternalRow](iter) + new PartitionReaderWithPartitionValues(fileReader, readDataSchema, + partitionSchema, file.partitionValues) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala new file mode 100644 index 000000000000..202723db2742 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.text + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.text.TextOptions +import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan +import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +case class TextScan( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap) + extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) { + + private val optionsAsScala = options.asScala.toMap + private lazy val textOptions: TextOptions = new TextOptions(optionsAsScala) + + override def isSplitable(path: Path): Boolean = { + super.isSplitable(path) && !textOptions.wholeText + } + + override def createReaderFactory(): PartitionReaderFactory = { + assert( + readDataSchema.length <= 1, + "Text data source only produces a single data column named \"value\".") + val hadoopConf = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + } + val broadcastedConf = sparkSession.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + TextPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, readDataSchema, + readPartitionSchema, textOptions) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala new file mode 100644 index 000000000000..fbe5e1688b83 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2.text + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.sources.v2.reader.Scan +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class TextScanBuilder( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: CaseInsensitiveStringMap) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + + override def build(): Scan = { + TextScan(sparkSession, fileIndex, readDataSchema(), readPartitionSchema(), options) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala new file mode 100644 index 000000000000..b8cb61a6c646 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.text + +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.sources.v2.writer.WriteBuilder +import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class TextTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat]) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + override def newScanBuilder(options: CaseInsensitiveStringMap): TextScanBuilder = + TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = + Some(StructType(Seq(StructField("value", StringType)))) + + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = + new TextWriteBuilder(options, paths, formatName, supportsDataType) + + override def supportsDataType(dataType: DataType): Boolean = dataType == StringType + + override def formatName: String = "Text" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala new file mode 100644 index 000000000000..c00dbc20be64 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.text + +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.text.{TextOptions, TextOutputWriter} +import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class TextWriteBuilder( + options: CaseInsensitiveStringMap, + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean) + extends FileWriteBuilder(options, paths, formatName, supportsDataType) { + private def verifySchema(schema: StructType): Unit = { + if (schema.size != 1) { + throw new AnalysisException( + s"Text data source supports only a single column, and you have ${schema.size} columns.") + } + } + + override def prepareWrite( + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + verifySchema(dataSchema) + + val textOptions = new TextOptions(options) + val conf = job.getConfiguration + + textOptions.compressionCodec.foreach { codec => + CompressionCodecs.setCodecConfiguration(conf, codec) + } + + new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new TextOutputWriter(path, dataSchema, textOptions.lineSeparatorInWrite, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".txt" + CodecStreams.getCompressionExtension(context) + } + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 3402ed240f8b..c9ff4fc0777e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -930,7 +930,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } assert(e.getMessage.contains( "The format of the existing table default.appendTextToJson is `JsonFileFormat`. " + - "It doesn't match the specified format `TextFileFormat`")) + "It doesn't match the specified format")) } }