diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java index 16d06492cf..a99370908b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java @@ -55,6 +55,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_SCHEMA; @@ -148,7 +149,7 @@ void testRebalanceWithRemoteLog() throws Exception { RemoteLogManager remoteLogManager = tabletServer.getReplicaManager().getRemoteLogManager(); RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb); - RemoteLogManifest manifest = remoteLogTablet.currentManifest(); + RemoteLogManifest manifest = waitForRemoteLogManifestReady(remoteLogTablet); assertThat(manifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH); assertThat(manifest.getTableBucket()).isEqualTo(tb); int remoteLogSize = manifest.getRemoteLogSegmentList().size(); @@ -219,7 +220,10 @@ void testRebalanceWithRemoteLog() throws Exception { RemoteLogManifest newManifest = leaderRlt.currentManifest(); assertThat(newManifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH); assertThat(newManifest.getTableBucket()).isEqualTo(tb); - assertThat(newManifest.getRemoteLogSegmentList().size()).isEqualTo(remoteLogSize); + // remoteLogSize is captured when async copy may not finish all segments yet, + // so the new leader's manifest can contain more segments than remoteLogSize. + assertThat(newManifest.getRemoteLogSegmentList().size()) + .isGreaterThanOrEqualTo(remoteLogSize); } private void fromCoordinatorContext( @@ -314,4 +318,21 @@ private void produceRecordsAndWaitRemoteLogCopy( FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote( new TableBucket(tb.getTableId(), 0)); } + + /** + * Wait until the in-memory remote log manifest is ready with at least one segment. {@link + * FlussClusterExtension#waitUntilSomeLogSegmentsCopyToRemote} only ensures ZK has manifest + * handle, but the in-memory manifest may not be updated yet. + */ + private RemoteLogManifest waitForRemoteLogManifestReady(RemoteLogTablet remoteLogTablet) { + AtomicReference manifestRef = new AtomicReference<>(); + retry( + Duration.ofMinutes(2), + () -> { + RemoteLogManifest m = remoteLogTablet.currentManifest(); + assertThat(m.getRemoteLogSegmentList().size()).isGreaterThan(0); + manifestRef.set(m); + }); + return manifestRef.get(); + } }