-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48484][SQL] Fix: V2Write use the same TaskAttemptId for different task attempts #46811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
also cc @yaooqinn @HyukjinKwon @cloud-fan @pan3793 |
yaooqinn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Can we have a test case?
|
Please remove the [MINOR] tag and file a Jira ticket for this |
|
|
||
| override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { | ||
| val taskAttemptContext = createTaskAttemptContext(partitionId) | ||
| val taskAttemptContext = createTaskAttemptContext(partitionId, realTaskId.toInt & Int.MaxValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it the same as math.abs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it same as Math.abs(realTaskId.toInt)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala
Line 97 in 80addbb
| val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue |
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
Line 263 in 45ba922
| sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, |
There are at least two other similar cases here, should we unify them as math.abs? Of course, this should be another PR. @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think perf matters here, and math.abs is definitely more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, we can unify the above three cases to math.abs in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan If overflow, realTaskId.toInt & Int.MaxValue and math.abs are not equal:
scala> val realTaskId = Long.MaxValue
val realTaskId: Long = 9223372036854775807
scala> val a = realTaskId.toInt
val a: Int = -1
scala> val b = realTaskId.toInt & Int.MaxValue
val b: Int = 2147483647
scala> val c= math.abs(realTaskId.toInt)
val c: Int = 1scala> val realTaskId = Int.MaxValue.toLong + 1
val realTaskId: Long = 2147483648
scala> val a = realTaskId.toInt
val a: Int = -2147483648
scala> val b = realTaskId.toInt & Int.MaxValue
val b: Int = 0
scala> val c= math.abs(realTaskId.toInt)
val c: Int = -2147483648Meanwhile, when an overflow occurs, math.abs may still return a negative value, so I suggest we continue using & Int.MaxValue
Sure, I will add a follow up for SPARK-42478 and a suite test for this pr. |
|
No,SPARK-42478 is not part of the Spark 4.0 cycle, please use a new jira ticket. @jackylee-ch |
|
create SPARK-48484 for this one @jackylee-ch |
| private[this] val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date) | ||
| @transient private lazy val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, 0) | ||
|
|
||
| override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this pr, why not naming it as taskAttemptId ? does realTaskId can be something else ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use PrivateMethodTester in FileWriterFactorySuite to avoid expanding the scope of this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
yikf
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @jackylee-ch , lgtm
1988368 to
74782eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we just check createTaskAttemptContext, do we really need to inherit from SharedSparkSession? Can we just inherit from SparkFunSuite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a Configuration here as it will be used in createTaskAttemptContext. It's ok to me that we just create a new Configuration.
74782eb to
3bb15e5
Compare
...re/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileWriterFactorySuite.scala
Outdated
Show resolved
Hide resolved
…rces/v2/FileWriterFactorySuite.scala
…ent task attempts ### What changes were proposed in this pull request? After #40064 , we always get the same TaskAttemptId for different task attempts which has the same partitionId. This would lead different task attempts write to the same directory. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46811 from jackylee-ch/fix_v2write_use_same_directories_for_different_task_attempts. Lead-authored-by: jackylee-ch <lijunqing@baidu.com> Co-authored-by: Kent Yao <yao@apache.org> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 67d11b1) Signed-off-by: yangjie01 <yangjie01@baidu.com>
…ent task attempts ### What changes were proposed in this pull request? After #40064 , we always get the same TaskAttemptId for different task attempts which has the same partitionId. This would lead different task attempts write to the same directory. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46811 from jackylee-ch/fix_v2write_use_same_directories_for_different_task_attempts. Lead-authored-by: jackylee-ch <lijunqing@baidu.com> Co-authored-by: Kent Yao <yao@apache.org> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 67d11b1) Signed-off-by: yangjie01 <yangjie01@baidu.com>
|
Merged into master/3.5/3.4. thanks @jackylee-ch @yaooqinn @cloud-fan @ulysses-you @yikf |
What changes were proposed in this pull request?
After #40064 , we always get the same TaskAttemptId for different task attempts which has the same partitionId. This would lead different task attempts write to the same directory.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
GA
Was this patch authored or co-authored using generative AI tooling?
No.