Skip to content

Conversation

@dependabot
Copy link

@dependabot dependabot bot commented on behalf of github Oct 13, 2022

Bumps protobuf-java from 3.21.1 to 3.21.7.

Commits
  • 54489e9 Updating changelog
  • 5fc03e1 Updating version.json and repo version numbers to: 21.7
  • a3888f5 Clean up TextFormat parser (#10673)
  • 3b5301c Refactoring Java parsing (21.x) (#10665)
  • bea6726 Merge pull request #10579 from protocolbuffers/21.x-202209142140
  • b1924e1 Update version.json to: 21.7-dev
  • 929e13d Merge pull request #10572 from deannagarcia/21.x
  • de7597e Update python/release.sh to handle delay between twine upload and pip install...
  • 480bd3b Merge pull request #10557 from protocolbuffers/21.x-202209132118
  • aa8c73d Updating changelog
  • Additional commits viewable in compare view

Dependabot compatibility score

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


Dependabot commands and options

You can trigger Dependabot actions by commenting on this PR:

  • @dependabot rebase will rebase this PR
  • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
  • @dependabot merge will merge this PR after your CI passes on it
  • @dependabot squash and merge will squash and merge this PR after your CI passes on it
  • @dependabot cancel merge will cancel a previously requested merge and block automerging
  • @dependabot reopen will reopen this PR if it is closed
  • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
  • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
  • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
  • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
  • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
  • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
  • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
  • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

You can disable automated security fix PRs for this repo from the Security Alerts page.

Bumps [protobuf-java](https://github.com/protocolbuffers/protobuf) from 3.21.1 to 3.21.7.
- [Release notes](https://github.com/protocolbuffers/protobuf/releases)
- [Changelog](https://github.com/protocolbuffers/protobuf/blob/main/generate_changelog.py)
- [Commits](protocolbuffers/protobuf@v3.21.1...v3.21.7)

---
updated-dependencies:
- dependency-name: com.google.protobuf:protobuf-java
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
@dependabot dependabot bot added dependencies Pull requests that update a dependency file java Pull requests that update Java code labels Oct 13, 2022
@wangyum wangyum closed this Oct 27, 2022
@dependabot @github
Copy link
Author

dependabot bot commented on behalf of github Oct 27, 2022

OK, I won't notify you again about this release, but will get in touch when a new version is available. If you'd rather skip all updates until the next major or minor version, let me know by commenting @dependabot ignore this major version or @dependabot ignore this minor version.

If you change your mind, just re-open this PR and I'll resolve any conflicts on it.

@dependabot dependabot bot deleted the dependabot/maven/connector/protobuf/com.google.protobuf-protobuf-java-3.21.7 branch October 27, 2022 08:07
pull bot pushed a commit that referenced this pull request Oct 3, 2023
…edExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.

```
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).

```
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following stack trace.

```
    "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
```

### Relevant code paths

Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.

### What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object.  For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">

- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">

- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

Closes apache#43021 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <xiongbo@amazon.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
wangyum pushed a commit that referenced this pull request Jun 26, 2024
…edExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.

```
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).

```
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following stack trace.

```
    "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
```

### Relevant code paths

Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.

### What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object.  For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">

- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">

- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

Closes apache#43021 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <xiongbo@amazon.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
wangyum pushed a commit that referenced this pull request Nov 20, 2024
…edExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.

```
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).

```
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following stack trace.

```
    "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
```

### Relevant code paths

Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.

### What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object.  For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">

- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">

- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

Closes apache#43021 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <xiongbo@amazon.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
pull bot pushed a commit that referenced this pull request Jan 23, 2025
…IN-subquery

### What changes were proposed in this pull request?

This PR adds code to `RewritePredicateSubquery#apply` to explicitly handle the case where an `Aggregate` node contains an aggregate expression in the left-hand operand of an IN-subquery expression. The explicit handler moves the IN-subquery expressions out of the `Aggregate` and into a parent `Project` node. The `Aggregate` will continue to perform the aggregations that were used as an operand to the IN-subquery expression, but will not include the IN-subquery expression itself. After pulling up IN-subquery expressions into a Project node, `RewritePredicateSubquery#apply` is called again to handle the `Project` as a `UnaryNode`. The `Join` will now be inserted between the `Project` and the `Aggregate` node, and the join condition will use an attribute rather than an aggregate expression, e.g.:
```
Project [col1#32, exists#42 AS (sum(col2) IN (listquery()))#40]
+- Join ExistenceJoin(exists#42), (sum(col2)#41L = c2#39L)
   :- Aggregate [col1#32], [col1#32, sum(col2#33) AS sum(col2)#41L]
   :  +- LocalRelation [col1#32, col2#33]
   +- LocalRelation [c2#39L]
```
`sum(col2)#41L` in the above join condition, despite how it looks, is the name of the attribute, not an aggregate expression.

### Why are the changes needed?

The following query fails:
```
create or replace temp view v1(c1, c2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1);
create or replace temp view v2(col1, col2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1);

select col1, sum(col2) in (select c2 from v1)
from v2 group by col1;
```
It fails with this error:
```
[INTERNAL_ERROR] Cannot generate code for expression: sum(input[1, int, false]) SQLSTATE: XX000
```
With SPARK_TESTING=1, it fails with this error:
```
[PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch RewriteSubquery generated an invalid plan: Special expressions are placed in the wrong plan:
Aggregate [col1#11], [col1#11, first(exists#20, false) AS (sum(col2) IN (listquery()))#19]
+- Join ExistenceJoin(exists#20), (sum(col2#12) = c2#18L)
   :- LocalRelation [col1#11, col2#12]
   +- LocalRelation [c2#18L]
```
The issue is that `RewritePredicateSubquery` builds a `Join` operator where the join condition contains an aggregate expression.

The bug is in the handler for `UnaryNode` in `RewritePredicateSubquery#apply`, which adds a `Join` below the `Aggregate` and assumes that the left-hand operand of IN-subquery can be used in the join condition. This works fine for most cases, but not when the left-hand operand is an aggregate expression.

This PR moves the offending IN-subqueries to a `Project` node, with the aggregates replaced by attributes referring to the aggregate expressions. The resulting join condition now uses those attributes rather than the actual aggregate expressions.

### Does this PR introduce _any_ user-facing change?

No, other than allowing this type of query to succeed.

### How was this patch tested?

New unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48627 from bersprockets/aggregate_in_set_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
pull bot pushed a commit that referenced this pull request Nov 1, 2025
### What changes were proposed in this pull request?

This PR proposes to add `doCanonicalize` function for DataSourceV2ScanRelation. The implementation is similar to [the one in BatchScanExec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150), as well as the [the one in LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52).

### Why are the changes needed?

Query optimization rules such as MergeScalarSubqueries check if two plans are identical by [comparing their canonicalized form](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala#L219). For DSv2, for physical plan, the canonicalization goes down in the child hierarchy to the BatchScanExec, which [has a doCanonicalize function](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L150); for logical plan, the canonicalization goes down to the DataSourceV2ScanRelation, which, however, does not have a doCanonicalize function. As a result, two logical plans who are semantically identical are not identified.

Moreover, for reference, [DSv1 LogicalRelation](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala#L52) also has `doCanonicalize()`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A new unit test is added to show that `MergeScalarSubqueries` is working for DataSourceV2ScanRelation.

For a query
```sql
select (select max(i) from df) as max_i, (select min(i) from df) as min_i
```

Before introducing the canonicalization, the plan is
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- Project [i#0]
:  :     +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- Project [i#10]
:        +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5]
      :  :- Subquery subquery#2, [id=#32]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- Subquery subquery#4, [id=#33]
      :     +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                        +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                           +- *(1) Project [i#10]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
                  +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                     +- Project [i#10]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=#32] AS max_i#3, Subquery subquery#4, [id=#33] AS min_i#5]
   :  :- Subquery subquery#2, [id=#32]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=58]
                     +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                        +- *(1) Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[max(i#0)], output=[max(i)#7])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=19]
               +- HashAggregate(keys=[], functions=[partial_max(i#0)], output=[max#14])
                  +- Project [i#0]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=#33]
   :     +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=63]
                     +- *(1) HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                        +- *(1) Project [i#10]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            HashAggregate(keys=[], functions=[min(i#10)], output=[min(i)#9])
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
               +- HashAggregate(keys=[], functions=[partial_min(i#10)], output=[min#15])
                  +- Project [i#10]
                     +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

After introducing the canonicalization, the plan is as following, where you can see **ReusedSubquery**
```
== Parsed Logical Plan ==
'Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- 'Project [unresolvedalias('max('i))]
:  :  +- 'UnresolvedRelation [df], [], false
:  +- 'Project [unresolvedalias('min('i))]
:     +- 'UnresolvedRelation [df], [], false
+- OneRowRelation

== Analyzed Logical Plan ==
max_i: int, min_i: int
Project [scalar-subquery#2 [] AS max_i#3, scalar-subquery#4 [] AS min_i#5]
:  :- Aggregate [max(i#0) AS max(i)#7]
:  :  +- SubqueryAlias df
:  :     +- View (`df`, [i#0, j#1])
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Aggregate [min(i#10) AS min(i)#9]
:     +- SubqueryAlias df
:        +- View (`df`, [i#10, j#11])
:           +- RelationV2[i#10, j#11] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Optimized Logical Plan ==
Project [scalar-subquery#2 [].max(i) AS max_i#3, scalar-subquery#4 [].min(i) AS min_i#5]
:  :- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:  :  +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:  :     +- Project [i#0]
:  :        +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
:  +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
:     +- Aggregate [max(i#0) AS max(i)#7, min(i#0) AS min(i)#9]
:        +- Project [i#0]
:           +- RelationV2[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5
+- OneRowRelation

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 0
   +- *(1) Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, ReusedSubquery Subquery subquery#2, [id=#40].min(i) AS min_i#5]
      :  :- Subquery subquery#2, [id=#40]
      :  :  +- AdaptiveSparkPlan isFinalPlan=true
            +- == Final Plan ==
               ResultQueryStage 1
               +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
                  +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                     +- ShuffleQueryStage 0
                        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                           +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                              +- *(1) Project [i#0]
                                 +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
            +- == Initial Plan ==
               Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                     +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                        +- Project [i#0]
                           +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
      :  +- ReusedSubquery Subquery subquery#2, [id=#40]
      +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
   Project [Subquery subquery#2, [id=#40].max(i) AS max_i#3, Subquery subquery#4, [id=#41].min(i) AS min_i#5]
   :  :- Subquery subquery#2, [id=#40]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            ResultQueryStage 1
            +- *(2) Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
               +- *(2) HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
                  +- ShuffleQueryStage 0
                     +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=71]
                        +- *(1) HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                           +- *(1) Project [i#0]
                              +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
         +- == Initial Plan ==
            Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
            +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=22]
                  +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
                     +- Project [i#0]
                        +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   :  +- Subquery subquery#4, [id=#41]
   :     +- AdaptiveSparkPlan isFinalPlan=false
   :        +- Project [named_struct(max(i), max(i)#7, min(i), min(i)#9) AS mergedValue#14]
   :           +- HashAggregate(keys=[], functions=[max(i#0), min(i#0)], output=[max(i)#7, min(i)#9])
   :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=37]
   :                 +- HashAggregate(keys=[], functions=[partial_max(i#0), partial_min(i#0)], output=[max#16, min#17])
   :                    +- Project [i#0]
   :                       +- BatchScan class org.apache.spark.sql.connector.SimpleDataSourceV2$$anon$5[i#0, j#1] class org.apache.spark.sql.connector.SimpleDataSourceV2$MyScanBuilder RuntimeFilters: []
   +- Scan OneRowRelation[]
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52529 from yhuang-db/scan-canonicalization.

Authored-by: yhuang-db <itisyuchuan@gmail.com>
Signed-off-by: Peter Toth <peter.toth@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

BUILD dependencies Pull requests that update a dependency file java Pull requests that update Java code PROTOBUF

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants