Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions src/a2a/server/routes/jsonrpc_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
DefaultServerCallContextBuilder,
ServerCallContextBuilder,
)
from a2a.types import A2ARequest

Check failure on line 34 in src/a2a/server/routes/jsonrpc_dispatcher.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

ruff (F401)

src/a2a/server/routes/jsonrpc_dispatcher.py:34:23: F401 `a2a.types.A2ARequest` imported but unused help: Remove unused import: `a2a.types.A2ARequest`
from a2a.types.a2a_pb2 import (
AgentCard,
CancelTaskRequest,
Expand Down Expand Up @@ -349,7 +349,7 @@
else:
try:
raw_result = await self._process_non_streaming_request(
request_id, specific_request, call_context
specific_request, call_context
)
handler_result = JSONRPC20Response(
result=raw_result, _id=request_id
Expand Down Expand Up @@ -385,7 +385,7 @@
async def _process_streaming_request(
self,
request_id: str | int | None,
request_obj: A2ARequest,
request_obj: Any,
context: ServerCallContext,
) -> AsyncGenerator[dict[str, Any], None]:
"""Processes streaming requests (SendStreamingMessage or SubscribeToTask).
Expand All @@ -399,11 +399,11 @@
An `AsyncGenerator` object to stream results to the client.
"""
stream: AsyncGenerator | None = None
if isinstance(request_obj, SendMessageRequest):
if context.state.get('method') == 'SendStreamingMessage':
Comment thread
guglielmo-san marked this conversation as resolved.
Outdated
stream = self.request_handler.on_message_send_stream(
request_obj, context
)
elif isinstance(request_obj, SubscribeToTaskRequest):
elif context.state.get('method') == 'SubscribeToTask':
stream = self.request_handler.on_subscribe_to_task(
request_obj, context
)
Expand Down Expand Up @@ -538,55 +538,53 @@
@validate_version(constants.PROTOCOL_VERSION_1_0)
async def _process_non_streaming_request( # noqa: PLR0911
self,
request_id: str | int | None,
request_obj: A2ARequest,
request_obj: Any,
context: ServerCallContext,
) -> dict[str, Any] | None:
"""Processes non-streaming requests (message/send, tasks/get, tasks/cancel, tasks/pushNotificationConfig/*).
"""Processes non-streaming requests.

Args:
request_id: The ID of the request.
request_obj: The proto request message.
context: The ServerCallContext for the request.

Returns:
A dict containing the result or error.
"""
match request_obj:
case SendMessageRequest():
match context.state.get('method'):
case 'SendMessage':
return await self._handle_send_message(request_obj, context)
case CancelTaskRequest():
case 'CancelTask':
return await self._handle_cancel_task(request_obj, context)
case GetTaskRequest():
case 'GetTask':
return await self._handle_get_task(request_obj, context)
case ListTasksRequest():
case 'ListTasks':
return await self._handle_list_tasks(request_obj, context)
case TaskPushNotificationConfig():
case 'CreateTaskPushNotificationConfig':
return await self._handle_create_task_push_notification_config(
request_obj, context
)
case GetTaskPushNotificationConfigRequest():
case 'GetTaskPushNotificationConfig':
return await self._handle_get_task_push_notification_config(
request_obj, context
)
case ListTaskPushNotificationConfigsRequest():
case 'ListTaskPushNotificationConfigs':
return await self._handle_list_task_push_notification_configs(
request_obj, context
)
case DeleteTaskPushNotificationConfigRequest():
return await self._handle_delete_task_push_notification_config(
case 'DeleteTaskPushNotificationConfig':
await self._handle_delete_task_push_notification_config(
request_obj, context
)
case GetExtendedAgentCardRequest():
return None
case 'GetExtendedAgentCard':
return await self._handle_get_extended_agent_card(
request_obj, context
)
case _:
logger.error(
'Unhandled validated request type: %s', type(request_obj)
)
method = context.state.get('method')
Comment thread
guglielmo-san marked this conversation as resolved.
Outdated
logger.error('Unhandled method: %s', method)
raise UnsupportedOperationError(
message=f'Request type {type(request_obj).__name__} is unknown.'
message=f'Method {method} is not supported.'
)

def _create_response(
Expand Down
Loading
Loading