Conversation
hussein-awala
left a comment
There was a problem hiding this comment.
For the long term, I think we should consider adding a supervisor to the triggerer — similar to how the task SDK works — so that triggers can interact with XCom, variables, and other components through a proper SDK interface. That said, the current approach of pushing XCom at the models layer during event processing is a pragmatic solution and looks good for now.
Could you please document the new approach? (LGTM once it's done)
Like the idea! |
|
@jscheffl Why not push XCom’s directly on task_instance of trigger? Why? Because now the XCom’s are passed via the EventTrigger which is fully persisted in DB. I would push them directly via task_instance on trigger, it won’t be persisted with the event in the DB, which can grow easily with XCom’s. |
Does not work, the Also a manual XCom push is not working as in the triggerer code there is no DB session. So, yes this proposal here has the trade-off that XCom is serialized once in the event and then another time from the event back to XCom. |
|
A note on the API design: since xcoms is now on
|
254c9c0 to
e9085ea
Compare
|
At a quick glance I think this won’t work for custom XCom backends? Since those values can’t make through API call serde. |
I could if this PR would get merged, we would just need to always convert the Here is the code that handles it: https://github.com/apache/airflow/pull/55068/changes#diff-e4cc497f1c786d142ce4c930f43e33b0bb4b53d375d274278fa82f4d5567608aR1005 |
Oh, ah, yeah! That would make it easier. Whereas also respecting the comments from @uranusjr the integration of a custom XCom backend also is limited in Triggerer? Because usually no XCom backend implements IO in async... so I am not sure besides the other PR proposed this is also adding a hairball of complexity to "just push XCom" from Triggerer :-( |
uranusjr
left a comment
There was a problem hiding this comment.
Gonna move this to 3.3 for now, there are still multiple points to discuss and we’re too close to the split. Putting this red marker here to prevent someone from accidentally merging.
There was a problem hiding this comment.
Pull request overview
This PR extends trigger events to optionally carry XCom key/value pairs so triggerers can persist XComs when resuming deferred task instances (reducing the need to bounce back to a worker purely to push XComs).
Changes:
- Add an optional
xcomspayload toTriggerEventand wire it through event construction/serialization. - Persist
event.xcomsinto XCom when handling trigger events for deferredTaskInstances. - Add unit tests and update deferral documentation examples to demonstrate XCom-from-trigger behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| airflow-core/tests/unit/models/test_trigger.py | Adds unit tests asserting XComs are written (or not) when submitting events with/without xcoms. |
| airflow-core/src/airflow/triggers/base.py | Introduces TriggerEvent.xcoms as an optional JSON-compatible dict and updates constructor signature. |
| airflow-core/src/airflow/models/trigger.py | Pushes XComs during handle_event_submit() and logs a warning for XComs on asset/callback notification path. |
| airflow-core/docs/authoring-and-scheduling/deferring.rst | Updates example to show passing XComs via TaskSuccessEvent. |
| if event.xcoms: | ||
| log.warning( | ||
| "Trigger event %i contains XCom values, which cannot be sent to assets or callbacks. XCom values: %s", | ||
| trigger_id, | ||
| event.xcoms, |
There was a problem hiding this comment.
The warning logs the full event.xcoms payload. Since XComs can contain sensitive or large data, this can leak secrets into logs and create noisy log volume. Consider logging only the presence/keys (or a count), and only emitting this warning when the trigger actually has asset/callback associations to notify (otherwise this will warn even for task-only triggers).
| # Push XCom values if provided by the trigger | ||
| if event.xcoms: | ||
| for key, value in event.xcoms.items(): | ||
| task_instance.xcom_push(key=key, value=value, session=session) | ||
|
|
There was a problem hiding this comment.
Pushing XComs one-by-one can be expensive: task_instance.xcom_push() calls XComModel.set() which performs a DagRun.id lookup plus delete/insert for each key. If event.xcoms can contain multiple keys, consider batching this (e.g., resolve dag_run_id once and insert multiple rows in one operation) to avoid repeated queries and write amplification.
|
|
||
| In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator. | ||
|
|
||
| Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when the task instance is marked as success or failure. |
There was a problem hiding this comment.
The docs refer to TaskFailureEvent, but the actual event class is TaskFailedEvent (see airflow/triggers/base.py). This mismatch can confuse users copying the example; please update the name here (and elsewhere in this section if applicable).
| Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when the task instance is marked as success or failure. | |
| Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailedEvent``. This XCom will be pushed when the task instance is marked as success or failure. |
In devlist discussion https://lists.apache.org/thread/6znvd5rtqnxt5r4hys7qn64j5mflr9g1 following PR #63489 to propose for directly enqueue tasks it came up that KPO mainly returns to worker to execute callbacks and retrieve XComs.
As callbacks would be hard to be executed in Triggerer but XComs are a blocker for most users when using KPO, this PR enabled returning XCom data from triggerers in Airflow Core. With this it would be possible to close and complete Pods with standard KPO if this is added to Airflow 3.2.0 scope.
Was generative AI tooling used to co-author this PR?
Claude Opus 4.6
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.