Skip to content

Commit

Permalink
PythonRunner Changes [databricks] (#10274)
Browse files Browse the repository at this point in the history
* Download Maven from apache.org archives (#10225)

Fixes #10224 

Replace broken install using apt by downloading Maven from apache.org.

Signed-off-by: Gera Shegalov <gera@apache.org>

* Fix a hang for Pandas UDFs on DB 13.3[databricks] (#9833)

fix #9493
fix #9844

The python runner uses two separate threads to write and read data with Python processes, 
however on DB13.3, it becomes single-threaded, which means reading and writing run on the same thread.
Now the first reading is always ahead of the first writing. But the original BatchQueue will wait
on the first reading until the first writing is done. Then it will wait forever.

Change made:

- Update the BatchQueue to support asking for a batch instead of waiting unitl one is inserted into the queue. 
   This can eliminate the order requirement of reading and writing.
- Introduce a new class named BatchProducer to work with the new BatchQueue to support rows number
   peek on demand for the reading.
- Apply this new BatchQueue to relevant plans.
- Update the Python runners to support writing one batch one time for the singled-threaded model.
- Found an issue about PythonUDAF and RunningWindoFunctionExec, it may be a bug specific to DB 13.3,
   and add a test (test_window_aggregate_udf_on_cpu) for it.
- Other small refactors
---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Fix a potential data corruption for Pandas UDF (#9942)

This PR moves the BatchQueue into the DataProducer to share the same lock as the output iterator
returned by asIterator,  and make the batch movement from the input iterator to the batch queue be
an atomic operation to eliminate the race when appending the batches to the queue.

* Do some refactor for the Python UDF code to try to reduce duplicate code. (#9902)

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Fixed 330db Shims to Adopt the PythonRunner Changes [databricks] (#10232)

This PR removes the old 330db shims in favor of the new Shims, similar to the one in 341db. 

**Tests:**
Ran udf_test.py on Databricks 11.3 and they all passed. 

fixes #10228 

---------

Signed-off-by: raza jafri <rjafri@nvidia.com>

---------

Signed-off-by: Gera Shegalov <gera@apache.org>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: raza jafri <rjafri@nvidia.com>
Co-authored-by: Gera Shegalov <gera@apache.org>
Co-authored-by: Liangcai Li <firestarmanllc@gmail.com>
  • Loading branch information
3 people authored Jan 26, 2024
1 parent e5048c1 commit dacc6fe
Show file tree
Hide file tree
Showing 41 changed files with 1,512 additions and 1,520 deletions.
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ def is_spark_330_or_later():
def is_spark_340_or_later():
return spark_version() >= "3.4.0"

def is_spark_341():
return spark_version() == "3.4.1"

def is_spark_350_or_later():
return spark_version() >= "3.5.0"

Expand Down
17 changes: 7 additions & 10 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest

from conftest import is_at_least_precommit_run, is_not_utc
from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_340_or_later
from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341

from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version

Expand Down Expand Up @@ -43,12 +43,6 @@
import pyarrow
from typing import Iterator, Tuple


if is_databricks_runtime() and is_spark_340_or_later():
# Databricks 13.3 does not use separate reader/writer threads for Python UDFs
# which can lead to hangs. Skipping these tests until the Python UDF handling is updated.
pytestmark = pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9493")

arrow_udf_conf = {
'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.rapids.sql.exec.WindowInPandasExec': 'true',
Expand Down Expand Up @@ -182,7 +176,10 @@ def group_size_udf(to_process: pd.Series) -> int:

low_upper_win = Window.partitionBy('a').orderBy('b').rowsBetween(-3, 3)

udf_windows = [no_part_win, unbounded_win, cur_follow_win, pre_cur_win, low_upper_win]
running_win_param = pytest.param(pre_cur_win, marks=pytest.mark.xfail(
condition=is_databricks_runtime() and is_spark_341(),
reason='DB13.3 wrongly uses RunningWindowFunctionExec to evaluate a PythonUDAF and it will fail even on CPU'))
udf_windows = [no_part_win, unbounded_win, cur_follow_win, running_win_param, low_upper_win]
window_ids = ['No_Partition', 'Unbounded', 'Unbounded_Following', 'Unbounded_Preceding',
'Lower_Upper']

Expand Down Expand Up @@ -338,8 +335,8 @@ def create_df(spark, data_gen, left_length, right_length):
@ignore_order
@pytest.mark.parametrize('data_gen', [ShortGen(nullable=False)], ids=idfn)
def test_cogroup_apply_udf(data_gen):
def asof_join(l, r):
return pd.merge_asof(l, r, on='a', by='b')
def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.merge_ordered(left, right)

def do_it(spark):
left, right = create_df(spark, data_gen, 500, 500)
Expand Down
10 changes: 8 additions & 2 deletions jenkins/databricks/build.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#
# Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,13 @@ declare -A artifacts
initialize()
{
# install rsync to be used for copying onto the databricks nodes
sudo apt install -y maven rsync
sudo apt install -y rsync

if [[ ! -d $HOME/apache-maven-3.6.3 ]]; then
wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz -P /tmp
tar xf /tmp/apache-maven-3.6.3-bin.tar.gz -C $HOME
sudo ln -s $HOME/apache-maven-3.6.3/bin/mvn /usr/local/bin/mvn
fi

# Archive file location of the plugin repository
SPARKSRCTGZ=${SPARKSRCTGZ:-''}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ private static StructType structFromTypes(DataType[] format) {
return new StructType(fields);
}

private static StructType structFromAttributes(List<Attribute> format) {
public static StructType structFromAttributes(List<Attribute> format) {
StructField[] fields = new StructField[format.size()];
int i = 0;
for (Attribute attribute: format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq

import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}

/**
* Utility class with methods for calculating various metrics about GPU memory usage
* prior to allocation.
* prior to allocation, along with some operations with batches.
*/
object GpuBatchUtils {

Expand Down Expand Up @@ -175,4 +179,37 @@ object GpuBatchUtils {
bytes
}
}

/**
* Concatenate the input batches into a single one.
* The caller is responsible for closing the returned batch.
*
* @param spillBatches the batches to be concatenated, will be closed after the call
* returns.
* @return the concatenated SpillableColumnarBatch or None if the input is empty.
*/
def concatSpillBatchesAndClose(
spillBatches: Seq[SpillableColumnarBatch]): Option[SpillableColumnarBatch] = {
val retBatch = if (spillBatches.length >= 2) {
// two or more batches, concatenate them
val (concatTable, types) = RmmRapidsRetryIterator.withRetryNoSplit(spillBatches) { _ =>
withResource(spillBatches.safeMap(_.getColumnarBatch())) { batches =>
val batchTypes = GpuColumnVector.extractTypes(batches.head)
withResource(batches.safeMap(GpuColumnVector.from)) { tables =>
(Table.concatenate(tables: _*), batchTypes)
}
}
}
// Make the concatenated table spillable.
withResource(concatTable) { _ =>
SpillableColumnarBatch(GpuColumnVector.from(concatTable, types),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
} else if (spillBatches.length == 1) {
// only one batch
spillBatches.head
} else null

Option(retBatch)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package org.apache.spark.sql.rapids.execution
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, RmmRapidsRetryIterator, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.{GpuBatchUtils, GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

Expand All @@ -41,27 +40,7 @@ object GpuSubPartitionHashJoin {
*/
def concatSpillBatchesAndClose(
spillBatches: Seq[SpillableColumnarBatch]): Option[SpillableColumnarBatch] = {
val retBatch = if (spillBatches.length >= 2) {
// two or more batches, concatenate them
val (concatTable, types) = RmmRapidsRetryIterator.withRetryNoSplit(spillBatches) { _ =>
withResource(spillBatches.safeMap(_.getColumnarBatch())) { batches =>
val batchTypes = GpuColumnVector.extractTypes(batches.head)
withResource(batches.safeMap(GpuColumnVector.from)) { tables =>
(Table.concatenate(tables: _*), batchTypes)
}
}
}
// Make the concatenated table spillable.
withResource(concatTable) { _ =>
SpillableColumnarBatch(GpuColumnVector.from(concatTable, types),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
} else if (spillBatches.length == 1) {
// only one batch
spillBatches.head
} else null

Option(retBatch)
GpuBatchUtils.concatSpillBatchesAndClose(spillBatches)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import ai.rapids.cudf
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.shims.GpuPythonArrowOutput
import org.apache.spark.sql.rapids.execution.python.shims.GpuBasePythonRunner
import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -195,7 +196,7 @@ private[python] object BatchGroupUtils {
def executePython[IN](
pyInputIterator: Iterator[IN],
output: Seq[Attribute],
pyRunner: GpuPythonRunnerBase[IN],
pyRunner: GpuBasePythonRunner[IN],
outputRows: GpuMetric,
outputBatches: GpuMetric): Iterator[ColumnarBatch] = {
val context = TaskContext.get()
Expand Down Expand Up @@ -394,38 +395,72 @@ private[python] object BatchGroupedIterator {
class CombiningIterator(
inputBatchQueue: BatchQueue,
pythonOutputIter: Iterator[ColumnarBatch],
pythonArrowReader: GpuPythonArrowOutput,
pythonArrowReader: GpuArrowOutput,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric) extends Iterator[ColumnarBatch] {

// For `hasNext` we are waiting on the queue to have something inserted into it
// instead of waiting for a result to be ready from Python. The reason for this
// is to let us know the target number of rows in the batch that we want when reading.
// It is a bit hacked up but it works. In the future when we support spilling we should
// store the number of rows separate from the batch. That way we can get the target batch
// size out without needing to grab the GpuSemaphore which we cannot do if we might block
// on a read operation.
override def hasNext: Boolean = inputBatchQueue.hasNext || pythonOutputIter.hasNext
// This is only for the input.
private var pendingInput: Option[SpillableColumnarBatch] = None
Option(TaskContext.get()).foreach(onTaskCompletion(_)(pendingInput.foreach(_.close())))

// The Python output should line up row for row so we only look at the Python output
// iterator and no need to check the `inputPending` who will be consumed when draining
// the Python output.
override def hasNext: Boolean = pythonOutputIter.hasNext

override def next(): ColumnarBatch = {
val numRows = inputBatchQueue.peekBatchSize
val numRows = inputBatchQueue.peekBatchNumRows()
// Updates the expected batch size for next read
pythonArrowReader.setMinReadTargetBatchSize(numRows)
pythonArrowReader.setMinReadTargetNumRows(numRows)
// Reads next batch from Python and combines it with the input batch by the left side.
withResource(pythonOutputIter.next()) { cbFromPython =>
assert(cbFromPython.numRows() == numRows)
withResource(inputBatchQueue.remove()) { origBatch =>
// Here may get a batch has a larger rows number than the current input batch.
assert(cbFromPython.numRows() >= numRows,
s"Expects >=$numRows rows but got ${cbFromPython.numRows()} from the Python worker")
withResource(concatInputBatch(cbFromPython.numRows())) { concated =>
numOutputBatches += 1
numOutputRows += numRows
combine(origBatch, cbFromPython)
GpuColumnVector.combineColumns(concated, cbFromPython)
}
}
}

private def combine(lBatch: ColumnarBatch, rBatch: ColumnarBatch): ColumnarBatch = {
val lColumns = GpuColumnVector.extractColumns(lBatch).map(_.incRefCount())
val rColumns = GpuColumnVector.extractColumns(rBatch).map(_.incRefCount())
new ColumnarBatch(lColumns ++ rColumns, lBatch.numRows())
private def concatInputBatch(targetNumRows: Int): ColumnarBatch = {
withResource(mutable.ArrayBuffer[SpillableColumnarBatch]()) { buf =>
var curNumRows = pendingInput.map(_.numRows()).getOrElse(0)
pendingInput.foreach(buf.append(_))
pendingInput = None
while (curNumRows < targetNumRows) {
val scb = inputBatchQueue.remove()
if (scb != null) {
buf.append(scb)
curNumRows = curNumRows + scb.numRows()
}
}
assert(buf.nonEmpty, "The input queue is empty")

if (curNumRows > targetNumRows) {
// Need to split the last batch
val Array(first, second) = withRetryNoSplit(buf.remove(buf.size - 1)) { lastScb =>
val splitIdx = lastScb.numRows() - (curNumRows - targetNumRows)
withResource(lastScb.getColumnarBatch()) { lastCb =>
val batchTypes = GpuColumnVector.extractTypes(lastCb)
withResource(GpuColumnVector.from(lastCb)) { table =>
table.contiguousSplit(splitIdx).safeMap(
SpillableColumnarBatch(_, batchTypes, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
}
}
}
buf.append(first)
pendingInput = Some(second)
}

val ret = GpuBatchUtils.concatSpillBatchesAndClose(buf.toSeq)
// "ret" should be non empty because we checked the buf is not empty ahead.
withResource(ret.get) { concatedScb =>
concatedScb.getColumnarBatch()
}
} // end of withResource(mutable.ArrayBuffer)
}

}
Expand Down Expand Up @@ -560,3 +595,4 @@ class CoGroupedIterator(
keyOrdering.compare(leftKeyRow, rightKeyRow)
}
}

Loading

0 comments on commit dacc6fe

Please sign in to comment.