diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R index 3a318b71ea06d..1b4fd4727fcdf 100644 --- a/R/pkg/inst/worker/daemon.R +++ b/R/pkg/inst/worker/daemon.R @@ -30,8 +30,50 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # The process IDs of exited children are returned below. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { + lapply(children, function(child) { + # This should be the PIDs of exited children. Otherwise, this returns raw bytes if any data + # was sent from this child. + pid <- parallel:::readChild(child) + if (is.integer(pid)) { + # This checks if the data from this child is a pid. + if (child == pid) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) + } + } + }) + } else if (is.null(children)) { + # If it is NULL, there are no children. Waits indefinitely for a socket connecion. + selectTimeout <- NULL + } + if (ready) { port <- SparkR:::readInt(inputCon) # There is a small chance that it could be interrupted by signal, retry one time @@ -44,12 +86,15 @@ while (TRUE) { } p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { + # Reach here because this is a child process. close(inputCon) Sys.setenv(SPARKR_WORKER_PORT = port) try(source(script)) - # Set SIGUSR1 so that child can exit - tools::pskill(Sys.getpid(), tools::SIGUSR1) + # Note that this mcexit does not fully terminate this child. parallel:::mcexit(0L) + } else { + # Forking succeeded and we need to check if they finished their jobs every second. + selectTimeout <- 1 } } } diff --git a/R/run-tests.sh b/R/run-tests.sh index 29764f48bd156..79c28e6e50129 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -24,7 +24,9 @@ LOGFILE=$FWDIR/unit-tests.out rm -f $LOGFILE SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE -FAILED=$((PIPESTATUS[0]||$FAILED)) +PSTATUS=$((PIPESTATUS[0])) +echo "PIPESTATUS: "$PSTATUS +FAILED=$(($PSTATUS||$FAILED)) NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)" diff --git a/appveyor.yml b/appveyor.yml index 43dad9bce60ac..dd24370b9b5ba 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -28,9 +28,7 @@ only_commits: files: - appveyor.yml - dev/appveyor-install-dependencies.ps1 - - R/ - sql/core/src/main/scala/org/apache/spark/sql/api/r/ - - core/src/main/scala/org/apache/spark/api/r/ - mllib/src/main/scala/org/apache/spark/ml/r/ cache: diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index 88118392003e8..9928969a3a6b5 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -290,6 +290,10 @@ private[r] class BufferedStreamThread( var lineIdx = 0 override def run() { for (line <- Source.fromInputStream(in).getLines) { + // scalastyle:off println + // Try to print everything just for easy debugging. + println(line) + // scalastyle:on println synchronized { lines(lineIdx) = line lineIdx = (lineIdx + 1) % errBufferSize diff --git a/dev/run-tests.py b/dev/run-tests.py index 72d148d7ea0fb..efb15d141774d 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -601,12 +601,12 @@ def main(): build_spark_assembly_sbt(hadoop_version) # run the test suites - run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags) + # run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags) modules_with_python_tests = [m for m in test_modules if m.python_test_goals] - if modules_with_python_tests: - run_python_tests(modules_with_python_tests, opts.parallelism) - run_python_packaging_tests() + # if modules_with_python_tests: + # run_python_tests(modules_with_python_tests, opts.parallelism) + # run_python_packaging_tests() if any(m.should_run_r_tests for m in test_modules): run_sparkr_tests()