-
Notifications
You must be signed in to change notification settings - Fork 16.9k
Add XCom Push Support for Triggerer #64068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -273,6 +273,12 @@ def submit_event(cls, trigger_id, event: TriggerEvent, session: Session = NEW_SE | |
| handle_event_submit(event, task_instance=task_instance, session=session) | ||
|
|
||
| # Send an event to assets | ||
| if event.xcoms: | ||
| log.warning( | ||
| "Trigger event %i contains XCom values, which cannot be sent to assets or callbacks. XCom values: %s", | ||
| trigger_id, | ||
| event.xcoms, | ||
|
Comment on lines
+276
to
+280
|
||
| ) | ||
| trigger = session.scalars(select(cls).where(cls.id == trigger_id)).one_or_none() | ||
| if trigger is None: | ||
| # Already deleted for some reason | ||
|
|
@@ -460,6 +466,9 @@ def handle_event_submit(event: TriggerEvent, *, task_instance: TaskInstance, ses | |
| as well as its state to scheduled. It also adds the event's payload | ||
| into the kwargs for the task. | ||
|
|
||
| If the event includes XCom values, they are pushed to the task instance | ||
| before the task is rescheduled. | ||
|
|
||
| :param task_instance: The task instance to handle the submit event for. | ||
| :param session: The session to be used for the database callback sink. | ||
| """ | ||
|
|
@@ -486,6 +495,11 @@ def handle_event_submit(event: TriggerEvent, *, task_instance: TaskInstance, ses | |
| # re-serialize the entire dict using serde to ensure consistent structure | ||
| task_instance.next_kwargs = serialize(next_kwargs) | ||
|
|
||
| # Push XCom values if provided by the trigger | ||
| if event.xcoms: | ||
| for key, value in event.xcoms.items(): | ||
| task_instance.xcom_push(key=key, value=value, session=session) | ||
|
|
||
|
Comment on lines
+498
to
+502
|
||
| # Remove ourselves as its trigger | ||
| task_instance.trigger_id = None | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs refer to
TaskFailureEvent, but the actual event class isTaskFailedEvent(seeairflow/triggers/base.py). This mismatch can confuse users copying the example; please update the name here (and elsewhere in this section if applicable).