From 7375ea6d2d5d525bf06c0acbb07d56c25962894a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 8 May 2026 20:43:39 -0400 Subject: [PATCH 1/4] Add a test to reproduce a flush error. --- sdks/python/apache_beam/dataframe/io_test.py | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py index 313d955b4550..1ddfa45c18f0 100644 --- a/sdks/python/apache_beam/dataframe/io_test.py +++ b/sdks/python/apache_beam/dataframe/io_test.py @@ -296,6 +296,27 @@ def test_truncating_filehandle_iter(self): self._run_truncating_file_handle_iter_test('aaa b cccccccccccccccccccc') self._run_truncating_file_handle_iter_test('aaa b ccccccccccccccccc ') + def test_truncating_filehandle_flush_on_closed_stream(self): + class ClosedFlushingStream(StringIO): + def flush(self): + if self.closed: + raise ValueError("I/O operation on closed file.") + super().flush() + + s = 'a b c' + tracker = restriction_trackers.OffsetRestrictionTracker( + restriction_trackers.OffsetRange(0, len(s))) + underlying = ClosedFlushingStream(s) + handle = io._TruncatingFileHandle( + underlying, tracker, splitter=io._DelimSplitter(' ', 10)) + + # Verify that calling flush() when the underlying stream is closed + # succeeds without raising ValueError. + underlying.close() + handle.flush() + + + @parameterized.expand([ ('defaults', {}), ('header', dict(header=1)), From 75ec31c961a25de93c28776c6eee0fd4944a305c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 8 May 2026 20:51:01 -0400 Subject: [PATCH 2/4] Fix close and flush exceptions on _TruncatingFileHandle --- sdks/python/apache_beam/dataframe/io.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index 02423f517eea..0fee0589ba44 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -515,7 +515,7 @@ def seekable(self): @property def closed(self): - return False + return getattr(self._underlying, 'closed', False) def __iter__(self): # For pandas is_file_like. @@ -584,7 +584,19 @@ def _read(self, size=-1): return res def flush(self): - self._underlying.flush() + if not self.closed: + try: + self._underlying.flush() + except ValueError: + pass + + def close(self): + if not self.closed: + try: + if hasattr(self._underlying, 'close'): + self._underlying.close() + except Exception: + pass class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider): From 7e4ee0af9c19c17e8c5739622b312080e26b0c7a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 8 May 2026 22:53:24 -0400 Subject: [PATCH 3/4] Address review comments. --- sdks/python/apache_beam/dataframe/io.py | 7 +++---- sdks/python/apache_beam/dataframe/io_test.py | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index 0fee0589ba44..cf0e04af163a 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -591,11 +591,10 @@ def flush(self): pass def close(self): - if not self.closed: + if not self.closed and hasattr(self._underlying, 'close'): try: - if hasattr(self._underlying, 'close'): - self._underlying.close() - except Exception: + self._underlying.close() + except (OSError, ValueError): pass diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py index 1ddfa45c18f0..8ef374985279 100644 --- a/sdks/python/apache_beam/dataframe/io_test.py +++ b/sdks/python/apache_beam/dataframe/io_test.py @@ -314,8 +314,7 @@ def flush(self): # succeeds without raising ValueError. underlying.close() handle.flush() - - + handle.close() @parameterized.expand([ ('defaults', {}), From 3e8f5806f763084c156ead63bdc433443da3abef Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 8 May 2026 23:38:50 -0400 Subject: [PATCH 4/4] Add another unit test to improve test coverage. --- sdks/python/apache_beam/dataframe/io_test.py | 25 ++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py index 8ef374985279..4cd502d1b8d7 100644 --- a/sdks/python/apache_beam/dataframe/io_test.py +++ b/sdks/python/apache_beam/dataframe/io_test.py @@ -316,6 +316,31 @@ def flush(self): handle.flush() handle.close() + def test_truncating_filehandle_exception_suppression(self): + class FaultyStream(StringIO): + @property + def closed(self): + return False + + def flush(self): + raise ValueError("Simulated flush error") + + def close(self): + raise OSError("Simulated close error") + + s = 'a b c' + tracker = restriction_trackers.OffsetRestrictionTracker( + restriction_trackers.OffsetRange(0, len(s))) + underlying = FaultyStream(s) + handle = io._TruncatingFileHandle( + underlying, tracker, splitter=io._DelimSplitter(' ', 10)) + + # Verify that ValueError raised during flush() is safely suppressed. + handle.flush() + + # Verify that OSError raised during close() is safely suppressed. + handle.close() + @parameterized.expand([ ('defaults', {}), ('header', dict(header=1)),