From 604987d44ce875c9b1b371d4295ac580a2bfa684 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 1 Sep 2023 17:29:37 -0700 Subject: [PATCH 1/4] Better errors when inputs are omitted. It's not always possible to know if a transform consumes inputs, or can act as a root transform (and in fact some may be able to do both depending on their configuration), but when a transform expecting inputs doesn't get them the error can be quite obscure. This adds best-effort checking and a better error in that case. We also allow explicitly setting empty imputs to work around this error (which is where most of the complexity of this change lies). Importantly, sources (no matter their name) are not required to have inputs. --- sdks/python/apache_beam/yaml/yaml_provider.py | 47 +++++++++-- .../python/apache_beam/yaml/yaml_transform.py | 82 +++++++++++++++---- .../yaml/yaml_transform_scope_test.py | 31 +------ .../apache_beam/yaml/yaml_transform_test.py | 48 +++++++++++ .../yaml/yaml_transform_unit_test.py | 42 +++++----- 5 files changed, 177 insertions(+), 73 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index aa5aa7183318a..5f22de35ed1cc 100644 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -61,6 +61,16 @@ def provided_transforms(self) -> Iterable[str]: """Returns a list of transform type names this provider can handle.""" raise NotImplementedError(type(self)) + def requires_inputs(self, typ: str, args: Mapping[str, Any]) -> bool: + """Returns whether this transform requires inputs. + + Specifically, if this returns True and inputs are not provided than an error + will be thrown. + + This is best-effort, primarily for better and earlier error messages. + """ + return not typ.startswith('Read') + def create_transform( self, typ: str, @@ -117,20 +127,29 @@ def __init__(self, urns, service): def provided_transforms(self): return self._urns.keys() - def create_transform(self, type, args, yaml_create_transform): - if callable(self._service): - self._service = self._service() + def schema_transforms(self): if self._schema_transforms is None: try: - self._schema_transforms = [ - config.identifier + self._schema_transforms = { + config.identifier: config for config in external.SchemaAwareExternalTransform.discover( self._service) - ] + } except Exception: self._schema_transforms = [] + return self._schema_transforms + + def requires_inputs(self, typ, args): + if self._urns[type] in self.schema_transforms(): + return bool(self.schema_transforms()[self._urns[type]].inputs) + else: + return super().requires_inputs(typ, args) + + def create_transform(self, type, args, yaml_create_transform): + if callable(self._service): + self._service = self._service() urn = self._urns[type] - if urn in self._schema_transforms: + if urn in self.schema_transforms(): return external.SchemaAwareExternalTransform(urn, self._service, **args) else: return type >> self.create_external_transform(urn, args) @@ -311,8 +330,9 @@ def fn_takes_side_inputs(fn): class InlineProvider(Provider): - def __init__(self, transform_factories): + def __init__(self, transform_factories, no_input_transforms=()): self._transform_factories = transform_factories + self._no_input_transforms = set(no_input_transforms) def available(self): return True @@ -326,6 +346,14 @@ def create_transform(self, type, args, yaml_create_transform): def to_json(self): return {'type': "InlineProvider"} + def requires_inputs(self, typ, args): + if typ in self._no_input_transforms: + return False + elif hasattr(self._transform_factories[typ], '_yaml_requires_inputs'): + return self._transform_factories[typ]._yaml_requires_inputs + else: + return super().requires_inputs(typ, args) + class MetaInlineProvider(InlineProvider): def create_transform(self, type, args, yaml_create_transform): @@ -460,7 +488,8 @@ def _parse_window_spec(spec): 'WindowInto': WindowInto, 'GroupByKey': beam.GroupByKey, }, - **ios)) + **ios), + no_input_transforms=('Create', )) class PypiExpansionService: diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 70cbf0b7cee33..6a1976a634d15 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -73,6 +73,28 @@ def only_element(xs): return x +# These allow a user to explicitly pass no input to a transform (i.e. use it +# as a root transform) without an error even if the transform is not known to +# handle it. +def explicitly_empty(): + return {'__explicitly_empty__': None} + + +def is_explicitly_empty(io): + return io == explicitly_empty() + + +def is_empty(io): + return not io or is_explicitly_empty(io) + + +def empty_if_explicitly_empty(io): + if is_explicitly_empty(io): + return {} + else: + return io + + class SafeLineLoader(SafeLoader): """A yaml loader that attaches line information to mappings and strings.""" class TaggedString(str): @@ -244,6 +266,12 @@ def provider_score(p): raise ValueError( 'Config for transform at %s must be a mapping.' % identify_object(spec)) + + if (not input_pcolls and not is_explicitly_empty(spec.get('input', {})) and + provider.requires_inputs(spec['type'], config)): + raise ValueError( + f'Missing inputs for transform at {identify_object(spec)}') + try: # pylint: disable=undefined-loop-variable ptransform = provider.create_transform( @@ -319,7 +347,7 @@ def expand_leaf_transform(spec, scope): spec = normalize_inputs_outputs(spec) inputs_dict = { key: scope.get_pcollection(value) - for (key, value) in spec['input'].items() + for (key, value) in empty_if_explicitly_empty(spec['input']).items() } input_type = spec.get('input_type', 'default') if input_type == 'list': @@ -359,10 +387,10 @@ def expand_composite_transform(spec, scope): spec = normalize_inputs_outputs(normalize_source_sink(spec)) inner_scope = Scope( - scope.root, { + scope.root, + { key: scope.get_pcollection(value) - for key, - value in spec['input'].items() + for (key, value) in empty_if_explicitly_empty(spec['input']).items() }, spec['transforms'], yaml_provider.merge_providers( @@ -387,8 +415,7 @@ def expand(inputs): _LOGGER.info("Expanding %s ", identify_object(spec)) return ({ key: scope.get_pcollection(value) - for key, - value in spec['input'].items() + for (key, value) in empty_if_explicitly_empty(spec['input']).items() } or scope.root) | scope.unique_name(spec, None) >> CompositePTransform() @@ -413,12 +440,25 @@ def is_not_output_of_last_transform(new_transforms, value): composite_spec = normalize_inputs_outputs(spec) new_transforms = [] for ix, transform in enumerate(composite_spec['transforms']): - if any(io in transform for io in ('input', 'output', 'input', 'output')): - raise ValueError( - f'Transform {identify_object(transform)} is part of a chain, ' - 'must have implicit inputs and outputs.') + if any(io in transform for io in ('input', 'output')): + if (ix == 0 and 'input' in transform and 'output' not in transform and + is_explicitly_empty(transform['input'])): + # This is OK as source clause sets an explicitly empty input. + pass + else: + raise ValueError( + f'Transform {identify_object(transform)} is part of a chain, ' + 'must have implicit inputs and outputs.') if ix == 0: - transform['input'] = {key: key for key in composite_spec['input'].keys()} + if is_explicitly_empty(transform.get('input', None)): + pass + elif is_explicitly_empty(composite_spec['input']): + transform['input'] = composite_spec['input'] + else: + transform['input'] = { + key: key + for key in composite_spec['input'].keys() + } else: transform['input'] = new_transforms[-1]['__uuid__'] new_transforms.append(transform) @@ -470,6 +510,8 @@ def normalize_source_sink(spec): spec = dict(spec) spec['transforms'] = list(spec.get('transforms', [])) if 'source' in spec: + if 'input' not in spec['source']: + spec['source']['input'] = explicitly_empty() spec['transforms'].insert(0, spec.pop('source')) if 'sink' in spec: spec['transforms'].append(spec.pop('sink')) @@ -483,6 +525,13 @@ def preprocess_source_sink(spec): return spec +def tag_explicit_inputs(spec): + if 'input' in spec and not SafeLineLoader.strip_metadata(spec['input']): + return dict(spec, input=explicitly_empty()) + else: + return spec + + def normalize_inputs_outputs(spec): spec = dict(spec) @@ -522,7 +571,7 @@ def push_windowing_to_roots(spec): scope = LightweightScope(spec['transforms']) consumed_outputs_by_transform = collections.defaultdict(set) for transform in spec['transforms']: - for _, input_ref in transform['input'].items(): + for _, input_ref in empty_if_explicitly_empty(transform['input']).items(): try: transform_id, output = scope.get_transform_id_and_output_name(input_ref) consumed_outputs_by_transform[transform_id].add(output) @@ -531,7 +580,7 @@ def push_windowing_to_roots(spec): pass for transform in spec['transforms']: - if not transform['input'] and 'windowing' not in transform: + if is_empty(transform['input']) and 'windowing' not in transform: transform['windowing'] = spec['windowing'] transform['__consumed_outputs'] = consumed_outputs_by_transform[ transform['__uuid__']] @@ -558,7 +607,7 @@ def preprocess_windowing(spec): spec = push_windowing_to_roots(spec) windowing = spec.pop('windowing') - if spec['input']: + if not is_empty(spec['input']): # Apply the windowing to all inputs by wrapping it in a transform that # first applies windowing and then applies the original transform. original_inputs = spec['input'] @@ -687,7 +736,7 @@ def ensure_errors_consumed(spec): raise ValueError( f'Missing output in error_handling of {identify_object(t)}') to_handle[t['__uuid__'], config['error_handling']['output']] = t - for _, input in t['input'].items(): + for _, input in empty_if_explicitly_empty(t['input']).items(): if input not in spec['input']: consumed.add(scope.get_transform_id_and_output_name(input)) for error_pcoll, t in to_handle.items(): @@ -724,7 +773,7 @@ def preprocess(spec, verbose=False): def apply(phase, spec): spec = phase(spec) - if spec['type'] in {'composite', 'chain'}: + if spec['type'] in {'composite', 'chain'} and 'transforms' in spec: spec = dict( spec, transforms=[apply(phase, t) for t in spec['transforms']]) return spec @@ -733,6 +782,7 @@ def apply(phase, spec): ensure_transforms_have_types, preprocess_source_sink, preprocess_chain, + tag_explicit_inputs, normalize_inputs_outputs, preprocess_flattened_inputs, ensure_errors_consumed, diff --git a/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py b/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py index a22e4f851a1f1..4162b793cd2ed 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py @@ -86,40 +86,13 @@ def test_create_ptransform(self): spec = ''' transforms: - type: PyMap + input: something config: fn: "lambda x: x*x" ''' scope, spec = self.get_scope_by_spec(p, spec) - result = scope.create_ptransform(spec['transforms'][0], []) - self.assertIsInstance(result, beam.transforms.ParDo) - self.assertEqual(result.label, 'Map(lambda x: x*x)') - - result_annotations = {**result.annotations()} - target_annotations = { - 'yaml_type': 'PyMap', - 'yaml_args': '{"fn": "lambda x: x*x"}', - 'yaml_provider': '{"type": "InlineProvider"}' - } - - # Check if target_annotations is a subset of result_annotations - self.assertDictEqual( - result_annotations, { - **result_annotations, **target_annotations - }) - - def test_create_ptransform_with_inputs(self): - with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( - pickle_library='cloudpickle')) as p: - spec = ''' - transforms: - - type: PyMap - config: - fn: "lambda x: x*x" - ''' - scope, spec = self.get_scope_by_spec(p, spec) - - result = scope.create_ptransform(spec['transforms'][0], []) + result = scope.create_ptransform(spec['transforms'][0], ['something']) self.assertIsInstance(result, beam.transforms.ParDo) self.assertEqual(result.label, 'Map(lambda x: x*x)') diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 9a540e3551ffd..8eaea72d0683f 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -229,8 +229,56 @@ def test_name_is_ambiguous(self): output: AnotherFilter ''') + def test_empty_inputs_throws_error(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with self.assertRaisesRegex( + ValueError, + r'Missing inputs for transform at "EmptyInputOkButYamlDoesntKnow" at line .*' + ): + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: PyTransform + name: EmptyInputOkButYamlDoesntKnow + config: + constructor: apache_beam.Impulse + ''') + + def test_empty_inputs_ok_in_source(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + # Does not throw an error like it does above. + _ = p | YamlTransform( + ''' + type: composite + source: + type: PyTransform + name: EmptyInputOkButYamlDoesntKnow + config: + constructor: apache_beam.Impulse + ''') + + def test_empty_inputs_ok_if_explicit(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + # Does not throw an error like it does above. + _ = p | YamlTransform( + ''' + type: composite + transforms: + - type: PyTransform + name: EmptyInputOkButYamlDoesntKnow + input: {} + config: + constructor: apache_beam.Impulse + ''') + class CreateTimestamped(beam.PTransform): + _yaml_requires_inputs = False + def __init__(self, elements): self._elements = elements diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py index d10056fea5b0b..92c2a8dd48e67 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -186,7 +186,7 @@ def test_expand_composite_transform_with_name(self): - type: Create config: elements: [0,1,2] - output: + output: Create ''' scope, spec = self.get_scope_by_spec(p, spec) @@ -204,7 +204,7 @@ def test_expand_composite_transform_with_name_input(self): input: input config: fn: 'lambda x: x*x' - output: + output: PyMap ''' elements = p | beam.Create(range(3)) @@ -222,7 +222,7 @@ def test_expand_composite_transform_root(self): - type: Create config: elements: [0,1,2] - output: + output: Create ''' scope, spec = self.get_scope_by_spec(p, spec) @@ -315,7 +315,7 @@ def test_chain_as_composite_with_outputs_override(self): def test_chain_as_composite_with_input(self): spec = ''' type: chain - input: + input: elements transforms: - type: PyMap @@ -346,6 +346,7 @@ def test_normalize_source_sink(self): expected = ''' transforms: - type: Create + input: {'__explicitly_empty__': null} config: elements: [0,1,2] - type: PyMap @@ -367,7 +368,7 @@ def test_normalize_source_sink_only_source(self): - type: PyMap config: fn: 'lambda x: x*x' - + ''' spec = yaml.load(spec, Loader=SafeLineLoader) result = normalize_source_sink(spec) @@ -375,6 +376,7 @@ def test_normalize_source_sink_only_source(self): expected = ''' transforms: - type: Create + input: {'__explicitly_empty__': null} config: elements: [0,1,2] - type: PyMap @@ -444,6 +446,7 @@ def test_preprocess_source_sink_composite(self): type: composite transforms: - type: Create + input: {'__explicitly_empty__': null} config: elements: [0,1,2] - type: PyMap @@ -471,6 +474,7 @@ def test_preprocess_source_sink_chain(self): type: chain transforms: - type: Create + input: {'__explicitly_empty__': null} config: elements: [0,1,2] - type: PyMap @@ -499,10 +503,10 @@ def test_normalize_inputs_outputs(self): expected = ''' type: PyMap - input: + input: input: [Create1, Create2] fn: 'lambda x: x*x' - output: + output: output: Squared ''' self.assertYaml(expected, result) @@ -512,7 +516,7 @@ def test_normalize_inputs_outputs_dict(self): type: PyMap input: [Create1, Create2] fn: 'lambda x: x*x' - output: + output: out1: Squared1 out2: Squared2 ''' @@ -521,10 +525,10 @@ def test_normalize_inputs_outputs_dict(self): expected = ''' type: PyMap - input: + input: input: [Create1, Create2] fn: 'lambda x: x*x' - output: + output: out1: Squared1 out2: Squared2 ''' @@ -610,13 +614,13 @@ def test_push_windowing_to_roots(self): windowing: type: fixed size: 2 - __consumed_outputs: + __consumed_outputs: - null input: {} output: {} - type: PyMap fn: 'lambda x: x*x' - input: + input: input: Create output: {} windowing: @@ -646,7 +650,7 @@ def test_preprocess_windowing_custom_type(self): input: Create transforms: - type: SumGlobally - input: + input: input: {result['transforms'][1]['__uuid__']} output: {{}} - type: WindowInto @@ -696,10 +700,10 @@ def test_preprocess_windowing_composite_with_windowing_outer(self): input: {} output: {} - type: SumGlobally - input: + input: input: Create output: {} - output: + output: output: SumGlobally ''' self.assertYaml(expected, result) @@ -736,13 +740,13 @@ def test_preprocess_windowing_composite_with_windowing_on_input(self): input: {} output: {} - type: SumGlobally - input: + input: input: Create windowing: type: fixed size: 4 output: {} - output: + output: output: SumGlobally ''' self.assertYaml(expected, result) @@ -805,7 +809,7 @@ def test_preprocess_flattened_inputs_implicit(self): input1: Create2 - type: PyMap fn: 'lambda x: x*x' - input: + input: input: {result['transforms'][0]['__uuid__']} output: {{}} output: CreateTimestamped @@ -839,7 +843,7 @@ def test_preprocess_flattened_inputs_explicit_flatten(self): output: {} - type: PyMap fn: 'lambda x: x*x' - input: + input: input: Flatten output: {} output: CreateTimestamped From 66c5c4a27e64f28f6528a503b40fb9902f3a262d Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 5 Sep 2023 10:01:16 -0700 Subject: [PATCH 2/4] lint --- sdks/python/apache_beam/yaml/yaml_transform_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 8eaea72d0683f..96f4111e8b454 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -232,10 +232,9 @@ def test_name_is_ambiguous(self): def test_empty_inputs_throws_error(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: - with self.assertRaisesRegex( - ValueError, - r'Missing inputs for transform at "EmptyInputOkButYamlDoesntKnow" at line .*' - ): + with self.assertRaisesRegex(ValueError, + 'Missing inputs for transform at ' + '"EmptyInputOkButYamlDoesntKnow" at line .*'): _ = p | YamlTransform( ''' type: composite From 16a8ccbd0c011b14a3295d80cafe75fcb2b6435a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 8 Sep 2023 10:52:19 -0700 Subject: [PATCH 3/4] post-merge update for new provider --- sdks/python/apache_beam/yaml/yaml_provider.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 701a9c91a3424..e4eaa3b77a2fe 100644 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -613,6 +613,9 @@ def available(self) -> bool: def provided_transforms(self) -> Iterable[str]: return self._transforms.keys() + def requires_inputs(self, typ, args): + return self._underlying_provider.requires_inputs(typ, args) + def create_transform( self, typ: str, From f0a9c6252c2767054502e13c489a0ceeda92a0ee Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 8 Sep 2023 11:08:59 -0700 Subject: [PATCH 4/4] another post-merge adaptation --- sdks/python/apache_beam/yaml/yaml_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 47b2f69f6702a..da9bf526cd596 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -208,7 +208,7 @@ def followers(self, transform_name): # TODO(yaml): Also trace through outputs and composites. for transform in self._transforms: if transform['type'] != 'composite': - for input in transform.get('input').values(): + for input in empty_if_explicitly_empty(transform['input']).values(): transform_id, _ = self.get_transform_id_and_output_name(input) self._all_followers[transform_id].append(transform['__uuid__']) return self._all_followers[self.get_transform_id(transform_name)]