diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala index bc29f40bd7344..3ce350c1fcb61 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala @@ -41,7 +41,7 @@ class SplitInfo( hashCode = hashCode * 31 + hostLocation.hashCode hashCode = hashCode * 31 + path.hashCode // ignore overflow ? It is hashcode anyway ! - hashCode = hashCode * 31 + (length & 0x7fffffff).toInt + hashCode = hashCode * 31 + length.toInt hashCode } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 505a5a6169204..39ff76278e1bf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -186,7 +186,7 @@ abstract class InMemoryBaseTable( ) var dataTypeHashCode = 0 valueTypePairs.foreach(dataTypeHashCode += _._2.hashCode()) - ((valueHashCode + 31 * dataTypeHashCode) & Integer.MAX_VALUE) % numBuckets + Math.abs(valueHashCode + 31 * dataTypeHashCode) % numBuckets case NamedTransform("truncate", Seq(ref: NamedReference, length: Literal[_])) => extractor(ref.fieldNames, cleanedSchema, row) match { case (str: UTF8String, StringType) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 91749ddd794fb..02852bae73e89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -245,7 +245,7 @@ object FileFormatWriter extends Logging { jobTrackerID = jobTrackerID, sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), - sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, + sparkAttemptNumber = Math.abs(taskContext.taskAttemptId().toInt), committer, iterator = iter, concurrentOutputWriterSpec = concurrentOutputWriterSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala index c6c34b7fcea3f..add12f140000c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala @@ -94,7 +94,7 @@ case class WriteFilesExec( rddWithNonEmptyPartitions.mapPartitionsInternal { iterator => val sparkStageId = TaskContext.get().stageId() val sparkPartitionId = TaskContext.get().partitionId() - val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue + val sparkAttemptNumber = Math.abs(TaskContext.get().taskAttemptId().toInt) val ret = FileFormatWriter.executeTask( description, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala index f18424b4bcb86..dade0d3fcd255 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactory.scala @@ -38,7 +38,7 @@ case class FileWriterFactory ( @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0) override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { - val taskAttemptContext = createTaskAttemptContext(partitionId, realTaskId.toInt & Int.MaxValue) + val taskAttemptContext = createTaskAttemptContext(partitionId, Math.abs(realTaskId.toInt)) committer.setupTask(taskAttemptContext) if (description.partitionColumns.isEmpty) { new SingleDirectoryDataWriter(description, taskAttemptContext, committer) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 10a32441b6cd9..0d11cb6e3ec10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -122,7 +122,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { Seq(TransformExpression(BucketFunction, Seq(attr("ts")), Some(32)))) // Has exactly one partition. - val partitionValues = Seq(31).map(v => InternalRow.fromSeq(Seq(v))) + val partitionValues = Seq(1).map(v => InternalRow.fromSeq(Seq(v))) checkQueryPlan(df, distribution, physical.KeyGroupedPartitioning(distribution.clustering, 1, partitionValues, partitionValues)) }