diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py index 02423f517eea..cf0e04af163a 100644 --- a/sdks/python/apache_beam/dataframe/io.py +++ b/sdks/python/apache_beam/dataframe/io.py @@ -515,7 +515,7 @@ def seekable(self): @property def closed(self): - return False + return getattr(self._underlying, 'closed', False) def __iter__(self): # For pandas is_file_like. @@ -584,7 +584,18 @@ def _read(self, size=-1): return res def flush(self): - self._underlying.flush() + if not self.closed: + try: + self._underlying.flush() + except ValueError: + pass + + def close(self): + if not self.closed and hasattr(self._underlying, 'close'): + try: + self._underlying.close() + except (OSError, ValueError): + pass class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider): diff --git a/sdks/python/apache_beam/dataframe/io_test.py b/sdks/python/apache_beam/dataframe/io_test.py index 313d955b4550..4cd502d1b8d7 100644 --- a/sdks/python/apache_beam/dataframe/io_test.py +++ b/sdks/python/apache_beam/dataframe/io_test.py @@ -296,6 +296,51 @@ def test_truncating_filehandle_iter(self): self._run_truncating_file_handle_iter_test('aaa b cccccccccccccccccccc') self._run_truncating_file_handle_iter_test('aaa b ccccccccccccccccc ') + def test_truncating_filehandle_flush_on_closed_stream(self): + class ClosedFlushingStream(StringIO): + def flush(self): + if self.closed: + raise ValueError("I/O operation on closed file.") + super().flush() + + s = 'a b c' + tracker = restriction_trackers.OffsetRestrictionTracker( + restriction_trackers.OffsetRange(0, len(s))) + underlying = ClosedFlushingStream(s) + handle = io._TruncatingFileHandle( + underlying, tracker, splitter=io._DelimSplitter(' ', 10)) + + # Verify that calling flush() when the underlying stream is closed + # succeeds without raising ValueError. + underlying.close() + handle.flush() + handle.close() + + def test_truncating_filehandle_exception_suppression(self): + class FaultyStream(StringIO): + @property + def closed(self): + return False + + def flush(self): + raise ValueError("Simulated flush error") + + def close(self): + raise OSError("Simulated close error") + + s = 'a b c' + tracker = restriction_trackers.OffsetRestrictionTracker( + restriction_trackers.OffsetRange(0, len(s))) + underlying = FaultyStream(s) + handle = io._TruncatingFileHandle( + underlying, tracker, splitter=io._DelimSplitter(' ', 10)) + + # Verify that ValueError raised during flush() is safely suppressed. + handle.flush() + + # Verify that OSError raised during close() is safely suppressed. + handle.close() + @parameterized.expand([ ('defaults', {}), ('header', dict(header=1)),