Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
749d102
feat: Add GetExtendedAgentCard Support to RequestHandlers
sokoliva Apr 1, 2026
80a6c9f
format
sokoliva Apr 1, 2026
5587296
Merge branch '1.0-dev' into GetExtendedAgentCard
sokoliva Apr 1, 2026
ae44764
bring back if inspect.iscoroutinefunction(function): check
sokoliva Apr 1, 2026
c2de945
Merge branch '1.0-dev' of https://github.com/a2aproject/a2a-python in…
sokoliva Apr 1, 2026
3169d76
fix validate function
sokoliva Apr 1, 2026
9b0b522
remove agent_card property
sokoliva Apr 2, 2026
89296dd
bring back aget_card
sokoliva Apr 2, 2026
ee10c1f
remove PushNotificationConfigNotFoundError
sokoliva Apr 2, 2026
628714c
remove normalization of rpc_url in create_jsonrpc_routes
sokoliva Apr 2, 2026
bcde236
bring back raise UnsupportedOperationError - remove in another PR
sokoliva Apr 2, 2026
21667bd
from a2a.utils.errors import UnsupportedOperationError
sokoliva Apr 2, 2026
90a49c6
add @validate_version(constants.PROTOCOL_VERSION_1_0) to handler in h…
sokoliva Apr 2, 2026
3296ae8
add GetExtendedCard method to compat request handler
sokoliva Apr 2, 2026
93720a9
Merge branch '1.0-dev' into GetExtendedAgentCard
sokoliva Apr 2, 2026
edf8ccb
move `validate` function to `request_handler` since it is only used i…
sokoliva Apr 2, 2026
e2361ad
improve default request handler docsting
sokoliva Apr 2, 2026
726b648
add get extended agent card test for the default request handler
sokoliva Apr 2, 2026
76b949d
Merge branch '1.0-dev' of https://github.com/a2aproject/a2a-python in…
sokoliva Apr 7, 2026
0cc8adc
Few minor fixes
sokoliva Apr 7, 2026
59de65f
fix GetAgentCard in gRPC
sokoliva Apr 7, 2026
9b4922f
Merge branch '1.0-dev' of https://github.com/a2aproject/a2a-python in…
sokoliva Apr 7, 2026
2542cd6
fix merge errors
sokoliva Apr 8, 2026
6b245fb
Merge branch '1.0-dev' into GetExtendedAgentCard
sokoliva Apr 8, 2026
6120254
add validate checks
sokoliva Apr 8, 2026
3697191
itk update
sokoliva Apr 8, 2026
5c5720e
Merge branch '1.0-dev' into GetExtendedAgentCard
sokoliva Apr 8, 2026
2683a77
update GetAgentCard method
sokoliva Apr 8, 2026
c1ee177
Merge branch 'GetExtendedAgentCard' of https://github.com/sokoliva/a2…
sokoliva Apr 8, 2026
396a216
remove logger
sokoliva Apr 8, 2026
9f8f587
fix error handling in default request handler
sokoliva Apr 8, 2026
967ada3
fix tests
sokoliva Apr 8, 2026
df8d006
fix error handling in request handler
sokoliva Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions itk/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,16 @@ async def main_async(http_port: int, grpc_port: int) -> None:
handler = DefaultRequestHandler(
agent_executor=V10AgentExecutor(),
task_store=task_store,
agent_card=agent_card,
queue_manager=InMemoryQueueManager(),
)

handler_extended = DefaultRequestHandler(
agent_executor=V10AgentExecutor(),
task_store=task_store,
agent_card=agent_card,
queue_manager=InMemoryQueueManager(),
extended_agent_card=agent_card,
)
Comment thread
sokoliva marked this conversation as resolved.

app = FastAPI()
Expand All @@ -301,9 +310,7 @@ async def main_async(http_port: int, grpc_port: int) -> None:
agent_card=agent_card, card_url='/.well-known/agent-card.json'
)
jsonrpc_routes = create_jsonrpc_routes(
agent_card=agent_card,
request_handler=handler,
extended_agent_card=agent_card,
request_handler=handler_extended,
rpc_url='/',
enable_v0_3_compat=True,
)
Expand All @@ -313,7 +320,6 @@ async def main_async(http_port: int, grpc_port: int) -> None:
)

rest_routes = create_rest_routes(
agent_card=agent_card,
request_handler=handler,
enable_v0_3_compat=True,
)
Expand All @@ -323,7 +329,7 @@ async def main_async(http_port: int, grpc_port: int) -> None:

compat_servicer = CompatGrpcHandler(agent_card, handler)
a2a_v0_3_pb2_grpc.add_A2AServiceServicer_to_server(compat_servicer, server)
servicer = GrpcHandler(agent_card, handler)
servicer = GrpcHandler(handler)
a2a_pb2_grpc.add_A2AServiceServicer_to_server(servicer, server)

server.add_insecure_port(f'127.0.0.1:{grpc_port}')
Expand Down
8 changes: 4 additions & 4 deletions samples/hello_world_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,17 @@ async def serve(

task_store = InMemoryTaskStore()
request_handler = DefaultRequestHandler(
agent_executor=SampleAgentExecutor(), task_store=task_store
agent_executor=SampleAgentExecutor(),
task_store=task_store,
agent_card=agent_card,
)

rest_routes = create_rest_routes(
agent_card=agent_card,
request_handler=request_handler,
path_prefix='/a2a/rest',
enable_v0_3_compat=True,
)
jsonrpc_routes = create_jsonrpc_routes(
agent_card=agent_card,
request_handler=request_handler,
rpc_url='/a2a/jsonrpc',
enable_v0_3_compat=True,
Expand All @@ -216,7 +216,7 @@ async def serve(

grpc_server = grpc.aio.server()
grpc_server.add_insecure_port(f'{host}:{grpc_port}')
servicer = GrpcHandler(agent_card, request_handler)
servicer = GrpcHandler(request_handler)
a2a_pb2_grpc.add_A2AServiceServicer_to_server(servicer, grpc_server)

compat_grpc_server = grpc.aio.server()
Expand Down
115 changes: 97 additions & 18 deletions src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging

from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import cast

from a2a.server.agent_execution import (
Expand Down Expand Up @@ -32,8 +32,10 @@
TaskStore,
)
from a2a.types.a2a_pb2 import (
AgentCard,
CancelTaskRequest,
DeleteTaskPushNotificationConfigRequest,
GetExtendedAgentCardRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigsRequest,
Expand All @@ -48,13 +50,16 @@
TaskState,
)
from a2a.utils.errors import (
ExtendedAgentCardNotConfiguredError,
InternalError,
InvalidParamsError,
PushNotificationConfigNotFoundError,
PushNotificationNotSupportedError,
TaskNotCancelableError,
TaskNotFoundError,
UnsupportedOperationError,
)
from a2a.utils.helpers import maybe_await, validate
from a2a.utils.task import (
apply_history_length,
validate_history_length,
Expand Down Expand Up @@ -89,27 +94,43 @@ def __init__( # noqa: PLR0913
self,
agent_executor: AgentExecutor,
task_store: TaskStore,
agent_card: AgentCard,
queue_manager: QueueManager | None = None,
push_config_store: PushNotificationConfigStore | None = None,
push_sender: PushNotificationSender | None = None,
request_context_builder: RequestContextBuilder | None = None,
extended_agent_card: AgentCard | None = None,
card_modifier: Callable[[AgentCard], Awaitable[AgentCard] | AgentCard]
| None = None,
extended_card_modifier: Callable[
[AgentCard, ServerCallContext], Awaitable[AgentCard] | AgentCard
]
| None = None,
) -> None:
"""Initializes the DefaultRequestHandler.

Args:
agent_executor: The `AgentExecutor` instance to run agent logic.
task_store: The `TaskStore` instance to manage task persistence.
agent_card: The AgentCard describing the agent's capabilities.
queue_manager: The `QueueManager` instance to manage event queues. Defaults to `InMemoryQueueManager`.
push_config_store: The `PushNotificationConfigStore` instance for managing push notification configurations. Defaults to None.
push_sender: The `PushNotificationSender` instance for sending push notifications. Defaults to None.
request_context_builder: The `RequestContextBuilder` instance used
to build request contexts. Defaults to `SimpleRequestContextBuilder`.
extended_agent_card: An optional, distinct AgentCard to be served at the authenticated extended card endpoint.
card_modifier: An optional callback to dynamically modify the public agent card before it is served.
extended_card_modifier: An optional callback to dynamically modify the extended agent card before it is served. It receives the call context.
"""
self.agent_executor = agent_executor
self.task_store = task_store
self._agent_card = agent_card
self._queue_manager = queue_manager or InMemoryQueueManager()
self._push_config_store = push_config_store
self._push_sender = push_sender
self.extended_agent_card = extended_agent_card
self.card_modifier = card_modifier
Comment thread
sokoliva marked this conversation as resolved.
Outdated
self.extended_card_modifier = extended_card_modifier
self._request_context_builder = (
request_context_builder
or SimpleRequestContextBuilder(
Expand All @@ -123,6 +144,11 @@ def __init__( # noqa: PLR0913
# asyncio tasks and to surface unexpected exceptions.
self._background_tasks = set()

@property
def agent_card(self) -> AgentCard:
"""The agent card to be served by default."""
return self._agent_card

@validate_request_params
async def on_get_task(
self,
Expand Down Expand Up @@ -397,6 +423,10 @@ async def push_notification_callback(event: Event) -> None:
return result

@validate_request_params
@validate(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
async def on_message_send_stream(
self,
params: SendMessageRequest,
Expand Down Expand Up @@ -486,6 +516,14 @@ async def _cleanup_producer(
self._running_agents.pop(task_id, None)

@validate_request_params
@validate(
lambda self: (
self.agent_card.capabilities.push_notifications
and self._push_config_store
),
error_message='Push notifications are not supported by the agent',
error_type=PushNotificationNotSupportedError,
)
async def on_create_task_push_notification_config(
self,
params: TaskPushNotificationConfig,
Expand All @@ -495,15 +533,12 @@ async def on_create_task_push_notification_config(

Requires a `PushNotifier` to be configured.
"""
if not self._push_config_store:
raise PushNotificationNotSupportedError

task_id = params.task_id
task: Task | None = await self.task_store.get(task_id, context)
if not task:
raise TaskNotFoundError

await self._push_config_store.set_info(
await self._push_config_store.set_info( # type: ignore[union-attr]
task_id,
params,
context,
Expand All @@ -512,6 +547,14 @@ async def on_create_task_push_notification_config(
return params

@validate_request_params
@validate(
lambda self: (
self.agent_card.capabilities.push_notifications
and self._push_config_store
),
error_message='Push notifications are not supported by the agent',
error_type=PushNotificationNotSupportedError,
)
async def on_get_task_push_notification_config(
self,
params: GetTaskPushNotificationConfigRequest,
Expand All @@ -521,26 +564,27 @@ async def on_get_task_push_notification_config(

Requires a `PushConfigStore` to be configured.
"""
if not self._push_config_store:
raise PushNotificationNotSupportedError

Comment thread
sokoliva marked this conversation as resolved.
task_id = params.task_id
config_id = params.id
task: Task | None = await self.task_store.get(task_id, context)
if not task:
raise TaskNotFoundError

push_notification_configs: list[TaskPushNotificationConfig] = (
await self._push_config_store.get_info(task_id, context) or []
await self._push_config_store.get_info(task_id, context) or [] # type: ignore[union-attr]
)

for config in push_notification_configs:
if config.id == config_id:
return config

raise InternalError(message='Push notification config not found')
raise PushNotificationConfigNotFoundError
Comment thread
sokoliva marked this conversation as resolved.
Outdated

@validate_request_params
@validate(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
async def on_subscribe_to_task(
self,
params: SubscribeToTaskRequest,
Expand Down Expand Up @@ -584,6 +628,14 @@ async def on_subscribe_to_task(
yield event

@validate_request_params
@validate(
lambda self: (
self.agent_card.capabilities.push_notifications
and self._push_config_store
),
error_message='Push notifications are not supported by the agent',
error_type=PushNotificationNotSupportedError,
)
async def on_list_task_push_notification_configs(
self,
params: ListTaskPushNotificationConfigsRequest,
Expand All @@ -593,15 +645,12 @@ async def on_list_task_push_notification_configs(

Requires a `PushConfigStore` to be configured.
"""
if not self._push_config_store:
raise PushNotificationNotSupportedError

task_id = params.task_id
task: Task | None = await self.task_store.get(task_id, context)
if not task:
raise TaskNotFoundError

push_notification_config_list = await self._push_config_store.get_info(
push_notification_config_list = await self._push_config_store.get_info( # type: ignore[union-attr]
task_id, context
)

Expand All @@ -610,6 +659,14 @@ async def on_list_task_push_notification_configs(
)

@validate_request_params
@validate(
lambda self: (
self.agent_card.capabilities.push_notifications
and self._push_config_store
),
error_message='Push notifications are not supported by the agent',
error_type=PushNotificationNotSupportedError,
)
async def on_delete_task_push_notification_config(
self,
params: DeleteTaskPushNotificationConfigRequest,
Expand All @@ -619,13 +676,35 @@ async def on_delete_task_push_notification_config(

Requires a `PushConfigStore` to be configured.
"""
if not self._push_config_store:
raise PushNotificationNotSupportedError

task_id = params.task_id
config_id = params.id
task: Task | None = await self.task_store.get(task_id, context)
if not task:
raise TaskNotFoundError

await self._push_config_store.delete_info(task_id, context, config_id)
await self._push_config_store.delete_info(task_id, context, config_id) # type: ignore[union-attr]

@validate_request_params
@validate(
lambda self: self.agent_card.capabilities.extended_agent_card,
error_message='The agent does not have an extended agent card configured',
error_type=ExtendedAgentCardNotConfiguredError,
)
Comment thread
sokoliva marked this conversation as resolved.
async def on_get_extended_agent_card(
self,
params: GetExtendedAgentCardRequest,
context: ServerCallContext,
) -> AgentCard:
"""Default handler for 'GetExtendedAgentCard'.

Requires `capabilities.extended_agent_card` to be true.
"""
card = self.extended_agent_card or self.agent_card

if self.extended_card_modifier:
return await maybe_await(self.extended_card_modifier(card, context))

if self.card_modifier:
return await maybe_await(self.card_modifier(card))

return card
Loading
Loading