Skip to content

Commit

Permalink
Fail the pipeline when a mismatched Python or Beam version is detecte…
Browse files Browse the repository at this point in the history
…d. (#25313)
  • Loading branch information
tvalentyn committed Feb 8, 2023
1 parent a9e80d2 commit 88da381
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import platform
import unittest

import mock
import pytest

import apache_beam as beam
Expand Down Expand Up @@ -680,6 +681,21 @@ def test_track_pcoll_unbounded_flatten(self):
self.assertIs(pcoll2_unbounded.is_bounded, False)
self.assertIs(merged.is_bounded, False)

def test_incompatible_submission_and_runtime_envs_fail_pipeline(self):
with mock.patch(
'apache_beam.transforms.environments.sdk_base_version_capability'
) as base_version:
base_version.side_effect = [
f"beam:version:sdk_base:apache/beam_python3.5_sdk:2.{i}.0"
for i in range(100)
]
with self.assertRaisesRegex(
RuntimeError,
'Pipeline construction environment and pipeline runtime '
'environment are not compatible.'):
with TestPipeline() as p:
_ = p | Create([None])


class DoFnTest(unittest.TestCase):
def test_element(self):
Expand Down
23 changes: 23 additions & 0 deletions sdks/python/apache_beam/runners/worker/bundle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from apache_beam.runners.worker import statesampler
from apache_beam.transforms import TimeDomain
from apache_beam.transforms import core
from apache_beam.transforms import environments
from apache_beam.transforms import sideinputs
from apache_beam.transforms import userstate
from apache_beam.transforms import window
Expand Down Expand Up @@ -823,6 +824,27 @@ def only_element(iterable):
return element


def _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor):
# type: (beam_fn_api_pb2.ProcessBundleDescriptor) -> None

runtime_sdk = environments.sdk_base_version_capability()
for t in process_bundle_descriptor.transforms.values():
env = process_bundle_descriptor.environments[t.environment_id]
for c in env.capabilities:
if (c.startswith(environments.SDK_VERSION_CAPABILITY_PREFIX) and
c != runtime_sdk):
raise RuntimeError(
"Pipeline construction environment and pipeline runtime "
"environment are not compatible. If you use a custom "
"container image, check that the Python interpreter minor version "
"and the Apache Beam version in your image match the versions "
"used at pipeline construction time. "
f"Submission environment: {c}. "
f"Runtime environment: {runtime_sdk}.")

# TODO: Consider warning on mismatches in versions of installed packages.


class BundleProcessor(object):
""" A class for processing bundles of elements. """

Expand All @@ -846,6 +868,7 @@ def __init__(self,
self.data_channel_factory = data_channel_factory
self.current_instruction_id = None # type: Optional[str]

_verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)
# There is no guarantee that the runner only set
# timer_api_service_descriptor when having timers. So this field cannot be
# used as an indicator of timers.
Expand Down
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/transforms/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ def looks_like_json(s):


APACHE_BEAM_DOCKER_IMAGE_PREFIX = 'apache/beam'

APACHE_BEAM_JAVA_CONTAINER_NAME_PREFIX = 'beam_java'
SDK_VERSION_CAPABILITY_PREFIX = 'beam:version:sdk_base:'


def is_apache_beam_container(container_image):
Expand Down Expand Up @@ -777,6 +777,11 @@ def python_sdk_docker_capabilities():
return python_sdk_capabilities() + [common_urns.protocols.SIBLING_WORKERS.urn]


def sdk_base_version_capability():
return (
SDK_VERSION_CAPABILITY_PREFIX + DockerEnvironment.default_docker_image())


def _python_sdk_capabilities_iter():
# type: () -> Iterator[str]
for urn_spec in common_urns.coders.__dict__.values():
Expand All @@ -786,7 +791,7 @@ def _python_sdk_capabilities_iter():
yield common_urns.protocols.HARNESS_MONITORING_INFOS.urn
yield common_urns.protocols.WORKER_STATUS.urn
yield python_urns.PACKED_COMBINE_FN
yield 'beam:version:sdk_base:' + DockerEnvironment.default_docker_image()
yield sdk_base_version_capability()
yield common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn
yield common_urns.primitives.TO_STRING.urn

Expand Down

0 comments on commit 88da381

Please sign in to comment.