diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java index ad9c838992fa..4a7462096db1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java @@ -39,11 +39,7 @@ * pruning), etc. Names of these interfaces start with `SupportsPushDown`. * 2. Information Reporting. E.g., statistics reporting, ordering reporting, etc. * Names of these interfaces start with `SupportsReporting`. - * 3. Special scans. E.g, columnar scan, unsafe row scan, etc. - * Names of these interfaces start with `SupportsScan`. Note that a reader should only - * implement at most one of the special scans, if more than one special scans are implemented, - * only one of them would be respected, according to the priority list from high to low: - * {@link SupportsScanColumnarBatch}, {@link SupportsDeprecatedScanRow}. + * 3. Columnar scan if implements {@link SupportsScanColumnarBatch}. * * If an exception was throw when applying any of these query optimizations, the action will fail * and no Spark job will be submitted. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java index 7cf382e52f67..f3ff7f5cc0f2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java @@ -28,8 +28,7 @@ * * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.catalyst.InternalRow} * for normal data source readers, {@link org.apache.spark.sql.vectorized.ColumnarBatch} for data - * source readers that mix in {@link SupportsScanColumnarBatch}, or {@link org.apache.spark.sql.Row} - * for data source readers that mix in {@link SupportsDeprecatedScanRow}. + * source readers that mix in {@link SupportsScanColumnarBatch}. */ @InterfaceStability.Evolving public interface InputPartitionReader extends Closeable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index b030b9a929b0..c8494f97f176 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -20,9 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.SinglePartition @@ -31,7 +29,6 @@ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -75,13 +72,8 @@ case class DataSourceV2ScanExec( case _ => super.outputPartitioning } - private lazy val partitions: Seq[InputPartition[InternalRow]] = reader match { - case r: SupportsDeprecatedScanRow => - r.planRowInputPartitions().asScala.map { - new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[InternalRow] - } - case _ => - reader.planInputPartitions().asScala + private lazy val partitions: Seq[InputPartition[InternalRow]] = { + reader.planInputPartitions().asScala } private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match { @@ -131,27 +123,3 @@ case class DataSourceV2ScanExec( } } } - -class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType) - extends InputPartition[InternalRow] { - - override def preferredLocations: Array[String] = partition.preferredLocations - - override def createPartitionReader: InputPartitionReader[InternalRow] = { - new RowToUnsafeInputPartitionReader( - partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind()) - } -} - -class RowToUnsafeInputPartitionReader( - val rowReader: InputPartitionReader[Row], - encoder: ExpressionEncoder[Row]) - - extends InputPartitionReader[InternalRow] { - - override def next: Boolean = rowReader.next - - override def get: UnsafeRow = encoder.toRow(rowReader.get).asInstanceOf[UnsafeRow] - - override def close(): Unit = rowReader.close() -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala index 1ffa1d02f143..554a0b0573f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.streaming.continuous import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.v2.RowToUnsafeInputPartitionReader import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader import org.apache.spark.util.NextIterator @@ -104,8 +102,6 @@ object ContinuousDataSourceRDD { reader: InputPartitionReader[InternalRow]): ContinuousInputPartitionReader[_] = { reader match { case r: ContinuousInputPartitionReader[InternalRow] => r - case wrapped: RowToUnsafeInputPartitionReader => - wrapped.rowReader.asInstanceOf[ContinuousInputPartitionReader[Row]] case _ => throw new IllegalStateException(s"Unknown continuous reader type ${reader.getClass}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index 55ce3ae38ee3..551e07c3db86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.json4s.DefaultFormats import org.json4s.jackson.Serialization -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair} import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider @@ -34,8 +34,7 @@ import org.apache.spark.sql.types.StructType case class RateStreamPartitionOffset( partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset -class RateStreamContinuousReader(options: DataSourceOptions) - extends ContinuousReader with SupportsDeprecatedScanRow { +class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousReader { implicit val defaultFormats: DefaultFormats = DefaultFormats val creationTime = System.currentTimeMillis() @@ -67,7 +66,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) override def getStartOffset(): Offset = offset - override def planRowInputPartitions(): java.util.List[InputPartition[Row]] = { + override def planInputPartitions(): java.util.List[InputPartition[InternalRow]] = { val partitionStartMap = offset match { case off: RateStreamOffset => off.partitionToValueAndRunTimeMs case off => @@ -91,7 +90,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) i, numPartitions, perPartitionRate) - .asInstanceOf[InputPartition[Row]] + .asInstanceOf[InputPartition[InternalRow]] }.asJava } @@ -119,9 +118,10 @@ case class RateStreamContinuousInputPartition( partitionIndex: Int, increment: Long, rowsPerSecond: Double) - extends ContinuousInputPartition[Row] { + extends ContinuousInputPartition[InternalRow] { - override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[Row] = { + override def createContinuousReader( + offset: PartitionOffset): InputPartitionReader[InternalRow] = { val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset] require(rateStreamOffset.partition == partitionIndex, s"Expected partitionIndex: $partitionIndex, but got: ${rateStreamOffset.partition}") @@ -133,7 +133,7 @@ case class RateStreamContinuousInputPartition( rowsPerSecond) } - override def createPartitionReader(): InputPartitionReader[Row] = + override def createPartitionReader(): InputPartitionReader[InternalRow] = new RateStreamContinuousInputPartitionReader( startValue, startTimeMs, partitionIndex, increment, rowsPerSecond) } @@ -144,12 +144,12 @@ class RateStreamContinuousInputPartitionReader( partitionIndex: Int, increment: Long, rowsPerSecond: Double) - extends ContinuousInputPartitionReader[Row] { + extends ContinuousInputPartitionReader[InternalRow] { private var nextReadTime: Long = startTimeMs private val readTimeIncrement: Long = (1000 / rowsPerSecond).toLong private var currentValue = startValue - private var currentRow: Row = null + private var currentRow: InternalRow = null override def next(): Boolean = { currentValue += increment @@ -165,14 +165,14 @@ class RateStreamContinuousInputPartitionReader( return false } - currentRow = Row( - DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromMillis(nextReadTime)), + currentRow = InternalRow( + DateTimeUtils.fromMillis(nextReadTime), currentValue) true } - override def get: Row = currentRow + override def get: InternalRow = currentRow override def close(): Unit = {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala index e776ebc08e30..711f0941fe73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ -import scala.collection.SortedMap import scala.collection.mutable.ListBuffer import org.json4s.NoTypeHints @@ -31,11 +30,12 @@ import org.json4s.jackson.Serialization import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} -import org.apache.spark.sql.{Encoder, Row, SQLContext} +import org.apache.spark.sql.{Encoder, SQLContext} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream.GetRecord import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions} -import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} import org.apache.spark.sql.types.StructType import org.apache.spark.util.RpcUtils @@ -49,8 +49,7 @@ import org.apache.spark.util.RpcUtils * the specified offset within the list, or null if that offset doesn't yet have a record. */ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2) - extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport - with SupportsDeprecatedScanRow { + extends MemoryStreamBase[A](sqlContext) with ContinuousReader with ContinuousReadSupport { private implicit val formats = Serialization.formats(NoTypeHints) protected val logicalPlan = @@ -100,7 +99,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa ) } - override def planRowInputPartitions(): ju.List[InputPartition[Row]] = { + override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = { synchronized { val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id" endpointRef = @@ -109,7 +108,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa startOffset.partitionNums.map { case (part, index) => new ContinuousMemoryStreamInputPartition( - endpointName, part, index): InputPartition[Row] + endpointName, part, index): InputPartition[InternalRow] }.toList.asJava } } @@ -141,7 +140,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa val buf = records(part) val record = if (buf.size <= index) None else Some(buf(index)) - context.reply(record.map(Row(_))) + context.reply(record.map(r => encoder.toRow(r).copy())) } } } @@ -164,7 +163,7 @@ object ContinuousMemoryStream { class ContinuousMemoryStreamInputPartition( driverEndpointName: String, partition: Int, - startOffset: Int) extends InputPartition[Row] { + startOffset: Int) extends InputPartition[InternalRow] { override def createPartitionReader: ContinuousMemoryStreamInputPartitionReader = new ContinuousMemoryStreamInputPartitionReader(driverEndpointName, partition, startOffset) } @@ -177,14 +176,14 @@ class ContinuousMemoryStreamInputPartition( class ContinuousMemoryStreamInputPartitionReader( driverEndpointName: String, partition: Int, - startOffset: Int) extends ContinuousInputPartitionReader[Row] { + startOffset: Int) extends ContinuousInputPartitionReader[InternalRow] { private val endpoint = RpcUtils.makeDriverRef( driverEndpointName, SparkEnv.get.conf, SparkEnv.get.rpcEnv) private var currentOffset = startOffset - private var current: Option[Row] = None + private var current: Option[InternalRow] = None // Defense-in-depth against failing to propagate the task context. Since it's not inheritable, // we have to do a bit of error prone work to get it into every thread used by continuous @@ -204,15 +203,15 @@ class ContinuousMemoryStreamInputPartitionReader( true } - override def get(): Row = current.get + override def get(): InternalRow = current.get override def close(): Unit = {} override def getOffset: ContinuousMemoryStreamPartitionOffset = ContinuousMemoryStreamPartitionOffset(partition, currentOffset) - private def getRecord: Option[Row] = - endpoint.askSync[Option[Row]]( + private def getRecord: Option[InternalRow] = + endpoint.askSync[Option[InternalRow]]( GetRecord(ContinuousMemoryStreamPartitionOffset(partition, currentOffset))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala index 7a3452aa315c..9e0d95493216 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala @@ -29,6 +29,7 @@ import org.apache.commons.io.IOUtils import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.v2.DataSourceOptions @@ -38,7 +39,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.{ManualClock, SystemClock} class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: String) - extends MicroBatchReader with SupportsDeprecatedScanRow with Logging { + extends MicroBatchReader with Logging { import RateStreamProvider._ private[sources] val clock = { @@ -134,7 +135,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: LongOffset(json.toLong) } - override def planRowInputPartitions(): java.util.List[InputPartition[Row]] = { + override def planInputPartitions(): java.util.List[InputPartition[InternalRow]] = { val startSeconds = LongOffset.convert(start).map(_.offset).getOrElse(0L) val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L) assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)") @@ -169,7 +170,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation: (0 until numPartitions).map { p => new RateStreamMicroBatchInputPartition( p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue) - : InputPartition[Row] + : InputPartition[InternalRow] }.toList.asJava } @@ -188,9 +189,9 @@ class RateStreamMicroBatchInputPartition( rangeStart: Long, rangeEnd: Long, localStartTimeMs: Long, - relativeMsPerValue: Double) extends InputPartition[Row] { + relativeMsPerValue: Double) extends InputPartition[InternalRow] { - override def createPartitionReader(): InputPartitionReader[Row] = + override def createPartitionReader(): InputPartitionReader[InternalRow] = new RateStreamMicroBatchInputPartitionReader(partitionId, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue) } @@ -201,22 +202,18 @@ class RateStreamMicroBatchInputPartitionReader( rangeStart: Long, rangeEnd: Long, localStartTimeMs: Long, - relativeMsPerValue: Double) extends InputPartitionReader[Row] { + relativeMsPerValue: Double) extends InputPartitionReader[InternalRow] { private var count: Long = 0 override def next(): Boolean = { rangeStart + partitionId + numPartitions * count < rangeEnd } - override def get(): Row = { + override def get(): InternalRow = { val currValue = rangeStart + partitionId + numPartitions * count count += 1 val relative = math.round((currValue - rangeStart) * relativeMsPerValue) - Row( - DateTimeUtils.toJavaTimestamp( - DateTimeUtils.fromMillis(relative + localStartTimeMs)), - currValue - ) + InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), currValue) } override def close(): Unit = {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala index e3a2c007a9ce..9f53a1849b33 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.streaming.sources import java.io.{BufferedReader, InputStreamReader, IOException} import java.net.Socket -import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.{Calendar, List => JList, Locale, Optional} import java.util.concurrent.atomic.AtomicBoolean @@ -31,12 +30,15 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming.LongOffset import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport} -import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.unsafe.types.UTF8String object TextSocketMicroBatchReader { val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) @@ -50,8 +52,7 @@ object TextSocketMicroBatchReader { * debugging. This MicroBatchReader will *not* work in production applications due to multiple * reasons, including no support for fault recovery. */ -class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchReader - with SupportsDeprecatedScanRow with Logging { +class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchReader with Logging { private var startOffset: Offset = _ private var endOffset: Offset = _ @@ -70,7 +71,7 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR * Stored in a ListBuffer to facilitate removing committed batches. */ @GuardedBy("this") - private val batches = new ListBuffer[(String, Timestamp)] + private val batches = new ListBuffer[(UTF8String, Long)] @GuardedBy("this") private var currentOffset: LongOffset = LongOffset(-1L) @@ -101,9 +102,9 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR return } TextSocketMicroBatchReader.this.synchronized { - val newData = (line, - Timestamp.valueOf( - TextSocketMicroBatchReader.DATE_FORMAT.format(Calendar.getInstance().getTime())) + val newData = ( + UTF8String.fromString(line), + DateTimeUtils.fromMillis(Calendar.getInstance().getTimeInMillis) ) currentOffset += 1 batches.append(newData) @@ -142,7 +143,7 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR } } - override def planRowInputPartitions(): JList[InputPartition[Row]] = { + override def planInputPartitions(): JList[InputPartition[InternalRow]] = { assert(startOffset != null && endOffset != null, "start offset and end offset should already be set before create read tasks.") @@ -164,16 +165,16 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR val spark = SparkSession.getActiveSession.get val numPartitions = spark.sparkContext.defaultParallelism - val slices = Array.fill(numPartitions)(new ListBuffer[(String, Timestamp)]) + val slices = Array.fill(numPartitions)(new ListBuffer[(UTF8String, Long)]) rawList.zipWithIndex.foreach { case (r, idx) => slices(idx % numPartitions).append(r) } (0 until numPartitions).map { i => val slice = slices(i) - new InputPartition[Row] { - override def createPartitionReader(): InputPartitionReader[Row] = - new InputPartitionReader[Row] { + new InputPartition[InternalRow] { + override def createPartitionReader(): InputPartitionReader[InternalRow] = + new InputPartitionReader[InternalRow] { private var currentIdx = -1 override def next(): Boolean = { @@ -181,8 +182,8 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR currentIdx < slice.size } - override def get(): Row = { - Row(slice(currentIdx)._1, slice(currentIdx)._2) + override def get(): InternalRow = { + InternalRow(slice(currentIdx)._1, slice(currentIdx)._2) } override def close(): Unit = {} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java index c130b5f1e251..e4cead9df429 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java @@ -20,8 +20,8 @@ import java.io.IOException; import java.util.*; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.GreaterThan; import org.apache.spark.sql.sources.v2.DataSourceOptions; @@ -33,7 +33,7 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport { public class Reader implements DataSourceReader, SupportsPushDownRequiredColumns, - SupportsPushDownFilters, SupportsDeprecatedScanRow { + SupportsPushDownFilters { // Exposed for testing. public StructType requiredSchema = new StructType().add("i", "int").add("j", "int"); @@ -79,8 +79,8 @@ public Filter[] pushedFilters() { } @Override - public List> planRowInputPartitions() { - List> res = new ArrayList<>(); + public List> planInputPartitions() { + List> res = new ArrayList<>(); Integer lowerBound = null; for (Filter filter : filters) { @@ -107,8 +107,8 @@ public List> planRowInputPartitions() { } } - static class JavaAdvancedInputPartition implements InputPartition, - InputPartitionReader { + static class JavaAdvancedInputPartition implements InputPartition, + InputPartitionReader { private int start; private int end; private StructType requiredSchema; @@ -120,7 +120,7 @@ static class JavaAdvancedInputPartition implements InputPartition, } @Override - public InputPartitionReader createPartitionReader() { + public InputPartitionReader createPartitionReader() { return new JavaAdvancedInputPartition(start - 1, end, requiredSchema); } @@ -131,7 +131,7 @@ public boolean next() { } @Override - public Row get() { + public InternalRow get() { Object[] values = new Object[requiredSchema.size()]; for (int i = 0; i < values.length; i++) { if ("i".equals(requiredSchema.apply(i).name())) { @@ -140,7 +140,7 @@ public Row get() { values[i] = -start; } } - return new GenericRow(values); + return new GenericInternalRow(values); } @Override diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java index 35aafb532d80..2d21324f5ece 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java @@ -21,8 +21,8 @@ import java.util.Arrays; import java.util.List; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.ReadSupport; @@ -34,7 +34,7 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport { - class Reader implements DataSourceReader, SupportsReportPartitioning, SupportsDeprecatedScanRow { + class Reader implements DataSourceReader, SupportsReportPartitioning { private final StructType schema = new StructType().add("a", "int").add("b", "int"); @Override @@ -43,7 +43,7 @@ public StructType readSchema() { } @Override - public List> planRowInputPartitions() { + public List> planInputPartitions() { return java.util.Arrays.asList( new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6}), new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2})); @@ -73,7 +73,9 @@ public boolean satisfy(Distribution distribution) { } } - static class SpecificInputPartition implements InputPartition, InputPartitionReader { + static class SpecificInputPartition implements InputPartition, + InputPartitionReader { + private int[] i; private int[] j; private int current = -1; @@ -91,8 +93,8 @@ public boolean next() throws IOException { } @Override - public Row get() { - return new GenericRow(new Object[] {i[current], j[current]}); + public InternalRow get() { + return new GenericInternalRow(new Object[] {i[current], j[current]}); } @Override @@ -101,7 +103,7 @@ public void close() throws IOException { } @Override - public InputPartitionReader createPartitionReader() { + public InputPartitionReader createPartitionReader() { return this; } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java index 6dee94c34e21..ca5abd24abe8 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java @@ -19,18 +19,17 @@ import java.util.List; -import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.ReadSupportWithSchema; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow; import org.apache.spark.sql.types.StructType; public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema { - class Reader implements DataSourceReader, SupportsDeprecatedScanRow { + class Reader implements DataSourceReader { private final StructType schema; Reader(StructType schema) { @@ -43,7 +42,7 @@ public StructType readSchema() { } @Override - public List> planRowInputPartitions() { + public List> planInputPartitions() { return java.util.Collections.emptyList(); } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java index 5c2f351975c7..274dc3745bcf 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -20,20 +20,19 @@ import java.io.IOException; import java.util.List; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.ReadSupport; import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; import org.apache.spark.sql.sources.v2.reader.InputPartition; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.reader.SupportsDeprecatedScanRow; import org.apache.spark.sql.types.StructType; public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport { - class Reader implements DataSourceReader, SupportsDeprecatedScanRow { + class Reader implements DataSourceReader { private final StructType schema = new StructType().add("i", "int").add("j", "int"); @Override @@ -42,14 +41,16 @@ public StructType readSchema() { } @Override - public List> planRowInputPartitions() { + public List> planInputPartitions() { return java.util.Arrays.asList( new JavaSimpleInputPartition(0, 5), new JavaSimpleInputPartition(5, 10)); } } - static class JavaSimpleInputPartition implements InputPartition, InputPartitionReader { + static class JavaSimpleInputPartition implements InputPartition, + InputPartitionReader { + private int start; private int end; @@ -59,7 +60,7 @@ static class JavaSimpleInputPartition implements InputPartition, InputParti } @Override - public InputPartitionReader createPartitionReader() { + public InputPartitionReader createPartitionReader() { return new JavaSimpleInputPartition(start - 1, end); } @@ -70,8 +71,8 @@ public boolean next() { } @Override - public Row get() { - return new GenericRow(new Object[] {start, -start}); + public InternalRow get() { + return new GenericInternalRow(new Object[] {start, -start}); } @Override diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java deleted file mode 100644 index 25b89c7fd36a..000000000000 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.sources.v2; - -import java.io.IOException; -import java.util.List; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.UnsafeRow; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.reader.*; -import org.apache.spark.sql.types.StructType; - -public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport { - - class Reader implements DataSourceReader { - private final StructType schema = new StructType().add("i", "int").add("j", "int"); - - @Override - public StructType readSchema() { - return schema; - } - - @Override - public List> planInputPartitions() { - return java.util.Arrays.asList( - new JavaUnsafeRowInputPartition(0, 5), - new JavaUnsafeRowInputPartition(5, 10)); - } - } - - static class JavaUnsafeRowInputPartition - implements InputPartition, InputPartitionReader { - private int start; - private int end; - private UnsafeRow row; - - JavaUnsafeRowInputPartition(int start, int end) { - this.start = start; - this.end = end; - this.row = new UnsafeRow(2); - row.pointTo(new byte[8 * 3], 8 * 3); - } - - @Override - public InputPartitionReader createPartitionReader() { - return new JavaUnsafeRowInputPartition(start - 1, end); - } - - @Override - public boolean next() { - start += 1; - return start < end; - } - - @Override - public UnsafeRow get() { - row.setInt(0, start); - row.setInt(1, -start); - return row; - } - - @Override - public void close() throws IOException { - - } - } - - @Override - public DataSourceReader createReader(DataSourceOptions options) { - return new Reader(); - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index 260a0376daeb..7e53da1f312c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -23,7 +23,8 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ @@ -146,10 +147,10 @@ class RateSourceSuite extends StreamTest { val startOffset = LongOffset(0L) val endOffset = LongOffset(1L) reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)) - val tasks = reader.planRowInputPartitions() + val tasks = reader.planInputPartitions() assert(tasks.size == 1) val dataReader = tasks.get(0).createPartitionReader() - val data = ArrayBuffer[Row]() + val data = ArrayBuffer[InternalRow]() while (dataReader.next()) { data.append(dataReader.get()) } @@ -165,13 +166,13 @@ class RateSourceSuite extends StreamTest { val startOffset = LongOffset(0L) val endOffset = LongOffset(1L) reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset)) - val tasks = reader.planRowInputPartitions() + val tasks = reader.planInputPartitions() assert(tasks.size == 11) val readData = tasks.asScala .map(_.createPartitionReader()) .flatMap { reader => - val buf = scala.collection.mutable.ListBuffer[Row]() + val buf = scala.collection.mutable.ListBuffer[InternalRow]() while (reader.next()) buf.append(reader.get()) buf } @@ -311,10 +312,10 @@ class RateSourceSuite extends StreamTest { val reader = new RateStreamContinuousReader( new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava)) reader.setStartOffset(Optional.empty()) - val tasks = reader.planRowInputPartitions() + val tasks = reader.planInputPartitions() assert(tasks.size == 2) - val data = scala.collection.mutable.ListBuffer[Row]() + val data = scala.collection.mutable.ListBuffer[InternalRow]() tasks.asScala.foreach { case t: RateStreamContinuousInputPartition => val startTimeMs = reader.getStartOffset() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index d73eebbc84b7..c7da13721989 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -121,17 +121,6 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } - test("unsafe row scan implementation") { - Seq(classOf[UnsafeRowDataSourceV2], classOf[JavaUnsafeRowDataSourceV2]).foreach { cls => - withClue(cls.getName) { - val df = spark.read.format(cls.getName).load() - checkAnswer(df, (0 until 10).map(i => Row(i, -i))) - checkAnswer(df.select('j), (0 until 10).map(i => Row(-i))) - checkAnswer(df.filter('i > 5), (6 until 10).map(i => Row(i, -i))) - } - } - } - test("columnar batch scan implementation") { Seq(classOf[BatchDataSourceV2], classOf[JavaBatchDataSourceV2]).foreach { cls => withClue(cls.getName) { @@ -345,10 +334,10 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport { - class Reader extends DataSourceReader with SupportsDeprecatedScanRow { + class Reader extends DataSourceReader { override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") - override def planRowInputPartitions(): JList[InputPartition[Row]] = { + override def planInputPartitions(): JList[InputPartition[InternalRow]] = { java.util.Arrays.asList(new SimpleInputPartition(0, 5)) } } @@ -358,10 +347,10 @@ class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport { class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { - class Reader extends DataSourceReader with SupportsDeprecatedScanRow { + class Reader extends DataSourceReader { override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") - override def planRowInputPartitions(): JList[InputPartition[Row]] = { + override def planInputPartitions(): JList[InputPartition[InternalRow]] = { java.util.Arrays.asList(new SimpleInputPartition(0, 5), new SimpleInputPartition(5, 10)) } } @@ -370,11 +359,11 @@ class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { } class SimpleInputPartition(start: Int, end: Int) - extends InputPartition[Row] - with InputPartitionReader[Row] { + extends InputPartition[InternalRow] + with InputPartitionReader[InternalRow] { private var current = start - 1 - override def createPartitionReader(): InputPartitionReader[Row] = + override def createPartitionReader(): InputPartitionReader[InternalRow] = new SimpleInputPartition(start, end) override def next(): Boolean = { @@ -382,7 +371,7 @@ class SimpleInputPartition(start: Int, end: Int) current < end } - override def get(): Row = Row(current, -current) + override def get(): InternalRow = InternalRow(current, -current) override def close(): Unit = {} } @@ -391,7 +380,7 @@ class SimpleInputPartition(start: Int, end: Int) class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport { - class Reader extends DataSourceReader with SupportsDeprecatedScanRow + class Reader extends DataSourceReader with SupportsPushDownRequiredColumns with SupportsPushDownFilters { var requiredSchema = new StructType().add("i", "int").add("j", "int") @@ -416,12 +405,12 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport { requiredSchema } - override def planRowInputPartitions(): JList[InputPartition[Row]] = { - val lowerBound = filters.collect { + override def planInputPartitions(): JList[InputPartition[InternalRow]] = { + val lowerBound = filters.collectFirst { case GreaterThan("i", v: Int) => v - }.headOption + } - val res = new ArrayList[InputPartition[Row]] + val res = new ArrayList[InputPartition[InternalRow]] if (lowerBound.isEmpty) { res.add(new AdvancedInputPartition(0, 5, requiredSchema)) @@ -441,11 +430,11 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport { } class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType) - extends InputPartition[Row] with InputPartitionReader[Row] { + extends InputPartition[InternalRow] with InputPartitionReader[InternalRow] { private var current = start - 1 - override def createPartitionReader(): InputPartitionReader[Row] = { + override def createPartitionReader(): InputPartitionReader[InternalRow] = { new AdvancedInputPartition(start, end, requiredSchema) } @@ -456,57 +445,20 @@ class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType) current < end } - override def get(): Row = { + override def get(): InternalRow = { val values = requiredSchema.map(_.name).map { case "i" => current case "j" => -current } - Row.fromSeq(values) + InternalRow.fromSeq(values) } } -class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport { - - class Reader extends DataSourceReader { - override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") - - override def planInputPartitions(): JList[InputPartition[InternalRow]] = { - java.util.Arrays.asList(new UnsafeRowInputPartitionReader(0, 5), - new UnsafeRowInputPartitionReader(5, 10)) - } - } - - override def createReader(options: DataSourceOptions): DataSourceReader = new Reader -} - -class UnsafeRowInputPartitionReader(start: Int, end: Int) - extends InputPartition[InternalRow] with InputPartitionReader[InternalRow] { - - private val row = new UnsafeRow(2) - row.pointTo(new Array[Byte](8 * 3), 8 * 3) - - private var current = start - 1 - - override def createPartitionReader(): InputPartitionReader[InternalRow] = this - - override def next(): Boolean = { - current += 1 - current < end - } - override def get(): UnsafeRow = { - row.setInt(0, current) - row.setInt(1, -current) - row - } - - override def close(): Unit = {} -} - class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema { - class Reader(val readSchema: StructType) extends DataSourceReader with SupportsDeprecatedScanRow { - override def planRowInputPartitions(): JList[InputPartition[Row]] = + class Reader(val readSchema: StructType) extends DataSourceReader { + override def planInputPartitions(): JList[InputPartition[InternalRow]] = java.util.Collections.emptyList() } @@ -569,11 +521,10 @@ class BatchInputPartitionReader(start: Int, end: Int) class PartitionAwareDataSource extends DataSourceV2 with ReadSupport { - class Reader extends DataSourceReader with SupportsReportPartitioning - with SupportsDeprecatedScanRow { + class Reader extends DataSourceReader with SupportsReportPartitioning { override def readSchema(): StructType = new StructType().add("a", "int").add("b", "int") - override def planRowInputPartitions(): JList[InputPartition[Row]] = { + override def planInputPartitions(): JList[InputPartition[InternalRow]] = { // Note that we don't have same value of column `a` across partitions. java.util.Arrays.asList( new SpecificInputPartitionReader(Array(1, 1, 3), Array(4, 4, 6)), @@ -596,20 +547,20 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport { } class SpecificInputPartitionReader(i: Array[Int], j: Array[Int]) - extends InputPartition[Row] - with InputPartitionReader[Row] { + extends InputPartition[InternalRow] + with InputPartitionReader[InternalRow] { assert(i.length == j.length) private var current = -1 - override def createPartitionReader(): InputPartitionReader[Row] = this + override def createPartitionReader(): InputPartitionReader[InternalRow] = this override def next(): Boolean = { current += 1 current < i.length } - override def get(): Row = Row(i(current), j(current)) + override def get(): InternalRow = InternalRow(i(current), j(current)) override def close(): Unit = {} } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 98d7eedbcb9c..183d0399d3bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path} import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader} import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -42,11 +42,10 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS private val schema = new StructType().add("i", "long").add("j", "long") - class Reader(path: String, conf: Configuration) extends DataSourceReader - with SupportsDeprecatedScanRow { + class Reader(path: String, conf: Configuration) extends DataSourceReader { override def readSchema(): StructType = schema - override def planRowInputPartitions(): JList[InputPartition[Row]] = { + override def planInputPartitions(): JList[InputPartition[InternalRow]] = { val dataPath = new Path(path) val fs = dataPath.getFileSystem(conf) if (fs.exists(dataPath)) { @@ -57,7 +56,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS val serializableConf = new SerializableConfiguration(conf) new SimpleCSVInputPartitionReader( f.getPath.toUri.toString, - serializableConf): InputPartition[Row] + serializableConf): InputPartition[InternalRow] }.toList.asJava } else { Collections.emptyList() @@ -158,13 +157,13 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS } class SimpleCSVInputPartitionReader(path: String, conf: SerializableConfiguration) - extends InputPartition[Row] with InputPartitionReader[Row] { + extends InputPartition[InternalRow] with InputPartitionReader[InternalRow] { @transient private var lines: Iterator[String] = _ @transient private var currentLine: String = _ @transient private var inputStream: FSDataInputStream = _ - override def createPartitionReader(): InputPartitionReader[Row] = { + override def createPartitionReader(): InputPartitionReader[InternalRow] = { val filePath = new Path(path) val fs = filePath.getFileSystem(conf.value) inputStream = fs.open(filePath) @@ -182,7 +181,7 @@ class SimpleCSVInputPartitionReader(path: String, conf: SerializableConfiguratio } } - override def get(): Row = Row(currentLine.split(",").map(_.trim.toLong): _*) + override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toLong): _*) override def close(): Unit = { inputStream.close() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 7c012158bd75..52b833a19c23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -19,22 +19,22 @@ package org.apache.spark.sql.streaming.sources import java.util.Optional -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper} import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport} -import org.apache.spark.sql.sources.v2.reader.{InputPartition, SupportsDeprecatedScanRow} +import org.apache.spark.sql.sources.v2.reader.InputPartition import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset} import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger} import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils -case class FakeReader() extends MicroBatchReader with ContinuousReader - with SupportsDeprecatedScanRow { +case class FakeReader() extends MicroBatchReader with ContinuousReader { def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = {} def getStartOffset: Offset = RateStreamOffset(Map()) def getEndOffset: Offset = RateStreamOffset(Map()) @@ -45,7 +45,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map()) def setStartOffset(start: Optional[Offset]): Unit = {} - def planRowInputPartitions(): java.util.ArrayList[InputPartition[Row]] = { + def planInputPartitions(): java.util.ArrayList[InputPartition[InternalRow]] = { throw new IllegalStateException("fake source - cannot actually read") } }