Skip to content

Commit b9c2d54

Browse files
committed
fix tests
1 parent f5666d8 commit b9c2d54

2 files changed

Lines changed: 11 additions & 20 deletions

File tree

lib/ConnectionPool.cc

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,8 @@ bool ConnectionPool::close() {
5555
return false;
5656
}
5757

58-
std::vector<ClientConnectionPtr> connectionsToClose;
59-
// ClientConnection::close() will remove the connection from the pool, which is not allowed when iterating
60-
// over a map, so we store the connections to close in a vector first and don't iterate the pool when
61-
// closing the connections.
62-
std::unique_lock<std::recursive_mutex> lock(mutex_);
63-
connectionsToClose.reserve(pool_.size());
64-
for (auto&& kv : pool_) {
65-
connectionsToClose.emplace_back(kv.second);
66-
}
67-
pool_.clear();
68-
lock.unlock();
69-
70-
for (auto&& cnx : connectionsToClose) {
58+
for (auto&& kv : releaseConnections()) {
59+
auto& cnx = kv.second;
7160
if (cnx) {
7261
// Close with a fatal error to not let client retry
7362
auto& future = cnx->close(ResultAlreadyClosed);
@@ -96,14 +85,9 @@ bool ConnectionPool::close() {
9685
}
9786

9887
void ConnectionPool::closeAllConnectionsForNewCluster() {
99-
std::unique_lock<std::recursive_mutex> lock(mutex_);
100-
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
101-
auto& cnx = cnxIt->second;
102-
if (cnx) {
103-
cnx->close(ResultDisconnected, true);
104-
}
88+
for (auto&& kv : releaseConnections()) {
89+
kv.second->close(ResultDisconnected, true);
10590
}
106-
pool_.clear();
10791
}
10892

10993
static const std::string getKey(const std::string& logicalAddress, const std::string& physicalAddress,

lib/ConnectionPool.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ class PULSAR_PUBLIC ConnectionPool {
110110
std::uniform_int_distribution<> randomDistribution_;
111111
std::mt19937 randomEngine_;
112112

113+
auto releaseConnections() {
114+
decltype(pool_) pool;
115+
std::lock_guard lock{mutex_};
116+
pool.swap(pool_);
117+
return pool;
118+
}
119+
113120
friend class PulsarFriend;
114121
};
115122
} // namespace pulsar

0 commit comments

Comments
 (0)