Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1507,25 +1507,28 @@ def _check_fn_use_yield_and_return(fn):
source_code = _get_function_body_without_inners(fn)
has_yield = False
has_return = False
return_none_warning = (
"No iterator is returned by the process method in %s.",
fn.__self__.__class__)
has_return_none = False
for line in source_code.split("\n"):
lstripped_line = line.lstrip()
if lstripped_line.startswith("yield ") or lstripped_line.startswith(
"yield("):
has_yield = True
if lstripped_line.startswith("return ") or lstripped_line.startswith(
elif lstripped_line.rstrip() == "return":
has_return = True
elif lstripped_line.startswith("return ") or lstripped_line.startswith(
"return("):
if lstripped_line.rstrip() == "return None" or lstripped_line.rstrip(
) == "return(None)":
has_return_none = True
has_return = True
if lstripped_line.startswith(
"return None") or lstripped_line.rstrip() == "return":
_LOGGER.warning(return_none_warning)
if has_yield and has_return:
return True

if not has_yield and not has_return:
_LOGGER.warning(return_none_warning)
if has_return_none:
_LOGGER.warning(
"Process method returned None (element won't be emitted): %s."
" Check if intended.",
fn.__self__.__class__)

return False
except Exception as e:
Expand Down
16 changes: 8 additions & 8 deletions sdks/python/apache_beam/transforms/core_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from apache_beam.typehints import row_type
from apache_beam.typehints import typehints

RETURN_NONE_PARTIAL_WARNING = "No iterator is returned"
RETURN_NONE_PARTIAL_WARNING = "Process method returned None"


class TestDoFn1(beam.DoFn):
Expand Down Expand Up @@ -121,9 +121,11 @@ def process(self, element):


class TestDoFn12(beam.DoFn):
"""test process returning None (return statement without a value)"""
"""test process returning None in a filter pattern"""
def process(self, element):
return
if element == 0:
return
return element


class TestDoFnStateful(beam.DoFn):
Expand Down Expand Up @@ -194,14 +196,12 @@ def test_dofn_with_explicit_return_none(self):
def test_dofn_with_implicit_return_none_missing_return_and_yield(self):
with self._caplog.at_level(logging.WARNING):
beam.ParDo(TestDoFn11())
assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text
assert str(TestDoFn11) in self._caplog.text
assert RETURN_NONE_PARTIAL_WARNING not in self._caplog.text

def test_dofn_with_implicit_return_none_return_without_value(self):
def test_dofn_with_implicit_return_none_and_value(self):
with self._caplog.at_level(logging.WARNING):
beam.ParDo(TestDoFn12())
assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text
assert str(TestDoFn12) in self._caplog.text
assert RETURN_NONE_PARTIAL_WARNING not in self._caplog.text


class PartitionTest(unittest.TestCase):
Expand Down
Loading