File tree Expand file tree Collapse file tree 1 file changed +11
-11
lines changed
sql/core/src/test/scala/org/apache/spark/sql/streaming Expand file tree Collapse file tree 1 file changed +11
-11
lines changed Original file line number Diff line number Diff line change @@ -633,18 +633,18 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
633633 // Try to find the index of the source to which data was added. Either get the index
634634 // from the current active query or the original input logical plan.
635635 val sourceIndex =
636- queryToUse.flatMap { query =>
637- findSourceIndex(query.logicalPlan)
638- }.orElse {
639- findSourceIndex(stream.logicalPlan)
640- }.orElse {
641- queryToUse.flatMap { q =>
642- findSourceIndex(q.lastExecution.logical)
636+ queryToUse.flatMap { query =>
637+ findSourceIndex(query.logicalPlan)
638+ }.orElse {
639+ findSourceIndex(stream.logicalPlan)
640+ }.orElse {
641+ queryToUse.flatMap { q =>
642+ findSourceIndex(q.lastExecution.logical)
643+ }
644+ }.getOrElse {
645+ throw new IllegalArgumentException (
646+ " Could not find index of the source to which data was added" )
643647 }
644- }.getOrElse {
645- throw new IllegalArgumentException (
646- " Could not find index of the source to which data was added" )
647- }
648648
649649 // Store the expected offset of added data to wait for it later
650650 awaiting.put(sourceIndex, offset)
You can’t perform that action at this time.
0 commit comments