-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patherror.rs
More file actions
482 lines (445 loc) · 16.2 KB
/
error.rs
File metadata and controls
482 lines (445 loc) · 16.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
use serde_json::Value as JsonValue;
use thiserror::Error;
use crate::error::suspend_handle::SuspendMarker;
/// Signals that interrupt task execution without indicating failure.
///
/// These are not errors - they represent intentional control flow that the worker
/// handles specially. When a task returns `Err(TaskError::Control(_))`, the worker
/// will not mark it as failed or trigger retries.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ControlFlow {
/// Task should suspend and resume later.
///
/// Returned by [`TaskContext::sleep_for`](crate::TaskContext::sleep_for)
/// and [`TaskContext::await_event`](crate::TaskContext::await_event)
/// when the task needs to wait.
Suspend(SuspendMarker),
/// Task was cancelled.
///
/// Detected when database operations return error code AB001, indicating
/// the task was cancelled via [`Durable::cancel_task`](crate::Durable::cancel_task).
Cancelled,
/// Task lease expired (claim lost).
///
/// Detected when database operations return error code AB002. Treated as control
/// flow to avoid double-failing runs that were already failed by `claim_task`,
/// and to let the next claim sweep fail the run if it hasn't happened yet.
LeaseExpired,
}
pub mod suspend_handle {
use crate::{TaskContext, TaskResult};
// An internal marker type that helps prevent us from constructing `ControlFlow::Suspend` errors
// without calling `task_context.mark_suspended()` first.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SuspendMarker {
_private: (),
}
impl SuspendMarker {
pub fn new<S: Clone + Send + Sync>(task_context: &mut TaskContext<S>) -> TaskResult<Self> {
task_context.mark_suspended()?;
Ok(Self { _private: () })
}
}
}
/// Error type for task execution.
///
/// This enum distinguishes between control flow signals (suspension, cancellation)
/// and actual failures. The worker handles these differently:
///
/// - `Control(Suspend)` - Task is waiting; worker does nothing (scheduler will resume it)
/// - `Control(Cancelled)` - Task was cancelled; worker does nothing
/// - `Control(LeaseExpired)` - Task lost its lease; worker stops without failing the run
/// - All other variants - Actual errors; worker records failure and may retry
///
/// # Example
///
/// ```ignore
/// match ctx.await_event::<MyPayload>("my-event", Some(Duration::from_secs(30))).await {
/// Ok(payload) => { /* handle payload */ }
/// Err(TaskError::Timeout { step_name }) => {
/// println!("Timed out waiting for {}", step_name);
/// }
/// Err(TaskError::Control(ControlFlow::Cancelled)) => {
/// println!("Task was cancelled");
/// }
/// Err(e) => { /* handle other errors */ }
/// }
/// ```
#[derive(Debug, Error)]
pub enum TaskError {
/// Control flow signal - not an actual error.
///
/// The worker will not mark the task as failed or trigger retries.
#[error("control flow: {0:?}")]
Control(ControlFlow),
/// The operation timed out.
///
/// Returned by [`TaskContext::await_event`](crate::TaskContext::await_event) when
/// a timeout is specified and the event doesn't arrive in time, or by
/// [`TaskContext::join`](crate::TaskContext::join) when waiting for a child task.
#[error("timed out waiting for '{step_name}'")]
Timeout {
/// The name of the step or event that timed out.
step_name: String,
},
/// Database operation failed.
///
/// This includes connection errors, query failures, and transaction issues.
#[error("database error: {0}")]
Database(sqlx::Error),
/// JSON serialization or deserialization failed.
#[error("serialization error: {0}")]
Serialization(serde_json::Error),
//// Error occurred while trying to spawn a subtask
#[error("failed to spawn subtask `{name}`: {error}")]
SubtaskSpawnFailed { name: String, error: DurableError },
/// Error occurred while trying to emit an event.
#[error("failed to emit event `{event_name}`: {error}")]
EmitEventFailed {
event_name: String,
error: DurableError,
},
/// A child task failed.
///
/// Returned by [`TaskContext::join`](crate::TaskContext::join) when the child
/// task completed with an error.
#[error("child task failed at '{step_name}': {message}")]
ChildFailed {
/// The step name used when joining the child task.
step_name: String,
/// The error message from the child task.
message: String,
},
/// A child task was cancelled.
///
/// Returned by [`TaskContext::join`](crate::TaskContext::join) when the child
/// task was cancelled before completion.
#[error("child task was cancelled at '{step_name}'")]
ChildCancelled {
/// The step name used when joining the child task.
step_name: String,
},
/// A validation error occurred.
///
/// This includes errors like reserved step name prefixes, empty event names,
/// reserved header prefixes, and unregistered task names.
#[error("{message}")]
Validation {
/// A description of the validation error.
message: String,
},
/// A user error from task code.
///
/// This variant stores a serialized user error for persistence and retrieval.
/// Created via [`TaskError::user()`] or [`TaskError::user_message()`].
#[error("{message}")]
User {
/// The error message (extracted from "message" field or stringified data)
message: String,
/// Serialized error data for storage/retrieval
error_data: JsonValue,
},
//// The user callback provided to `step` failed.
/// We treat this as a non-deterministic error, and will retry the task
#[error("user step `{base_name}` failed: {error}")]
Step {
base_name: String,
error: anyhow::Error,
},
/// The task panicked.
#[error("task panicked: {message}")]
TaskPanicked {
/// The error message from the task.
message: String,
},
}
impl TaskError {
pub fn retryable(&self) -> bool {
match self {
// These are non-deterministic errors, which might succeed on a retry
// (which will have the same checkpoint cache up to the point of the error)
TaskError::Timeout { .. } | TaskError::Database(_) | TaskError::Step { .. } => true,
// Everything else is considered to be a deterministic error, which will fail again
// on a retry
TaskError::SubtaskSpawnFailed { .. }
| TaskError::EmitEventFailed { .. }
| TaskError::Control(_)
| TaskError::Serialization(_)
| TaskError::ChildFailed { .. }
| TaskError::ChildCancelled { .. }
| TaskError::Validation { .. }
| TaskError::User { .. }
| TaskError::TaskPanicked { .. } => false,
}
}
}
/// Result type alias for task execution.
///
/// Use this as the return type for [`Task::run`](crate::Task::run) implementations.
pub type TaskResult<T> = Result<T, TaskError>;
impl TaskError {
/// Create a user error from arbitrary JSON data.
///
/// If the JSON is an object with a "message" field, that's used for display.
/// Otherwise, the JSON is stringified for the display message.
///
/// ```ignore
/// // With structured data
/// Err(TaskError::user(json!({"message": "Not found", "code": 404})))
///
/// // With any serializable type
/// Err(TaskError::user(MyError { code: 404, details: "..." }))
/// ```
pub fn user(error_data: impl serde::Serialize) -> Self {
let error_data = serde_json::to_value(&error_data).unwrap_or(serde_json::Value::Null);
let message = error_data
.get("message")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| error_data.to_string());
TaskError::User {
message,
error_data,
}
}
/// Create a user error from just a message string.
pub fn user_message(message: impl Into<String>) -> Self {
let message = message.into();
TaskError::User {
error_data: serde_json::Value::String(message.clone()),
message,
}
}
}
impl From<serde_json::Error> for TaskError {
fn from(err: serde_json::Error) -> Self {
TaskError::Serialization(err)
}
}
impl TaskError {
// This is explicitly *not* a `From<sqlx::Error> for TaskError` impl,
// because we don't want user code to be performing database queries directly.
pub(crate) fn from_sqlx_error(err: sqlx::Error) -> Self {
if is_cancelled_error(&err) {
TaskError::Control(ControlFlow::Cancelled)
} else if is_lease_expired_error(&err) {
TaskError::Control(ControlFlow::LeaseExpired)
} else {
TaskError::Database(err)
}
}
}
/// Check if a sqlx error indicates task cancellation (error code AB001)
pub fn is_cancelled_error(err: &sqlx::Error) -> bool {
if let sqlx::Error::Database(db_err) = err {
db_err.code().is_some_and(|c| c == "AB001")
} else {
false
}
}
/// Check if a sqlx error indicates lease expiration (error code AB002)
pub fn is_lease_expired_error(err: &sqlx::Error) -> bool {
if let sqlx::Error::Database(db_err) = err {
db_err.code().is_some_and(|c| c == "AB002")
} else {
false
}
}
/// Serialize a TaskError for storage in fail_run
pub fn serialize_task_error(err: &TaskError) -> JsonValue {
match err {
TaskError::Control(_) => {
// Control flow signals should never be serialized as failures
serde_json::json!({
"name": "ControlFlow",
"message": err.to_string(),
})
}
TaskError::Timeout { step_name } => {
serde_json::json!({
"name": "Timeout",
"message": err.to_string(),
"step_name": step_name,
})
}
TaskError::Database(e) => {
serde_json::json!({
"name": "Database",
"message": e.to_string(),
})
}
TaskError::Serialization(e) => {
serde_json::json!({
"name": "Serialization",
"message": e.to_string(),
})
}
TaskError::SubtaskSpawnFailed { name, error } => {
serde_json::json!({
"name": "SubtaskSpawnFailed",
"message": error.to_string(),
"subtask_name": name,
})
}
TaskError::EmitEventFailed { event_name, error } => {
serde_json::json!({
"name": "EmitEventFailed",
"message": error.to_string(),
"event_name": event_name,
})
}
TaskError::ChildFailed { step_name, message } => {
serde_json::json!({
"name": "ChildFailed",
"message": message,
"step_name": step_name,
})
}
TaskError::ChildCancelled { step_name } => {
serde_json::json!({
"name": "ChildCancelled",
"message": err.to_string(),
"step_name": step_name,
})
}
TaskError::Validation { message } => {
serde_json::json!({
"name": "Validation",
"message": message,
})
}
TaskError::User {
message,
error_data,
} => {
serde_json::json!({
"name": "User",
"message": message,
"error_data": error_data,
})
}
TaskError::Step { base_name, error } => {
serde_json::json!({
"name": "Step",
"base_name": base_name,
"message": error.to_string(),
})
}
TaskError::TaskPanicked { message } => {
serde_json::json!({
"name": "TaskPanicked",
"message": message,
})
}
}
}
/// Error type for Client API operations.
///
/// This enum covers all errors that can occur when using the [`Durable`](crate::Durable) client
/// to spawn tasks, manage queues, and emit events. Unlike [`TaskError`], which is used
/// during task execution and intentionally wraps `anyhow::Error` for flexibility,
/// `DurableError` provides typed variants that callers can match on.
///
/// # Example
///
/// ```ignore
/// match client.spawn::<MyTask>(params).await {
/// Ok(result) => println!("Spawned task {}", result.task_id),
/// Err(DurableError::Database(e)) => eprintln!("Database error: {}", e),
/// Err(DurableError::TaskNotRegistered { task_name }) => {
/// eprintln!("Task {} not registered", task_name);
/// }
/// Err(e) => eprintln!("Other error: {}", e),
/// }
/// ```
#[derive(Debug, Error)]
pub enum DurableError {
/// Database operation failed.
///
/// This includes connection errors, query failures, pool exhaustion, and transaction issues.
#[error("database error: {0}")]
Database(#[from] sqlx::Error),
/// JSON serialization or deserialization failed.
///
/// Occurs when task parameters or event payloads cannot be serialized to JSON.
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
/// Task is not registered with this client.
///
/// Tasks must be registered via [`Durable::register`](crate::Durable::register) before spawning.
#[error("Unknown task: '{task_name}'. Task must be registered before spawning.")]
TaskNotRegistered {
/// The name of the task that was not found in the registry.
task_name: String,
},
/// Task is already registered with this client.
///
/// Each task name must be unique within a client instance.
#[error("task '{task_name}' is already registered. Each task name must be unique.")]
TaskAlreadyRegistered {
/// The name of the task that was already registered.
task_name: String,
},
//// Task params validation failed.
///
/// Returned when the task definition in the registry fails to validate the params
/// (before we attempt to spawn the task in Postgres).
#[error("invalid task parameters for '{task_name}': {message}")]
InvalidTaskParams {
/// The name of the task being spawned
task_name: String,
/// The error message from the task.
message: String,
},
/// Header key uses a reserved prefix.
///
/// User-provided headers cannot start with "durable::" as this prefix
/// is reserved for internal use.
#[error(
"header key '{key}' uses reserved prefix 'durable::'. User headers cannot start with 'durable::'."
)]
ReservedHeaderPrefix {
/// The header key that used the reserved prefix.
key: String,
},
/// Event name validation failed.
#[error("invalid event name: {reason}")]
InvalidEventName {
/// The reason the event name was invalid.
reason: String,
},
/// Configuration validation failed.
///
/// Returned when worker options contain invalid values.
#[error("invalid configuration: {reason}")]
InvalidConfiguration {
/// The reason the configuration was invalid.
reason: String,
},
/// Database returned an unrecognized task state.
#[error("invalid task state: {state}")]
InvalidState {
/// The unrecognized state string.
state: String,
},
/// Schedule name failed validation.
#[error("invalid schedule name `{name}`: {reason}")]
InvalidScheduleName {
/// The invalid schedule name.
name: String,
/// Why the name is invalid.
reason: String,
},
/// Schedule not found.
#[error("schedule `{schedule_name}` not found in queue `{queue_name}`")]
ScheduleNotFound {
/// The schedule name that was not found.
schedule_name: String,
/// The queue name that was searched.
queue_name: String,
},
}
/// Result type alias for Client API operations.
///
/// Use this as the return type for [`Durable`](crate::Durable) method calls.
pub type DurableResult<T> = Result<T, DurableError>;