Fix AwsBaseWaiterTrigger losing error details on deferred task failure#64085
Fix AwsBaseWaiterTrigger losing error details on deferred task failure#64085shivaam wants to merge 5 commits intoapache:mainfrom
Conversation
When a deferred AWS task hits a terminal failure state, async_wait() raises AirflowException with the error details. But AwsBaseWaiterTrigger.run() did not catch it — the exception propagated to the triggerer framework which replaced it with a generic "Trigger failure" message. execute_complete() was never called, so operators and on_failure_callbacks lost all error context. Fix: catch AirflowException in run() and yield TriggerEvent with status="error" and the full error message, routing failures through execute_complete() where every operator already handles non-success events. Also add "JobRun.ErrorMessage" to GlueJobCompleteTrigger.status_queries so the actual Glue error text is included alongside JobRunState. Affects all AWS triggers inheriting run() from the base class.
| self.status_queries, | ||
| ) | ||
| except AirflowException as e: | ||
| yield TriggerEvent({"status": "error", "message": str(e), self.return_key: self.return_value}) |
There was a problem hiding this comment.
@o-nikolas
Does the TriggerEvent(status="error") approach look right? If so, I'll
add commits to fix all 8 operators in this PR before we can merge.
There was a problem hiding this comment.
You don't see many try/except/else blocks. I like it.
…rEvent test_run_fail in TestNeptuneClusterInstancesAvailableTrigger expected AirflowException to propagate from run(). With the base trigger fix, the exception is now caught and yielded as TriggerEvent(status="error"). Update the test to assert on the event payload instead.
Both NeptuneStartDbClusterOperator and NeptuneStopDbClusterOperator silently succeeded when the trigger reported a failure — no status validation, so the task was marked success and downstream tasks would start against a cluster that wasn't actually running. Also fixes wrong XCom key (cluster_id → db_cluster_id) that caused empty cluster ID in return value, and removes debug self.log.info(event).
c089cbd to
2bb3ebc
Compare
vincbeck
left a comment
There was a problem hiding this comment.
I like this approach, it also makes the code cleaner
|
Just to clarify this fix also closes #64095 right? |
ferruzzi
left a comment
There was a problem hiding this comment.
I like the plan and think you can proceed with the others.
| self.status_queries, | ||
| ) | ||
| except AirflowException as e: | ||
| yield TriggerEvent({"status": "error", "message": str(e), self.return_key: self.return_value}) |
There was a problem hiding this comment.
You don't see many try/except/else blocks. I like it.
…methods These operators/sensors had missing or incomplete error handling in their execute_complete callbacks. Previously invisible because errors never reached execute_complete (the triggerer crashed with "Trigger failure" before the event was delivered). Now that the base trigger routes errors through TriggerEvent, these methods must handle non-success events. DMS: Add validate_execute_complete_event + status check to all 3 execute_complete and 2 retry_execution methods. Add FailureMessages to DmsReplicationCompleteTrigger status_queries. EMR Serverless: Change == "success" (no else) to != "success" with raise in stop_application, and both execute_complete methods. MWAA: Replace return None with validate + status check in both MwaaDagRunSensor and MwaaTaskSensor execute_complete methods.
Yes, it will close that issue as well |
eladkal
left a comment
There was a problem hiding this comment.
Setting temporary request changes to avoid merge
Note: This PR is blocked until all underlying operators are updated to handle the new error trigger event. Otherwise, these operators will fail silently.
When a deferred AWS task hits a terminal failure state,
async_wait()raisesAirflowExceptionwith the error details. ButAwsBaseWaiterTrigger.run()didnot catch it — the exception propagated to the triggerer framework which replaced
it with a generic "Trigger failure" message.
execute_complete()was never called,so operators and
on_failure_callbacklost all error context.Fix: Catch
AirflowExceptioninrun()and yieldTriggerEvent(status="error", message=str(e)), routing failures throughexecute_complete()where operators handle non-success events.Also adds
"JobRun.ErrorMessage"toGlueJobCompleteTrigger.status_queriesso theactual Glue error text is included alongside
JobRunState.related: #63706
Closes: #64095
Tested on EC2 with real Glue job
Ran a Glue job designed to fail with
GlueJobOperator(deferrable=True).Before (bug):
on_failure_callbackreceivesTaskDeferralError("Trigger failure")— no details.After (fix):
TriggerEventwithstatus="error"and full error messageexecute_complete()is called (stack trace showsglue.py:345) and raises with the detailson_failure_callbackreceives the sameAirflowExceptionwith actual Glue errorChanges in this PR
Core fix:
AwsBaseWaiterTrigger.run()error handlingtriggers/base.py— CatchAirflowExceptionfromasync_wait()and yieldTriggerEvent(status="error", message=str(e))instead of letting it propagatetriggers/glue.py— Add"JobRun.ErrorMessage"toGlueJobCompleteTrigger.status_queriestest_base.py,test_glue.py— Tests for both changesNeptune operator fix (pre-existing bugs found during audit)
operators/neptune.py— BothNeptuneStartDbClusterOperatorandNeptuneStopDbClusterOperatorhad:cluster_id→ should bedb_cluster_id) — downstream tasks got empty stringself.log.info(event)left in production codevalidate_execute_complete_event()consistent with all other operatorstest_neptune.py— Updated trigger test to expectTriggerEventinstead of raised exceptionOperator compatibility audit — feedback requested
This change means
execute_complete()will now be called withstatus="error"eventswhere previously it was never called on failure. I audited all 59
execute_completemethods across AWS operators and sensors:
validated_event["status"] != "success"with raiserun()with own error handling== "success"with no else/raise== "success"with no else/raisereturn NoneunconditionallyMerging the base trigger fix without addressing them would change their failure mode from "Trigger
failure" crash to silent success — which is worse.
Reviewers: Does the
TriggerEvent(status="error")approach look right? If so, I'lladd commits to fix all 7 remaining operators in this PR before merge.
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Claude Opus 4.6) following the guidelines