Skip to content

Commit 4b25ab8

Browse files
committed
feat: add tracing-flame to benches/any-pool
1 parent 0095644 commit 4b25ab8

5 files changed

Lines changed: 115 additions & 29 deletions

File tree

Cargo.lock

Lines changed: 32 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ criterion = { version = "0.7.0", features = ["async_tokio"] }
241241
libsqlite3-sys = { version = "0.30.1" }
242242

243243
tracing = "0.1.41"
244+
tracing-flame = "0.2.0"
244245
tracing-subscriber = "0.3.20"
245246

246247
# If this is an unconditional dev-dependency then Cargo will *always* try to build `libsqlite3-sys`,

benches/any/pool.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ use std::fmt::{Display, Formatter};
44
use std::thread;
55
use std::time::{Duration, Instant};
66
use tracing::Instrument;
7+
use tracing_flame::FlameLayer;
8+
use tracing_subscriber::{EnvFilter, Layer};
9+
use tracing_subscriber::layer::SubscriberExt;
10+
use tracing_subscriber::util::SubscriberInitExt;
711

812
#[derive(Debug)]
913
struct Input {
@@ -24,7 +28,27 @@ impl Display for Input {
2428

2529
fn bench_pool(c: &mut Criterion) {
2630
sqlx::any::install_default_drivers();
27-
tracing_subscriber::fmt::try_init().ok();
31+
32+
let _guard = if let Ok(path) = dotenvy::var("FLAMEGRAPH_OUT") {
33+
let (layer, guard) = FlameLayer::with_file(&path)
34+
.expect(&format!("error opening path {path:?} (`FLAMEGRAPH_OUT`)"));
35+
36+
tracing_subscriber::registry()
37+
.with(
38+
tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env())
39+
)
40+
.with(layer.with_threads_collapsed(true))
41+
.try_init()
42+
.ok();
43+
44+
tracing::info!("Writing flamegraph to {path:?}");
45+
46+
Some(guard)
47+
} else {
48+
tracing_subscriber::fmt::try_init().ok();
49+
50+
None
51+
};
2852

2953
let database_url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
3054

@@ -136,4 +160,4 @@ fn bench_pool_with(b: &mut Bencher, input: &Input, database_url: &str) {
136160
}
137161

138162
criterion_group!(benches, bench_pool,);
139-
criterion_main!(benches);
163+
criterion_main!(benches);

sqlx-core/src/pool/connection_set.rs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1313
use std::sync::Arc;
1414
use std::task::Poll;
1515
use std::time::Duration;
16+
use tracing::Instrument;
1617

1718
pub struct ConnectionSet<C> {
1819
global: Arc<Global>,
@@ -120,22 +121,37 @@ impl<C> ConnectionSet<C> {
120121
}
121122

122123
async fn acquire_inner(&self, pref: AcquirePreference) -> SlotGuard<C> {
123-
let preferred_slot = current_thread_id() % self.slots.len();
124-
125-
tracing::trace!(preferred_slot, ?pref, "acquire_inner");
124+
let preferred_slot = current_thread_id() % self.slots.len();
126125

127126
// Always try to lock the connection associated with our thread ID
128127
let mut acquire_preferred = pin!(self.slots[preferred_slot].acquire(pref));
129128

129+
let alternate_slot = (preferred_slot + 547usize.wrapping_mul(
130+
Arc::strong_count(&self.slots[preferred_slot].connection)
131+
)) % self.slots.len();
132+
133+
let mut acquire_alternate = pin!(self.slots[alternate_slot].acquire(pref));
134+
130135
let mut listen_global = pin!(self.global.listen(pref));
131136

132-
let mut yielded = false;
137+
let mut yielded_1 = false;
138+
let mut yielded_2 = false;
133139

134140
std::future::poll_fn(|cx| {
135141
if let Poll::Ready(locked) = acquire_preferred.as_mut().poll(cx) {
136142
return Poll::Ready(locked);
137143
}
138144

145+
if let Poll::Ready(locked) = acquire_alternate.as_mut().poll(cx) {
146+
return Poll::Ready(locked);
147+
}
148+
149+
// if !yielded_1 {
150+
// cx.waker().wake_by_ref();
151+
// yielded_1 = true;
152+
// return Poll::Pending;
153+
// }
154+
139155
if let Poll::Ready(slot) = listen_global.as_mut().poll(cx) {
140156
if let Some(locked) = self.slots[slot].try_acquire(pref) {
141157
return Poll::Ready(locked);
@@ -144,9 +160,9 @@ impl<C> ConnectionSet<C> {
144160
listen_global.as_mut().set(self.global.listen(pref));
145161
}
146162

147-
if !yielded {
163+
if !yielded_2 {
148164
cx.waker().wake_by_ref();
149-
yielded = true;
165+
yielded_2 = true;
150166
return Poll::Pending;
151167
}
152168

@@ -156,6 +172,12 @@ impl<C> ConnectionSet<C> {
156172

157173
Poll::Pending
158174
})
175+
.instrument(tracing::trace_span!(
176+
target: "sqlx::pool::connection_set",
177+
"acquire_inner",
178+
preferred_slot,
179+
?pref,
180+
))
159181
.await
160182
}
161183

@@ -174,14 +196,24 @@ impl<C> ConnectionSet<C> {
174196
}
175197

176198
fn try_acquire(&self, pref: AcquirePreference) -> Option<SlotGuard<C>> {
177-
let mut search_slot = current_thread_id() % self.slots.len();
199+
let preferred_slot = current_thread_id() % self.slots.len();
178200

179-
for _ in 0..self.slots.len() {
180-
if let Some(locked) = self.slots[search_slot].try_acquire(pref) {
181-
return Some(locked);
201+
let (slots_before, slots_after) = self.slots.split_at(preferred_slot);
202+
203+
let (preferred_slot, slots_after) = slots_after.split_first().unwrap();
204+
205+
if let Some(locked) = preferred_slot.try_acquire(pref) {
206+
return Some(locked);
207+
}
208+
209+
for slot in slots_before.iter().chain(slots_after).rev() {
210+
if self.global.locked_set[slot.index].load(Ordering::Relaxed) {
211+
continue;
182212
}
183213

184-
search_slot = self.next_slot(search_slot);
214+
if let Some(locked) = slot.try_acquire(pref) {
215+
return Some(locked);
216+
}
185217
}
186218

187219
None
@@ -398,6 +430,7 @@ impl<C> SlotGuard<C> {
398430
let connected = locked.is_some();
399431
self.slot.set_is_connected(connected);
400432
self.slot.locked.store(false, Ordering::Release);
433+
self.slot.global.locked_set[self.slot.index].store(false, Ordering::Relaxed);
401434
connected
402435
})
403436
}

sqlx-core/src/pool/inner.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{private_tracing_dynamic_event, rt};
2020
use event_listener::listener;
2121
use futures_util::future::{self};
2222
use std::time::{Duration, Instant};
23-
use tracing::Level;
23+
use tracing::{Instrument, Level};
2424

2525
const GRACEFUL_CLOSE_TIMEOUT: Duration = Duration::from_secs(5);
2626
const TEST_BEFORE_ACQUIRE_TIMEOUT: Duration = Duration::from_secs(60);
@@ -181,7 +181,7 @@ impl<DB: Database> PoolInner<DB> {
181181
tracing::trace!("waiting for any connection");
182182

183183
let disconnected = match self.connections.acquire_any().await {
184-
Ok(conn) => match finish_acquire(self, conn).await {
184+
Ok(conn) => match self.finish_acquire(conn).await {
185185
Ok(conn) => return Ok(conn),
186186
Err(slot) => slot,
187187
},
@@ -199,14 +199,23 @@ impl<DB: Database> PoolInner<DB> {
199199
match race(&mut connect_task, self.connections.acquire_connected()).await {
200200
Ok(Ok(conn)) => return Ok(conn),
201201
Ok(Err(e)) => return Err(e),
202-
Err(conn) => match finish_acquire(self, conn).await {
202+
Err(conn) => match self.finish_acquire(conn).await {
203203
Ok(conn) => return Ok(conn),
204204
Err(_) => continue,
205205
},
206206
}
207207
}
208208
}
209209

210+
#[inline(always)]
211+
async fn finish_acquire(self: &Arc<Self>, conn: ConnectedSlot<ConnectionInner<DB>>) -> Result<PoolConnection<DB>, DisconnectedSlot<ConnectionInner<DB>>> {
212+
let span = tracing::trace_span!(target: "sqlx::pool", "finish_acquire", connection_id=?conn.id);
213+
214+
finish_acquire(self, conn)
215+
.instrument(span)
216+
.await
217+
}
218+
210219
pub(crate) async fn try_min_connections(
211220
self: &Arc<Self>,
212221
deadline: Option<Instant>,

0 commit comments

Comments
 (0)