Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for asynchronous writing for parquet #11730

Merged
merged 11 commits into from
Nov 25, 2024
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,11 +25,13 @@ import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.io.async.{AsyncOutputStream, TrafficController}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.{ColumnarWriteTaskStatsTracker, GpuWriteTaskStatsTracker}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -70,21 +72,31 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
abstract class ColumnarOutputWriter(context: TaskAttemptContext,
dataSchema: StructType,
rangeName: String,
includeRetry: Boolean) extends HostBufferConsumer {
includeRetry: Boolean,
holdGpuBetweenBatches: Boolean = false) extends HostBufferConsumer with Logging {

protected val tableWriter: TableWriter

protected val conf: Configuration = context.getConfiguration

// This is implemented as a method to make it easier to subclass
// ColumnarOutputWriter in the tests, and override this behavior.
protected def getOutputStream: FSDataOutputStream = {
private val trafficController: Option[TrafficController] = TrafficController.getInstance

private def openOutputStream(): OutputStream = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(conf)
fs.create(hadoopPath, false)
}

protected val outputStream: FSDataOutputStream = getOutputStream
// This is implemented as a method to make it easier to subclass
// ColumnarOutputWriter in the tests, and override this behavior.
protected def getOutputStream: OutputStream = {
trafficController.map(controller => {
logWarning("Async output write enabled")
new AsyncOutputStream(() => openOutputStream(), controller)
}).getOrElse(openOutputStream())
}

protected val outputStream: OutputStream = getOutputStream

private[this] val tempBuffer = new Array[Byte](128 * 1024)
private[this] var anythingWritten = false
Expand Down Expand Up @@ -166,7 +178,11 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
}
// we successfully buffered to host memory, release the semaphore and write
// the buffered data to the FS
GpuSemaphore.releaseIfNecessary(TaskContext.get)
if (!holdGpuBetweenBatches) {
logDebug("Releasing semaphore between batches")
GpuSemaphore.releaseIfNecessary(TaskContext.get)
}

writeBufferedData()
updateStatistics(writeStartTime, gpuTime, statsTrackers)
spillableBatch.numRows()
Expand Down Expand Up @@ -202,6 +218,10 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
// buffer an empty batch on close() to work around issues in cuDF
// where corrupt files can be written if nothing is encoded via the writer.
anythingWritten = true

// tableWriter.write() serializes the table into the HostMemoryBuffer, and buffers it
// by calling handleBuffer() on the ColumnarOutputWriter. It may not write to the
// output stream just yet.
tableWriter.write(table)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,19 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.")
}

val asyncOutputWriteEnabled = RapidsConf.ENABLE_ASYNC_OUTPUT_WRITE.get(sqlConf)
// holdGpuBetweenBatches is on by default if asyncOutputWriteEnabled is on
val holdGpuBetweenBatches = RapidsConf.ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK.get(sqlConf)
.getOrElse(asyncOutputWriteEnabled)

new ColumnarOutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString,
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled)
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled,
holdGpuBetweenBatches)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -299,8 +305,9 @@ class GpuParquetWriter(
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
context: TaskAttemptContext,
parquetFieldIdEnabled: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true) {
parquetFieldIdEnabled: Boolean,
holdGpuBetweenBatches: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true, holdGpuBetweenBatches) {
override def throwIfRebaseNeededInExceptionMode(batch: ColumnarBatch): Unit = {
val cols = GpuColumnVector.extractBases(batch)
cols.foreach { col =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.nvidia.spark.DFUDFPlugin
import com.nvidia.spark.rapids.RapidsConf.AllowMultipleJars
import com.nvidia.spark.rapids.RapidsPluginUtils.buildInfoEvent
import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg}
import com.nvidia.spark.rapids.io.async.TrafficController
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import org.apache.commons.lang3.exception.ExceptionUtils
Expand Down Expand Up @@ -554,6 +555,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
extraExecutorPlugins.foreach(_.init(pluginContext, extraConf))
GpuSemaphore.initialize()
FileCache.init(pluginContext)
TrafficController.initialize(conf)
} catch {
// Exceptions in executor plugin can cause a single thread to die but the executor process
// sticks around without any useful info until it hearbeat times out. Print what happened
Expand Down Expand Up @@ -656,6 +658,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
extraExecutorPlugins.foreach(_.shutdown())
FileCache.shutdown()
GpuCoreDumpHandler.shutdown()
TrafficController.shutdown()
}

override def onTaskFailed(failureReason: TaskFailedReason): Unit = {
Expand Down
35 changes: 35 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,36 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(false)

val ENABLE_ASYNC_OUTPUT_WRITE =
conf("spark.rapids.sql.asyncWrite.queryOutput.enabled")
.doc("Option to turn on the async query output write. During the final output write, the " +
"task first copies the output to the host memory, and then writes it into the storage. " +
"When this option is enabled, the task will asynchronously write the output in the host " +
"memory to the storage. Only the Parquet format is supported currently.")
.internal()
.booleanConf
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
.createWithDefault(false)

val ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK =
conf("spark.rapids.sql.queryOutput.holdGpuInTask")
.doc("Option to hold GPU semaphore between batch processing during the final output write. " +
"This option could degrade query performance if it is enabled without the async query " +
"output write. It is recommended to consider enabling this option only when " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is recommended this way is it even needed to allow it to be set when ENABLE_ASYNC_OUTPUT_WRITE is off?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I think the answer is no, but am not 100% sure at this point. I think we need to do more experiments and tests to make sure of it and make some adjustment for this config later.

s"${ENABLE_ASYNC_OUTPUT_WRITE.key} is set. This option is off by default when the async " +
"query output write is disabled; otherwise, it is on.")
.internal()
.booleanConf
.createOptional

val ASYNC_WRITE_MAX_IN_FLIGHT_HOST_MEMORY_BYTES =
conf("spark.rapids.sql.asyncWrite.maxInFlightHostMemoryBytes")
.doc("Maximum number of host memory bytes per executor that can be in-flight for async " +
"query output write. Tasks may be blocked if the total host memory bytes in-flight " +
"exceeds this value.")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(2L * 1024 * 1024 * 1024)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -2650,6 +2680,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isFoldableNonLitAllowed: Boolean = get(FOLDABLE_NON_LIT_ALLOWED)

lazy val asyncWriteMaxInFlightHostMemoryBytes: Long =
get(ASYNC_WRITE_MAX_IN_FLIGHT_HOST_MEMORY_BYTES)

/**
* Convert a string value to the injection configuration OomInjection.
*
Expand Down Expand Up @@ -3233,6 +3266,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val caseWhenFuseEnabled: Boolean = get(CASE_WHEN_FUSE)

lazy val isAsyncOutputWriteEnabled: Boolean = get(ENABLE_ASYNC_OUTPUT_WRITE)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.io.async

import java.io.{IOException, OutputStream}
import java.util.concurrent.{Callable, TimeUnit}
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.rapids.execution.TrampolineUtil

/**
* OutputStream that performs writes asynchronously. Writes are scheduled on a background thread
* and executed in the order they were scheduled. This class is not thread-safe and should only be
* used by a single thread.
*/
class AsyncOutputStream(openFn: Callable[OutputStream], trafficController: TrafficController)
extends OutputStream {

private var closed = false

private val executor = new ThrottlingExecutor(
TrampolineUtil.newDaemonCachedThreadPool("AsyncOutputStream", 1, 1),
trafficController)

// Open the underlying stream asynchronously as soon as the AsyncOutputStream is constructed,
// so that the open can be done in parallel with other operations. This could help with
// performance if the open is slow.
private val openFuture = executor.submit(openFn, 0)
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
// Let's give it enough time to open the stream. Something bad should have happened if it
// takes more than 5 minutes to open a stream.
private val openTimeoutMin = 5

private lazy val delegate: OutputStream = {
openFuture.get(openTimeoutMin, TimeUnit.MINUTES)
}

class Metrics {
var numBytesScheduled: Long = 0
// This is thread-safe as it is updated by the background thread and can be read by
// any threads.
val numBytesWritten: AtomicLong = new AtomicLong(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you considered just making it @volatile Long

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can, but it doesn't seem that it will make a big difference?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile are designed for this kind of scenario, and the reads are as cheap as the a regular reads

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is, but I don't think it will make any performance difference in this case. I prefer AtomicLong unless it's performance critical as I have seen many times that many people are confused with volatile and end up making the concurrency model super complex as the code evolves.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker, you considered it

}

val metrics = new Metrics

/**
* The last error that occurred in the background thread, or None if no error occurred.
* Once it is set, all subsequent writes that are already scheduled will fail and no new
* writes will be accepted.
*
* This is thread-safe as it is set by the background thread and can be read by any threads.
*/
val lastError: AtomicReference[Option[Throwable]] =
new AtomicReference[Option[Throwable]](None)

@throws[IOException]
private def throwIfError(): Unit = {
lastError.get() match {
case Some(t: IOException) => throw t
case Some(t) => throw new IOException(t)
case None =>
}
}

@throws[IOException]
private def ensureOpen(): Unit = {
if (closed) {
throw new IOException("Stream closed")
}
}

private def scheduleWrite(fn: () => Unit, bytesToWrite: Int): Unit = {
throwIfError()
ensureOpen()

metrics.numBytesScheduled += bytesToWrite
executor.submit(() => {
throwIfError()
ensureOpen()

try {
fn()
metrics.numBytesWritten.addAndGet(bytesToWrite)
} catch {
case t: Throwable =>
// Update the error state
lastError.set(Some(t))
}
}, bytesToWrite)
}

override def write(b: Int): Unit = {
scheduleWrite(() => delegate.write(b), 1)
}

override def write(b: Array[Byte]): Unit = {
scheduleWrite(() => delegate.write(b), b.length)
}

/**
* Schedules a write of the given bytes to the underlying stream. The write is executed
* asynchronously on a background thread. The method returns immediately, and the write may not
* have completed when the method returns.
*
* If an error has occurred in the background thread and [[lastError]] has been set, this function
* will throw an IOException immediately.
*
* If an error has occurred in the background thread while executing a previous write after the
* current write has been scheduled, the current write will fail with the same error.
*/
@throws[IOException]
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
scheduleWrite(() => delegate.write(b, off, len), len)
}

/**
* Flushes all pending writes to the underlying stream. This method blocks until all pending
* writes have been completed. If an error has occurred in the background thread, this method
* will throw an IOException.
*
* If an error has occurred in the background thread and [[lastError]] has been set, this function
* will throw an IOException immediately.
*
* If an error has occurred in the background thread while executing a previous task after the
* current flush has been scheduled, the current flush will fail with the same error.
*/
@throws[IOException]
override def flush(): Unit = {
throwIfError()
ensureOpen()

val f = executor.submit(() => {
throwIfError()
ensureOpen()

delegate.flush()
}, 0)

f.get()
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Closes the underlying stream and releases any resources associated with it. All pending writes
* are flushed before closing the stream. This method blocks until all pending writes have been
* completed.
*
* If an error has occurred while flushing, this function will throw an IOException.
*
* If an error has occurred while executing a previous task before this function is called,
* this function will throw the same error. All resources and the underlying stream are still
* guaranteed to be closed.
*/
@throws[IOException]
override def close(): Unit = {
if (!closed) {
Seq[AutoCloseable](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure how this works. It seems you are building a sequence of AutoCloseable but the lambdas are not AutoCloseable. delegate is, but that's it. What magic do you speak of?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. So Java allows using the lambda function to create a FunctionalInterface. I'm not a Scala expert, but guess Scala does the same.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok discussed with @revans2 a bit it and he thought what is likely happening is yes that these are single method lambdas, and AutoCloseable wants an impl for a single method, so each lambda becomes:

Seq(
new AutoCloseable {
  override def close(): Unit = {
    flush()
  }
},
new AutoCloseable {
  override def close(): Unit = {
    executor.shutdownNow(10, TimeUnit.SECONDS)
  }
},
delegate,
new AutoCloseable {
  override def close(): Unit = {
    closed = true
  }
}).safeClose()

Note that safeClose will iterate through all of these, and call close in order. If any throw, they get added as suppressed exceptions and thrown when the whole thing finishes. The effect will be that AsyncOutputStream will be closed, and we'll get an exception. I think that's all OK and it should take the task down (I don't know if the executor will come down.. but possibly it should?)

The main feedback is to add comments around here on how this works. You could also consider making an interface out of this, so it's clear what the intent is.

Copy link
Collaborator Author

@jihoonson jihoonson Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's all OK and it should take the task down (I don't know if the executor will come down.. but possibly it should?)

Yes I think it should. executor.shutdownNow() interrupts all threads. So, unless some delegate implementation does not respect InterruptedException, all tasks should be cancelled.

The main feedback is to add comments around here on how this works. You could also consider making an interface out of this, so it's clear what the intent is.

I'm not sure if I understand your comments correctly. So, I tried to explain the error propagation mechanism in this scaladoc here. Do you think we need additional comments for this close function? Or do you think we need comments for safeClose()? Also what kind of interface are you suggesting?

() => {
// Wait for all pending writes to complete
// This will throw an exception if one of the writes fails
flush()
},
() => {
// Give the executor a chance to shutdown gracefully.
executor.shutdownNow(10, TimeUnit.SECONDS)
},
delegate,
() => closed = true).safeClose()
}
}
}
Loading