Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow-core/docs/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,14 @@ Triggers can have two options: they can either send execution back to the worker
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
yield TaskSuccessEvent(xcoms={"wait_duration_s": self.duration.total_seconds()})
else:
yield TriggerEvent({"duration": self.duration})

In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator.

Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when the task instance is marked as success or failure.
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs refer to TaskFailureEvent, but the actual event class is TaskFailedEvent (see airflow/triggers/base.py). This mismatch can confuse users copying the example; please update the name here (and elsewhere in this section if applicable).

Suggested change
Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailureEvent``. This XCom will be pushed when the task instance is marked as success or failure.
Note also that in case of direct exit, an XCom can be produced and passed with the ``TaskSuccessEvent`` or ``TaskFailedEvent``. This XCom will be pushed when the task instance is marked as success or failure.

Copilot uses AI. Check for mistakes.

.. note::
Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the Dag parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases.

Expand Down
14 changes: 14 additions & 0 deletions airflow-core/src/airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ def submit_event(cls, trigger_id, event: TriggerEvent, session: Session = NEW_SE
handle_event_submit(event, task_instance=task_instance, session=session)

# Send an event to assets
if event.xcoms:
log.warning(
"Trigger event %i contains XCom values, which cannot be sent to assets or callbacks. XCom values: %s",
trigger_id,
event.xcoms,
Comment on lines +276 to +280
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning logs the full event.xcoms payload. Since XComs can contain sensitive or large data, this can leak secrets into logs and create noisy log volume. Consider logging only the presence/keys (or a count), and only emitting this warning when the trigger actually has asset/callback associations to notify (otherwise this will warn even for task-only triggers).

Copilot uses AI. Check for mistakes.
)
trigger = session.scalars(select(cls).where(cls.id == trigger_id)).one_or_none()
if trigger is None:
# Already deleted for some reason
Expand Down Expand Up @@ -460,6 +466,9 @@ def handle_event_submit(event: TriggerEvent, *, task_instance: TaskInstance, ses
as well as its state to scheduled. It also adds the event's payload
into the kwargs for the task.

If the event includes XCom values, they are pushed to the task instance
before the task is rescheduled.

:param task_instance: The task instance to handle the submit event for.
:param session: The session to be used for the database callback sink.
"""
Expand All @@ -486,6 +495,11 @@ def handle_event_submit(event: TriggerEvent, *, task_instance: TaskInstance, ses
# re-serialize the entire dict using serde to ensure consistent structure
task_instance.next_kwargs = serialize(next_kwargs)

# Push XCom values if provided by the trigger
if event.xcoms:
for key, value in event.xcoms.items():
task_instance.xcom_push(key=key, value=value, session=session)

Comment on lines +498 to +502
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing XComs one-by-one can be expensive: task_instance.xcom_push() calls XComModel.set() which performs a DagRun.id lookup plus delete/insert for each key. If event.xcoms can contain multiple keys, consider batching this (e.g., resolve dag_run_id once and insert multiple rows in one operation) to avoid repeated queries and write amplification.

Copilot uses AI. Check for mistakes.
# Remove ourselves as its trigger
task_instance.trigger_id = None

Expand Down
19 changes: 17 additions & 2 deletions airflow-core/src/airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,23 @@ class TriggerEvent(BaseModel):
Must be natively JSON-serializable, or registered with the airflow serialization code.
"""

def __init__(self, payload, **kwargs):
super().__init__(payload=payload, **kwargs)
xcoms: dict[str, JsonValue] | None = None
"""
Optional XCom values to persist when the event is processed.

If provided, these XCom key-value pairs will be pushed to the task instance
before the task is rescheduled. This allows triggers to write XCom values
directly without requiring the task to resume on a worker first.

Keys are XCom keys and values must be JSON-serializable.

Note: XCom only applies to task instances, not to assets or callbacks. If a trigger event
with XCom values is sent to an asset or callback, the XCom values will be ignored and a
warning will be logged.
"""

def __init__(self, payload, *, xcoms: dict[str, JsonValue] | None = None, **kwargs):
super().__init__(payload=payload, xcoms=xcoms, **kwargs)

def __repr__(self) -> str:
return f"TriggerEvent<{self.payload!r}>"
Expand Down
65 changes: 65 additions & 0 deletions airflow-core/tests/unit/models/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,3 +919,68 @@ def test_kwargs_not_encrypted():

assert trigger.kwargs["param1"] == "value1"
assert trigger.kwargs["param2"] == "value2"


def test_submit_event_with_xcoms(session, create_task_instance):
"""
Tests that events with xcoms submitted to a trigger push XCom values
to the task instance before rescheduling it.
"""
trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
session.add(trigger)
task_instance = create_task_instance(
session=session, logical_date=timezone.utcnow(), state=State.DEFERRED
)
task_instance.trigger_id = trigger.id
task_instance.next_kwargs = {}
session.commit()

event = TriggerEvent("payload", xcoms={"return_value": {"key": "value"}, "extra_key": 42})
Trigger.submit_event(trigger.id, event, session=session)
session.flush()

session.refresh(task_instance)
assert task_instance.state == State.SCHEDULED

xcoms = session.scalars(
XComModel.get_many(
dag_ids=[task_instance.dag_id],
task_ids=[task_instance.task_id],
run_id=task_instance.run_id,
)
).all()
actual_xcoms = {x.key: x.value for x in xcoms}
assert actual_xcoms == {
"return_value": json.dumps({"key": "value"}),
"extra_key": json.dumps(42),
}


def test_submit_event_without_xcoms_does_not_push(session, create_task_instance):
"""
Tests that events without xcoms don't push any XCom values.
"""
trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
session.add(trigger)
task_instance = create_task_instance(
session=session, logical_date=timezone.utcnow(), state=State.DEFERRED
)
task_instance.trigger_id = trigger.id
task_instance.next_kwargs = {}
session.commit()

event = TriggerEvent("payload")
Trigger.submit_event(trigger.id, event, session=session)
session.flush()

session.refresh(task_instance)
assert task_instance.state == State.SCHEDULED

xcoms = session.scalars(
XComModel.get_many(
dag_ids=[task_instance.dag_id],
task_ids=[task_instance.task_id],
run_id=task_instance.run_id,
)
).all()
assert len(xcoms) == 0
Loading