diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 164684454869..aa893ba8110e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -73,8 +73,14 @@ object ExpressionEncoder { * Given a set of N encoders, constructs a new encoder that produce objects as items in an * N-tuple. Note that these encoders should be unresolved so that information about * name/positional binding is preserved. + * When `useNullSafeDeserializer` is true, the deserialization result for a child will be null if + * the input is null. It is false by default as most deserializers handle null input properly and + * don't require an extra null check. Some of them are null-tolerant, such as the deserializer for + * `Option[T]`, and we must not set it to true in this case. */ - def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { + def tuple( + encoders: Seq[ExpressionEncoder[_]], + useNullSafeDeserializer: Boolean = false): ExpressionEncoder[_] = { if (encoders.length > 22) { throw QueryExecutionErrors.elementsOfTupleExceedLimitError() } @@ -119,7 +125,7 @@ object ExpressionEncoder { case GetColumnByOrdinal(0, _) => input } - if (enc.objSerializer.nullable) { + if (useNullSafeDeserializer && enc.objSerializer.nullable) { nullSafe(input, childDeserializer) } else { childDeserializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c2e51b574df7..c29fd968fc19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1246,7 +1246,9 @@ class Dataset[T] private[sql]( JoinHint.NONE)).analyzed.asInstanceOf[Join] implicit val tuple2Encoder: Encoder[(T, U)] = - ExpressionEncoder.tuple(this.exprEnc, other.exprEnc) + ExpressionEncoder + .tuple(Seq(this.exprEnc, other.exprEnc), useNullSafeDeserializer = true) + .asInstanceOf[Encoder[(T, U)]] withTypedPlan(JoinWith.typedJoinWith( joined, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 91057dcc98e0..16a493b52909 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2576,6 +2576,18 @@ class DatasetSuite extends QueryTest assert(result == expected) } + test("SPARK-47385: Tuple encoder with Option inputs") { + implicit val enc: Encoder[(SingleData, Option[SingleData])] = + Encoders.tuple(Encoders.product[SingleData], Encoders.product[Option[SingleData]]) + + val input = Seq( + (SingleData(1), Some(SingleData(1))), + (SingleData(2), None) + ) + val ds = spark.createDataFrame(input).as[(SingleData, Option[SingleData])] + checkDataset(ds, input: _*) + } + test("SPARK-43124: Show does not trigger job execution on CommandResults") { withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "") { withTable("t1") {