Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12792] Install pipeline dependencies to temporary venv #16658

Merged
merged 9 commits into from
Nov 10, 2022
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
than requiring them to be passed separately via the `--extra_package` option
(Python) ([#23684](https://github.com/apache/beam/pull/23684)).
* Pipeline Resource Hints now supported via `--resource_hints` flag (Go) ([#23990](https://github.com/apache/beam/pull/23990)).
* Make Python SDK containers reusable on portable runners by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)).

## Breaking Changes

Expand Down
36 changes: 23 additions & 13 deletions sdks/python/apache_beam/runners/worker/worker_pool_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@
_LOGGER = logging.getLogger(__name__)


def kill_process_gracefully(proc, timeout=10):
"""
Kill a worker process gracefully by sending a SIGTERM and waiting for
it to finish. A SIGKILL will be sent if the process has not finished
after ``timeout`` seconds.
"""
def _kill():
proc.terminate()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
_LOGGER.warning('Worker process did not respond, killing it.')
proc.kill()
proc.wait() # Avoid zombies

kill_thread = threading.Thread(target=_kill)
kill_thread.start()
kill_thread.join()


class BeamFnExternalWorkerPoolServicer(
beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):

Expand Down Expand Up @@ -95,7 +115,7 @@ def start(
# Register to kill the subprocesses on exit.
def kill_worker_processes():
for worker_process in worker_pool._worker_processes.values():
worker_process.kill()
kill_process_gracefully(worker_process)

atexit.register(kill_worker_processes)

Expand Down Expand Up @@ -172,19 +192,9 @@ def StopWorker(self,
worker_process = self._worker_processes.pop(
stop_worker_request.worker_id, None)
if worker_process:

def kill_worker_process():
try:
worker_process.kill()
except OSError:
# ignore already terminated process
return

_LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id)
# communicate is necessary to avoid zombie process
# time box communicate (it has no timeout parameter in Py2)
threading.Timer(1, kill_worker_process).start()
worker_process.communicate()
kill_process_gracefully(worker_process)

return beam_fn_api_pb2.StopWorkerResponse()


Expand Down
2 changes: 1 addition & 1 deletion sdks/python/container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa

####
# Install Apache Beam SDK. Use --no-deps and pip check to verify that all
# necessary dependencies are specified in base_image_requiremetns.txt.
# necessary dependencies are specified in base_image_requirements.txt.
####
COPY target/apache-beam.tar.gz /opt/apache/beam/tars/
RUN pip install --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp]
Expand Down
Loading