diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 7ba8aa128c24..12b546da53d9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1507,25 +1507,28 @@ def _check_fn_use_yield_and_return(fn): source_code = _get_function_body_without_inners(fn) has_yield = False has_return = False - return_none_warning = ( - "No iterator is returned by the process method in %s.", - fn.__self__.__class__) + has_return_none = False for line in source_code.split("\n"): lstripped_line = line.lstrip() if lstripped_line.startswith("yield ") or lstripped_line.startswith( "yield("): has_yield = True - if lstripped_line.startswith("return ") or lstripped_line.startswith( + elif lstripped_line.rstrip() == "return": + has_return = True + elif lstripped_line.startswith("return ") or lstripped_line.startswith( "return("): + if lstripped_line.rstrip() == "return None" or lstripped_line.rstrip( + ) == "return(None)": + has_return_none = True has_return = True - if lstripped_line.startswith( - "return None") or lstripped_line.rstrip() == "return": - _LOGGER.warning(return_none_warning) if has_yield and has_return: return True - if not has_yield and not has_return: - _LOGGER.warning(return_none_warning) + if has_return_none: + _LOGGER.warning( + "Process method returned None (element won't be emitted): %s." + " Check if intended.", + fn.__self__.__class__) return False except Exception as e: diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 0d680c969c9b..80ab6a88afb4 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -40,7 +40,7 @@ from apache_beam.typehints import row_type from apache_beam.typehints import typehints -RETURN_NONE_PARTIAL_WARNING = "No iterator is returned" +RETURN_NONE_PARTIAL_WARNING = "Process method returned None" class TestDoFn1(beam.DoFn): @@ -121,9 +121,11 @@ def process(self, element): class TestDoFn12(beam.DoFn): - """test process returning None (return statement without a value)""" + """test process returning None in a filter pattern""" def process(self, element): - return + if element == 0: + return + return element class TestDoFnStateful(beam.DoFn): @@ -194,14 +196,12 @@ def test_dofn_with_explicit_return_none(self): def test_dofn_with_implicit_return_none_missing_return_and_yield(self): with self._caplog.at_level(logging.WARNING): beam.ParDo(TestDoFn11()) - assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text - assert str(TestDoFn11) in self._caplog.text + assert RETURN_NONE_PARTIAL_WARNING not in self._caplog.text - def test_dofn_with_implicit_return_none_return_without_value(self): + def test_dofn_with_implicit_return_none_and_value(self): with self._caplog.at_level(logging.WARNING): beam.ParDo(TestDoFn12()) - assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text - assert str(TestDoFn12) in self._caplog.text + assert RETURN_NONE_PARTIAL_WARNING not in self._caplog.text class PartitionTest(unittest.TestCase):