From 3d137746a842368c88c391a31bc1e1c185b5d4ec Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 00:09:49 -0400 Subject: [PATCH 1/9] Add a test to reproduce hanging. --- .../apache_beam/transforms/async_dofn_test.py | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/sdks/python/apache_beam/transforms/async_dofn_test.py b/sdks/python/apache_beam/transforms/async_dofn_test.py index 81c7b8e163ff..ec276f91af13 100644 --- a/sdks/python/apache_beam/transforms/async_dofn_test.py +++ b/sdks/python/apache_beam/transforms/async_dofn_test.py @@ -16,6 +16,7 @@ # import logging +import multiprocessing import random import time import unittest @@ -487,6 +488,46 @@ def add_item(i): self.check_output(results[i], expected_outputs['key' + str(i)]) self.assertEqual(bag_states['key' + str(i)].items, []) + @staticmethod + def _run_reset_state_deadlock_scenario(use_asyncio): + if use_asyncio: + return + + dofn = BasicDofn(sleep_time=0.5) + async_dofn = async_lib.AsyncWrapper(dofn, use_asyncio=False) + async_dofn.setup() + fake_bag_state = FakeBagState([]) + fake_timer = FakeTimer(0) + + # Start processing an item. This starts a worker thread sleeping for 0.5s. + async_dofn.process(('key1', 1), to_process=fake_bag_state, timer=fake_timer) + time.sleep(0.05) + + # Attempt to call reset_state(). If the fix is NOT applied, this will deadlock + # forever because reset_state() holds the lock while calling shutdown(wait=True), + # blocking the future's done callback from acquiring the lock. + async_lib.AsyncWrapper.reset_state() + + def test_reset_state_hang_reproduction(self): + # Run the deadlock scenario in a separate process so that if it hangs, + # we can terminate it without causing the main pytest process to hang at exit. + if self.use_asyncio: + return + + p = multiprocessing.Process( + target=AsyncTest._run_reset_state_deadlock_scenario, + args=(self.use_asyncio,)) + p.start() + p.join(timeout=3.0) + + if p.is_alive(): + p.terminate() + p.join() + self.fail("reset_state() deadlocked/hung waiting for active threads to finish") + else: + self.assertEqual(p.exitcode, 0) + if __name__ == '__main__': unittest.main() + From ed0c2d29b8d7b7f614e6b6165f76d170e5c4a3e3 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 00:33:18 -0400 Subject: [PATCH 2/9] Fix deadlock between shutdown in main thread and done callback in worker threads. --- sdks/python/apache_beam/transforms/async_dofn.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/async_dofn.py b/sdks/python/apache_beam/transforms/async_dofn.py index 28568bd893c5..a0248bd70c86 100644 --- a/sdks/python/apache_beam/transforms/async_dofn.py +++ b/sdks/python/apache_beam/transforms/async_dofn.py @@ -165,9 +165,14 @@ def reset_state(): if AsyncWrapper._loop_started is not None: AsyncWrapper._loop_started.clear() - for pool in AsyncWrapper._pool.values(): - pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown( - wait=True, cancel_futures=True) + pools_to_shutdown = [ + pool.acquire(AsyncWrapper.initialize_pool(1)) + for pool in AsyncWrapper._pool.values() + ] + + for pool in pools_to_shutdown: + pool.shutdown(wait=True, cancel_futures=True) + with AsyncWrapper._lock: AsyncWrapper._pool = {} AsyncWrapper._processing_elements = {} @@ -268,7 +273,8 @@ async def _collect(result): def decrement_items_in_buffer(self, future): with AsyncWrapper._lock: - AsyncWrapper._items_in_buffer[self._uuid] -= 1 + if self._uuid in AsyncWrapper._items_in_buffer: + AsyncWrapper._items_in_buffer[self._uuid] -= 1 def schedule_if_room(self, element, ignore_buffer=False, *args, **kwargs): """Schedules an item to be processed asynchronously if there is room. From f737cc18a42c8873c3abff1e432736411ebdb8c2 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 00:53:32 -0400 Subject: [PATCH 3/9] Address review comments. --- sdks/python/apache_beam/transforms/async_dofn.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/async_dofn.py b/sdks/python/apache_beam/transforms/async_dofn.py index a0248bd70c86..a6312340b234 100644 --- a/sdks/python/apache_beam/transforms/async_dofn.py +++ b/sdks/python/apache_beam/transforms/async_dofn.py @@ -165,10 +165,11 @@ def reset_state(): if AsyncWrapper._loop_started is not None: AsyncWrapper._loop_started.clear() - pools_to_shutdown = [ - pool.acquire(AsyncWrapper.initialize_pool(1)) - for pool in AsyncWrapper._pool.values() - ] + pools = list(AsyncWrapper._pool.values()) + + pools_to_shutdown = [ + pool.acquire(AsyncWrapper.initialize_pool(1)) for pool in pools + ] for pool in pools_to_shutdown: pool.shutdown(wait=True, cancel_futures=True) From d2fbefa99e9c9a3680fee44118eef19d6979a62b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 08:33:10 -0400 Subject: [PATCH 4/9] Fix format --- sdks/python/apache_beam/transforms/async_dofn_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/async_dofn_test.py b/sdks/python/apache_beam/transforms/async_dofn_test.py index ec276f91af13..18ecb133a8a8 100644 --- a/sdks/python/apache_beam/transforms/async_dofn_test.py +++ b/sdks/python/apache_beam/transforms/async_dofn_test.py @@ -516,14 +516,15 @@ def test_reset_state_hang_reproduction(self): p = multiprocessing.Process( target=AsyncTest._run_reset_state_deadlock_scenario, - args=(self.use_asyncio,)) + args=(self.use_asyncio, )) p.start() p.join(timeout=3.0) if p.is_alive(): p.terminate() p.join() - self.fail("reset_state() deadlocked/hung waiting for active threads to finish") + self.fail( + "reset_state() deadlocked/hung waiting for active threads to finish") else: self.assertEqual(p.exitcode, 0) From 95e3b2452768dd5636502621985ac2f2a2c79f53 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 09:01:59 -0400 Subject: [PATCH 5/9] Modify the test to cover reset_state() hanging in asyncio mode. --- .../apache_beam/transforms/async_dofn_test.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/transforms/async_dofn_test.py b/sdks/python/apache_beam/transforms/async_dofn_test.py index 18ecb133a8a8..67615009629c 100644 --- a/sdks/python/apache_beam/transforms/async_dofn_test.py +++ b/sdks/python/apache_beam/transforms/async_dofn_test.py @@ -490,30 +490,24 @@ def add_item(i): @staticmethod def _run_reset_state_deadlock_scenario(use_asyncio): - if use_asyncio: - return - dofn = BasicDofn(sleep_time=0.5) - async_dofn = async_lib.AsyncWrapper(dofn, use_asyncio=False) + async_dofn = async_lib.AsyncWrapper(dofn, use_asyncio=use_asyncio) async_dofn.setup() fake_bag_state = FakeBagState([]) fake_timer = FakeTimer(0) - # Start processing an item. This starts a worker thread sleeping for 0.5s. + # Start processing an item. This starts a worker thread/coroutine sleeping for 0.5s. async_dofn.process(('key1', 1), to_process=fake_bag_state, timer=fake_timer) time.sleep(0.05) # Attempt to call reset_state(). If the fix is NOT applied, this will deadlock - # forever because reset_state() holds the lock while calling shutdown(wait=True), + # forever because reset_state() holds the lock while waiting for active tasks/threads, # blocking the future's done callback from acquiring the lock. async_lib.AsyncWrapper.reset_state() def test_reset_state_hang_reproduction(self): # Run the deadlock scenario in a separate process so that if it hangs, # we can terminate it without causing the main pytest process to hang at exit. - if self.use_asyncio: - return - p = multiprocessing.Process( target=AsyncTest._run_reset_state_deadlock_scenario, args=(self.use_asyncio, )) @@ -524,7 +518,7 @@ def test_reset_state_hang_reproduction(self): p.terminate() p.join() self.fail( - "reset_state() deadlocked/hung waiting for active threads to finish") + "reset_state() deadlocked/hung waiting for active threads/tasks to finish") else: self.assertEqual(p.exitcode, 0) From 63901b0db706ddb000f4bc85081478a1cf097125 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 09:11:53 -0400 Subject: [PATCH 6/9] Fix the deadlock when asyncio is used. --- sdks/python/apache_beam/transforms/async_dofn.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/async_dofn.py b/sdks/python/apache_beam/transforms/async_dofn.py index a6312340b234..ad3d5bc66469 100644 --- a/sdks/python/apache_beam/transforms/async_dofn.py +++ b/sdks/python/apache_beam/transforms/async_dofn.py @@ -153,12 +153,13 @@ def _run_event_loop(): @staticmethod def reset_state(): + event_loop_thread_to_join = None with AsyncWrapper._lock: if AsyncWrapper._event_loop: AsyncWrapper._event_loop.call_soon_threadsafe( AsyncWrapper._event_loop.stop) if AsyncWrapper._event_loop_thread: - AsyncWrapper._event_loop_thread.join() + event_loop_thread_to_join = AsyncWrapper._event_loop_thread AsyncWrapper._event_loop = None AsyncWrapper._event_loop_thread = None @@ -167,6 +168,17 @@ def reset_state(): pools = list(AsyncWrapper._pool.values()) + # We must join the asyncio event loop thread outside of the lock block. + # If joined inside the lock, the waiting thread holds the lock while blocking, + # preventing active coroutines' done callbacks from acquiring the lock on the + # event loop thread, resulting in a deadlock. + if event_loop_thread_to_join: + event_loop_thread_to_join.join() + + # We must acquire and shut down the thread pools outside of the lock block. + # If shutdown(wait=True) is called inside the lock, the caller blocks holding + # the lock, preventing active worker threads from acquiring the lock to run + # their done callbacks, resulting in a deadlock. pools_to_shutdown = [ pool.acquire(AsyncWrapper.initialize_pool(1)) for pool in pools ] From 738c81b9946eaa0e52497541647a42978c205240 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 09:14:48 -0400 Subject: [PATCH 7/9] Fix formatting. --- sdks/python/apache_beam/transforms/async_dofn_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/async_dofn_test.py b/sdks/python/apache_beam/transforms/async_dofn_test.py index 67615009629c..ddb574a6908c 100644 --- a/sdks/python/apache_beam/transforms/async_dofn_test.py +++ b/sdks/python/apache_beam/transforms/async_dofn_test.py @@ -518,11 +518,11 @@ def test_reset_state_hang_reproduction(self): p.terminate() p.join() self.fail( - "reset_state() deadlocked/hung waiting for active threads/tasks to finish") + "reset_state() deadlocked/hung waiting for active threads/tasks to finish" + ) else: self.assertEqual(p.exitcode, 0) if __name__ == '__main__': unittest.main() - From 937071297c273ac7d9a7623690e64995685f1f9e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 16:08:32 -0400 Subject: [PATCH 8/9] Increase timeout to reduce false-positives. --- sdks/python/apache_beam/transforms/async_dofn_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/async_dofn_test.py b/sdks/python/apache_beam/transforms/async_dofn_test.py index ddb574a6908c..a3117d2fea0b 100644 --- a/sdks/python/apache_beam/transforms/async_dofn_test.py +++ b/sdks/python/apache_beam/transforms/async_dofn_test.py @@ -512,7 +512,7 @@ def test_reset_state_hang_reproduction(self): target=AsyncTest._run_reset_state_deadlock_scenario, args=(self.use_asyncio, )) p.start() - p.join(timeout=3.0) + p.join(timeout=10.0) if p.is_alive(): p.terminate() From 11ffedc241c7d16ee8066e2c18ef3d60165ad90a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 9 May 2026 17:05:43 -0400 Subject: [PATCH 9/9] Revise test function names and some comments. --- .../apache_beam/transforms/async_dofn_test.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/transforms/async_dofn_test.py b/sdks/python/apache_beam/transforms/async_dofn_test.py index a3117d2fea0b..39901d791fb9 100644 --- a/sdks/python/apache_beam/transforms/async_dofn_test.py +++ b/sdks/python/apache_beam/transforms/async_dofn_test.py @@ -489,7 +489,7 @@ def add_item(i): self.assertEqual(bag_states['key' + str(i)].items, []) @staticmethod - def _run_reset_state_deadlock_scenario(use_asyncio): + def _run_reset_state_concurrent_teardown(use_asyncio): dofn = BasicDofn(sleep_time=0.5) async_dofn = async_lib.AsyncWrapper(dofn, use_asyncio=use_asyncio) async_dofn.setup() @@ -500,16 +500,15 @@ def _run_reset_state_deadlock_scenario(use_asyncio): async_dofn.process(('key1', 1), to_process=fake_bag_state, timer=fake_timer) time.sleep(0.05) - # Attempt to call reset_state(). If the fix is NOT applied, this will deadlock - # forever because reset_state() holds the lock while waiting for active tasks/threads, - # blocking the future's done callback from acquiring the lock. + # Verify that calling reset_state() while background tasks are actively running + # completes cleanly without causing lock-ordering deadlocks. async_lib.AsyncWrapper.reset_state() - def test_reset_state_hang_reproduction(self): - # Run the deadlock scenario in a separate process so that if it hangs, - # we can terminate it without causing the main pytest process to hang at exit. + def test_reset_state_concurrent_teardown(self): + # Verify concurrent teardown safety in a separate process to prevent any potential + # regressions from freezing the main pytest process at exit. p = multiprocessing.Process( - target=AsyncTest._run_reset_state_deadlock_scenario, + target=AsyncTest._run_reset_state_concurrent_teardown, args=(self.use_asyncio, )) p.start() p.join(timeout=10.0)