Skip to content

Commit 656e751

Browse files
committed
fix(pool): tweaks and fixes
1 parent 6ac7b2c commit 656e751

4 files changed

Lines changed: 25 additions & 15 deletions

File tree

sqlx-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ pin-project-lite = "0.2.14"
9696

9797
[dev-dependencies]
9898
sqlx = { workspace = true, features = ["postgres", "sqlite", "mysql", "migrate", "macros", "time", "uuid"] }
99-
tokio = { version = "1", features = ["rt"] }
99+
tokio = { version = "1", features = ["rt", "sync"] }
100100

101101
[lints]
102102
workspace = true

sqlx-core/src/pool/connect.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::pool::PoolConnection;
66
use crate::rt::JoinHandle;
77
use crate::Error;
88
use ease_off::EaseOff;
9-
use event_listener::Event;
9+
use event_listener::{listener, Event};
1010
use std::fmt::{Display, Formatter};
1111
use std::future::Future;
1212
use std::ptr;
@@ -50,7 +50,7 @@ use std::io;
5050
/// let database_url = database_url.clone();
5151
/// async move {
5252
/// println!(
53-
/// "opening connection {}, attempt {}; elapsed time: {}",
53+
/// "opening connection {}, attempt {}; elapsed time: {:?}",
5454
/// meta.pool_size,
5555
/// meta.num_attempts + 1,
5656
/// meta.start.elapsed()
@@ -96,10 +96,10 @@ use std::io;
9696
///
9797
/// let pool = PgPoolOptions::new()
9898
/// .connect_with_connector(move |meta: PoolConnectMetadata| {
99-
/// let connect_opts_ = connect_opts.clone();
99+
/// let connect_opts = connect_opts_.clone();
100100
/// async move {
101101
/// println!(
102-
/// "opening connection {}, attempt {}; elapsed time: {}",
102+
/// "opening connection {}, attempt {}; elapsed time: {:?}",
103103
/// meta.pool_size,
104104
/// meta.num_attempts + 1,
105105
/// meta.start.elapsed()
@@ -318,7 +318,8 @@ impl ConnectionCounter {
318318

319319
pub async fn drain(&self) {
320320
while self.count.load(Ordering::Acquire) > 0 {
321-
self.connect_available.listen().await;
321+
listener!(self.connect_available => permit_released);
322+
permit_released.await;
322323
}
323324
}
324325

@@ -386,13 +387,14 @@ impl ConnectionCounter {
386387
return acquired;
387388
}
388389

389-
self.connect_available.listen().await;
390-
391390
if attempt == 2 {
392391
tracing::warn!(
393392
"unable to acquire a connect permit after sleeping; this may indicate a bug"
394393
);
395394
}
395+
396+
listener!(self.connect_available => connect_available);
397+
connect_available.await;
396398
}
397399

398400
panic!("BUG: was never able to acquire a connection despite waking many times")

sqlx-core/src/pool/idle.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use futures_util::FutureExt;
88
use std::sync::atomic::{AtomicUsize, Ordering};
99
use std::sync::Arc;
1010

11+
use event_listener::listener;
12+
1113
pub struct IdleQueue<DB: Database> {
1214
queue: ArrayQueue<Idle<DB>>,
1315
// Keep a separate count because `ArrayQueue::len()` loops until the head and tail pointers
@@ -36,7 +38,8 @@ impl<DB: Database> IdleQueue<DB> {
3638

3739
for attempt in 1usize.. {
3840
if should_wait {
39-
self.release_event.listen().await;
41+
listener!(self.release_event => release_event);
42+
release_event.await;
4043
}
4144

4245
if let Some(conn) = self.try_acquire(pool) {

sqlx-core/src/pool/inner.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::rt::JoinHandle;
1818
use crate::{private_tracing_dynamic_event, rt};
1919
use either::Either;
2020
use futures_util::future::{self, OptionFuture};
21-
use futures_util::{select, FutureExt};
21+
use futures_util::{FutureExt};
2222
use std::time::{Duration, Instant};
2323
use tracing::Level;
2424

@@ -78,14 +78,19 @@ impl<DB: Database> PoolInner<DB> {
7878

7979
// Keep clearing the idle queue as connections are released until the count reaches zero.
8080
async move {
81-
let mut drained = pin!(self.counter.drain()).fuse();
81+
let mut drained = pin!(self.counter.drain());
8282

8383
loop {
84-
select! {
85-
idle = self.idle.acquire(self) => {
84+
let mut acquire_idle = pin!(self.idle.acquire(self));
85+
86+
// Not using `futures::select!{}` here because it requires a proc-macro dep,
87+
// and frankly it's a little broken.
88+
match future::select(drained.as_mut(), acquire_idle.as_mut()).await {
89+
// *not* `either::Either`; they rolled their own
90+
future::Either::Left(_) => break,
91+
future::Either::Right((idle, _)) => {
8692
idle.close().await;
87-
},
88-
() = drained.as_mut() => break,
93+
}
8994
}
9095
}
9196
}

0 commit comments

Comments
 (0)