Skip to content

Commit 5d4a5dc

Browse files
committed
WIP refactor: replace sharding with single connection set (5)
1 parent 4b25ab8 commit 5d4a5dc

1 file changed

Lines changed: 52 additions & 63 deletions

File tree

sqlx-core/src/pool/connection_set.rs

Lines changed: 52 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::ext::future::race;
22
use crate::rt;
33
use crate::sync::{AsyncMutex, AsyncMutexGuardArc};
44
use event_listener::{listener, Event, EventListener, IntoNotification};
5-
use futures_core::Stream;
5+
use futures_util::future::{Fuse, FusedFuture};
66
use futures_util::stream::FuturesUnordered;
77
use futures_util::{FutureExt, StreamExt};
88
use std::cmp;
@@ -11,7 +11,7 @@ use std::ops::{Deref, DerefMut, RangeInclusive, RangeToInclusive};
1111
use std::pin::{pin, Pin};
1212
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1313
use std::sync::Arc;
14-
use std::task::Poll;
14+
use std::task::{ready, Poll};
1515
use std::time::Duration;
1616
use tracing::Instrument;
1717

@@ -121,97 +121,84 @@ impl<C> ConnectionSet<C> {
121121
}
122122

123123
async fn acquire_inner(&self, pref: AcquirePreference) -> SlotGuard<C> {
124-
let preferred_slot = current_thread_id() % self.slots.len();
125-
126-
// Always try to lock the connection associated with our thread ID
127-
let mut acquire_preferred = pin!(self.slots[preferred_slot].acquire(pref));
128-
129-
let alternate_slot = (preferred_slot + 547usize.wrapping_mul(
130-
Arc::strong_count(&self.slots[preferred_slot].connection)
131-
)) % self.slots.len();
132-
133-
let mut acquire_alternate = pin!(self.slots[alternate_slot].acquire(pref));
134-
135-
let mut listen_global = pin!(self.global.listen(pref));
124+
let span = tracing::trace_span!(
125+
target: "sqlx::pool::connection_set",
126+
"acquire_inner",
127+
preferred_slot = tracing::field::Empty,
128+
?pref,
129+
);
130+
131+
if self.slots.len() == 1 {
132+
span.record("alternate_slot", 0usize);
133+
return self.slots[0].acquire(pref).instrument(span).await;
134+
}
136135

137-
let mut yielded_1 = false;
138-
let mut yielded_2 = false;
136+
// Always try to lock the connection associated with our thread ID first
137+
let preferred_slot = current_thread_id() % self.slots.len();
138+
span.record("preferred_slot", preferred_slot);
139139

140-
std::future::poll_fn(|cx| {
141-
if let Poll::Ready(locked) = acquire_preferred.as_mut().poll(cx) {
142-
return Poll::Ready(locked);
143-
}
140+
// The number of tasks currently interested in this slot. Always at least 1.
141+
let search_offset = Arc::strong_count(&self.slots[preferred_slot].connection);
144142

145-
if let Poll::Ready(locked) = acquire_alternate.as_mut().poll(cx) {
146-
return Poll::Ready(locked);
143+
let acquire_global = pin!(async {
144+
if let Some(locked) = self.try_acquire(pref, preferred_slot.wrapping_add(search_offset))
145+
{
146+
return locked;
147147
}
148148

149-
// if !yielded_1 {
150-
// cx.waker().wake_by_ref();
151-
// yielded_1 = true;
152-
// return Poll::Pending;
153-
// }
149+
loop {
150+
let slot = self.global.listen(pref).await;
154151

155-
if let Poll::Ready(slot) = listen_global.as_mut().poll(cx) {
156-
if let Some(locked) = self.slots[slot].try_acquire(pref) {
157-
return Poll::Ready(locked);
152+
if let Some(locked) = self.try_acquire(pref, slot) {
153+
return locked;
158154
}
159-
160-
listen_global.as_mut().set(self.global.listen(pref));
161155
}
156+
});
162157

163-
if !yielded_2 {
164-
cx.waker().wake_by_ref();
165-
yielded_2 = true;
166-
return Poll::Pending;
167-
}
158+
let res = race(self.slots[preferred_slot].acquire(pref), acquire_global)
159+
.instrument(span.clone())
160+
.await;
168161

169-
if let Some(locked) = self.try_acquire(pref) {
170-
return Poll::Ready(locked);
162+
let _span = span.enter();
163+
match res {
164+
Ok(preferred) => {
165+
tracing::trace!("acquired from preferred_slot");
166+
preferred
171167
}
172-
173-
Poll::Pending
174-
})
175-
.instrument(tracing::trace_span!(
176-
target: "sqlx::pool::connection_set",
177-
"acquire_inner",
178-
preferred_slot,
179-
?pref,
180-
))
181-
.await
168+
Err(global) => {
169+
tracing::trace!(slot = global.slot.index, "acquired from acquire_global");
170+
global
171+
}
172+
}
182173
}
183174

184175
pub fn try_acquire_connected(&self) -> Option<ConnectedSlot<C>> {
185176
Some(
186-
self.try_acquire(AcquirePreference::Connected)?
177+
self.try_acquire(AcquirePreference::Connected, current_thread_id())?
187178
.assert_connected(),
188179
)
189180
}
190181

191182
pub fn try_acquire_disconnected(&self) -> Option<DisconnectedSlot<C>> {
192183
Some(
193-
self.try_acquire(AcquirePreference::Disconnected)?
184+
self.try_acquire(AcquirePreference::Disconnected, current_thread_id())?
194185
.assert_disconnected(),
195186
)
196187
}
197188

198-
fn try_acquire(&self, pref: AcquirePreference) -> Option<SlotGuard<C>> {
199-
let preferred_slot = current_thread_id() % self.slots.len();
200-
201-
let (slots_before, slots_after) = self.slots.split_at(preferred_slot);
189+
fn try_acquire(&self, pref: AcquirePreference, starting_slot: usize) -> Option<SlotGuard<C>> {
190+
let starting_slot = starting_slot % self.slots.len();
202191

203-
let (preferred_slot, slots_after) = slots_after.split_first().unwrap();
192+
let (slots_before, slots_after) = self.global.locked_set.split_at(starting_slot);
204193

205-
if let Some(locked) = preferred_slot.try_acquire(pref) {
206-
return Some(locked);
207-
}
208-
209-
for slot in slots_before.iter().chain(slots_after).rev() {
210-
if self.global.locked_set[slot.index].load(Ordering::Relaxed) {
194+
for (index, locked) in slots_after.iter().chain(slots_before).enumerate() {
195+
if locked.load(Ordering::Relaxed) {
211196
continue;
212197
}
213198

214-
if let Some(locked) = slot.try_acquire(pref) {
199+
let slot = (starting_slot + index) % self.slots.len();
200+
201+
if let Some(locked) = self.slots[slot].try_acquire(pref) {
215202
return Some(locked);
216203
}
217204
}
@@ -363,6 +350,7 @@ impl<C> Slot<C> {
363350
let locked = crate::sync::lock_arc(&self.connection).await;
364351

365352
self.locked.store(true, Ordering::Relaxed);
353+
self.global.locked_set[self.index].store(true, Ordering::Relaxed);
366354

367355
SlotGuard {
368356
slot: self.clone(),
@@ -374,6 +362,7 @@ impl<C> Slot<C> {
374362
let locked = crate::sync::try_lock_arc(&self.connection)?;
375363

376364
self.locked.store(true, Ordering::Relaxed);
365+
self.global.locked_set[self.index].store(true, Ordering::Relaxed);
377366

378367
Some(SlotGuard {
379368
slot: self.clone(),

0 commit comments

Comments
 (0)