-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsetup.rs
More file actions
130 lines (110 loc) · 3.95 KB
/
setup.rs
File metadata and controls
130 lines (110 loc) · 3.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#![allow(clippy::unwrap_used, clippy::expect_used)]
use durable::{Durable, DurableBuilder, MIGRATOR, WorkerOptions};
use sqlx::{AssertSqlSafe, PgPool};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
/// Counter for unique queue names across benchmark iterations
static QUEUE_COUNTER: AtomicU64 = AtomicU64::new(0);
/// Context for running a single benchmark iteration.
/// Provides isolated database state via unique queue names.
pub struct BenchContext {
pub pool: PgPool,
pub client: Durable,
pub queue_name: String,
}
impl BenchContext {
/// Create a new benchmark context, allowing task registration on the builder.
pub async fn with_builder(f: impl FnOnce(DurableBuilder) -> DurableBuilder) -> Self {
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5436/test".to_string());
let pool = PgPool::connect(&database_url)
.await
.expect("Failed to connect to database");
// Run migrations once per connection (idempotent)
MIGRATOR.run(&pool).await.expect("Failed to run migrations");
// Generate unique queue name for this benchmark run
let counter = QUEUE_COUNTER.fetch_add(1, Ordering::SeqCst);
let queue_name = format!("bench_{}", counter);
let builder = DurableBuilder::new()
.pool(pool.clone())
.queue_name(&queue_name);
let client = f(builder)
.build()
.await
.expect("Failed to create Durable client");
client
.create_queue(None)
.await
.expect("Failed to create queue");
Self {
pool,
client,
queue_name,
}
}
/// Create a new DurableBuilder using the same pool and queue.
/// Useful for creating multiple workers with task registrations.
#[allow(dead_code)]
pub fn new_builder(&self) -> DurableBuilder {
DurableBuilder::new()
.pool(self.pool.clone())
.queue_name(&self.queue_name)
}
/// Clean up the queue after benchmark
pub async fn cleanup(self) {
self.client
.drop_queue(None)
.await
.expect("Failed to drop queue");
}
}
/// Helper to wait for a specific number of tasks to reach a terminal state.
pub async fn wait_for_tasks_complete(
pool: &PgPool,
queue: &str,
expected_count: usize,
timeout_secs: u64,
) -> bool {
let start = std::time::Instant::now();
let timeout = Duration::from_secs(timeout_secs);
loop {
let query = AssertSqlSafe(format!(
"SELECT COUNT(*) FROM durable.t_{} WHERE state IN ('completed', 'failed', 'cancelled')",
queue
));
let (count,): (i64,) = sqlx::query_as(query)
.fetch_one(pool)
.await
.expect("Failed to count tasks");
if count as usize >= expected_count {
return true;
}
if start.elapsed() > timeout {
return false;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
/// Helper to clear completed tasks from the queue for clean iteration.
#[allow(dead_code)]
pub async fn clear_completed_tasks(pool: &PgPool, queue: &str) {
let query = AssertSqlSafe(format!(
"DELETE FROM durable.t_{} WHERE state IN ('completed', 'failed', 'cancelled')",
queue
));
sqlx::query(query)
.execute(pool)
.await
.expect("Failed to clear completed tasks");
}
/// Default worker options optimized for benchmarking
pub fn bench_worker_options(concurrency: usize, claim_timeout: Duration) -> WorkerOptions {
WorkerOptions {
worker_id: None,
concurrency,
poll_interval: Duration::from_millis(1), // Very fast polling for accurate timing
claim_timeout,
batch_size: None, // Use default (= concurrency)
fatal_on_lease_timeout: false,
}
}