From d2cadcc8a91521ee56cb99ccfb163b5026f9a71a Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 6 Feb 2023 12:58:38 +0800 Subject: [PATCH] fix --- .../spark/sql/kafka010/KafkaOffsetReaderConsumer.scala | 5 ++--- .../spark/deploy/k8s/KubernetesVolumeUtils.scala | 2 +- .../spark/sql/catalyst/expressions/Expression.scala | 2 +- .../sql/execution/datasources/orc/OrcFileFormat.scala | 10 ++++------ .../datasources/parquet/ParquetFileFormat.scala | 10 ++++------ .../AsyncProgressTrackingMicroBatchExecution.scala | 6 ++---- 6 files changed, 14 insertions(+), 21 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index 10c7488de8968..a1f7f71d5f302 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -435,11 +435,10 @@ private[kafka010] class KafkaOffsetReaderConsumer( // Calculate offset ranges val offsetRangesBase = untilPartitionOffsets.keySet.map { tp => - val fromOffset = fromPartitionOffsets.get(tp).getOrElse { + val fromOffset = fromPartitionOffsets.getOrElse(tp, // This should not happen since topicPartitions contains all partitions not in // fromPartitionOffsets - throw new IllegalStateException(s"$tp doesn't have a from offset") - } + throw new IllegalStateException(s"$tp doesn't have a from offset")) val untilOffset = untilPartitionOffsets(tp) KafkaOffsetRange(tp, fromOffset, untilOffset, None) }.toSeq diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index b2eacca042794..18fda708d9bbb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -38,7 +38,7 @@ private[spark] object KubernetesVolumeUtils { KubernetesVolumeSpec( volumeName = volumeName, mountPath = properties(pathKey), - mountSubPath = properties.get(subPathKey).getOrElse(""), + mountSubPath = properties.getOrElse(subPathKey, ""), mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean), volumeConf = parseVolumeSpecificConf(properties, volumeType, volumeName)) }.toSeq diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 7d5169ca8ef92..37ec4802993fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -615,7 +615,7 @@ trait SupportQueryContext extends Expression with Serializable { def initQueryContext(): Option[SQLQueryContext] - def getContextOrNull(): SQLQueryContext = queryContext.getOrElse(null) + def getContextOrNull(): SQLQueryContext = queryContext.orNull def getContextOrNullCode(ctx: CodegenContext, withErrorContext: Boolean = true): String = { if (withErrorContext && queryContext.isDefined) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index cb18566e848fa..a669adb29e762 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -140,12 +140,10 @@ class OrcFileFormat // Should always be set by FileSourceScanExec creating this. // Check conf before checking option, to allow working around an issue by changing conf. val enableVectorizedReader = sqlConf.orcVectorizedReaderEnabled && - options.get(FileFormat.OPTION_RETURNING_BATCH) - .getOrElse { - throw new IllegalArgumentException( - "OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " + - "To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.") - } + options.getOrElse(FileFormat.OPTION_RETURNING_BATCH, + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " + + "To workaround this issue, set spark.sql.orc.enableVectorizedReader=false.")) .equals("true") if (enableVectorizedReader) { // If the passed option said that we are to return batches, we need to also be able to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index afa00aa6f3737..c0f38aef1fe82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -182,12 +182,10 @@ class ParquetFileFormat // Should always be set by FileSourceScanExec creating this. // Check conf before checking option, to allow working around an issue by changing conf. val returningBatch = sparkSession.sessionState.conf.parquetVectorizedReaderEnabled && - options.get(FileFormat.OPTION_RETURNING_BATCH) - .getOrElse { - throw new IllegalArgumentException( - "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + - "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.") - } + options.getOrElse(FileFormat.OPTION_RETURNING_BATCH, + throw new IllegalArgumentException( + "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + + "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")) .equals("true") if (returningBatch) { // If the passed option said that we are to return batches, we need to also be able to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala index 2040881b852b6..f7c7aab65e201 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala @@ -232,10 +232,8 @@ class AsyncProgressTrackingMicroBatchExecution( private def validateAndGetTrigger(): TriggerExecutor = { // validate that the pipeline is using a supported sink if (!extraOptions - .get( - ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK - ) - .getOrElse("false") + .getOrElse( + ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK, "false") .toBoolean) { try { plan.sink.name() match {