Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,9 @@ class HadoopMapReduceCommitProtocol(
}

override def setupJob(jobContext: JobContext): Unit = {
// Setup IDs
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(jobId, TaskType.MAP, 0)
val taskAttemptId = new TaskAttemptID(taskId, 0)

// Set up the configuration object
jobContext.getConfiguration.set("mapreduce.job.id", jobId.toString)
jobContext.getConfiguration.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString)
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)

// Create a dummy [[TaskAttemptContextImpl]] with configuration to get [[OutputCommitter]]
// instance on Spark driver. Note that the job/task/attampt id doesn't matter here.
val taskAttemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
committer.setupJob(jobContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto
OrcColumnVector(DataType type, ColumnVector vector) {
super(type);

if (type instanceof TimestampType) {
isTimestamp = true;
} else {
isTimestamp = false;
}
isTimestamp = type instanceof TimestampType;

baseData = vector;
if (vector instanceof LongColumnVector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ private int readIntLittleEndian() throws IOException {
int ch3 = in.read();
int ch2 = in.read();
int ch1 = in.read();
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
}

/**
Expand All @@ -592,7 +592,7 @@ private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
int ch3 = in.read();
int ch2 = in.read();
int ch1 = in.read();
return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
return (ch1 << 16) + (ch2 << 8) + (ch3);
}
case 4: {
return readIntLittleEndian();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private[sql] object SQLUtils extends Logging {
dataType match {
case 's' =>
// Read StructType for DataFrame
val fields = SerDe.readList(dis, jvmObjectTracker = null).asInstanceOf[Array[Object]]
val fields = SerDe.readList(dis, jvmObjectTracker = null)
Row.fromSeq(fields)
case _ => null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,10 @@ object ViewHelper {
}

// Detect cyclic references from subqueries.
plan.expressions.foreach { expr =>
expr match {
case s: SubqueryExpression =>
checkCyclicViewReference(s.plan, path, viewIdent)
case _ => // Do nothing.
}
plan.expressions.foreach {
case s: SubqueryExpression =>
checkCyclicViewReference(s.plan, path, viewIdent)
case _ => // Do nothing.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,17 @@ object FileFormatWriter extends Logging {
iterator: Iterator[InternalRow]): WriteTaskResult = {

val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)

// Set up the attempt context required to use in the output committer.
val taskAttemptContext: TaskAttemptContext = {
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
// Set up the configuration object
val hadoopConf = description.serializableHadoopConf.value
hadoopConf.set("mapreduce.job.id", jobId.toString)
hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapreduce.task.ismap", true)
hadoopConf.setInt("mapreduce.task.partition", 0)

new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
}
Expand Down Expand Up @@ -378,7 +377,7 @@ object FileFormatWriter extends Logging {
dataSchema = description.dataColumns.toStructType,
context = taskAttemptContext)

statsTrackers.map(_.newFile(currentPath))
statsTrackers.foreach(_.newFile(currentPath))
}

override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
Expand Down Expand Up @@ -429,10 +428,10 @@ object FileFormatWriter extends Logging {
committer: FileCommitProtocol) extends ExecuteWriteTask {

/** Flag saying whether or not the data to be written out is partitioned. */
val isPartitioned = desc.partitionColumns.nonEmpty
private val isPartitioned = desc.partitionColumns.nonEmpty

/** Flag saying whether or not the data to be written out is bucketed. */
val isBucketed = desc.bucketIdExpression.isDefined
private val isBucketed = desc.bucketIdExpression.isDefined

assert(isPartitioned || isBucketed,
s"""DynamicPartitionWriteTask should be used for writing out data that's either
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class SQLAppStatusListener(
.filter { case (id, _) => metricIds.contains(id) }
.groupBy(_._1)
.map { case (id, values) =>
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2))
}

// Check the execution again for whether the aggregated metrics data has been calculated.
Expand Down