From 784877b37e4bfbd04669a359cdb5fc1ebf509dce Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Mon, 31 Jan 2022 13:01:40 +0100 Subject: [PATCH 1/9] Install pipline dependencies to temporary venv Fixes BEAM-12792 The change allows users to submit multiple pipelines to the same SDK container without dependency conflicts. Dependencies in --setup_only mode are still installed globally. --- CHANGES.md | 1 + sdks/python/container/Dockerfile | 2 +- sdks/python/container/boot.go | 166 +++++++++++++++---------------- 3 files changed, 83 insertions(+), 86 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 824b7982c4d1..1f04dae0b6e9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -363,6 +363,7 @@ * Added support for cloudpickle as a pickling library for Python SDK ([BEAM-8123](https://issues.apache.org/jira/browse/BEAM-8123)). To use cloudpickle, set pipeline option: --pickler_lib=cloudpickle * Added option to specify triggering frequency when streaming to BigQuery (Python) ([BEAM-12865](https://issues.apache.org/jira/browse/BEAM-12865)). * Added option to enable caching uploaded artifacts across job runs for Python Dataflow jobs ([BEAM-13459](https://issues.apache.org/jira/browse/BEAM-13459)). To enable, set pipeline option: --enable_artifact_caching, this will be enabled by default in a future release. +* Make Python SDK containers reusable on portable runners by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)). ## Breaking Changes diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile index a301db74ee08..fe5238d22ce3 100644 --- a/sdks/python/container/Dockerfile +++ b/sdks/python/container/Dockerfile @@ -67,7 +67,7 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa #### # Install Apache Beam SDK. Use --no-deps and pip check to verify that all -# necessary dependencies are specified in base_image_requiremetns.txt. +# necessary dependencies are specified in base_image_requirements.txt. #### COPY target/apache-beam.tar.gz /opt/apache/beam/tars/ RUN pip install --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp] diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 8e4cf772c0e0..d6cba7b11a97 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -25,12 +25,13 @@ import ( "io/ioutil" "log" "os" + "os/signal" "os/exec" "path/filepath" "regexp" "strings" + "syscall" "sync" - "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" @@ -39,7 +40,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" - "github.com/nightlyone/lockfile" ) var ( @@ -73,13 +73,20 @@ const ( ) func main() { + if err := mainError(); err != nil { + log.Print(err) + os.Exit(1) + } +} + +func mainError() error { flag.Parse() if *setupOnly { if err := processArtifactsInSetupOnlyMode(); err != nil { - log.Fatalf("Setup unsuccessful with error: %v", err) + return fmt.Errorf("Setup unsuccessful with error: %v", err) } - return + return nil } if *workerPool == true { @@ -92,21 +99,21 @@ func main() { "--container_executable=/opt/apache/beam/boot", } log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " ")) - log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...)) + return fmt.Errorf("Python SDK worker pool exited: %v", execx.Execute("python", args...)) } if *id == "" { - log.Fatal("No id provided.") + return fmt.Errorf("No id provided.") } if *provisionEndpoint == "" { - log.Fatal("No provision endpoint provided.") + return fmt.Errorf("No provision endpoint provided.") } ctx := grpcx.WriteWorkerID(context.Background(), *id) info, err := provision.Info(ctx, *provisionEndpoint) if err != nil { - log.Fatalf("Failed to obtain provisioning information: %v", err) + return fmt.Errorf("Failed to obtain provisioning information: %v", err) } log.Printf("Provision info:\n%v", info) @@ -122,13 +129,13 @@ func main() { } if *loggingEndpoint == "" { - log.Fatal("No logging endpoint provided.") + return fmt.Errorf("No logging endpoint provided.") } if *artifactEndpoint == "" { - log.Fatal("No artifact endpoint provided.") + return fmt.Errorf("No artifact endpoint provided.") } if *controlEndpoint == "" { - log.Fatal("No control endpoint provided.") + return fmt.Errorf("No control endpoint provided.") } log.Printf("Initializing python harness: %v", strings.Join(os.Args, " ")) @@ -137,46 +144,53 @@ func main() { options, err := provision.ProtoToJSON(info.GetPipelineOptions()) if err != nil { - log.Fatalf("Failed to convert pipeline options: %v", err) + return fmt.Errorf("Failed to convert pipeline options: %v", err) } // (2) Retrieve and install the staged packages. // - // Guard from concurrent artifact retrieval and installation, - // when called by child processes in a worker pool. + // No log.Fatalf() from here on, otherwise deferred cleanups will not be called! - materializeArtifactsFunc := func() { - dir := filepath.Join(*semiPersistDir, "staged") + venvDir, err := setupVenv(filepath.Join(*semiPersistDir, "beam-venv"), *id) + if err != nil { + return fmt.Errorf("Failed to initialize Python venv.") + } + cleanupFunc := func() { + log.Printf("Cleaning up temporary venv ...") + os.RemoveAll(venvDir) + } + defer cleanupFunc() + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) + go func() { + log.Printf("Received signal: ", (<-signalChannel).String()) + cleanupFunc() + os.Exit(1) + }() - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } + dir := filepath.Join(*semiPersistDir, "staged") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - fileNames := make([]string, len(files)) - requirementsFiles := []string{requirementsFile} - for i, v := range files { - name, _ := artifact.MustExtractFilePayload(v) - log.Printf("Found artifact: %s", name) - fileNames[i] = name - - if v.RoleUrn == artifact.URNPipRequirementsFile { - requirementsFiles = append(requirementsFiles, name) - } - } + files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) + if err != nil { + return fmt.Errorf("Failed to retrieve staged files: %v", err) + } - if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // TODO(herohde): the packages to install should be specified explicitly. It + // would also be possible to install the SDK in the Dockerfile. + fileNames := make([]string, len(files)) + requirementsFiles := []string{requirementsFile} + for i, v := range files { + name, _ := artifact.MustExtractFilePayload(v) + log.Printf("Found artifact: %s", name) + fileNames[i] = name + + if v.RoleUrn == artifact.URNPipRequirementsFile { + requirementsFiles = append(requirementsFiles, name) + } } - workerPoolId := os.Getenv(workerPoolIdEnv) - if workerPoolId != "" { - multiProcessExactlyOnce(materializeArtifactsFunc, "beam.install.complete."+workerPoolId) - } else { - materializeArtifactsFunc() + if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil { + return fmt.Errorf("Failed to install required packages: %v", setupErr) } // (3) Invoke python @@ -210,14 +224,37 @@ func main() { wg.Add(len(workerIds)) for _, workerId := range workerIds { go func(workerId string) { - log.Printf("Executing: python %v", strings.Join(args, " ")) - log.Fatalf("Python exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)) + defer wg.Done() + log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) + log.Printf("Python (worker %v) exited with code: %v", workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)) }(workerId) } wg.Wait() + + return nil +} + +// setupVenv initialize a local Python venv and set the corresponding env variables +func setupVenv(baseDir, workerId string) (string, error) { + log.Printf("Initializing temporary Python venv ...") + + if err := os.MkdirAll(baseDir, 0750); err != nil { + return "", fmt.Errorf("Failed to create venv base directory: %s", err) + } + dir, err := ioutil.TempDir(baseDir, fmt.Sprintf("beam-venv-%s-", workerId)) + if err != nil { + return "", fmt.Errorf("Failed Python venv directory: %s", err) + } + args := []string{"-m", "venv", "--system-site-packages", dir} + if err := execx.Execute("python", args...); err != nil { + return "", err + } + os.Setenv("VIRTUAL_ENV", dir) + os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":")) + return dir, nil } -// setup wheel specs according to installed python version +// setupAcceptableWheelSpecs setup wheel specs according to installed python version func setupAcceptableWheelSpecs() error { cmd := exec.Command("python", "-V") stdoutStderr, err := cmd.CombinedOutput() @@ -282,47 +319,6 @@ func joinPaths(dir string, paths ...string) []string { return ret } -// Call the given function exactly once across multiple worker processes. -// The need for multiple processes is specific to the Python SDK due to the GIL. -// Should another SDK require it, this could be separated out as shared utility. -func multiProcessExactlyOnce(actionFunc func(), completeFileName string) { - installCompleteFile := filepath.Join(os.TempDir(), completeFileName) - - // skip if install already complete, no need to lock - _, err := os.Stat(installCompleteFile) - if err == nil { - return - } - - lock, err := lockfile.New(filepath.Join(os.TempDir(), completeFileName+".lck")) - if err != nil { - log.Fatalf("Cannot init artifact retrieval lock: %v", err) - } - - for err = lock.TryLock(); err != nil; err = lock.TryLock() { - if _, ok := err.(lockfile.TemporaryError); ok { - time.Sleep(5 * time.Second) - log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) - } else { - log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) - } - } - defer lock.Unlock() - - // skip if install already complete - _, err = os.Stat(installCompleteFile) - if err == nil { - return - } - - // do the real work - actionFunc() - - // mark install complete - os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666) - -} - // processArtifactsInSetupOnlyMode installs the dependencies found in artifacts // when flag --setup_only and --artifacts exist. The setup mode will only // process the provided artifacts and skip the actual worker program start up. From 839a79d7e8febba2a537b82e6c6602c69a5f62fc Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Mon, 21 Feb 2022 18:55:44 +0100 Subject: [PATCH 2/9] Ensure clean worker shutdown and avoid zombie processes --- .../runners/worker/worker_pool_main.py | 36 +++-- sdks/python/container/boot.go | 129 +++++++++++++----- 2 files changed, 118 insertions(+), 47 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py index eb5cdd935161..7e81b1fa6d72 100644 --- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py +++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py @@ -51,6 +51,26 @@ _LOGGER = logging.getLogger(__name__) +def kill_process_gracefully(proc, timeout=10): + """ + Kill a worker process gracefully by sending a SIGTERM and waiting for + it to finish. A SIGKILL will be sent if the process has not finished + after ``timeout`` seconds. + """ + def _kill(): + proc.terminate() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + _LOGGER.warning('Worker process did not respond, killing it.') + proc.kill() + proc.wait() # Avoid zombies + + kill_thread = threading.Thread(target=_kill) + kill_thread.start() + kill_thread.join() + + class BeamFnExternalWorkerPoolServicer( beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): @@ -95,7 +115,7 @@ def start( # Register to kill the subprocesses on exit. def kill_worker_processes(): for worker_process in worker_pool._worker_processes.values(): - worker_process.kill() + kill_process_gracefully(worker_process) atexit.register(kill_worker_processes) @@ -172,19 +192,9 @@ def StopWorker(self, worker_process = self._worker_processes.pop( stop_worker_request.worker_id, None) if worker_process: - - def kill_worker_process(): - try: - worker_process.kill() - except OSError: - # ignore already terminated process - return - _LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id) - # communicate is necessary to avoid zombie process - # time box communicate (it has no timeout parameter in Py2) - threading.Timer(1, kill_worker_process).start() - worker_process.communicate() + kill_process_gracefully(worker_process) + return beam_fn_api_pb2.StopWorkerResponse() diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index d6cba7b11a97..2187bfff0d15 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -25,13 +25,14 @@ import ( "io/ioutil" "log" "os" - "os/signal" "os/exec" + "os/signal" "path/filepath" "regexp" "strings" "syscall" "sync" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" @@ -74,8 +75,8 @@ const ( func main() { if err := mainError(); err != nil { - log.Print(err) - os.Exit(1) + log.Print(err) + os.Exit(1) } } @@ -151,28 +152,24 @@ func mainError() error { // // No log.Fatalf() from here on, otherwise deferred cleanups will not be called! + // Trap signals, so we can clean up properly. + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) + venvDir, err := setupVenv(filepath.Join(*semiPersistDir, "beam-venv"), *id) if err != nil { - return fmt.Errorf("Failed to initialize Python venv.") + return fmt.Errorf("Failed to initialize Python venv.") } cleanupFunc := func() { - log.Printf("Cleaning up temporary venv ...") - os.RemoveAll(venvDir) + log.Printf("Cleaning up temporary venv ...") + os.RemoveAll(venvDir) } defer cleanupFunc() - signalChannel := make(chan os.Signal, 1) - signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) - go func() { - log.Printf("Received signal: ", (<-signalChannel).String()) - cleanupFunc() - os.Exit(1) - }() dir := filepath.Join(*semiPersistDir, "staged") - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir) if err != nil { - return fmt.Errorf("Failed to retrieve staged files: %v", err) + return fmt.Errorf("Failed to retrieve staged files: %v", err) } // TODO(herohde): the packages to install should be specified explicitly. It @@ -180,17 +177,17 @@ func mainError() error { fileNames := make([]string, len(files)) requirementsFiles := []string{requirementsFile} for i, v := range files { - name, _ := artifact.MustExtractFilePayload(v) - log.Printf("Found artifact: %s", name) - fileNames[i] = name + name, _ := artifact.MustExtractFilePayload(v) + log.Printf("Found artifact: %s", name) + fileNames[i] = name - if v.RoleUrn == artifact.URNPipRequirementsFile { - requirementsFiles = append(requirementsFiles, name) - } + if v.RoleUrn == artifact.URNPipRequirementsFile { + requirementsFiles = append(requirementsFiles, name) + } } if setupErr := installSetupPackages(fileNames, dir, requirementsFiles); setupErr != nil { - return fmt.Errorf("Failed to install required packages: %v", setupErr) + return fmt.Errorf("Failed to install required packages: %v", setupErr) } // (3) Invoke python @@ -214,41 +211,105 @@ func mainError() error { } } + workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...) + + // Keep track of child PIDs for clean shutdown without zombies + childPids := struct { + v []int + canceled bool + mu sync.Mutex + } {v: make([]int, 0, len(workerIds))} + + // Forward trapped signals to child process groups in order to terminate them gracefully and avoid zombies + go func() { + log.Printf("Received signal: %v", <-signalChannel) + childPids.mu.Lock() + childPids.canceled = true + for _, pid := range childPids.v { + syscall.Kill(-pid, syscall.SIGTERM) + go func() { + // This goroutine will be canceled if the main process exits before the 5 seconds + // have elapsed, i.e., as soon as all subprocesses have returned from Wait(). + time.Sleep(5 * time.Second) + log.Printf("Worker process did not respond, killing it.") + syscall.Kill(-pid, syscall.SIGKILL) + }() + } + childPids.mu.Unlock() + }() + args := []string{ "-m", sdkHarnessEntrypoint, } - workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...) var wg sync.WaitGroup wg.Add(len(workerIds)) for _, workerId := range workerIds { go func(workerId string) { defer wg.Done() + + childPids.mu.Lock() + if childPids.canceled { + childPids.mu.Unlock() + return + } log.Printf("Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) - log.Printf("Python (worker %v) exited with code: %v", workerId, execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "python", args...)) + cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, "python", args...) + childPids.v = append(childPids.v, cmd.Process.Pid) + childPids.mu.Unlock() + + if err := cmd.Wait(); err != nil { + log.Printf("Python (worker %v) exited: %v", workerId, err) + } else { + log.Printf("Python (worker %v) exited.", workerId) + } }(workerId) } wg.Wait() - return nil } -// setupVenv initialize a local Python venv and set the corresponding env variables +// Start a command object in a new process group with the given arguments with +// additional environment variables. It attaches stdio to the child process. +// Returns the process handle. +func StartCommandEnv(env map[string]string, prog string, args ...string) *exec.Cmd { + cmd := exec.Command(prog, args...) + cmd.Stdin = os.Stdin + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if env != nil { + cmd.Env = os.Environ() + for k, v := range env { + cmd.Env = append(cmd.Env, k+"="+v) + } + } + + // Create process group so we can clean up the whole subtree later without creating zombies + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0} + cmd.Start() + return cmd +} + +// setupVenv initializes a local Python venv and sets the corresponding env variables func setupVenv(baseDir, workerId string) (string, error) { log.Printf("Initializing temporary Python venv ...") - if err := os.MkdirAll(baseDir, 0750); err != nil { - return "", fmt.Errorf("Failed to create venv base directory: %s", err) + dir := filepath.Join(baseDir, "beam-venv-worker-" + workerId) + if _, err := os.Stat(dir); !os.IsNotExist(err) { + // Probably leftovers from a previous run + log.Printf("Cleaning up previous venv ...") + if err := os.RemoveAll(dir); err != nil { + return "", err + } } - dir, err := ioutil.TempDir(baseDir, fmt.Sprintf("beam-venv-%s-", workerId)) - if err != nil { - return "", fmt.Errorf("Failed Python venv directory: %s", err) + if err := os.MkdirAll(dir, 0750); err != nil { + return "", fmt.Errorf("Failed to create Python venv directory: %s", err) } - args := []string{"-m", "venv", "--system-site-packages", dir} - if err := execx.Execute("python", args...); err != nil { - return "", err + if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil { + return "", fmt.Errorf("Python venv initialization failed: %s", err) } + os.Setenv("VIRTUAL_ENV", dir) os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":")) return dir, nil From 235f00feaced5624f792bf5530882e782fe89a87 Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Fri, 29 Apr 2022 11:15:26 +0200 Subject: [PATCH 3/9] Use python -m pip instead of calling directly --- sdks/python/container/boot.go | 2 +- sdks/python/container/piputil.go | 34 +++++++++++--------------------- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 2187bfff0d15..8153e507b534 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -161,8 +161,8 @@ func mainError() error { return fmt.Errorf("Failed to initialize Python venv.") } cleanupFunc := func() { - log.Printf("Cleaning up temporary venv ...") os.RemoveAll(venvDir) + log.Printf("Cleaned up temporary venv for worker %v.", *id) } defer cleanupFunc() diff --git a/sdks/python/container/piputil.go b/sdks/python/container/piputil.go index 60cf33549cf5..a931f45a6728 100644 --- a/sdks/python/container/piputil.go +++ b/sdks/python/container/piputil.go @@ -29,18 +29,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx" ) -var ( - pip = pipLocation() -) - -func pipLocation() string { - // Users can set 'pip' environment variable to use a custom pip path. - if v, ok := os.LookupEnv("pip"); ok { - return v - } - return "pip" -} - // pipInstallRequirements installs the given requirement, if present. func pipInstallRequirements(files []string, dir, name string) error { for _, file := range files { @@ -49,15 +37,15 @@ func pipInstallRequirements(files []string, dir, name string) error { // as possible PyPI downloads. In the first round the --find-links // option will make sure that only things staged in the worker will be // used without following their dependencies. - args := []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir} - if err := execx.Execute(pip, args...); err != nil { - fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.") + args := []string{"-m", "pip", "install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir} + if err := execx.Execute("python", args...); err != nil { + fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.") } // The second install round opens up the search for packages on PyPI and // also installs dependencies. The key is that if all the packages have // been installed in the first round then this command will be a no-op. - args = []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--find-links", dir} - return execx.Execute(pip, args...) + args = []string{"-m", "pip", "install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--find-links", dir} + return execx.Execute("python", args...) } } return nil @@ -88,19 +76,19 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e // installed version will match the package specified, the package itself // will not be reinstalled, but its dependencies will now be resolved and // installed if necessary. This achieves our goal outlined above. - args := []string{"install", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps", + args := []string{"-m", "pip", "install", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps", filepath.Join(dir, packageSpec)} - err := execx.Execute(pip, args...) + err := execx.Execute("python", args...) if err != nil { return err } - args = []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} - return execx.Execute(pip, args...) + args = []string{"-m", "pip", "install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} + return execx.Execute("python", args...) } // Case when we do not perform a forced reinstall. - args := []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} - return execx.Execute(pip, args...) + args := []string{"-m", "pip", "install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)} + return execx.Execute("python", args...) } } if optional { From c1f7b26a939dfe1a16d26b2565542a351d178818 Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Wed, 21 Sep 2022 18:10:27 +0200 Subject: [PATCH 4/9] Move venv to /opt/apache/beam-venv --- sdks/python/container/boot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 8153e507b534..377163306647 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -156,7 +156,7 @@ func mainError() error { signalChannel := make(chan os.Signal, 1) signal.Notify(signalChannel, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) - venvDir, err := setupVenv(filepath.Join(*semiPersistDir, "beam-venv"), *id) + venvDir, err := setupVenv("/opt/apache/beam-venv", *id) if err != nil { return fmt.Errorf("Failed to initialize Python venv.") } From 99a11a9ce2cb07ceb68e5b77302869a8ffbe357e Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Thu, 22 Sep 2022 09:48:58 +0200 Subject: [PATCH 5/9] Fix PID loop variable capture --- sdks/python/container/boot.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 377163306647..7ba743ca5013 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -226,14 +226,15 @@ func mainError() error { childPids.mu.Lock() childPids.canceled = true for _, pid := range childPids.v { - syscall.Kill(-pid, syscall.SIGTERM) - go func() { + go func(pid int) { // This goroutine will be canceled if the main process exits before the 5 seconds // have elapsed, i.e., as soon as all subprocesses have returned from Wait(). time.Sleep(5 * time.Second) - log.Printf("Worker process did not respond, killing it.") - syscall.Kill(-pid, syscall.SIGKILL) - }() + if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil { + log.Printf("Worker process %v did not respond, killed it.", pid) + } + }(pid) + syscall.Kill(-pid, syscall.SIGTERM) } childPids.mu.Unlock() }() From 8c8dda7955f77560e0f539f767c8d6a1aa3f9e0c Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Tue, 8 Nov 2022 09:32:33 +0100 Subject: [PATCH 6/9] Address review comments --- sdks/python/container/boot.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 7ba743ca5013..b70bbb6b7bd0 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -74,18 +74,16 @@ const ( ) func main() { - if err := mainError(); err != nil { - log.Print(err) - os.Exit(1) + flag.Parse() + if err := launchSDKProcess(); err != nil { + log.Fatal(err) } } -func mainError() error { - flag.Parse() - +func launchSDKProcess() error { if *setupOnly { if err := processArtifactsInSetupOnlyMode(); err != nil { - return fmt.Errorf("Setup unsuccessful with error: %v", err) + log.Fatalf("Setup unsuccessful with error: %v", err) } return nil } @@ -100,21 +98,21 @@ func mainError() error { "--container_executable=/opt/apache/beam/boot", } log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " ")) - return fmt.Errorf("Python SDK worker pool exited: %v", execx.Execute("python", args...)) + log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...)) } if *id == "" { - return fmt.Errorf("No id provided.") + log.Fatalf("No id provided.") } if *provisionEndpoint == "" { - return fmt.Errorf("No provision endpoint provided.") + log.Fatalf("No provision endpoint provided.") } ctx := grpcx.WriteWorkerID(context.Background(), *id) info, err := provision.Info(ctx, *provisionEndpoint) if err != nil { - return fmt.Errorf("Failed to obtain provisioning information: %v", err) + log.Fatalf("Failed to obtain provisioning information: %v", err) } log.Printf("Provision info:\n%v", info) @@ -130,13 +128,13 @@ func mainError() error { } if *loggingEndpoint == "" { - return fmt.Errorf("No logging endpoint provided.") + log.Fatalf("No logging endpoint provided.") } if *artifactEndpoint == "" { - return fmt.Errorf("No artifact endpoint provided.") + log.Fatalf("No artifact endpoint provided.") } if *controlEndpoint == "" { - return fmt.Errorf("No control endpoint provided.") + log.Fatalf("No control endpoint provided.") } log.Printf("Initializing python harness: %v", strings.Join(os.Args, " ")) @@ -145,7 +143,7 @@ func mainError() error { options, err := provision.ProtoToJSON(info.GetPipelineOptions()) if err != nil { - return fmt.Errorf("Failed to convert pipeline options: %v", err) + log.Fatalf("Failed to convert pipeline options: %v", err) } // (2) Retrieve and install the staged packages. From 66c1b2126fdf2dc116fd56ec2dbc7dad0b8bc0a5 Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Tue, 8 Nov 2022 10:04:32 +0100 Subject: [PATCH 7/9] Move unscoped setup lines to main() function --- sdks/python/container/boot.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index b70bbb6b7bd0..8b0abfc37159 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -75,17 +75,10 @@ const ( func main() { flag.Parse() - if err := launchSDKProcess(); err != nil { - log.Fatal(err) - } -} -func launchSDKProcess() error { if *setupOnly { - if err := processArtifactsInSetupOnlyMode(); err != nil { - log.Fatalf("Setup unsuccessful with error: %v", err) - } - return nil + processArtifactsInSetupOnlyMode() + os.Exit(0) } if *workerPool == true { @@ -108,6 +101,12 @@ func launchSDKProcess() error { log.Fatalf("No provision endpoint provided.") } + if err := launchSDKProcess(); err != nil { + log.Fatal(err) + } +} + +func launchSDKProcess() error { ctx := grpcx.WriteWorkerID(context.Background(), *id) info, err := provision.Info(ctx, *provisionEndpoint) @@ -384,7 +383,7 @@ func joinPaths(dir string, paths ...string) []string { // process the provided artifacts and skip the actual worker program start up. // The mode is useful for building new images with dependencies pre-installed so // that the installation can be skipped at the pipeline runtime. -func processArtifactsInSetupOnlyMode() error { +func processArtifactsInSetupOnlyMode() { if *artifacts == "" { log.Fatal("No --artifacts provided along with --setup_only flag.") } @@ -418,5 +417,4 @@ func processArtifactsInSetupOnlyMode() error { if setupErr := installSetupPackages(files, workDir, []string{requirementsFile}); setupErr != nil { log.Fatalf("Failed to install required packages: %v", setupErr) } - return nil } From 387e0020922d1716d070f8bbae01feb74dd35c6a Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Tue, 8 Nov 2022 10:10:36 +0100 Subject: [PATCH 8/9] Forward exit code of worker pool process --- sdks/python/container/boot.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 8b0abfc37159..646109127f1b 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -91,7 +91,11 @@ func main() { "--container_executable=/opt/apache/beam/boot", } log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " ")) - log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...)) + if err := execx.Execute("python", args...); err != nil { + log.Fatalf("Python SDK worker pool exited with error: %v", err) + } + log.Print("Python SDK worker pool exited.") + os.Exit(0) } if *id == "" { From 5a91e67d8ae7248c35017b87bfb6542c5d14c44d Mon Sep 17 00:00:00 2001 From: Janek Bevendorff Date: Thu, 10 Nov 2022 10:20:24 +0100 Subject: [PATCH 9/9] Move changelog entry to "Unreleased" --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 1f04dae0b6e9..8f6e1e08c078 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,7 @@ than requiring them to be passed separately via the `--extra_package` option (Python) ([#23684](https://github.com/apache/beam/pull/23684)). * Pipeline Resource Hints now supported via `--resource_hints` flag (Go) ([#23990](https://github.com/apache/beam/pull/23990)). +* Make Python SDK containers reusable on portable runners by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)). ## Breaking Changes @@ -363,7 +364,6 @@ * Added support for cloudpickle as a pickling library for Python SDK ([BEAM-8123](https://issues.apache.org/jira/browse/BEAM-8123)). To use cloudpickle, set pipeline option: --pickler_lib=cloudpickle * Added option to specify triggering frequency when streaming to BigQuery (Python) ([BEAM-12865](https://issues.apache.org/jira/browse/BEAM-12865)). * Added option to enable caching uploaded artifacts across job runs for Python Dataflow jobs ([BEAM-13459](https://issues.apache.org/jira/browse/BEAM-13459)). To enable, set pipeline option: --enable_artifact_caching, this will be enabled by default in a future release. -* Make Python SDK containers reusable on portable runners by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)). ## Breaking Changes