Skip to content

[SPARK-52060][SQL] Make OneRowRelationExec node #50849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

richardc-db
Copy link
Contributor

What changes were proposed in this pull request?

creates a new OneRowRelationExec node, which is more or less a copy of the RDDScanExec node.

We want a dedicated node because this helps make it more clear when a one row relation, i.e. for patterns like SELECT version() is used.

Why are the changes needed?

this makes it more clear in the code that a one row relation is used and allows us to avoid checking the hard coded "OneRowRelation" string when pattern matching.

Does this PR introduce any user-facing change?

yes, the plan will now be OneRowRelationExec rather than RDDScanExec. The plan string should be the same, however.

How was this patch tested?

added UTs

Was this patch authored or co-authored using generative AI tooling?

@richardc-db richardc-db changed the title [SQL] Make OneRowRelationExec node [SPARK-52060][SQL] Make OneRowRelationExec node May 9, 2025
@github-actions github-actions bot added the SQL label May 9, 2025
}

override def simpleString(maxFields: Int): String = {
s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from the default implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default implementation returns Scan OneRowRelation, while the existing implementation (using RDDScan) returns Scan OneRowRelation[]. I figured we shouldn't change this in the off chance that someone is relying on it.

@richardc-db richardc-db requested a review from cloud-fan May 19, 2025 22:19
@richardc-db
Copy link
Contributor Author

there are failures like

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.errors.QueryExecutionErrorsSuite, threads: rpc-boss-1245-1 (daemon=true) =====


[info] org.apache.spark.sql.errors.QueryExecutionErrorsSuite *** ABORTED *** (37 milliseconds)
[info]   java.lang.IllegalStateException: Shutdown hooks cannot be modified during shutdown.
[info]   at org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:212)

which i also see in other PRs such as here, so i think the failures are unrelated


private val emptyRow: InternalRow = InternalRow.empty

private val rdd = session.sparkContext.parallelize(Seq(emptyRow), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this

private val rdd = {
  val proj = UnsafeProjection.create(schema)
  val emptyRow = proj(InternalRow.empty)
  session.sparkContext.parallelize(Seq(emptyRow), 1)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then def doExecute() can just return this RDD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, sure i implemented this - but because we still want to increment the number of output rows at the time when the row is actually processed, i did not end up simply returning rdd from doExecute()... lmk if you think there a better way.


override def inputRDD: RDD[InternalRow] = rdd

override protected val createUnsafeProjection: Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richardc-db richardc-db requested a review from cloud-fan May 20, 2025 20:30
@cloud-fan
Copy link
Contributor

@richardc-db can you do a rebase to re-trigger the CI? We fixed an OOM test issue recently.

@richardc-db richardc-db force-pushed the make_one_row_relation_node branch from c993f51 to eef16fe Compare May 22, 2025 15:14
@richardc-db
Copy link
Contributor Author

hmm, @cloud-fan, seems like this causes a test to fail with org.apache.spark.sql.CacheTableInKryoSuite. The error is

java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.sql.catalyst.expressions.UnsafeRow[]

I'm guessing this is because the inputRDD has unsafe rows? The test passes after undoing the change described here... do you think we should undo this change and go back to adding an unsafe projection in doExecute, like so:

protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    rdd.mapPartitionsWithIndexInternal { (_, iter) =>
      val proj = UnsafeProjection.create(schema)
      iter.map { r =>
        numOutputRows += 1
        proj(r)
      }
    }
  }

and setting createUnsafeProjection: Boolean = true

@cloud-fan
Copy link
Contributor

@richardc-db let's change it back, didn't realize this serde issue...

@richardc-db
Copy link
Contributor Author

@cloud-fan sounds good! I switched it back and seems like tests pass now


override val output: Seq[Attribute] = Nil

private val rdd: RDD[InternalRow] = session.sparkContext.parallelize(Seq(InternalRow.empty), 1)
Copy link
Contributor

@cloud-fan cloud-fan May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about it more, I think we can avoid serializing any row:

session.sparkContext.parallelize(Nil, 1).mapPartitionsInternal { _ =>
  val proj = UnsafeProjection.create(Seq.empty[Expression])
  Iterator(proj.apply(InternalRow.empty))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the unsafe row is generated on the fly at the worker side, no serialization is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for the help! ended up doing

private val rdd: RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    session
      .sparkContext
      .parallelize(Seq(InternalRow()), 1)
      .mapPartitionsInternal { _ =>
        val proj = UnsafeProjection.create(Seq.empty[Expression])
        Iterator(proj.apply(InternalRow.empty)).map { r =>
          numOutputRows += 1
          r
        }
      }
  }

to ensure the metrics are filled properly.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in f423885 May 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants