Skip to content

Fix AwsBaseWaiterTrigger losing error details on deferred task failure#64085

Open
shivaam wants to merge 5 commits intoapache:mainfrom
shivaam:glue-deferred-fix-clean-pr
Open

Fix AwsBaseWaiterTrigger losing error details on deferred task failure#64085
shivaam wants to merge 5 commits intoapache:mainfrom
shivaam:glue-deferred-fix-clean-pr

Conversation

@shivaam
Copy link
Contributor

@shivaam shivaam commented Mar 23, 2026

Note: This PR is blocked until all underlying operators are updated to handle the new error trigger event. Otherwise, these operators will fail silently.

When a deferred AWS task hits a terminal failure state, async_wait() raises
AirflowException with the error details. But AwsBaseWaiterTrigger.run() did
not catch it — the exception propagated to the triggerer framework which replaced
it with a generic "Trigger failure" message. execute_complete() was never called,
so operators and on_failure_callback lost all error context.

Fix: Catch AirflowException in run() and yield
TriggerEvent(status="error", message=str(e)), routing failures through
execute_complete() where operators handle non-success events.

Also adds "JobRun.ErrorMessage" to GlueJobCompleteTrigger.status_queries so the
actual Glue error text is included alongside JobRunState.

related: #63706
Closes: #64095

Tested on EC2 with real Glue job

Ran a Glue job designed to fail with GlueJobOperator(deferrable=True).

Before (bug): on_failure_callback receives TaskDeferralError("Trigger failure") — no details.

After (fix):

[2026-03-20 22:53:41] INFO - Status of AWS Glue job is: RUNNING
[2026-03-20 22:53:52] INFO - Status of AWS Glue job is: RUNNING
[2026-03-20 22:54:02] INFO - Status of AWS Glue job is: RUNNING
[2026-03-20 22:54:12] INFO - Status of AWS Glue job is: RUNNING
[2026-03-20 22:54:22] INFO - Trigger fired event result=TriggerEvent<{'status': 'error',
    'message': 'AWS Glue job failed.: FAILED - RuntimeError: GLUE_TEST: Intentional failure
    to test error propagation in deferrable mode\nWaiter job_complete failed: Waiter
    encountered a terminal failure state: For expression "JobRun.JobRunState" we matched
    expected path: "FAILED"',
    'run_id': 'jr_a485d84c34f952e1c8d9d3199455d2d6eda07529034ceb7e1b554514d367f417'}>
[2026-03-20 22:54:25] ERROR - Task failed with exception
AirflowException: Error in glue job: {'run_id': 'jr_a485d84c...', 'status': 'error',
    'message': 'AWS Glue job failed.: FAILED - RuntimeError: GLUE_TEST: Intentional failure
    to test error propagation in deferrable mode...'}
File ".../providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py", line 345
    in execute_complete
  • Trigger yields TriggerEvent with status="error" and full error message
  • execute_complete() is called (stack trace shows glue.py:345) and raises with the details
  • on_failure_callback receives the same AirflowException with actual Glue error

Changes in this PR

Core fix: AwsBaseWaiterTrigger.run() error handling

  • triggers/base.py — Catch AirflowException from async_wait() and yield TriggerEvent(status="error", message=str(e)) instead of letting it propagate
  • triggers/glue.py — Add "JobRun.ErrorMessage" to GlueJobCompleteTrigger.status_queries
  • test_base.py, test_glue.py — Tests for both changes

Neptune operator fix (pre-existing bugs found during audit)

  • operators/neptune.py — Both NeptuneStartDbClusterOperator and NeptuneStopDbClusterOperator had:
    • No error status validation — silently succeeded on failure
    • Wrong XCom key (cluster_id → should be db_cluster_id) — downstream tasks got empty string
    • Debug self.log.info(event) left in production code
    • Now uses validate_execute_complete_event() consistent with all other operators
  • test_neptune.py — Updated trigger test to expect TriggerEvent instead of raised exception

Operator compatibility audit — feedback requested

This change means execute_complete() will now be called with status="error" events
where previously it was never called on failure. I audited all 59 execute_complete
methods across AWS operators and sensors:

  • 47 are safe — use validated_event["status"] != "success" with raise
  • 4 not affected — their triggers override run() with own error handling
  • 1 fixed in this PR — Neptune (see above)
  • 7 remaining — would silently succeed on error events:
Operator Issue
DmsDeleteReplicationConfigOperator No status check
DmsStartReplicationOperator No status check
DmsStopReplicationOperator No status check
EmrServerlessStopApplicationOperator == "success" with no else/raise
EmrServerlessDeleteApplicationOperator == "success" with no else/raise
EmrServerlessStopApplicationOperator.stop_application Deferred callback with same issue
MwaaTaskSensor return None unconditionally

Merging the base trigger fix without addressing them would change their failure mode from "Trigger
failure" crash to silent success — which is worse.

Reviewers: Does the TriggerEvent(status="error") approach look right? If so, I'll
add commits to fix all 7 remaining operators in this PR before merge.


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Claude Opus 4.6)

Generated-by: Claude Code (Claude Opus 4.6) following the guidelines

When a deferred AWS task hits a terminal failure state, async_wait()
raises AirflowException with the error details. But AwsBaseWaiterTrigger.run()
did not catch it — the exception propagated to the triggerer framework which
replaced it with a generic "Trigger failure" message. execute_complete() was
never called, so operators and on_failure_callbacks lost all error context.

Fix: catch AirflowException in run() and yield TriggerEvent with
status="error" and the full error message, routing failures through
execute_complete() where every operator already handles non-success events.

Also add "JobRun.ErrorMessage" to GlueJobCompleteTrigger.status_queries
so the actual Glue error text is included alongside JobRunState.

Affects all AWS triggers inheriting run() from the base class.
@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Mar 23, 2026
self.status_queries,
)
except AirflowException as e:
yield TriggerEvent({"status": "error", "message": str(e), self.return_key: self.return_value})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@o-nikolas
Does the TriggerEvent(status="error") approach look right? If so, I'll
add commits to fix all 8 operators in this PR before we can merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't see many try/except/else blocks. I like it.

@shivaam shivaam marked this pull request as ready for review March 23, 2026 01:37
@shivaam shivaam requested a review from o-nikolas as a code owner March 23, 2026 01:37
shivaam added 2 commits March 22, 2026 21:33
…rEvent

test_run_fail in TestNeptuneClusterInstancesAvailableTrigger expected
AirflowException to propagate from run(). With the base trigger fix,
the exception is now caught and yielded as TriggerEvent(status="error").
Update the test to assert on the event payload instead.
Both NeptuneStartDbClusterOperator and NeptuneStopDbClusterOperator
silently succeeded when the trigger reported a failure — no status
validation, so the task was marked success and downstream tasks would
start against a cluster that wasn't actually running.

Also fixes wrong XCom key (cluster_id → db_cluster_id) that caused
empty cluster ID in return value, and removes debug self.log.info(event).
Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach, it also makes the code cleaner

@eladkal
Copy link
Contributor

eladkal commented Mar 23, 2026

Just to clarify this fix also closes #64095 right?

Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the plan and think you can proceed with the others.

self.status_queries,
)
except AirflowException as e:
yield TriggerEvent({"status": "error", "message": str(e), self.return_key: self.return_value})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't see many try/except/else blocks. I like it.

shivaam added 2 commits March 23, 2026 18:44
…methods

These operators/sensors had missing or incomplete error handling in their
execute_complete callbacks. Previously invisible because errors never
reached execute_complete (the triggerer crashed with "Trigger failure"
before the event was delivered). Now that the base trigger routes errors
through TriggerEvent, these methods must handle non-success events.

DMS: Add validate_execute_complete_event + status check to all 3
execute_complete and 2 retry_execution methods. Add FailureMessages
to DmsReplicationCompleteTrigger status_queries.

EMR Serverless: Change == "success" (no else) to != "success" with raise
in stop_application, and both execute_complete methods.

MWAA: Replace return None with validate + status check in both
MwaaDagRunSensor and MwaaTaskSensor execute_complete methods.
@shivaam
Copy link
Contributor Author

shivaam commented Mar 24, 2026

Just to clarify this fix also closes #64095 right?

Yes, it will close that issue as well

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting temporary request changes to avoid merge

Note: This PR is blocked until all underlying operators are updated to handle the new error trigger event. Otherwise, these operators will fail silently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

GlueJobOperator in deffered mode does not return error message

4 participants