diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3e60c50ada59..163511b7ffa3 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -145,18 +145,9 @@ class HadoopMapReduceCommitProtocol( } override def setupJob(jobContext: JobContext): Unit = { - // Setup IDs - val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0) - val taskId = new TaskID(jobId, TaskType.MAP, 0) - val taskAttemptId = new TaskAttemptID(taskId, 0) - - // Set up the configuration object - jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString) - jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) - jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString) - jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true) - jobContext.getConfiguration.setInt("mapreduce.task.partition", 0) - + // Create a dummy [[TaskAttemptContextImpl]] with configuration to get [[OutputCommitter]] + // instance on Spark driver. Note that the job/task/attampt id doesn't matter here. + val taskAttemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) committer.setupJob(jobContext) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 12f4d658b186..fcf73e8d7ae6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -47,11 +47,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto OrcColumnVector(DataType type, ColumnVector vector) { super(type); - if (type instanceof TimestampType) { - isTimestamp = true; - } else { - isTimestamp = false; - } + isTimestamp = type instanceof TimestampType; baseData = vector; if (vector instanceof LongColumnVector) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index fe3d31ae8e74..de0d65a1e090 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -571,7 +571,7 @@ private int readIntLittleEndian() throws IOException { int ch3 = in.read(); int ch2 = in.read(); int ch1 = in.read(); - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4)); } /** @@ -592,7 +592,7 @@ private int readIntLittleEndianPaddedOnBitWidth() throws IOException { int ch3 = in.read(); int ch2 = in.read(); int ch1 = in.read(); - return (ch1 << 16) + (ch2 << 8) + (ch3 << 0); + return (ch1 << 16) + (ch2 << 8) + (ch3); } case 4: { return readIntLittleEndian(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index af20764f9a96..265a84b39a42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -188,7 +188,7 @@ private[sql] object SQLUtils extends Logging { dataType match { case 's' => // Read StructType for DataFrame - val fields = SerDe.readList(dis, jvmObjectTracker = null).asInstanceOf[Array[Object]] + val fields = SerDe.readList(dis, jvmObjectTracker = null) Row.fromSeq(fields) case _ => null } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 5172f32ec7b9..6373584b10e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -410,12 +410,10 @@ object ViewHelper { } // Detect cyclic references from subqueries. - plan.expressions.foreach { expr => - expr match { - case s: SubqueryExpression => - checkCyclicViewReference(s.plan, path, viewIdent) - case _ => // Do nothing. - } + plan.expressions.foreach { + case s: SubqueryExpression => + checkCyclicViewReference(s.plan, path, viewIdent) + case _ => // Do nothing. } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 401597f96721..681bb1df6bba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -244,18 +244,17 @@ object FileFormatWriter extends Logging { iterator: Iterator[InternalRow]): WriteTaskResult = { val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId) - val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) - val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) // Set up the attempt context required to use in the output committer. val taskAttemptContext: TaskAttemptContext = { + val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) + val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) // Set up the configuration object val hadoopConf = description.serializableHadoopConf.value hadoopConf.set("mapreduce.job.id", jobId.toString) hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) hadoopConf.setBoolean("mapreduce.task.ismap", true) - hadoopConf.setInt("mapreduce.task.partition", 0) new TaskAttemptContextImpl(hadoopConf, taskAttemptId) } @@ -378,7 +377,7 @@ object FileFormatWriter extends Logging { dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) - statsTrackers.map(_.newFile(currentPath)) + statsTrackers.foreach(_.newFile(currentPath)) } override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = { @@ -429,10 +428,10 @@ object FileFormatWriter extends Logging { committer: FileCommitProtocol) extends ExecuteWriteTask { /** Flag saying whether or not the data to be written out is partitioned. */ - val isPartitioned = desc.partitionColumns.nonEmpty + private val isPartitioned = desc.partitionColumns.nonEmpty /** Flag saying whether or not the data to be written out is bucketed. */ - val isBucketed = desc.bucketIdExpression.isDefined + private val isBucketed = desc.bucketIdExpression.isDefined assert(isPartitioned || isBucketed, s"""DynamicPartitionWriteTask should be used for writing out data that's either diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index d254af400a7c..2c4d0bcf103f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -170,7 +170,7 @@ class SQLAppStatusListener( .filter { case (id, _) => metricIds.contains(id) } .groupBy(_._1) .map { case (id, values) => - id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq) + id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2)) } // Check the execution again for whether the aggregated metrics data has been calculated.