-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deduplicate identical environments in a pipeline. #22308
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -82,6 +82,7 @@ | |||
from apache_beam.options.pipeline_options import TypeOptions | ||||
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator | ||||
from apache_beam.portability import common_urns | ||||
from apache_beam.portability.api import beam_runner_api_pb2 | ||||
from apache_beam.runners import PipelineRunner | ||||
from apache_beam.runners import create_runner | ||||
from apache_beam.transforms import ParDo | ||||
|
@@ -100,7 +101,6 @@ | |||
|
||||
if TYPE_CHECKING: | ||||
from types import TracebackType | ||||
from apache_beam.portability.api import beam_runner_api_pb2 | ||||
from apache_beam.runners.pipeline_context import PipelineContext | ||||
from apache_beam.runners.runner import PipelineResult | ||||
from apache_beam.transforms import environments | ||||
|
@@ -851,7 +851,6 @@ def to_runner_api( | |||
|
||||
"""For internal use only; no backwards-compatibility guarantees.""" | ||||
from apache_beam.runners import pipeline_context | ||||
from apache_beam.portability.api import beam_runner_api_pb2 | ||||
if context is None: | ||||
context = pipeline_context.PipelineContext( | ||||
use_fake_coders=use_fake_coders, | ||||
|
@@ -919,11 +918,49 @@ def visit_transform(self, transform_node): | |||
requirements=context.requirements()) | ||||
proto.components.transforms[root_transform_id].unique_name = ( | ||||
root_transform_id) | ||||
self.merge_compatible_environments(proto) | ||||
if return_context: | ||||
return proto, context # type: ignore # too complicated for now | ||||
else: | ||||
return proto | ||||
|
||||
@staticmethod | ||||
def merge_compatible_environments(proto): | ||||
"""Tries to minimize the number of distinct environments by merging | ||||
those that are compatible (currently defined as identical). | ||||
|
||||
Mutates proto as contexts may have references to proto.components. | ||||
""" | ||||
env_map = {} | ||||
canonical_env = {} | ||||
files_by_hash = {} | ||||
for env_id, env in proto.components.environments.items(): | ||||
# First deduplicate any file dependencies by their hash. | ||||
for dep in env.dependencies: | ||||
if dep.type_urn == common_urns.artifact_types.FILE.urn: | ||||
file_payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( | ||||
dep.type_payload) | ||||
if file_payload.sha256: | ||||
if file_payload.sha256 in files_by_hash: | ||||
file_payload.path = files_by_hash[file_payload.sha256] | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, we de-duplicate artifacts again before staging:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah. We could probably clean that up a lot once Runner v1 goes away. |
||||
dep.type_payload = file_payload.SerializeToString() | ||||
else: | ||||
files_by_hash[file_payload.sha256] = file_payload.path | ||||
# Next check if we've ever seen this environment before. | ||||
normalized = env.SerializeToString(deterministic=True) | ||||
if normalized in canonical_env: | ||||
env_map[env_id] = canonical_env[normalized] | ||||
else: | ||||
canonical_env[normalized] = env_id | ||||
for old_env, new_env in env_map.items(): | ||||
for transform in proto.components.transforms.values(): | ||||
if transform.environment_id == old_env: | ||||
transform.environment_id = new_env | ||||
for windowing_strategy in proto.components.windowing_strategies.values(): | ||||
if windowing_strategy.environment_id == old_env: | ||||
windowing_strategy.environment_id = new_env | ||||
del proto.components.environments[old_env] | ||||
|
||||
@staticmethod | ||||
def from_runner_api( | ||||
proto, # type: beam_runner_api_pb2.Pipeline | ||||
|
@@ -1270,8 +1307,6 @@ def to_runner_api(self, context): | |||
# are properly propagated. | ||||
return self.transform.to_runner_api_transform(context, self.full_label) | ||||
|
||||
from apache_beam.portability.api import beam_runner_api_pb2 | ||||
|
||||
def transform_to_runner_api( | ||||
transform, # type: Optional[ptransform.PTransform] | ||||
context # type: PipelineContext | ||||
|
@@ -1331,7 +1366,6 @@ def from_runner_api( | |||
|
||||
if common_urns.primitives.PAR_DO.urn == proto.spec.urn: | ||||
# Preserving side input tags. | ||||
from apache_beam.portability.api import beam_runner_api_pb2 | ||||
pardo_payload = ( | ||||
proto_utils.parse_Bytes( | ||||
proto.spec.payload, beam_runner_api_pb2.ParDoPayload)) | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
from apache_beam.pipeline import PipelineVisitor | ||
from apache_beam.pipeline import PTransformOverride | ||
from apache_beam.portability import common_urns | ||
from apache_beam.portability.api import beam_runner_api_pb2 | ||
from apache_beam.pvalue import AsSingleton | ||
from apache_beam.pvalue import TaggedOutput | ||
from apache_beam.runners.dataflow.native_io.iobase import NativeSource | ||
|
@@ -1021,7 +1022,6 @@ def display_data(self): # type: () -> dict | |
|
||
p = beam.Pipeline() | ||
p | MyPTransform() # pylint: disable=expression-not-assigned | ||
from apache_beam.portability.api import beam_runner_api_pb2 | ||
|
||
proto_pipeline = Pipeline.to_runner_api(p, use_fake_coders=True) | ||
my_transform, = [ | ||
|
@@ -1291,6 +1291,56 @@ def CompositeTransform(pcoll): | |
count += 1 | ||
assert count == 2 | ||
|
||
def test_environments_are_deduplicated(self): | ||
def file_artifact(path, hash, staged_name): | ||
return beam_runner_api_pb2.ArtifactInformation( | ||
type_urn=common_urns.artifact_types.FILE.urn, | ||
type_payload=beam_runner_api_pb2.ArtifactFilePayload( | ||
path=path, sha256=hash).SerializeToString(), | ||
role_urn=common_urns.artifact_roles.STAGING_TO.urn, | ||
role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload( | ||
staged_name=staged_name).SerializeToString(), | ||
) | ||
|
||
proto = beam_runner_api_pb2.Pipeline( | ||
components=beam_runner_api_pb2.Components( | ||
transforms={ | ||
'transform1': beam_runner_api_pb2.PTransform( | ||
environment_id='e1'), | ||
'transform2': beam_runner_api_pb2.PTransform( | ||
environment_id='e2'), | ||
'transform3': beam_runner_api_pb2.PTransform( | ||
environment_id='e3'), | ||
'transform4': beam_runner_api_pb2.PTransform( | ||
environment_id='e4'), | ||
}, | ||
environments={ | ||
'e1': beam_runner_api_pb2.Environment( | ||
dependencies=[file_artifact('a1', 'x', 'a')]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a test for multiple-artifacts ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
'e2': beam_runner_api_pb2.Environment( | ||
dependencies=[file_artifact('a2', 'x', 'a')]), | ||
'e3': beam_runner_api_pb2.Environment( | ||
dependencies=[file_artifact('a3', 'y', 'a')]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably also should add a negative test for the case where artifacts are the same but other properties (for example, capabilities) are different. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
'e4': beam_runner_api_pb2.Environment( | ||
dependencies=[file_artifact('a4', 'y', 'b')]), | ||
})) | ||
Pipeline.merge_compatible_environments(proto) | ||
|
||
# These environments are equivalent. | ||
self.assertEqual( | ||
proto.components.transforms['transform1'].environment_id, | ||
proto.components.transforms['transform2'].environment_id) | ||
|
||
# These are not. | ||
self.assertNotEqual( | ||
proto.components.transforms['transform1'].environment_id, | ||
proto.components.transforms['transform3'].environment_id) | ||
self.assertNotEqual( | ||
proto.components.transforms['transform4'].environment_id, | ||
proto.components.transforms['transform3'].environment_id) | ||
|
||
self.assertEqual(len(proto.components.environments), 3) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we should sort dependencies before de-duping ?
Otherwise different environments with the same normalized set of files might end up picking different files based on the order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really well defined whether dependencies are ordered (e.g. the order of jar files matters for class resolution) so I'm preserving the order here just in case.