Skip to content

Commit

Permalink
Ensure clean worker shutdown and avoid zombie processes
Browse files Browse the repository at this point in the history
  • Loading branch information
phoerious committed Feb 22, 2022
1 parent f088fb2 commit 6923256
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 44 deletions.
38 changes: 25 additions & 13 deletions sdks/python/apache_beam/runners/worker/worker_pool_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,27 @@
_LOGGER = logging.getLogger(__name__)


def kill_process_gracefully(proc, timeout=5):
"""
Kill a worker process gracefully by sending a SIGTERM and waiting for it to finish.
A SIGKILL will be sent if the process has not finished after ``timeout`` seconds.
"""
def _kill():
proc.terminate()
t = time.time()
while time.time() < t + timeout:
time.sleep(0.01)
if proc.poll() is not None:
return
_LOGGER.warning("Worker process not responding, sending SIGKILL.")
proc.kill()
proc.wait() # Avoid zombies

kill_thread = threading.Thread(target=_kill)
kill_thread.start()
kill_thread.join()


class BeamFnExternalWorkerPoolServicer(
beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):

Expand Down Expand Up @@ -94,8 +115,9 @@ def start(

# Register to kill the subprocesses on exit.
def kill_worker_processes():
_LOGGER.error("CLEANING UP WORKERS!!!!")
for worker_process in worker_pool._worker_processes.values():
worker_process.kill()
kill_process_gracefully(worker_process)

atexit.register(kill_worker_processes)

Expand Down Expand Up @@ -172,19 +194,9 @@ def StopWorker(self,
worker_process = self._worker_processes.pop(
stop_worker_request.worker_id, None)
if worker_process:

def kill_worker_process():
try:
worker_process.kill()
except OSError:
# ignore already terminated process
return

_LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id)
# communicate is necessary to avoid zombie process
# time box communicate (it has no timeout parameter in Py2)
threading.Timer(1, kill_worker_process).start()
worker_process.communicate()
kill_process_gracefully(worker_process)

return beam_fn_api_pb2.StopWorkerResponse()


Expand Down
136 changes: 105 additions & 31 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"strings"
"syscall"
"sync"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
Expand Down Expand Up @@ -74,8 +75,8 @@ const (

func main() {
if err := mainError(); err != nil {
log.Print(err)
os.Exit(1)
log.Print(err)
os.Exit(1)
}
}

Expand Down Expand Up @@ -151,46 +152,42 @@ func mainError() error {
//
// No log.Fatalf() from here on, otherwise deferred cleanups will not be called!

// Trap signals, so we can clean up properly.
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

venvDir, err := setupVenv(filepath.Join(*semiPersistDir, "beam-venv"), *id)
if err != nil {
return fmt.Errorf("Failed to initialize Python venv.")
return fmt.Errorf("Failed to initialize Python venv.")
}
cleanupFunc := func() {
log.Printf("Cleaning up temporary venv ...")
os.RemoveAll(venvDir)
log.Printf("Cleaning up temporary venv ...")
os.RemoveAll(venvDir)
}
defer cleanupFunc()
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)
go func() {
log.Printf("Received signal: ", (<-signalChannel).String())
cleanupFunc()
os.Exit(1)
}()

dir := filepath.Join(*semiPersistDir, "staged")

files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
if err != nil {
return fmt.Errorf("Failed to retrieve staged files: %v", err)
return fmt.Errorf("Failed to retrieve staged files: %v", err)
}

// TODO(herohde): the packages to install should be specified explicitly. It
// would also be possible to install the SDK in the Dockerfile.
fileNames := make([]string, len(files))
requirementsFiles := []string{requirementsFile}
for i, v := range files {
name, _ := artifact.MustExtractFilePayload(v)
log.Printf("Found artifact: %s", name)
fileNames[i] = name
name, _ := artifact.MustExtractFilePayload(v)
log.Printf("Found artifact: %s", name)
fileNames[i] = name

if v.RoleUrn == artifact.URNPipRequirementsFile {
requirementsFiles = append(requirementsFiles, name)
}
if v.RoleUrn == artifact.URNPipRequirementsFile {
requirementsFiles = append(requirementsFiles, name)
}
}

if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil {
return fmt.Errorf("Failed to install required packages: %v", setupErr)
return fmt.Errorf("Failed to install required packages: %v", setupErr)
}

// (3) Invoke python
Expand All @@ -214,44 +211,121 @@ func mainError() error {
}
}

workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)

// Keep track of child PIDs for clean shutdown without zombies
childPids := struct {
v []int
canceled bool
mu sync.Mutex
} {v: make([]int, 0, len(workerIds))}

// Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies
go func() {
log.Printf("Received signal: %v", <-signalChannel)
childPids.mu.Lock()
childPids.canceled = true
for _, pid := range childPids.v {
syscall.Kill(-pid, syscall.SIGTERM)
go func() {
// This goroutine will be canceled if the main process exits before the 5 seconds
// have elapsed, i.e., as soon as all subprocesses have returned from Wait().
time.Sleep(5 * time.Second)
log.Printf("Worker did not respond, killing it.")
syscall.Kill(-pid, syscall.SIGKILL)
}()
}
childPids.mu.Unlock()
}()

args := []string{
"-m",
sdkHarnessEntrypoint,
}

workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
go func(workerId string) {
defer wg.Done()

childPids.mu.Lock()
if childPids.canceled {
childPids.mu.Unlock()
return
}
log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
log.Printf("Python (worker %v) exited with code: %v", workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...))
cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)
childPids.v = append(childPids.v, cmd.Process.Pid)
childPids.mu.Unlock()

log.Printf("Python (worker %v) exited: %v", workerId, cmd.Wait())
}(workerId)
}
wg.Wait()

return nil
}

// Start a command object in a new process group with the given arguments with
// additional environment variables. It attaches stdio to the child process.
// Returns the process handle.
func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.Cmd {
cmd := exec.Command(prog, args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if env != nil {
cmd.Env = os.Environ()
for k, v := range env {
cmd.Env = append(cmd.Env, k+"="+v)
}
}

// Create process group so we can clean up the whole subtree later without creating zombies
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
cmd.Start()
return cmd
}

// setupVenv initialize a local Python venv and set the corresponding env variables
func setupVenv(baseDir, workerId string) (string, error) {
log.Printf("Initializing temporary Python venv ...")

if err := os.MkdirAll(baseDir, 0750); err != nil {
return "", fmt.Errorf("Failed to create venv base directory: %s", err)
dir := filepath.Join(baseDir, "beam-venv-worker-" + workerId)
if _, err := os.Stat(dir); !os.IsNotExist(err) {
// Probably leftovers from a previous run
log.Printf("Cleaning up previous venv ...")
if err := os.RemoveAll(dir); err != nil {
return "", err
}
}
dir, err := ioutil.TempDir(baseDir, fmt.Sprintf("beam-venv-%s-", workerId))
if err != nil {
return "", fmt.Errorf("Failed Python venv directory: %s", err)
if err := os.MkdirAll(dir, 0750); err != nil {
return "", fmt.Errorf("Failed to create Python venv directory: %s", err)
}
args := []string{"-m", "venv", "--system-site-packages", dir}
if err := execx.Execute("python", args...); err != nil {
return "", err
if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil {
return "", fmt.Errorf("Python venv initialization failed: %s", err)
}

os.Setenv("VIRTUAL_ENV", dir)
os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))
return dir, nil

// log.Printf("Initializing temporary Python venv ...")
//
// if err := os.MkdirAll(baseDir, 0750); err != nil {
// return "", fmt.Errorf("Failed to create venv base directory: %s", err)
// }
// dir, err := ioutil.TempDir(baseDir, fmt.Sprintf("beam-venv-%s-", workerId))
// if err != nil {
// return "", fmt.Errorf("Failed Python venv directory: %s", err)
// }
// if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil {
// return "", fmt.Errorf("Python venv initialization failed: %s", err)
// }
// os.Setenv("VIRTUAL_ENV", dir)
// os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))
// return dir, nil
}

// setupAcceptableWheelSpecs setup wheel specs according to installed python version
Expand Down

0 comments on commit 6923256

Please sign in to comment.