Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ case class FlatMapGroupsWithStateExec(
val encSchemaAttribs = stateEncoder.schema.toAttributes
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
// Get the serializer for the state, taking into account whether we need to save timestamps
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
}
// Get the deserializer for the state. Note that this must be done in the driver, as
// resolving and binding of deserializer expressions to the encoded type can be safely done
// only in the driver.
private val stateDeserializer = stateEncoder.resolveAndBind().deserializer


/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
Expand Down Expand Up @@ -139,19 +153,9 @@ case class FlatMapGroupsWithStateExec(
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)

// Converter for translating state rows to Java objects
// Converters for translating state between rows and Java objects
private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
stateEncoder.resolveAndBind().deserializer, stateAttributes)

// Converter for translating state Java objects to rows
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
}
stateDeserializer, stateAttributes)
private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)

// Index of the additional metadata fields in the state row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("max files per trigger - incorrect values") {
testQuietly("max files per trigger - incorrect values") {
val testTable = "maxFilesPerTrigger_test"
withTable(testTable) {
withTempDir { case src =>
Expand Down Expand Up @@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

import testImplicits._

test("file source stress test") {
testQuietly("file source stress test") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.Date
import java.util.concurrent.ConcurrentHashMap

import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout

import org.apache.spark.SparkException
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
Expand Down Expand Up @@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assertNumStateRows(total = 1, updated = 2),

StopStream,
StartStream(ProcessingTime("1 second"), triggerClock = clock),
AdvanceManualClock(10 * 1000),
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),

AddData(inputData, "c"),
AdvanceManualClock(1 * 1000),
AdvanceManualClock(11 * 1000),
CheckLastBatch(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class StreamSuite extends StreamTest {
CheckAnswer((1, 2), (2, 2), (3, 2)))
}

test("recover from a Spark v2.1 checkpoint") {
testQuietly("recover from a Spark v2.1 checkpoint") {
var inputData: MemoryStream[Int] = null
var query: DataStreamWriter[Row] = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,27 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {

case a: AddData =>
try {
// Add data and get the source where it was added, and the expected offset of the
// added data.

// If the query is running with manual clock, then wait for the stream execution
// thread to start waiting for the clock to increment. This is needed so that we
// are adding data when there is no trigger that is active. This would ensure that
// the data gets deterministically added to the next batch triggered after the manual
// clock is incremented in following AdvanceManualClock. This avoid race conditions
// between the test thread and the stream execution thread in tests using manual
// clock.
if (currentStream != null &&
currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
eventually("Error while synchronizing with manual clock before adding data") {
if (currentStream.isActive) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (!currentStream.isActive) {
failTest("Query terminated while synchronizing with manual clock")
}
}
// Add data
val queryToUse = Option(currentStream).orElse(Option(lastStream))
val (source, offset) = a.addData(queryToUse)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

test("StreamingQuery should be Serializable but cannot be used in executors") {
testQuietly("StreamingQuery should be Serializable but cannot be used in executors") {
def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = {
ds.writeStream
.queryName(queryName)
Expand Down