Skip to content

Commit

Permalink
More tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Jul 20, 2022
1 parent b7e18d4 commit 508a991
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1305,24 +1305,43 @@ def file_artifact(path, hash, staged_name):
proto = beam_runner_api_pb2.Pipeline(
components=beam_runner_api_pb2.Components(
transforms={
'transform1': beam_runner_api_pb2.PTransform(
environment_id='e1'),
'transform2': beam_runner_api_pb2.PTransform(
environment_id='e2'),
'transform3': beam_runner_api_pb2.PTransform(
environment_id='e3'),
'transform4': beam_runner_api_pb2.PTransform(
environment_id='e4'),
f'transform{ix}': beam_runner_api_pb2.PTransform(
environment_id=f'e{ix}')
for ix in range(8)
},
environments={
# Same hash and destination.
'e1': beam_runner_api_pb2.Environment(
dependencies=[file_artifact('a1', 'x', 'a')]),
dependencies=[file_artifact('a1', 'x', 'dest')]),
'e2': beam_runner_api_pb2.Environment(
dependencies=[file_artifact('a2', 'x', 'a')]),
dependencies=[file_artifact('a2', 'x', 'dest')]),
# Different hash.
'e3': beam_runner_api_pb2.Environment(
dependencies=[file_artifact('a3', 'y', 'a')]),
dependencies=[file_artifact('a3', 'y', 'dest')]),
# Different destination.
'e4': beam_runner_api_pb2.Environment(
dependencies=[file_artifact('a4', 'y', 'b')]),
dependencies=[file_artifact('a4', 'y', 'dest2')]),
# Multiple files with same hash and destinations.
'e5': beam_runner_api_pb2.Environment(
dependencies=[
file_artifact('a1', 'x', 'dest'),
file_artifact('b1', 'xb', 'destB')
]),
'e6': beam_runner_api_pb2.Environment(
dependencies=[
file_artifact('a2', 'x', 'dest'),
file_artifact('b2', 'xb', 'destB')
]),
# Overlapping, but not identical, files.
'e7': beam_runner_api_pb2.Environment(
dependencies=[
file_artifact('a1', 'x', 'dest'),
file_artifact('b2', 'y', 'destB')
]),
# Same files as first, but differing other properties.
'e0': beam_runner_api_pb2.Environment(
resource_hints={'hint': b'value'},
dependencies=[file_artifact('a1', 'x', 'dest')]),
}))
Pipeline.merge_compatible_environments(proto)

Expand All @@ -1331,15 +1350,25 @@ def file_artifact(path, hash, staged_name):
proto.components.transforms['transform1'].environment_id,
proto.components.transforms['transform2'].environment_id)

self.assertEqual(
proto.components.transforms['transform5'].environment_id,
proto.components.transforms['transform6'].environment_id)

# These are not.
self.assertNotEqual(
proto.components.transforms['transform1'].environment_id,
proto.components.transforms['transform3'].environment_id)
self.assertNotEqual(
proto.components.transforms['transform4'].environment_id,
proto.components.transforms['transform3'].environment_id)
self.assertNotEqual(
proto.components.transforms['transform6'].environment_id,
proto.components.transforms['transform7'].environment_id)
self.assertNotEqual(
proto.components.transforms['transform1'].environment_id,
proto.components.transforms['transform0'].environment_id)

self.assertEqual(len(proto.components.environments), 3)
self.assertEqual(len(proto.components.environments), 6)


if __name__ == '__main__':
Expand Down

0 comments on commit 508a991

Please sign in to comment.