Skip to content

Add XCom Push Support for Triggerer#64068

Draft
jscheffl wants to merge 4 commits intoapache:mainfrom
jscheffl:feature/add-xcom-push-support-for-triggerer
Draft

Add XCom Push Support for Triggerer#64068
jscheffl wants to merge 4 commits intoapache:mainfrom
jscheffl:feature/add-xcom-push-support-for-triggerer

Conversation

@jscheffl
Copy link
Copy Markdown
Contributor

In devlist discussion https://lists.apache.org/thread/6znvd5rtqnxt5r4hys7qn64j5mflr9g1 following PR #63489 to propose for directly enqueue tasks it came up that KPO mainly returns to worker to execute callbacks and retrieve XComs.

As callbacks would be hard to be executed in Triggerer but XComs are a blocker for most users when using KPO, this PR enabled returning XCom data from triggerers in Airflow Core. With this it would be possible to close and complete Pods with standard KPO if this is added to Airflow 3.2.0 scope.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Claude Opus 4.6


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Copy link
Copy Markdown
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the long term, I think we should consider adding a supervisor to the triggerer — similar to how the task SDK works — so that triggers can interact with XCom, variables, and other components through a proper SDK interface. That said, the current approach of pushing XCom at the models layer during event processing is a pragmatic solution and looks good for now.

Could you please document the new approach? (LGTM once it's done)

@dabla
Copy link
Copy Markdown
Contributor

dabla commented Mar 22, 2026

For the long term, I think we should consider adding a supervisor to the triggerer — similar to how the task SDK works — so that triggers can interact with XCom, variables, and other components through a proper SDK interface. That said, the current approach of pushing XCom at the models layer during event processing is a pragmatic solution and looks good for now.

Could you please document the new approach? (LGTM once it's done)

Like the idea!

@dabla
Copy link
Copy Markdown
Contributor

dabla commented Mar 22, 2026

@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s.

@jscheffl
Copy link
Copy Markdown
Contributor Author

@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s.

Does not work, the self.task_instance in the triggerer execution is from type airflow.executors.workloads.TaskInstance and this is a minimal stup with no xcom_push() implementation.

Also a manual XCom push is not working as in the triggerer code there is no DB session.

So, yes this proposal here has the trade-off that XCom is serialized once in the event and then another time from the event back to XCom.

@hussein-awala
Copy link
Copy Markdown
Member

A note on the API design: since xcoms is now on TriggerEvent (the base class), it's also available on events yielded by BaseEventTrigger subclasses (asset watchers). However, in submit_event(), the asset path only uses event.payload — any xcoms set on a watcher event would be silently ignored. This isn't a bug, but it could be confusing for users who set xcoms on a watcher trigger and expect them to land somewhere. It might be worth either:

  1. Adding a warning log in submit_event() when event.xcoms is set but there are no deferred task instances to push them to, or
  2. Documenting that xcoms only applies to task-deferred triggers, not asset watchers.

@jscheffl jscheffl force-pushed the feature/add-xcom-push-support-for-triggerer branch from 254c9c0 to e9085ea Compare March 22, 2026 21:30
@uranusjr
Copy link
Copy Markdown
Member

uranusjr commented Mar 23, 2026

At a quick glance I think this won’t work for custom XCom backends? Since those values can’t make through API call serde.

@dabla
Copy link
Copy Markdown
Contributor

dabla commented Mar 23, 2026

@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s.

Does not work, the self.task_instance in the triggerer execution is from type airflow.executors.workloads.TaskInstance and this is a minimal stup with no xcom_push() implementation.

Also a manual XCom push is not working as in the triggerer code there is no DB session.

So, yes this proposal here has the trade-off that XCom is serialized once in the event and then another time from the event back to XCom.

I could if this PR would get merged, we would just need to always convert the airflow.executors.workloads.TaskInstance to the RuntimeTaskInstance instead of only when start_from_trigger is enabled.

Here is the code that handles it: https://github.com/apache/airflow/pull/55068/changes#diff-e4cc497f1c786d142ce4c930f43e33b0bb4b53d375d274278fa82f4d5567608aR1005

@jscheffl
Copy link
Copy Markdown
Contributor Author

@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s.

Does not work, the self.task_instance in the triggerer execution is from type airflow.executors.workloads.TaskInstance and this is a minimal stup with no xcom_push() implementation.
Also a manual XCom push is not working as in the triggerer code there is no DB session.
So, yes this proposal here has the trade-off that XCom is serialized once in the event and then another time from the event back to XCom.

I could if this PR would get merged, we would just need to always convert the airflow.executors.workloads.TaskInstance to the RuntimeTaskInstance instead of only when start_from_trigger is enabled.

Here is the code that handles it: https://github.com/apache/airflow/pull/55068/changes#diff-e4cc497f1c786d142ce4c930f43e33b0bb4b53d375d274278fa82f4d5567608aR1005

Oh, ah, yeah! That would make it easier.

Whereas also respecting the comments from @uranusjr the integration of a custom XCom backend also is limited in Triggerer? Because usually no XCom backend implements IO in async... so I am not sure besides the other PR proposed this is also adding a hairball of complexity to "just push XCom" from Triggerer :-(

@uranusjr uranusjr modified the milestones: Airflow 3.2.0, Airflow 3.3.0 Mar 25, 2026
Copy link
Copy Markdown
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gonna move this to 3.3 for now, there are still multiple points to discuss and we’re too close to the split. Putting this red marker here to prevent someone from accidentally merging.

@jscheffl jscheffl marked this pull request as draft March 29, 2026 21:07
@kaxil kaxil requested a review from Copilot April 2, 2026 00:43
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends trigger events to optionally carry XCom key/value pairs so triggerers can persist XComs when resuming deferred task instances (reducing the need to bounce back to a worker purely to push XComs).

Changes:

  • Add an optional xcoms payload to TriggerEvent and wire it through event construction/serialization.
  • Persist event.xcoms into XCom when handling trigger events for deferred TaskInstances.
  • Add unit tests and update deferral documentation examples to demonstrate XCom-from-trigger behavior.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
airflow-core/tests/unit/models/test_trigger.py Adds unit tests asserting XComs are written (or not) when submitting events with/without xcoms.
airflow-core/src/airflow/triggers/base.py Introduces TriggerEvent.xcoms as an optional JSON-compatible dict and updates constructor signature.
airflow-core/src/airflow/models/trigger.py Pushes XComs during handle_event_submit() and logs a warning for XComs on asset/callback notification path.
airflow-core/docs/authoring-and-scheduling/deferring.rst Updates example to show passing XComs via TaskSuccessEvent.

Comment on lines +276 to +280
if event.xcoms:
log.warning(
"Trigger event %i contains XCom values, which cannot be sent to assets or callbacks. XCom values: %s",
trigger_id,
event.xcoms,
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning logs the full event.xcoms payload. Since XComs can contain sensitive or large data, this can leak secrets into logs and create noisy log volume. Consider logging only the presence/keys (or a count), and only emitting this warning when the trigger actually has asset/callback associations to notify (otherwise this will warn even for task-only triggers).

Copilot uses AI. Check for mistakes.
Comment on lines +498 to +502
# Push XCom values if provided by the trigger
if event.xcoms:
for key, value in event.xcoms.items():
task_instance.xcom_push(key=key, value=value, session=session)

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing XComs one-by-one can be expensive: task_instance.xcom_push() calls XComModel.set() which performs a DagRun.id lookup plus delete/insert for each key. If event.xcoms can contain multiple keys, consider batching this (e.g., resolve dag_run_id once and insert multiple rows in one operation) to avoid repeated queries and write amplification.

Copilot uses AI. Check for mistakes.

In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator.

Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when the task instance is marked as success or failure.
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs refer to TaskFailureEvent, but the actual event class is TaskFailedEvent (see airflow/triggers/base.py). This mismatch can confuse users copying the example; please update the name here (and elsewhere in this section if applicable).

Suggested change
Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when the task instance is marked as success or failure.
Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailedEvent``. This XCom will be pushed when the task instance is marked as success or failure.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants