From c0e1b6a0cf2bc9ccfe9493964543dcea2b2120f1 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 15 Jul 2021 00:50:50 -0700 Subject: [PATCH 1/7] Add negation filter Signed-off-by: Niranjan Artal --- tools/README.md | 4 +++ .../spark/sql/rapids/tool/AppFilterImpl.scala | 33 ++++++++++++++----- .../qualification/QualificationSuite.scala | 21 ++++++++++++ 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/tools/README.md b/tools/README.md index c389ba1b31f..3054e829707 100644 --- a/tools/README.md +++ b/tools/README.md @@ -191,6 +191,10 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* -a, --application-name Filter event logs whose application name matches exactly with input string i.e no regular expressions supported. + --application-name ~ Filter event logs based on the complement + of a selection criterion. i.e Select all + event logs except the ones which have + application name as the input string. -f, --filter-criteria Filter newest or oldest N eventlogs for processing.eg: 100-newest (for processing newest 100 event logs). eg: 100-oldest (for diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala index 51a1f633be8..250d927df38 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala @@ -71,14 +71,11 @@ class AppFilterImpl( val filterAppName = appArgs.applicationName.getOrElse("") if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) { - val filtered = apps.filter { app => - val appNameOpt = app.appInfo.map(_.appName) - if (appNameOpt.isDefined) { - appNameOpt.get.equals(filterAppName) - } else { - // in complete log file - false - } + val checkNegation = filterAppName(0) + val filtered = if (checkNegation.equals('~')) { + apps.filterNot(app => filterAppsNegate(app, filterAppName)) + } else { + apps.filter(app => filterApps(app, filterAppName)) } filtered.map(_.eventlog).toSeq } else { @@ -86,6 +83,26 @@ class AppFilterImpl( } } + def filterApps(app: AppFilterReturnParameters, filterAppName: String): Boolean = { + val appNameOpt = app.appInfo.map(_.appName) + if (appNameOpt.isDefined) { + appNameOpt.get.equals(filterAppName) + } else { + // in complete log file + false + } + } + + def filterAppsNegate(app: AppFilterReturnParameters, filterAppName: String): Boolean = { + val appNameOpt = app.appInfo.map(_.appName) + if (appNameOpt.isDefined) { + appNameOpt.get.equals(filterAppName.substring(1)) + } else { + // in complete log file + false + } + } + case class AppFilterReturnParameters( appInfo: Option[ApplicationStartInfo], eventlog: EventLogInfo) diff --git a/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 92a1a6467ea..957b8362574 100644 --- a/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -263,6 +263,27 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging { assert(result.length == 2) // 2 out of 3 have "Spark shell" as appName. } + test("test appName filter - Negation") { + val appName = "~Spark shell" + val appArgs = new QualificationArgs(Array( + "--application-name", + appName, + s"$logDir/rdd_only_eventlog", + s"$logDir/empty_eventlog", + s"$logDir/udf_dataset_eventlog" + )) + + val eventLogInfo = EventLogPathProcessor.processAllPaths(appArgs.filterCriteria.toOption, + appArgs.matchEventLogs.toOption, appArgs.eventlog(), + sparkSession.sparkContext.hadoopConfiguration) + + val appFilter = new AppFilterImpl(1000, sparkSession.sparkContext.hadoopConfiguration, + Some(84000), 2) + val result = appFilter.filterEventLogs(eventLogInfo, appArgs) + assert(eventLogInfo.length == 3) + assert(result.length == 1) // 1 out of 3 does not has "Spark shell" as appName. + } + test("test udf event logs") { val logFiles = Array( s"$logDir/dataset_eventlog", From 86d9dabb103f37173b0d518e898b348702e674ef Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 15 Jul 2021 11:12:37 -0700 Subject: [PATCH 2/7] addressed review comments Signed-off-by: Niranjan Artal --- tools/README.md | 4 ++-- .../org/apache/spark/sql/rapids/tool/AppFilterImpl.scala | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tools/README.md b/tools/README.md index 3054e829707..3076d96a664 100644 --- a/tools/README.md +++ b/tools/README.md @@ -189,8 +189,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* -a, --application-name Filter event logs whose application name - matches exactly with input string i.e no - regular expressions supported. + matches exactly or is a substring of input + string. Regular expressions not supported. --application-name ~ Filter event logs based on the complement of a selection criterion. i.e Select all event logs except the ones which have diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala index 250d927df38..2ce995d7a76 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala @@ -37,6 +37,8 @@ class AppFilterImpl( // default is 24 hours private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L) + private val negate = '~' + private val threadFactory = new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build() logInfo(s"Threadpool size is $nThreads") @@ -71,8 +73,7 @@ class AppFilterImpl( val filterAppName = appArgs.applicationName.getOrElse("") if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) { - val checkNegation = filterAppName(0) - val filtered = if (checkNegation.equals('~')) { + val filtered = if (filterAppName(0).equals(negate)) { apps.filterNot(app => filterAppsNegate(app, filterAppName)) } else { apps.filter(app => filterApps(app, filterAppName)) @@ -86,7 +87,7 @@ class AppFilterImpl( def filterApps(app: AppFilterReturnParameters, filterAppName: String): Boolean = { val appNameOpt = app.appInfo.map(_.appName) if (appNameOpt.isDefined) { - appNameOpt.get.equals(filterAppName) + appNameOpt.get.contains(filterAppName) } else { // in complete log file false @@ -96,7 +97,7 @@ class AppFilterImpl( def filterAppsNegate(app: AppFilterReturnParameters, filterAppName: String): Boolean = { val appNameOpt = app.appInfo.map(_.appName) if (appNameOpt.isDefined) { - appNameOpt.get.equals(filterAppName.substring(1)) + appNameOpt.get.contains(filterAppName.substring(1)) } else { // in complete log file false From 350965d2acae9dfb924ebe24c2917442fd0f2300 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 15 Jul 2021 15:07:52 -0700 Subject: [PATCH 3/7] addressed review comments Signed-off-by: Niranjan Artal --- .../spark/sql/rapids/tool/AppFilterImpl.scala | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala index 2ce995d7a76..c5676e58ee5 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala @@ -37,7 +37,7 @@ class AppFilterImpl( // default is 24 hours private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L) - private val negate = '~' + private val NEGATE = "~" private val threadFactory = new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build() @@ -73,10 +73,11 @@ class AppFilterImpl( val filterAppName = appArgs.applicationName.getOrElse("") if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) { - val filtered = if (filterAppName(0).equals(negate)) { - apps.filterNot(app => filterAppsNegate(app, filterAppName)) + val filtered = if (filterAppName.startsWith(NEGATE)) { + // remove ~ before passing it into the containsAppName function + apps.filterNot(app => containsAppName(app, filterAppName.substring(1))) } else { - apps.filter(app => filterApps(app, filterAppName)) + apps.filter(app => containsAppName(app, filterAppName)) } filtered.map(_.eventlog).toSeq } else { @@ -84,7 +85,7 @@ class AppFilterImpl( } } - def filterApps(app: AppFilterReturnParameters, filterAppName: String): Boolean = { + private def containsAppName(app: AppFilterReturnParameters, filterAppName: String): Boolean = { val appNameOpt = app.appInfo.map(_.appName) if (appNameOpt.isDefined) { appNameOpt.get.contains(filterAppName) @@ -94,16 +95,6 @@ class AppFilterImpl( } } - def filterAppsNegate(app: AppFilterReturnParameters, filterAppName: String): Boolean = { - val appNameOpt = app.appInfo.map(_.appName) - if (appNameOpt.isDefined) { - appNameOpt.get.contains(filterAppName.substring(1)) - } else { - // in complete log file - false - } - } - case class AppFilterReturnParameters( appInfo: Option[ApplicationStartInfo], eventlog: EventLogInfo) From 19ed8aced8a2700b4b291fa927589299c7aaaf2f Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 15 Jul 2021 15:20:03 -0700 Subject: [PATCH 4/7] Update QualificationArgs Signed-off-by: Niranjan Artal --- .../rapids/tool/qualification/QualificationArgs.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index bcb54a641d9..f4833e92fd0 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -45,8 +45,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* "eg: 100-oldest (for processing oldest 100 event logs)") val applicationName: ScallopOption[String] = opt[String](required = false, - descr = "Filter event logs whose application name matches exactly with input string" + - "i.e no regular expressions supported.") + descr = "Filter event logs whose application name matches " + + "exactly or is a substring of input string. Regular expressions not supported." + + "For complement of eventlogs, use ~EVENTLOGNAME. Filter event logs based on the " + + "complement of a selection criterion. i.e Select all event logs except the ones which " + + "have application name as the input string.") val matchEventLogs: ScallopOption[String] = opt[String](required = false, descr = "Filter event logs whose filenames contain the input string") From 5eeb562f68e521390252fe3444fb40d89eec4241 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 15 Jul 2021 15:23:31 -0700 Subject: [PATCH 5/7] fix description Signed-off-by: Niranjan Artal --- .../spark/rapids/tool/qualification/QualificationArgs.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index f4833e92fd0..de92a6fed9f 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -47,7 +47,7 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* opt[String](required = false, descr = "Filter event logs whose application name matches " + "exactly or is a substring of input string. Regular expressions not supported." + - "For complement of eventlogs, use ~EVENTLOGNAME. Filter event logs based on the " + + "For complement of eventlogs, use ~APPLICATION_NAME. Filter event logs based on the " + "complement of a selection criterion. i.e Select all event logs except the ones which " + "have application name as the input string.") val matchEventLogs: ScallopOption[String] = From 8372674411f55a5c877e9216431024d1b9fffb04 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 15 Jul 2021 15:25:19 -0700 Subject: [PATCH 6/7] Update description Signed-off-by: Niranjan Artal --- .../spark/rapids/tool/qualification/QualificationArgs.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index de92a6fed9f..e4e509c59ee 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -47,9 +47,9 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* opt[String](required = false, descr = "Filter event logs whose application name matches " + "exactly or is a substring of input string. Regular expressions not supported." + - "For complement of eventlogs, use ~APPLICATION_NAME. Filter event logs based on the " + - "complement of a selection criterion. i.e Select all event logs except the ones which " + - "have application name as the input string.") + "For filtering based on complement of application name, use ~APPLICATION_NAME. Filter " + + "event logs based on the complement of a selection criterion. i.e Select all event " + + "logs except the ones which have application name as the input string.") val matchEventLogs: ScallopOption[String] = opt[String](required = false, descr = "Filter event logs whose filenames contain the input string") From 9ac153999cb3c2548a1071b3d7622733a300e773 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Thu, 15 Jul 2021 16:15:10 -0700 Subject: [PATCH 7/7] address review comments Signed-off-by: Niranjan Artal --- .../spark/rapids/tool/qualification/QualificationArgs.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index e4e509c59ee..f2e5bad5dcc 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -47,9 +47,8 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* opt[String](required = false, descr = "Filter event logs whose application name matches " + "exactly or is a substring of input string. Regular expressions not supported." + - "For filtering based on complement of application name, use ~APPLICATION_NAME. Filter " + - "event logs based on the complement of a selection criterion. i.e Select all event " + - "logs except the ones which have application name as the input string.") + "For filtering based on complement of application name, use ~APPLICATION_NAME. i.e " + + "Select all event logs except the ones which have application name as the input string.") val matchEventLogs: ScallopOption[String] = opt[String](required = false, descr = "Filter event logs whose filenames contain the input string")