Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@

* Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)).
* Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)).
* `DoFn.process` returning a `str`, `bytes`, or `dict` (instead of an iterable wrapping one) now raises a clear `TypeError` rather than silently iterating per-character/byte/key (Python) ([#18712](https://github.com/apache/beam/issues/18712)).

## Security Fixes

Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ cdef class _OutputHandler(OutputHandler):
cdef object output_batch_converter
cdef bint _process_batch_yields_elements
cdef bint _process_yields_batches
cdef bint _check_user_dofn_output

@cython.locals(windowed_value=WindowedValue,
windowed_batch=WindowedBatch,
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ def __init__(
do_fn_signature.process_batch_method.method_value,
'_beam_yields_elements',
False),
check_user_dofn_output=not isinstance(fn, core.CallableWrapperDoFn),
)

if do_fn_signature.is_stateful_dofn() and not user_state_context:
Expand Down Expand Up @@ -1633,6 +1634,7 @@ def __init__(
output_batch_converter, # type: Optional[BatchConverter]
process_yields_batches, # type: bool
process_batch_yields_elements, # type: bool
check_user_dofn_output=False, # type: bool
):
"""Initializes ``_OutputHandler``.

Expand All @@ -1642,6 +1644,12 @@ def __init__(
tagged_receivers: main receiver object.
per_element_output_counter: per_element_output_counter of one work_item.
could be none if experimental flag turn off
check_user_dofn_output: if True, validate that a user-class DoFn does not
return a str/bytes/dict (a common bug — see
https://github.com/apache/beam/issues/18712).
Skipped for callable-wrapped DoFns (Map/FlatMap)
where iterating a returned str/bytes/dict is a
legitimate flatten use case.
"""
self.window_fn = window_fn
self.main_receivers = main_receivers
Expand All @@ -1654,6 +1662,7 @@ def __init__(
self.output_batch_converter = output_batch_converter
self._process_yields_batches = process_yields_batches
self._process_batch_yields_elements = process_batch_yields_elements
self._check_user_dofn_output = check_user_dofn_output

def handle_process_outputs(
self, windowed_input_element, results, watermark_estimator=None):
Expand All @@ -1667,6 +1676,13 @@ def handle_process_outputs(
if results is None:
results = []

if self._check_user_dofn_output and isinstance(results, (str, bytes, dict)):
object_type = type(results).__name__
raise TypeError(
'Returning a %s from a ParDo or FlatMap is discouraged. '
'Please use list(%r) if you really want this behavior.' %
(object_type, results))
Comment thread
chrisqiqiu marked this conversation as resolved.

# TODO(https://github.com/apache/beam/issues/20404): Verify that the
# results object is a valid iterable type if
# performance_runtime_type_check is active, without harming performance
Expand Down
41 changes: 41 additions & 0 deletions sdks/python/apache_beam/runners/common_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,47 @@ def process(self, element, mykey=DoFn.KeyParam):
test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2]))
(p | test_stream | beam.ParDo(DoFnProcessWithKeyparam()))

def test_dofn_returning_str_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712.

A DoFn returning a str instead of an iterable wrapping one used to
silently iterate per-character. It should now raise a clear TypeError.
"""
class BadDoFn(DoFn):
def process(self, element):
return 'hello'

# Use base Exception (matching existing convention in
# typecheck_test.py::test_do_fn_returning_non_iterable_throws_error)
# because the runner's _reraise_augmented wraps the TypeError before
# it surfaces to the test framework.
with self.assertRaisesRegex(
Exception, 'Returning a str from a ParDo or FlatMap is discouraged'):
Comment thread
chrisqiqiu marked this conversation as resolved.
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_dofn_returning_bytes_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712."""
class BadDoFn(DoFn):
def process(self, element):
return b'hello'

with self.assertRaisesRegex(
Exception, 'Returning a bytes from a ParDo or FlatMap is discouraged'):
Comment thread
chrisqiqiu marked this conversation as resolved.
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_dofn_returning_dict_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712."""
class BadDoFn(DoFn):
def process(self, element):
return {'k': 'v'}

with self.assertRaisesRegex(
Exception, 'Returning a dict from a ParDo or FlatMap is discouraged'):
Comment thread
chrisqiqiu marked this conversation as resolved.
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_pardo_with_unbounded_per_element_dofn(self):
class UnboundedDoFn(beam.DoFn):
@beam.DoFn.unbounded_per_element()
Expand Down
Loading