diff --git a/tools/README.md b/tools/README.md index 5abeaa6677c..771c21393a6 100644 --- a/tools/README.md +++ b/tools/README.md @@ -190,8 +190,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* -a, --application-name Filter event logs whose application name - matches exactly with input string i.e no - regular expressions supported. + matches exactly or is a substring of input + string. Regular expressions not supported. + --application-name ~ Filter event logs based on the complement + of a selection criterion. i.e Select all + event logs except the ones which have + application name as the input string. -f, --filter-criteria Filter newest or oldest N eventlogs for processing.eg: 100-newest (for processing newest 100 event logs). eg: 100-oldest (for diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 38d09252897..81f2e64915b 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -47,8 +47,10 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* "eg: 100-oldest (for processing oldest 100 event logs)") val applicationName: ScallopOption[String] = opt[String](required = false, - descr = "Filter event logs whose application name matches exactly with input string" + - "i.e no regular expressions supported.") + descr = "Filter event logs whose application name matches " + + "exactly or is a substring of input string. Regular expressions not supported." + + "For filtering based on complement of application name, use ~APPLICATION_NAME. i.e " + + "Select all event logs except the ones which have application name as the input string.") val startAppTime: ScallopOption[String] = opt[String](required = false, descr = "Filter event logs whose application start occurred within the past specified " + diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala index f93106b5534..c1ad4ac11fc 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala @@ -38,6 +38,8 @@ class AppFilterImpl( // default is 24 hours private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L) + private val NEGATE = "~" + private val threadFactory = new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build() logInfo(s"Threadpool size is $nThreads") @@ -72,14 +74,11 @@ class AppFilterImpl( val filterAppName = appArgs.applicationName.getOrElse("") if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) { - val filtered = apps.filter { app => - val appNameOpt = app.appInfo.map(_.appName) - if (appNameOpt.isDefined) { - appNameOpt.get.equals(filterAppName) - } else { - // incomplete log file - false - } + val filtered = if (filterAppName.startsWith(NEGATE)) { + // remove ~ before passing it into the containsAppName function + apps.filterNot(app => containsAppName(app, filterAppName.substring(1))) + } else { + apps.filter(app => containsAppName(app, filterAppName)) } filtered.map(_.eventlog).toSeq } else if (appArgs.startAppTime.isSupplied) { @@ -98,6 +97,16 @@ class AppFilterImpl( } } + private def containsAppName(app: AppFilterReturnParameters, filterAppName: String): Boolean = { + val appNameOpt = app.appInfo.map(_.appName) + if (appNameOpt.isDefined) { + appNameOpt.get.contains(filterAppName) + } else { + // in complete log file + false + } + } + case class AppFilterReturnParameters( appInfo: Option[ApplicationStartInfo], eventlog: EventLogInfo) diff --git a/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index e34ed691ea5..d6945bc0146 100644 --- a/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -264,6 +264,27 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging { assert(result.length == 2) // 2 out of 3 have "Spark shell" as appName. } + test("test appName filter - Negation") { + val appName = "~Spark shell" + val appArgs = new QualificationArgs(Array( + "--application-name", + appName, + s"$logDir/rdd_only_eventlog", + s"$logDir/empty_eventlog", + s"$logDir/udf_dataset_eventlog" + )) + + val eventLogInfo = EventLogPathProcessor.processAllPaths(appArgs.filterCriteria.toOption, + appArgs.matchEventLogs.toOption, appArgs.eventlog(), + sparkSession.sparkContext.hadoopConfiguration) + + val appFilter = new AppFilterImpl(1000, sparkSession.sparkContext.hadoopConfiguration, + Some(84000), 2) + val result = appFilter.filterEventLogs(eventLogInfo, appArgs) + assert(eventLogInfo.length == 3) + assert(result.length == 1) // 1 out of 3 does not has "Spark shell" as appName. + } + test("test udf event logs") { val logFiles = Array( s"$logDir/dataset_eventlog",