From 3dd2e8e08e4d97fd06036f07c4314ad96b1e3151 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Fri, 8 May 2026 19:16:55 +0300 Subject: [PATCH] fix rrio teardown executor cleanup path --- .../apache/beam/io/requestresponse/Call.java | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java index b515957459be..b318cac17379 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java @@ -270,34 +270,35 @@ public void teardown() throws UserCodeExecutionException { Sleeper sleeper = configuration.getSleeperSupplier().get(); backoffIfNeeded(backOff, sleeper); - - if (!configuration.getShouldRepeat()) { - incIfPresent(teardownCounter); - setupTeardown.teardown(); - return; - } - - Repeater repeater = - Repeater.builder() - .setBackOff(backOff) - .setSleeper(sleeper) - .setThrowableFunction( - ignored -> { - incIfPresent(teardownCounter); - setupTeardown.teardown(); - return null; - }) - .build() - .withBackoffCounter(backoffCounter) - .withSleeperCounter(sleeperCounter); - - repeater.apply(null); - - checkStateNotNull(executor).shutdown(); try { - boolean ignored = executor.awaitTermination(3L, TimeUnit.SECONDS); - } catch (InterruptedException ignored) { - // Ignore the interrupt during teardown. + if (!configuration.getShouldRepeat()) { + incIfPresent(teardownCounter); + setupTeardown.teardown(); + return; + } + + Repeater repeater = + Repeater.builder() + .setBackOff(backOff) + .setSleeper(sleeper) + .setThrowableFunction( + ignored -> { + incIfPresent(teardownCounter); + setupTeardown.teardown(); + return null; + }) + .build() + .withBackoffCounter(backoffCounter) + .withSleeperCounter(sleeperCounter); + + repeater.apply(null); + } finally { + checkStateNotNull(executor).shutdown(); + try { + boolean ignored = executor.awaitTermination(3L, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + // Ignore the interrupt during teardown. + } } }