Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for asynchronous writing for parquet #11730

Merged
merged 11 commits into from
Nov 25, 2024

Conversation

jihoonson
Copy link
Collaborator

@jihoonson jihoonson commented Nov 18, 2024

Description

This PR is the first work for #11342. It adds new configurations explained in the below. Please see the configuration docs in the PR for more details.

  • spark.rapids.sql.asyncWrite.queryOutput.enabled: the option to enable the async write. Only parquet is supported currently. ORC is planned to be supported next. This option is off by default currently, and can be enabled by default in the future once it is tested more in production.
  • spark.rapids.sql.queryOutput.holdGpuInTask: another option to enable holding GPU between processing batches during the output write. This option is recommended to be enabled when async output is enabled. This option is defaulted to the value of spark.rapids.sql.asyncWrite.queryOutput.enabled.
  • spark.rapids.sql.asyncWrite.maxInFlightHostMemoryBytes: max in-flight bytes per executor that can be buffered for async write.

Performance test results

query time, spilled bytes, retry count, batchSize=100M

retry count, retry block time, batchSize=100M

The charts above show some performance test results of the async output writing. The test setup was:

  • NDS dataset at sf=100 was used for testing. The generated dataset was stored in the Parquet format in google cloud storage.
  • The below query was used for testing. The output size was about 37.9 GB.
select s.* from store s, store_sales ss
where s.s_store_sk <= ss.ss_store_sk and s.s_store_sk + 41 > ss.ss_store_sk;
  • 2 workers were used for testing. The executor and the task counts were set to 32.
  • The GPU parallelism was set to 4.

The results show that the async writing + holding gpu between batches improved the query time by about 11% comparing to sync writing + releasing gpu between batches (current behavior). This was because of the less memory pressure, and thus less spills and retries. Interestingly, the retry block time was increased with async writing + holding gpu. This seems because the async write reduced the memory pressure, and thus many tasks were able to proceed further and even finish without throwing out-of-memory errors. As a result, the tasks blocked due to the memory allocation failure had to wait longer until running tasks finish their job and release memory.

Future work

  • Use the condition variable in TrafficController instead of polling every 100 ms.
  • I did not add any integration test in this PR. I'm planning to add some in a follow-up.
  • Add debug metrics suggested in Add support for asynchronous writing for parquet #11730 (review).
  • ORC format will be supported in a follow-up.
  • The default of spark.rapids.sql.asyncWrite.maxInFlightHostMemoryBytes is currently fixed. This is not only not smart, but also could cause oom errors if it conflicts with other memory settings. It would be nice if its default can be computed based on the executor memory settings. This can be done in a follow-up as well.

Signed-off-by: Jihoon Son <ghoonson@gmail.com>
@jihoonson
Copy link
Collaborator Author

build

@jihoonson
Copy link
Collaborator Author

build

@jihoonson
Copy link
Collaborator Author

build

@jihoonson
Copy link
Collaborator Author

build

@jihoonson
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator

Will do another pass today.

@abellina abellina self-requested a review November 21, 2024 17:00
@throws[IOException]
override def close(): Unit = {
if (!closed) {
Seq[AutoCloseable](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure how this works. It seems you are building a sequence of AutoCloseable but the lambdas are not AutoCloseable. delegate is, but that's it. What magic do you speak of?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. So Java allows using the lambda function to create a FunctionalInterface. I'm not a Scala expert, but guess Scala does the same.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok discussed with @revans2 a bit it and he thought what is likely happening is yes that these are single method lambdas, and AutoCloseable wants an impl for a single method, so each lambda becomes:

Seq(
new AutoCloseable {
  override def close(): Unit = {
    flush()
  }
},
new AutoCloseable {
  override def close(): Unit = {
    executor.shutdownNow(10, TimeUnit.SECONDS)
  }
},
delegate,
new AutoCloseable {
  override def close(): Unit = {
    closed = true
  }
}).safeClose()

Note that safeClose will iterate through all of these, and call close in order. If any throw, they get added as suppressed exceptions and thrown when the whole thing finishes. The effect will be that AsyncOutputStream will be closed, and we'll get an exception. I think that's all OK and it should take the task down (I don't know if the executor will come down.. but possibly it should?)

The main feedback is to add comments around here on how this works. You could also consider making an interface out of this, so it's clear what the intent is.

Copy link
Collaborator Author

@jihoonson jihoonson Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's all OK and it should take the task down (I don't know if the executor will come down.. but possibly it should?)

Yes I think it should. executor.shutdownNow() interrupts all threads. So, unless some delegate implementation does not respect InterruptedException, all tasks should be cancelled.

The main feedback is to add comments around here on how this works. You could also consider making an interface out of this, so it's clear what the intent is.

I'm not sure if I understand your comments correctly. So, I tried to explain the error propagation mechanism in this scaladoc here. Do you think we need additional comments for this close function? Or do you think we need comments for safeClose()? Also what kind of interface are you suggesting?

var numBytesScheduled: Long = 0
// This is thread-safe as it is updated by the background thread and can be read by
// any threads.
val numBytesWritten: AtomicLong = new AtomicLong(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you considered just making it @volatile Long

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can, but it doesn't seem that it will make a big difference?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile are designed for this kind of scenario, and the reads are as cheap as the a regular reads

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is, but I don't think it will make any performance difference in this case. I prefer AtomicLong unless it's performance critical as I have seen many times that many people are confused with volatile and end up making the concurrency model super complex as the code evolves.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker, you considered it

…ncOutputStream.scala


Simplify case arm

Co-authored-by: Gera Shegalov <gshegalov@nvidia.com>
@jihoonson
Copy link
Collaborator Author

build

@sameerz sameerz added the performance A performance related task/issue label Nov 22, 2024
conf("spark.rapids.sql.queryOutput.holdGpuInTask")
.doc("Option to hold GPU semaphore between batch processing during the final output write. " +
"This option could degrade query performance if it is enabled without the async query " +
"output write. It is recommended to consider enabling this option only when " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is recommended this way is it even needed to allow it to be set when ENABLE_ASYNC_OUTPUT_WRITE is off?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I think the answer is no, but am not 100% sure at this point. I think we need to do more experiments and tests to make sure of it and make some adjustment for this config later.

var numBytesScheduled: Long = 0
// This is thread-safe as it is updated by the background thread and can be read by
// any threads.
val numBytesWritten: AtomicLong = new AtomicLong(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile are designed for this kind of scenario, and the reads are as cheap as the a regular reads

def blockUntilRunnable[T](task: Task[T]): Unit = synchronized {
if (numTasks > 0) {
while (!throttle.canAccept(task)) {
wait(100)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 100 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just feel that 100 is good enough to react fast enough for the interrupt. Though, now I wonder why I did this after all.. I should probably use some condition variable instead. Mind if I fix this in a follow-up?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up is fine

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really would like to see some kind of a metric for how long the task was throttled on I/O. I am fine if this is a follow on issue, and if it is a debug metric until we decide what we want to do with this long term.

Also what is the plan? Are you going to look at ORC too? Are you going to work on shuffle next? What about parquet reads? At what point are we going to decide what to do with this?

@jihoonson
Copy link
Collaborator Author

jihoonson commented Nov 22, 2024

I really would like to see some kind of a metric for how long the task was throttled on I/O. I am fine if this is a follow on issue, and if it is a debug metric until we decide what we want to do with this long term.

Good suggestion. I will do it as a follow-up.

Also what is the plan? Are you going to look at ORC too? Are you going to work on shuffle next? What about parquet reads? At what point are we going to decide what to do with this?

I'm going to look at ORC, assuming that ORC would be easy, and then work on shuffle. I'm not planning anything on the reader side yet.

@jihoonson
Copy link
Collaborator Author

build

@jihoonson
Copy link
Collaborator Author

build

@revans2
Copy link
Collaborator

revans2 commented Nov 22, 2024

So I did a little testing of my own. I included the latest patch to fix issues with max device bytes and I ran two different queries on a GPU limited to 16 GiB of memory. The queries are close to being disk bound. The disk I was writing to for both shuffle and the final output is a Samsung SSD 970 EVO Plus 2TB. For this I typically would see the disk go at 1.1 GB/sec, but I did see it as high as 1.7 GB/sec for short periods of time.

Note also that the box has enough memory to store the entire shuffle data in the page cache so reads are much faster than they might be on other systems. I also have 32 GiB of pinned memory for spill.

The two queries I ran are

spark.time(spark.range(1, 500000000L * 16, 1, 16).selectExpr("id", "id % 500 as a").orderBy("a", "id").write.mode("overwrite").parquet("/data/tmp/TEST_OUTPUT"))

which inserts in a sort. Sort requires that all of a tasks data be read into memory before doing more processing.

and

spark.time(spark.range(1, 500000000L * 16, 1, 16).selectExpr("id", "id % 500 as a").repartition(col("a")).write.mode("overwrite").parquet("/data/tmp/TEST_OUTPUT"))

which just partitions the data fairly evenly before writing it out.

For the first query I see vary different end to end run times.

async write sync write
hold sem 205,025 ms 210,707 ms
release sem 199,275 ms 258,736 ms

Doing the async write here has a huge win. I think that it was because in makes the GPU stop spilling to disk. The metrics show that we spent 41 seconds spilling to or reading from disk. Because spill ends up being synchronous I suspect the other thread ended up blocked waiting on spill while this was happening.

The async write while releasing the semaphore ended up spilling almost 32 GiB to host memory and was right on the edge of going to disk. But it only took 2 seconds to spill to host memory. I think the reason it is the fastest is because we were able to read the shuffle data from main memory and overlap some processing without overloading the spill memory.

They sync write while holding the semaphore did spill, but it was very very little. I think that we ended up not overlapping the processing and the read as much and it ended up hurting the overall time.

For the second query I got

async write sync write
hold sem 169,803 ms 190,977 ms
release sem 113,861 ms 118,714 ms

This shows that trying to read shuffle data with the semaphore held is bad. We have to decompress the data and paying for that while holding the semaphore is bad. In the previous example we almost always did that because sort does not release the semaphore when reading data. That was to reduce spill, but I am not so sure that is the best choice. in all cases.

Here all of the data is being read one batch at a time and nothing ever ends up spilling.

I think this shows why releasing the semaphore is not a clear answer. We need something that can reason about the possible outcomes of different situations and make a good decision. But this is not simple to do. Like if we have a lot of remote shuffle it might be worth it to read the remote data, decompress it and write that out to disk. That is unless the disk is overloaded with other shuffle data being written. It just isn't simple.

@jihoonson
Copy link
Collaborator Author

@revans2 thanks for sharing your test result! Yes, I agree that we should do more tests and experiments before we do something about releasing semaphore.

@jihoonson
Copy link
Collaborator Author

build

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had 1 nit, but I am good with this.

assertResult(0)(throttle.getTotalHostMemoryBytes)
}

test("tasks submission fails if total weight exceeds maxWeight") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use the hostMemoryBytes and maxInFlight terminology instead of "weight". I had to go to the class to realize that weight was in bytes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I have renamed it in the production code, but forgot to do that for the tests. I will fix them in a follow-up.

@jihoonson jihoonson merged commit 6b90b2f into NVIDIA:branch-24.12 Nov 25, 2024
49 checks passed
@jihoonson
Copy link
Collaborator Author

Thanks all for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants