Skip to content

Commit 0c6a7c0

Browse files
authored
[Test][C++] Fix flaky ReaderTest.testAsyncRead (#545)
* fix * format * revert test --------- Co-authored-by: zhangzhibiao <zhangzhibiao@bigo.sg>
1 parent e290658 commit 0c6a7c0

1 file changed

Lines changed: 20 additions & 4 deletions

File tree

tests/ReaderTest.cc

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
#include <pulsar/Reader.h>
2222
#include <time.h>
2323

24+
#include <atomic>
25+
#include <functional>
2426
#include <future>
27+
#include <set>
2528
#include <string>
2629
#include <thread>
30+
#include <vector>
2731

2832
#include "HttpHelper.h"
2933
#include "PulsarFriend.h"
@@ -110,15 +114,27 @@ TEST_P(ReaderTest, testAsyncRead) {
110114
ASSERT_EQ(ResultOk, producer.send(msg));
111115
}
112116

117+
// readNextAsync callbacks may complete in any order (e.g. with partitioned topic); collect all 10 then
118+
// verify set
119+
std::string received[10];
120+
std::atomic<int> receivedCount{0};
113121
for (int i = 0; i < 10; i++) {
114-
reader.readNextAsync([i](Result result, const Message& msg) {
122+
reader.readNextAsync([&](Result result, const Message& msg) {
115123
ASSERT_EQ(ResultOk, result);
116-
std::string content = msg.getDataAsString();
117-
std::string expected = "my-message-" + std::to_string(i);
118-
ASSERT_EQ(expected, content);
124+
int idx = receivedCount.fetch_add(1);
125+
if (idx < 10) received[idx] = msg.getDataAsString();
119126
});
120127
}
121128

129+
waitUntil(
130+
std::chrono::seconds(5), [&]() { return receivedCount.load() == 10; }, 1000);
131+
ASSERT_EQ(10, receivedCount.load()) << "Expected 10 messages";
132+
133+
std::set<std::string> receivedSet(received, received + 10);
134+
for (int i = 0; i < 10; i++) {
135+
ASSERT_TRUE(receivedSet.count("my-message-" + std::to_string(i))) << "Missing my-message-" << i;
136+
}
137+
122138
waitUntil(
123139
std::chrono::seconds(5),
124140
[&]() {

0 commit comments

Comments
 (0)