diff --git a/CHANGES.md b/CHANGES.md index 0db7fddba4f1..c96fa3a11c93 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -95,6 +95,7 @@ * Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)). * Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)). +* `DoFn.process` returning a `str`, `bytes`, or `dict` (instead of an iterable wrapping one) now raises a clear `TypeError` rather than silently iterating per-character/byte/key (Python) ([#18712](https://github.com/apache/beam/issues/18712)). ## Security Fixes diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index a9c1b91d9d54..93c9e83875ec 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -150,6 +150,7 @@ cdef class _OutputHandler(OutputHandler): cdef object output_batch_converter cdef bint _process_batch_yields_elements cdef bint _process_yields_batches + cdef bint _check_user_dofn_output @cython.locals(windowed_value=WindowedValue, windowed_batch=WindowedBatch, diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index c22072dbf8b9..d1688e9182dc 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -1475,6 +1475,7 @@ def __init__( do_fn_signature.process_batch_method.method_value, '_beam_yields_elements', False), + check_user_dofn_output=not isinstance(fn, core.CallableWrapperDoFn), ) if do_fn_signature.is_stateful_dofn() and not user_state_context: @@ -1633,6 +1634,7 @@ def __init__( output_batch_converter, # type: Optional[BatchConverter] process_yields_batches, # type: bool process_batch_yields_elements, # type: bool + check_user_dofn_output=False, # type: bool ): """Initializes ``_OutputHandler``. @@ -1642,6 +1644,12 @@ def __init__( tagged_receivers: main receiver object. per_element_output_counter: per_element_output_counter of one work_item. could be none if experimental flag turn off + check_user_dofn_output: if True, validate that a user-class DoFn does not + return a str/bytes/dict (a common bug — see + https://github.com/apache/beam/issues/18712). + Skipped for callable-wrapped DoFns (Map/FlatMap) + where iterating a returned str/bytes/dict is a + legitimate flatten use case. """ self.window_fn = window_fn self.main_receivers = main_receivers @@ -1654,6 +1662,7 @@ def __init__( self.output_batch_converter = output_batch_converter self._process_yields_batches = process_yields_batches self._process_batch_yields_elements = process_batch_yields_elements + self._check_user_dofn_output = check_user_dofn_output def handle_process_outputs( self, windowed_input_element, results, watermark_estimator=None): @@ -1667,6 +1676,13 @@ def handle_process_outputs( if results is None: results = [] + if self._check_user_dofn_output and isinstance(results, (str, bytes, dict)): + object_type = type(results).__name__ + raise TypeError( + 'Returning a %s from a ParDo or FlatMap is discouraged. ' + 'Please use list(%r) if you really want this behavior.' % + (object_type, results)) + # TODO(https://github.com/apache/beam/issues/20404): Verify that the # results object is a valid iterable type if # performance_runtime_type_check is active, without harming performance diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py index cc4e8218e8af..6470c3a52b32 100644 --- a/sdks/python/apache_beam/runners/common_test.py +++ b/sdks/python/apache_beam/runners/common_test.py @@ -154,6 +154,47 @@ def process(self, element, mykey=DoFn.KeyParam): test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2])) (p | test_stream | beam.ParDo(DoFnProcessWithKeyparam())) + def test_dofn_returning_str_raises_clear_error(self): + """Regression test for https://github.com/apache/beam/issues/18712. + + A DoFn returning a str instead of an iterable wrapping one used to + silently iterate per-character. It should now raise a clear TypeError. + """ + class BadDoFn(DoFn): + def process(self, element): + return 'hello' + + # Use base Exception (matching existing convention in + # typecheck_test.py::test_do_fn_returning_non_iterable_throws_error) + # because the runner's _reraise_augmented wraps the TypeError before + # it surfaces to the test framework. + with self.assertRaisesRegex( + Exception, 'Returning a str from a ParDo or FlatMap is discouraged'): + with TestPipeline() as p: + _ = p | beam.Create([0]) | beam.ParDo(BadDoFn()) + + def test_dofn_returning_bytes_raises_clear_error(self): + """Regression test for https://github.com/apache/beam/issues/18712.""" + class BadDoFn(DoFn): + def process(self, element): + return b'hello' + + with self.assertRaisesRegex( + Exception, 'Returning a bytes from a ParDo or FlatMap is discouraged'): + with TestPipeline() as p: + _ = p | beam.Create([0]) | beam.ParDo(BadDoFn()) + + def test_dofn_returning_dict_raises_clear_error(self): + """Regression test for https://github.com/apache/beam/issues/18712.""" + class BadDoFn(DoFn): + def process(self, element): + return {'k': 'v'} + + with self.assertRaisesRegex( + Exception, 'Returning a dict from a ParDo or FlatMap is discouraged'): + with TestPipeline() as p: + _ = p | beam.Create([0]) | beam.ParDo(BadDoFn()) + def test_pardo_with_unbounded_per_element_dofn(self): class UnboundedDoFn(beam.DoFn): @beam.DoFn.unbounded_per_element()