From f2c15a3241583b9d692cf48b36e62d8b84cbc5dd Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 25 Jun 2017 11:05:57 -0700 Subject: [PATCH 1/8] [SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak ## What changes were proposed in this pull request? `mcfork` in R looks opening a pipe ahead but the existing logic does not properly close it when it is executed hot. This leads to the failure of more forking due to the limit for number of files open. This hot execution looks particularly for `gapply`/`gapplyCollect`. For unknown reason, this happens more easily in CentOS and could be reproduced in Mac too. All the details are described in https://issues.apache.org/jira/browse/SPARK-21093 This PR proposes simply to terminate R's worker processes in the parent of R's daemon to prevent a leak. ## How was this patch tested? I ran the codes below on both CentOS and Mac with that configuration disabled/enabled. ```r df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d")) collect(gapply(df, "a", function(key, x) { x }, schema(df))) collect(gapply(df, "a", function(key, x) { x }, schema(df))) ... # 30 times ``` Also, now it passes R tests on CentOS as below: ``` SparkSQL functions: Spark package found in SPARK_HOME: .../spark .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .............................................................................................................................................................. .................................................................................................................................... ``` Author: hyukjinkwon Closes #18320 from HyukjinKwon/SPARK-21093. --- R/pkg/inst/worker/daemon.R | 59 +++++++++++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R index 3a318b71ea06d..6e385b2a27622 100644 --- a/R/pkg/inst/worker/daemon.R +++ b/R/pkg/inst/worker/daemon.R @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { + lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { + # This checks if the data from this child is the exit code that indicates an exited child. + if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) + } + } + }) + } else if (is.null(children)) { + # If it is NULL, there are no children. Waits indefinitely for a socket connecion. + selectTimeout <- NULL + } + if (ready) { port <- SparkR:::readInt(inputCon) # There is a small chance that it could be interrupted by signal, retry one time @@ -44,12 +91,16 @@ while (TRUE) { } p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { + # Reach here because this is a child process. close(inputCon) Sys.setenv(SPARKR_WORKER_PORT = port) try(source(script)) - # Set SIGUSR1 so that child can exit - tools::pskill(Sys.getpid(), tools::SIGUSR1) - parallel:::mcexit(0L) + # Note that this mcexit does not fully terminate this child. So, this writes back + # a custom exit code so that the parent can read and terminate this child. + parallel:::mcexit(0L, send = exitCode) + } else { + # Forking succeeded and we need to check if they finished their jobs every second. + selectTimeout <- 1 } } } From 9e907cbaa6d6b65e09008181b61747ffcb67d5d0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 29 Jun 2017 12:46:12 +0900 Subject: [PATCH 2/8] Disable Scala/Python tests for debugging and print everything --- core/src/main/scala/org/apache/spark/api/r/RRunner.scala | 4 ++++ dev/run-tests.py | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index 88118392003e8..9928969a3a6b5 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -290,6 +290,10 @@ private[r] class BufferedStreamThread( var lineIdx = 0 override def run() { for (line <- Source.fromInputStream(in).getLines) { + // scalastyle:off println + // Try to print everything just for easy debugging. + println(line) + // scalastyle:on println synchronized { lines(lineIdx) = line lineIdx = (lineIdx + 1) % errBufferSize diff --git a/dev/run-tests.py b/dev/run-tests.py index 72d148d7ea0fb..efb15d141774d 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -601,12 +601,12 @@ def main(): build_spark_assembly_sbt(hadoop_version) # run the test suites - run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags) + # run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags) modules_with_python_tests = [m for m in test_modules if m.python_test_goals] - if modules_with_python_tests: - run_python_tests(modules_with_python_tests, opts.parallelism) - run_python_packaging_tests() + # if modules_with_python_tests: + # run_python_tests(modules_with_python_tests, opts.parallelism) + # run_python_packaging_tests() if any(m.should_run_r_tests for m in test_modules): run_sparkr_tests() From 5d5b39077d49225df2603217dea7e8d978a22a76 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 29 Jun 2017 12:51:18 +0900 Subject: [PATCH 3/8] Disable AppVeyor tests too in this PR --- appveyor.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/appveyor.yml b/appveyor.yml index 43dad9bce60ac..dd24370b9b5ba 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -28,9 +28,7 @@ only_commits: files: - appveyor.yml - dev/appveyor-install-dependencies.ps1 - - R/ - sql/core/src/main/scala/org/apache/spark/sql/api/r/ - - core/src/main/scala/org/apache/spark/api/r/ - mllib/src/main/scala/org/apache/spark/ml/r/ cache: From 86bfa22d1f8d46e75dcc5f9085b7976365bc0e8f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 29 Jun 2017 13:46:21 +0900 Subject: [PATCH 4/8] Print pipe status --- R/run-tests.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/R/run-tests.sh b/R/run-tests.sh index 29764f48bd156..e2e260d2f5f94 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -24,6 +24,7 @@ LOGFILE=$FWDIR/unit-tests.out rm -f $LOGFILE SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +echo "PIPESTATUS: "$((PIPESTATUS[0])) FAILED=$((PIPESTATUS[0]||$FAILED)) NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)" From 488c2871e4589f1a469cff2dba1e962173eaf910 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 29 Jun 2017 13:49:01 +0900 Subject: [PATCH 5/8] Should keep the status --- R/run-tests.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/R/run-tests.sh b/R/run-tests.sh index e2e260d2f5f94..79c28e6e50129 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -24,8 +24,9 @@ LOGFILE=$FWDIR/unit-tests.out rm -f $LOGFILE SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE -echo "PIPESTATUS: "$((PIPESTATUS[0])) -FAILED=$((PIPESTATUS[0]||$FAILED)) +PSTATUS=$((PIPESTATUS[0])) +echo "PIPESTATUS: "$PSTATUS +FAILED=$(($PSTATUS||$FAILED)) NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)" From 466325d3fd353668583f3bde38ae490d9db0b189 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 29 Jun 2017 14:37:31 +0900 Subject: [PATCH 6/8] Revert the suspicious change (the last non-comment change) in the past PR --- R/pkg/inst/worker/daemon.R | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R index 6e385b2a27622..e907fe3238d59 100644 --- a/R/pkg/inst/worker/daemon.R +++ b/R/pkg/inst/worker/daemon.R @@ -33,9 +33,6 @@ inputCon <- socketConnection( # Waits indefinitely for a socket connecion by default. selectTimeout <- NULL -# Exit code that children send to the parent to indicate they exited. -exitCode <- 1 - while (TRUE) { ready <- socketSelect(list(inputCon), timeout = selectTimeout) @@ -62,18 +59,7 @@ while (TRUE) { children <- parallel:::selectChildren(timeout = 0) if (is.integer(children)) { - lapply(children, function(child) { - # This data should be raw bytes if any data was sent from this child. - # Otherwise, this returns the PID. - data <- parallel:::readChild(child) - if (is.raw(data)) { - # This checks if the data from this child is the exit code that indicates an exited child. - if (unserialize(data) == exitCode) { - # If so, we terminate this child. - tools::pskill(child, tools::SIGUSR1) - } - } - }) + lapply(children, function(child) { tools::pskill(child, tools::SIGUSR1) }) } else if (is.null(children)) { # If it is NULL, there are no children. Waits indefinitely for a socket connecion. selectTimeout <- NULL @@ -97,7 +83,7 @@ while (TRUE) { try(source(script)) # Note that this mcexit does not fully terminate this child. So, this writes back # a custom exit code so that the parent can read and terminate this child. - parallel:::mcexit(0L, send = exitCode) + parallel:::mcexit(0L) } else { # Forking succeeded and we need to check if they finished their jobs every second. selectTimeout <- 1 From 3f151658caaf069b1b6515e3b68cfd793f67420e Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 29 Jun 2017 16:42:46 +0900 Subject: [PATCH 7/8] Back to the original state --- R/pkg/inst/worker/daemon.R | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R index e907fe3238d59..6e385b2a27622 100644 --- a/R/pkg/inst/worker/daemon.R +++ b/R/pkg/inst/worker/daemon.R @@ -33,6 +33,9 @@ inputCon <- socketConnection( # Waits indefinitely for a socket connecion by default. selectTimeout <- NULL +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { ready <- socketSelect(list(inputCon), timeout = selectTimeout) @@ -59,7 +62,18 @@ while (TRUE) { children <- parallel:::selectChildren(timeout = 0) if (is.integer(children)) { - lapply(children, function(child) { tools::pskill(child, tools::SIGUSR1) }) + lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { + # This checks if the data from this child is the exit code that indicates an exited child. + if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) + } + } + }) } else if (is.null(children)) { # If it is NULL, there are no children. Waits indefinitely for a socket connecion. selectTimeout <- NULL @@ -83,7 +97,7 @@ while (TRUE) { try(source(script)) # Note that this mcexit does not fully terminate this child. So, this writes back # a custom exit code so that the parent can read and terminate this child. - parallel:::mcexit(0L) + parallel:::mcexit(0L, send = exitCode) } else { # Forking succeeded and we need to check if they finished their jobs every second. selectTimeout <- 1 From 0a7589c09f53dfc2094497d8d3e59d6407569417 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 29 Jun 2017 17:04:35 +0900 Subject: [PATCH 8/8] New approach --- R/pkg/inst/worker/daemon.R | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R index 6e385b2a27622..1b4fd4727fcdf 100644 --- a/R/pkg/inst/worker/daemon.R +++ b/R/pkg/inst/worker/daemon.R @@ -33,9 +33,6 @@ inputCon <- socketConnection( # Waits indefinitely for a socket connecion by default. selectTimeout <- NULL -# Exit code that children send to the parent to indicate they exited. -exitCode <- 1 - while (TRUE) { ready <- socketSelect(list(inputCon), timeout = selectTimeout) @@ -56,19 +53,17 @@ while (TRUE) { # any worker is running or right before launching other worker children from the following # new socket connection. - # Only the process IDs of children sent data to the parent are returned below. The children - # send a custom exit code to the parent after being exited and the parent tries - # to terminate them only if they sent the exit code. + # The process IDs of exited children are returned below. children <- parallel:::selectChildren(timeout = 0) if (is.integer(children)) { lapply(children, function(child) { - # This data should be raw bytes if any data was sent from this child. - # Otherwise, this returns the PID. - data <- parallel:::readChild(child) - if (is.raw(data)) { - # This checks if the data from this child is the exit code that indicates an exited child. - if (unserialize(data) == exitCode) { + # This should be the PIDs of exited children. Otherwise, this returns raw bytes if any data + # was sent from this child. + pid <- parallel:::readChild(child) + if (is.integer(pid)) { + # This checks if the data from this child is a pid. + if (child == pid) { # If so, we terminate this child. tools::pskill(child, tools::SIGUSR1) } @@ -95,9 +90,8 @@ while (TRUE) { close(inputCon) Sys.setenv(SPARKR_WORKER_PORT = port) try(source(script)) - # Note that this mcexit does not fully terminate this child. So, this writes back - # a custom exit code so that the parent can read and terminate this child. - parallel:::mcexit(0L, send = exitCode) + # Note that this mcexit does not fully terminate this child. + parallel:::mcexit(0L) } else { # Forking succeeded and we need to check if they finished their jobs every second. selectTimeout <- 1