From 6923256d832c3e548ef095e7f3f41086a1536a51 Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Mon, 21 Feb 2022 18:55:44 +0100 Subject: [PATCH] Ensure clean worker shutdown and avoid zombie processes --- .../runners/worker/worker_pool_main.py | 38 +++-- sdks/python/container/boot.go | 136 ++++++++++++++---- 2 files changed, 130 insertions(+), 44 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py index eb5cdd935161b..6ed7bbec9e358 100644 --- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py +++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py @@ -51,6 +51,27 @@ _LOGGER = logging.getLogger(__name__) +def kill_process_gracefully(proc, timeout=5): + """ + Kill a worker process gracefully by sending a SIGTERM and waiting for it to finish. + A SIGKILL will be sent if the process has not finished after ``timeout`` seconds. + """ + def _kill(): + proc.terminate() + t = time.time() + while time.time() < t + timeout: + time.sleep(0.01) + if proc.poll() is not None: + return + _LOGGER.warning("Worker process not responding, sending SIGKILL.") + proc.kill() + proc.wait() # Avoid zombies + + kill_thread = threading.Thread(target=_kill) + kill_thread.start() + kill_thread.join() + + class BeamFnExternalWorkerPoolServicer( beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): @@ -94,8 +115,9 @@ def start( # Register to kill the subprocesses on exit. def kill_worker_processes(): + _LOGGER.error("CLEANING UP WORKERS!!!!") for worker_process in worker_pool._worker_processes.values(): - worker_process.kill() + kill_process_gracefully(worker_process) atexit.register(kill_worker_processes) @@ -172,19 +194,9 @@ def StopWorker(self, worker_process = self._worker_processes.pop( stop_worker_request.worker_id, None) if worker_process: - - def kill_worker_process(): - try: - worker_process.kill() - except OSError: - # ignore already terminated process - return - _LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id) - # communicate is necessary to avoid zombie process - # time box communicate (it has no timeout parameter in Py2) - threading.Timer(1, kill_worker_process).start() - worker_process.communicate() + kill_process_gracefully(worker_process) + return beam_fn_api_pb2.StopWorkerResponse() diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index d6cba7b11a976..a1401047c6f70 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -32,6 +32,7 @@ import ( "strings" "syscall" "sync" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" @@ -74,8 +75,8 @@ const ( func main() { if err := mainError(); err != nil { - log.Print(err) - os.Exit(1) + log.Print(err) + os.Exit(1) } } @@ -151,28 +152,24 @@ func mainError() error { // // No log.Fatalf() from here on, otherwise deferred cleanups will not be called! + // Trap signals, so we can clean up properly. + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + venvDir, err := setupVenv(filepath.Join(*semiPersistDir, "beam-venv"), *id) if err != nil { - return fmt.Errorf("Failed to initialize Python venv.") + return fmt.Errorf("Failed to initialize Python venv.") } cleanupFunc := func() { - log.Printf("Cleaning up temporary venv ...") - os.RemoveAll(venvDir) + log.Printf("Cleaning up temporary venv ...") + os.RemoveAll(venvDir) } defer cleanupFunc() - signalChannel := make(chan os.Signal, 1) - signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) - go func() { - log.Printf("Received signal: ", (<-signalChannel).String()) - cleanupFunc() - os.Exit(1) - }() dir := filepath.Join(*semiPersistDir, "staged") - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) if err != nil { - return fmt.Errorf("Failed to retrieve staged files: %v", err) + return fmt.Errorf("Failed to retrieve staged files: %v", err) } // TODO(herohde): the packages to install should be specified explicitly. It @@ -180,17 +177,17 @@ func mainError() error { fileNames := make([]string, len(files)) requirementsFiles := []string{requirementsFile} for i, v := range files { - name, _ := artifact.MustExtractFilePayload(v) - log.Printf("Found artifact: %s", name) - fileNames[i] = name + name, _ := artifact.MustExtractFilePayload(v) + log.Printf("Found artifact: %s", name) + fileNames[i] = name - if v.RoleUrn == artifact.URNPipRequirementsFile { - requirementsFiles = append(requirementsFiles, name) - } + if v.RoleUrn == artifact.URNPipRequirementsFile { + requirementsFiles = append(requirementsFiles, name) + } } if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil { - return fmt.Errorf("Failed to install required packages: %v", setupErr) + return fmt.Errorf("Failed to install required packages: %v", setupErr) } // (3) Invoke python @@ -214,19 +211,55 @@ func mainError() error { } } + workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...) + + // Keep track of child PIDs for clean shutdown without zombies + childPids := struct { + v []int + canceled bool + mu sync.Mutex + } {v: make([]int, 0, len(workerIds))} + + // Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies + go func() { + log.Printf("Received signal: %v", <-signalChannel) + childPids.mu.Lock() + childPids.canceled = true + for _, pid := range childPids.v { + syscall.Kill(-pid, syscall.SIGTERM) + go func() { + // This goroutine will be canceled if the main process exits before the 5 seconds + // have elapsed, i.e., as soon as all subprocesses have returned from Wait(). + time.Sleep(5 * time.Second) + log.Printf("Worker did not respond, killing it.") + syscall.Kill(-pid, syscall.SIGKILL) + }() + } + childPids.mu.Unlock() + }() + args := []string{ "-m", sdkHarnessEntrypoint, } - workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...) var wg sync.WaitGroup wg.Add(len(workerIds)) for _, workerId := range workerIds { go func(workerId string) { defer wg.Done() + + childPids.mu.Lock() + if childPids.canceled { + childPids.mu.Unlock() + return + } log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) - log.Printf("Python (worker %v) exited with code: %v", workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)) + cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...) + childPids.v = append(childPids.v, cmd.Process.Pid) + childPids.mu.Unlock() + + log.Printf("Python (worker %v) exited: %v", workerId, cmd.Wait()) }(workerId) } wg.Wait() @@ -234,24 +267,65 @@ func mainError() error { return nil } +// Start a command object in a new process group with the given arguments with +// additional environment variables. It attaches stdio to the child process. +// Returns the process handle. +func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.Cmd { + cmd := exec.Command(prog, args...) + cmd.Stdin = os.Stdin + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if env != nil { + cmd.Env = os.Environ() + for k, v := range env { + cmd.Env = append(cmd.Env, k+"="+v) + } + } + + // Create process group so we can clean up the whole subtree later without creating zombies + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0} + cmd.Start() + return cmd +} + // setupVenv initialize a local Python venv and set the corresponding env variables func setupVenv(baseDir, workerId string) (string, error) { log.Printf("Initializing temporary Python venv ...") - if err := os.MkdirAll(baseDir, 0750); err != nil { - return "", fmt.Errorf("Failed to create venv base directory: %s", err) + dir := filepath.Join(baseDir, "beam-venv-worker-" + workerId) + if _, err := os.Stat(dir); !os.IsNotExist(err) { + // Probably leftovers from a previous run + log.Printf("Cleaning up previous venv ...") + if err := os.RemoveAll(dir); err != nil { + return "", err + } } - dir, err := ioutil.TempDir(baseDir, fmt.Sprintf("beam-venv-%s-", workerId)) - if err != nil { - return "", fmt.Errorf("Failed Python venv directory: %s", err) + if err := os.MkdirAll(dir, 0750); err != nil { + return "", fmt.Errorf("Failed to create Python venv directory: %s", err) } - args := []string{"-m", "venv", "--system-site-packages", dir} - if err := execx.Execute("python", args...); err != nil { - return "", err + if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil { + return "", fmt.Errorf("Python venv initialization failed: %s", err) } + os.Setenv("VIRTUAL_ENV", dir) os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":")) return dir, nil + +// log.Printf("Initializing temporary Python venv ...") +// +// if err := os.MkdirAll(baseDir, 0750); err != nil { +// return "", fmt.Errorf("Failed to create venv base directory: %s", err) +// } +// dir, err := ioutil.TempDir(baseDir, fmt.Sprintf("beam-venv-%s-", workerId)) +// if err != nil { +// return "", fmt.Errorf("Failed Python venv directory: %s", err) +// } +// if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil { +// return "", fmt.Errorf("Python venv initialization failed: %s", err) +// } +// os.Setenv("VIRTUAL_ENV", dir) +// os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":")) +// return dir, nil } // setupAcceptableWheelSpecs setup wheel specs according to installed python version