From e2bf5d6dbe31b85a74975b076aee04606621f725 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 9 Aug 2024 13:32:43 -0700 Subject: [PATCH] Add basic testing for yaml join docs. As we're using the fake SQL and not checking the outputs, this is not substitute for real testing, but at the very least it ensures the joins are syntactically correct. --- sdks/python/apache_beam/yaml/readme_test.py | 40 ++++++++++++++++--- sdks/python/apache_beam/yaml/yaml_join.py | 3 +- .../en/documentation/sdks/yaml-join.md | 2 +- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/yaml/readme_test.py b/sdks/python/apache_beam/yaml/readme_test.py index efce92490a359..82e0a1b246f2c 100644 --- a/sdks/python/apache_beam/yaml/readme_test.py +++ b/sdks/python/apache_beam/yaml/readme_test.py @@ -55,6 +55,8 @@ def expand(self, inputs): def guess_name_and_type(expr): expr = expr.strip().replace('`', '') + if expr.endswith('*'): + return 'unknown', str parts = expr.split() if len(parts) >= 2 and parts[-2].lower() == 'as': name = parts[-1] @@ -87,7 +89,7 @@ def guess_name_and_type(expr): return name, typ if m.group(1) == '*': - return inputs['PCOLLECTION'] | beam.Filter(lambda _: True) + return next(iter(inputs.values())) | beam.Filter(lambda _: True) else: output_schema = [ guess_name_and_type(expr) for expr in m.group(1).split(',') @@ -280,17 +282,40 @@ def parse_test_methods(markdown_lines): else: if code_lines: if code_lines[0].startswith('- type:'): - is_chain = not any('input:' in line for line in code_lines) + specs = yaml.load('\n'.join(code_lines), Loader=SafeLoader) + is_chain = not any('input' in spec for spec in specs) + if is_chain: + undefined_inputs = set(['input']) + else: + + def extract_inputs(input_spec): + if not input_spec: + return set() + elif isinstance(input_spec, str): + return set([input_spec.split('.')[0]]) + elif isinstance(input_spec, list): + return set.union(*[extract_inputs(v) for v in input_spec]) + elif isinstance(input_spec, dict): + return set.union( + *[extract_inputs(v) for v in input_spec.values()]) + else: + raise ValueError("Misformed inputs: " + input_spec) + + def extract_name(input_spec): + return input_spec.get('name', input_spec.get('type')) + + undefined_inputs = set.union( + *[extract_inputs(spec.get('input')) for spec in specs]) - set( + extract_name(spec) for spec in specs) # Treat this as a fragment of a larger pipeline. # pylint: disable=not-an-iterable code_lines = [ 'pipeline:', ' type: chain' if is_chain else '', ' transforms:', - ' - type: ReadFromCsv', - ' name: input', - ' config:', - ' path: whatever', + ] + [ + ' - {type: ReadFromCsv, name: "%s", config: {path: x}}' % + undefined_input for undefined_input in undefined_inputs ] + [' ' + line for line in code_lines] if code_lines[0] == 'pipeline:': yaml_pipeline = '\n'.join(code_lines) @@ -329,6 +354,9 @@ def createTestSuite(name, path): InlinePythonTest = createTestSuite( 'InlinePythonTest', os.path.join(YAML_DOCS_DIR, 'yaml-inline-python.md')) +JoinTest = createTestSuite( + 'JoinTest', os.path.join(YAML_DOCS_DIR, 'yaml-join.md')) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--render_dir', default=None) diff --git a/sdks/python/apache_beam/yaml/yaml_join.py b/sdks/python/apache_beam/yaml/yaml_join.py index 04a24642c2317..5124ef56b49c1 100644 --- a/sdks/python/apache_beam/yaml/yaml_join.py +++ b/sdks/python/apache_beam/yaml/yaml_join.py @@ -173,8 +173,9 @@ def _is_connected(edge_list, expected_node_count): def _SqlJoinTransform( pcolls, sql_transform_constructor, - type: Union[str, Dict[str, List]], + *, equalities: Union[str, List[Dict[str, str]]], + type: Union[str, Dict[str, List]] = 'inner', fields: Optional[Dict[str, Any]] = None): """Joins two or more inputs using a specified condition. diff --git a/website/www/site/content/en/documentation/sdks/yaml-join.md b/website/www/site/content/en/documentation/sdks/yaml-join.md index d207926ff995b..6645d7a945a34 100644 --- a/website/www/site/content/en/documentation/sdks/yaml-join.md +++ b/website/www/site/content/en/documentation/sdks/yaml-join.md @@ -48,7 +48,7 @@ inputs, one can use the following shorthand syntax: input2: Second Input input3: Third Input config: - equalities: col + equalities: col1 ``` ## Join Types