Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions R/pkg/inst/worker/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
inputCon <- socketConnection(
port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout)

# Waits indefinitely for a socket connecion by default.
selectTimeout <- NULL

# Exit code that children send to the parent to indicate they exited.
exitCode <- 1

while (TRUE) {
ready <- socketSelect(list(inputCon))
ready <- socketSelect(list(inputCon), timeout = selectTimeout)

# Note that the children should be terminated in the parent. If each child terminates
# itself, it appears that the resource is not released properly, that causes an unexpected
# termination of this daemon due to, for example, running out of file descriptors
# (see SPARK-21093). Therefore, the current implementation tries to retrieve children
# that are exited (but not terminated) and then sends a kill signal to terminate them properly
# in the parent.
#
# There are two paths that it attempts to send a signal to terminate the children in the parent.
#
# 1. Every second if any socket connection is not available and if there are child workers
# running.
# 2. Right after a socket connection is available.
#
# In other words, the parent attempts to send the signal to the children every second if
# any worker is running or right before launching other worker children from the following
# new socket connection.

# Only the process IDs of children sent data to the parent are returned below. The children
# send a custom exit code to the parent after being exited and the parent tries
# to terminate them only if they sent the exit code.
children <- parallel:::selectChildren(timeout = 0)

if (is.integer(children)) {
lapply(children, function(child) {
# This data should be raw bytes if any data was sent from this child.
# Otherwise, this returns the PID.
data <- parallel:::readChild(child)
if (is.raw(data)) {
# This checks if the data from this child is the exit code that indicates an exited child.
if (unserialize(data) == exitCode) {
# If so, we terminate this child.
tools::pskill(child, tools::SIGUSR1)
}
}
})
} else if (is.null(children)) {
# If it is NULL, there are no children. Waits indefinitely for a socket connecion.
selectTimeout <- NULL
}

if (ready) {
port <- SparkR:::readInt(inputCon)
# There is a small chance that it could be interrupted by signal, retry one time
Expand All @@ -44,12 +91,16 @@ while (TRUE) {
}
p <- parallel:::mcfork()
if (inherits(p, "masterProcess")) {
# Reach here because this is a child process.
close(inputCon)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment here to say "# reach here because this is a child process"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Sys.setenv(SPARKR_WORKER_PORT = port)
try(source(script))
# Set SIGUSR1 so that child can exit
tools::pskill(Sys.getpid(), tools::SIGUSR1)
parallel:::mcexit(0L)
# Note that this mcexit does not fully terminate this child. So, this writes back
# a custom exit code so that the parent can read and terminate this child.
parallel:::mcexit(0L, send = exitCode)
} else {
# Forking succeeded and we need to check if they finished their jobs every second.
selectTimeout <- 1
}
}
}