Skip to content

Commit 8cf2eba

Browse files
committed
Fix UT
1 parent f457f40 commit 8cf2eba

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
3333
import org.apache.spark.rpc.RpcEndpointRef
3434
import org.apache.spark.sql.catalyst.InternalRow
3535
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
36-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
36+
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
3737
import org.apache.spark.sql.connector.read.InputPartition
3838
import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReader, ContinuousPartitionReaderFactory, ContinuousStream, Offset, PartitionOffset}
3939
import org.apache.spark.sql.execution.streaming.{Offset => _, _}
@@ -246,6 +246,8 @@ class TextSocketContinuousPartitionReader(
246246
private var currentOffset = startOffset
247247
private var current: Option[InternalRow] = None
248248

249+
private val projectWithoutTimestamp = UnsafeProjection.create(TextSocketReader.SCHEMA_REGULAR)
250+
249251
override def next(): Boolean = {
250252
try {
251253
current = getRecord
@@ -277,8 +279,7 @@ class TextSocketContinuousPartitionReader(
277279
if (includeTimestamp) {
278280
rec
279281
} else {
280-
InternalRow(rec.get(0, TextSocketReader.SCHEMA_TIMESTAMP)
281-
.asInstanceOf[(String, Timestamp)]._1)
282+
projectWithoutTimestamp(rec)
282283
}
283284
)
284285
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
318318
for (i <- 0 until numRecords / 2) {
319319
r.next()
320320
offsets.append(r.getOffset().asInstanceOf[ContinuousRecordPartitionOffset].offset)
321-
data.append(r.get().get(0, DataTypes.StringType).asInstanceOf[String].toInt)
321+
data.append(r.get().getString(0).toInt)
322322
// commit the offsets in the middle and validate if processing continues
323323
if (i == 2) {
324324
commitOffset(t.partitionId, i + 1)
@@ -381,7 +381,10 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
381381
val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
382382
for (_ <- 0 until numRecords / 2) {
383383
r.next()
384-
assert(r.get().get(0, TextSocketReader.SCHEMA_TIMESTAMP).isInstanceOf[(_, _)])
384+
assert(r.get().numFields === 2)
385+
// just try to read columns one by one - it would throw error if the row is corrupted
386+
r.get().getString(0)
387+
r.get().getLong(1)
385388
}
386389
case _ => throw new IllegalStateException("Unexpected task type")
387390
}

0 commit comments

Comments
 (0)