Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unshim ExclusiveModeGpuDiscoveryPlugin #3590

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dist/unshimmed-common-from-spark301.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ META-INF/DEPENDENCIES
META-INF/LICENSE
META-INF/NOTICE
META-INF/maven/**
com/nvidia/spark/ExclusiveModeGpuDiscoveryPlugin*
com/nvidia/spark/RapidsUDF*
com/nvidia/spark/SQLPlugin*
com/nvidia/spark/rapids/ExecutionPlanCaptureCallback*
com/nvidia/spark/rapids/PlanUtils*
com/nvidia/spark/rapids/GpuKryoRegistrator*
com/nvidia/spark/rapids/PlanUtils*
com/nvidia/spark/rapids/RapidsExecutorHeartbeatMsg*
com/nvidia/spark/rapids/RapidsExecutorStartupMsg*
com/nvidia/spark/rapids/RapidsExecutorUpdateMsg*
Expand All @@ -23,4 +24,4 @@ org/apache/spark/sql/rapids/ProxyRapidsShuffleInternalManagerBase*
org/apache/spark/sql/rapids/VisibleShuffleManager*
org/openucx/**
rapids/*.py
rapids4spark-version-info.properties
rapids4spark-version-info.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,14 +18,10 @@ package com.nvidia.spark

import java.util.Optional

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Cuda
import com.nvidia.spark.rapids.GpuDeviceManager
import com.nvidia.spark.rapids.ShimLoader

import org.apache.spark.SparkConf
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.resource.{ResourceInformation, ResourceRequest}

/**
Expand All @@ -39,44 +35,12 @@ import org.apache.spark.resource.{ResourceInformation, ResourceRequest}
* This plugin can be activated in spark with the configuration:
* `--conf spark.resources.discoveryPlugin=com.nvidia.spark.ExclusiveModeGpuDiscoveryPlugin`
*/
class ExclusiveModeGpuDiscoveryPlugin extends ResourceDiscoveryPlugin with Logging {
class ExclusiveModeGpuDiscoveryPlugin extends ResourceDiscoveryPlugin with Proxy {
override def discoverResource(
request: ResourceRequest,
sparkconf: SparkConf): Optional[ResourceInformation] = {
request: ResourceRequest,
sparkConf: SparkConf
): Optional[ResourceInformation] = self.discoverResource(request, sparkConf)

val resourceName = request.id.resourceName
if (!resourceName.equals("gpu")) {
logInfo("ExclusiveModeGpuDiscoveryPlugin only handles gpu allocations, " +
s"skipping $resourceName")
return Optional.empty()
}
val ngpusRequested = request.amount
val deviceCount: Int = Cuda.getDeviceCount
logInfo(s"Running ExclusiveModeGpuDiscoveryPlugin to acquire $ngpusRequested GPU(s), " +
s"host has $deviceCount GPU(s)")
// loop multiple times to see if a GPU was released or something unexpected happened that
// we couldn't acquire on first try
var numRetries = 2
val allocatedAddrs = ArrayBuffer[String]()
val addrsToTry = ArrayBuffer.empty ++= (0 until deviceCount)
while (numRetries > 0 && allocatedAddrs.size < ngpusRequested && addrsToTry.nonEmpty) {
var addrLoc = 0
val allAddrs = addrsToTry.size
while (addrLoc < allAddrs && allocatedAddrs.size < ngpusRequested) {
val addr = addrsToTry(addrLoc)
if (GpuDeviceManager.tryToSetGpuDeviceAndAcquire(addr)) {
allocatedAddrs += addr.toString
}
addrLoc += 1
}
addrsToTry --= allocatedAddrs.map(_.toInt)
numRetries -= 1
}
if (allocatedAddrs.size < ngpusRequested) {
// log warning here, Spark will throw exception if we return not enough
logWarning(s"ExclusiveModeGpuDiscoveryPlugin did not find enough gpus, " +
s"requested: $ngpusRequested found: ${allocatedAddrs.size}")
}
Optional.of(new ResourceInformation("gpu", allocatedAddrs.toArray))
}
override lazy val self: ResourceDiscoveryPlugin =
ShimLoader.newInternalExclusiveModeGpuDiscoveryPlugin()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import java.util.Optional

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Cuda

import org.apache.spark.SparkConf
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.resource.{ResourceInformation, ResourceRequest}

class InternalExclusiveModeGpuDiscoveryPlugin extends ResourceDiscoveryPlugin with Logging {
override def discoverResource(
request: ResourceRequest,
sparkconf: SparkConf
): Optional[ResourceInformation] = {

val resourceName = request.id.resourceName
if (!resourceName.equals("gpu")) {
logInfo("ExclusiveModeGpuDiscoveryPlugin only handles gpu allocations, " +
s"skipping $resourceName")
return Optional.empty()
}
val ngpusRequested = request.amount
val deviceCount: Int = Cuda.getDeviceCount
logInfo(s"Running ExclusiveModeGpuDiscoveryPlugin to acquire $ngpusRequested GPU(s), " +
s"host has $deviceCount GPU(s)")
// loop multiple times to see if a GPU was released or something unexpected happened that
// we couldn't acquire on first try
var numRetries = 2
val allocatedAddrs = ArrayBuffer[String]()
val addrsToTry = ArrayBuffer.empty ++= (0 until deviceCount)
while (numRetries > 0 && allocatedAddrs.size < ngpusRequested && addrsToTry.nonEmpty) {
var addrLoc = 0
val allAddrs = addrsToTry.size
while (addrLoc < allAddrs && allocatedAddrs.size < ngpusRequested) {
val addr = addrsToTry(addrLoc)
if (GpuDeviceManager.tryToSetGpuDeviceAndAcquire(addr)) {
allocatedAddrs += addr.toString
}
addrLoc += 1
}
addrsToTry --= allocatedAddrs.map(_.toInt)
numRetries -= 1
}
if (allocatedAddrs.size < ngpusRequested) {
// log warning here, Spark will throw exception if we return not enough
logWarning(s"ExclusiveModeGpuDiscoveryPlugin did not find enough gpus, " +
s"requested: $ngpusRequested found: ${allocatedAddrs.size}")
}
Optional.of(new ResourceInformation("gpu", allocatedAddrs.toArray))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._

gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.spark.{SPARK_BUILD_USER, SPARK_VERSION, SparkConf, SparkEnv}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin}
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -339,4 +340,7 @@ object ShimLoader extends Logging {
newInstanceOf("com.nvidia.spark.udf.LogicalPlanRules")
}

def newInternalExclusiveModeGpuDiscoveryPlugin(): ResourceDiscoveryPlugin = {
newInstanceOf("com.nvidia.spark.rapids.InternalExclusiveModeGpuDiscoveryPlugin")
}
}