Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions R/pkg/inst/worker/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,50 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
inputCon <- socketConnection(
port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)

# Waits indefinitely for a socket connecion by default.
selectTimeout <- NULL

while (TRUE) {
ready <- socketSelect(list(inputCon))
ready <- socketSelect(list(inputCon), timeout = selectTimeout)

# Note that the children should be terminated in the parent. If each child terminates
# itself, it appears that the resource is not released properly, that causes an unexpected
# termination of this daemon due to, for example, running out of file descriptors
# (see SPARK-21093). Therefore, the current implementation tries to retrieve children
# that are exited (but not terminated) and then sends a kill signal to terminate them properly
# in the parent.
#
# There are two paths that it attempts to send a signal to terminate the children in the parent.
#
# 1. Every second if any socket connection is not available and if there are child workers
# running.
# 2. Right after a socket connection is available.
#
# In other words, the parent attempts to send the signal to the children every second if
# any worker is running or right before launching other worker children from the following
# new socket connection.

# The process IDs of exited children are returned below.
children <- parallel:::selectChildren(timeout = 0)

if (is.integer(children)) {
lapply(children, function(child) {
# This should be the PIDs of exited children. Otherwise, this returns raw bytes if any data
# was sent from this child.
pid <- parallel:::readChild(child)
if (is.integer(pid)) {
# This checks if the data from this child is a pid.
if (child == pid) {
# If so, we terminate this child.
tools::pskill(child, tools::SIGUSR1)
}
}
})
} else if (is.null(children)) {
# If it is NULL, there are no children. Waits indefinitely for a socket connecion.
selectTimeout <- NULL
}

if (ready) {
port <- SparkR:::readInt(inputCon)
# There is a small chance that it could be interrupted by signal, retry one time
Expand All @@ -44,12 +86,15 @@ while (TRUE) {
}
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
# Reach here because this is a child process.
close(inputCon)
Sys.setenv(SPARKR_WORKER_PORT = port)
try(source(script))
# Set SIGUSR1 so that child can exit
tools::pskill(Sys.getpid(), tools::SIGUSR1)
# Note that this mcexit does not fully terminate this child.
parallel:::mcexit(0L)
} else {
# Forking succeeded and we need to check if they finished their jobs every second.
selectTimeout <- 1
}
}
}
4 changes: 3 additions & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))
PSTATUS=$((PIPESTATUS[0]))
echo "PIPESTATUS: "$PSTATUS
FAILED=$(($PSTATUS||$FAILED))

NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"

Expand Down
2 changes: 0 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ only_commits:
files:
- appveyor.yml
- dev/appveyor-install-dependencies.ps1
- R/
- sql/core/src/main/scala/org/apache/spark/sql/api/r/
- core/src/main/scala/org/apache/spark/api/r/
- mllib/src/main/scala/org/apache/spark/ml/r/

cache:
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ private[r] class BufferedStreamThread(
var lineIdx = 0
override def run() {
for (line <- Source.fromInputStream(in).getLines) {
// scalastyle:off println
// Try to print everything just for easy debugging.
println(line)
// scalastyle:on println
synchronized {
lines(lineIdx) = line
lineIdx = (lineIdx + 1) % errBufferSize
Expand Down
8 changes: 4 additions & 4 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,12 @@ def main():
build_spark_assembly_sbt(hadoop_version)

# run the test suites
run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags)
# run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags)

modules_with_python_tests = [m for m in test_modules if m.python_test_goals]
if modules_with_python_tests:
run_python_tests(modules_with_python_tests, opts.parallelism)
run_python_packaging_tests()
# if modules_with_python_tests:
# run_python_tests(modules_with_python_tests, opts.parallelism)
# run_python_packaging_tests()
if any(m.should_run_r_tests for m in test_modules):
run_sparkr_tests()

Expand Down