diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index c4ac12a84cb6..6fb7c9f28a3c 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -766,6 +766,12 @@ def apply( 'streaming jobs.' % full_label) self.applied_labels.add(full_label) + if pvalueish is None: + full_label = self._current_transform().full_label + raise TypeCheckError( + f'Transform "{full_label}" was applied to the output of ' + f'an object of type None.') + pvalueish, inputs = transform._extract_input_pvalues(pvalueish) try: if not isinstance(inputs, dict): @@ -805,9 +811,6 @@ def apply( self._assert_not_applying_PDone(pvalueish, transform) pvalueish_result = self.runner.apply(transform, pvalueish, self._options) - if pvalueish_result is None: - pvalueish_result = pvalue.PDone(self) - pvalueish_result.producer = current if type_options is not None and type_options.pipeline_type_check: transform.type_check_outputs(pvalueish_result) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 0bbd14b6afc7..bbeedbfb72a6 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -172,9 +172,7 @@ class ParentTransform(PTransform): def expand(self, pcoll): return pcoll | DoNothingTransform() - with pytest.raises( - TypeCheckError, - match=r".*applied to the output.*ParentTransform/DoNothingTransform"): + with pytest.raises(TypeCheckError, match=r".*applied to the output"): with TestPipeline() as pipeline: _ = pipeline | ParentTransform() | beam.Map(lambda x: x + 1)