Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ This is the official Ruby SDK for the Model Context Protocol (MCP), implementing

**MCP::Server** (`lib/mcp/server.rb`):

- Main server class handling JSON-RPC requests
- Main server class handling JSON-RPC requests and holding shared configuration (tools, prompts, resources, handlers, capabilities)
- Implements MCP protocol methods: initialize, ping, tools/list, tools/call, prompts/list, prompts/get, resources/list, resources/read
- Supports custom method registration via `define_custom_method`
- Handles instrumentation, exception reporting, and notifications
- Uses JsonRpcHandler for request processing

**MCP::ServerSession** (`lib/mcp/server_session.rb`):

- Per-connection state: client info, logging level
- Created by the transport layer for each client connection
- Delegates request handling to the shared `Server`

**MCP::Client** (`lib/mcp/client.rb`):

- Client interface for communicating with MCP servers
Expand Down Expand Up @@ -95,6 +101,14 @@ This is the official Ruby SDK for the Model Context Protocol (MCP), implementing
- Protocol version 2025-03-26+ supports tool annotations (destructive_hint, idempotent_hint, etc.)
- Validation is configurable via `configuration.validate_tool_call_arguments`

**Session Architecture**:

- `Server` holds shared configuration (tools, prompts, resources, handlers)
- `ServerSession` holds per-connection state (client info, logging level)
- Both `StdioTransport` and `StreamableHTTPTransport` create a `ServerSession` per connection, making the session model transparent across transports
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

- Session-scoped notifications (`notify_progress`, `notify_log_message`) are sent only to the originating client via `ServerSession`
- Server-wide notifications (`notify_tools_list_changed`, etc.) broadcast to all sessions via `Server`

**Context Passing**:

- `server_context` hash passed through tool/prompt calls for request-specific data
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ The server provides the following notification methods:
- `notify_progress` - Send a progress notification for long-running operations
- `notify_log_message` - Send a structured logging notification message

#### Session Scoping

When using Streamable HTTP transport with multiple clients, each client connection gets its own session. Notifications are scoped as follows:

- **`report_progress`** and **`notify_log_message`** called via `server_context` inside a tool handler are automatically sent only to the requesting client.
No extra configuration is needed.
- **`notify_tools_list_changed`**, **`notify_prompts_list_changed`**, and **`notify_resources_list_changed`** are always broadcast to all connected clients,
as they represent server-wide state changes. These should be called on the `server` instance directly.

#### Notification Format

Notifications follow the JSON-RPC 2.0 specification and use these method names:
Expand Down
1 change: 1 addition & 0 deletions lib/mcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module MCP
autoload :Resource, "mcp/resource"
autoload :ResourceTemplate, "mcp/resource_template"
autoload :Server, "mcp/server"
autoload :ServerSession, "mcp/server_session"
autoload :Tool, "mcp/tool"

class << self
Expand Down
6 changes: 3 additions & 3 deletions lib/mcp/progress.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

module MCP
class Progress
def initialize(server:, progress_token:)
@server = server
def initialize(notification_target:, progress_token:)
@notification_target = notification_target
@progress_token = progress_token
end

def report(progress, total: nil, message: nil)
return unless @progress_token

@server.notify_progress(
@notification_target.notify_progress(
progress_token: @progress_token,
progress: progress,
total: total,
Expand Down
60 changes: 45 additions & 15 deletions lib/mcp/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,29 @@ def initialize(
@transport = transport
end

def handle(request)
# Processes a parsed JSON-RPC request and returns the response as a Hash.
#
# @param request [Hash] A parsed JSON-RPC request.
# @param session [ServerSession, nil] Per-connection session. Passed by
# `ServerSession#handle` for session-scoped notification delivery.
# When `nil`, notifications broadcast to all sessions.
# @return [Hash, nil] The JSON-RPC response, or `nil` for notifications.
def handle(request, session: nil)
JsonRpcHandler.handle(request) do |method|
handle_request(request, method)
handle_request(request, method, session: session)
end
end

def handle_json(request)
# Processes a JSON-RPC request string and returns the response as a JSON string.
#
# @param request [String] A JSON-RPC request as a JSON string.
# @param session [ServerSession, nil] Per-connection session. Passed by
# `ServerSession#handle_json` for session-scoped notification delivery.
# When `nil`, notifications broadcast to all sessions.
# @return [String, nil] The JSON-RPC response as JSON, or `nil` for notifications.
def handle_json(request, session: nil)
JsonRpcHandler.handle_json(request) do |method|
handle_request(request, method)
handle_request(request, method, session: session)
end
end

Expand Down Expand Up @@ -279,11 +293,12 @@ def schema_contains_ref?(schema)
end
end

def handle_request(request, method)
def handle_request(request, method, session: nil)
handler = @handlers[method]
unless handler
instrument_call("unsupported_method") do
add_instrumentation_data(client: @client) if @client
client = session&.client || @client
add_instrumentation_data(client: client) if client
end
return
end
Expand All @@ -293,6 +308,8 @@ def handle_request(request, method)
->(params) {
instrument_call(method) do
result = case method
when Methods::INITIALIZE
init(params, session: session)
when Methods::TOOLS_LIST
{ tools: @handlers[Methods::TOOLS_LIST].call(params) }
when Methods::PROMPTS_LIST
Expand All @@ -303,10 +320,15 @@ def handle_request(request, method)
{ contents: @handlers[Methods::RESOURCES_READ].call(params) }
when Methods::RESOURCES_TEMPLATES_LIST
{ resourceTemplates: @handlers[Methods::RESOURCES_TEMPLATES_LIST].call(params) }
when Methods::TOOLS_CALL
call_tool(params, session: session)
when Methods::LOGGING_SET_LEVEL
configure_logging_level(params, session: session)
else
@handlers[method].call(params)
end
add_instrumentation_data(client: @client) if @client
client = session&.client || @client
add_instrumentation_data(client: client) if client

result
rescue => e
Expand Down Expand Up @@ -342,8 +364,14 @@ def server_info
}.compact
end

def init(params)
@client = params[:clientInfo] if params
def init(params, session: nil)
if params
if session
session.store_client_info(client: params[:clientInfo], capabilities: params[:capabilities])
else
@client = params[:clientInfo]
end
end

protocol_version = params[:protocolVersion] if params
negotiated_version = if Configuration::SUPPORTED_STABLE_PROTOCOL_VERSIONS.include?(protocol_version)
Expand Down Expand Up @@ -371,7 +399,7 @@ def init(params)
}.compact
end

def configure_logging_level(request)
def configure_logging_level(request, session: nil)
if capabilities[:logging].nil?
raise RequestHandlerError.new("Server does not support logging", request, error_type: :internal_error)
end
Expand All @@ -381,6 +409,7 @@ def configure_logging_level(request)
raise RequestHandlerError.new("Invalid log level #{request[:level]}", request, error_type: :invalid_params)
end

session&.configure_logging(logging_message_notification)
@logging_message_notification = logging_message_notification

{}
Expand All @@ -390,7 +419,7 @@ def list_tools(request)
@tools.values.map(&:to_h)
end

def call_tool(request)
def call_tool(request, session: nil)
tool_name = request[:name]

tool = tools[tool_name]
Expand Down Expand Up @@ -422,7 +451,7 @@ def call_tool(request)

progress_token = request.dig(:_meta, :progressToken)

call_tool_with_args(tool, arguments, server_context_with_meta(request), progress_token: progress_token)
call_tool_with_args(tool, arguments, server_context_with_meta(request), progress_token: progress_token, session: session)
rescue RequestHandlerError
raise
rescue => e
Expand Down Expand Up @@ -491,12 +520,13 @@ def accepts_server_context?(method_object)
parameters.any? { |type, name| type == :keyrest || name == :server_context }
end

def call_tool_with_args(tool, arguments, context, progress_token: nil)
def call_tool_with_args(tool, arguments, context, progress_token: nil, session: nil)
args = arguments&.transform_keys(&:to_sym) || {}

if accepts_server_context?(tool.method(:call))
progress = Progress.new(server: self, progress_token: progress_token)
server_context = ServerContext.new(context, progress: progress)
notification_target = session || self
progress = Progress.new(notification_target: notification_target, progress_token: progress_token)
server_context = ServerContext.new(context, progress: progress, notification_target: notification_target)
tool.call(**args, server_context: server_context).to_h
else
tool.call(**args).to_h
Expand Down
8 changes: 5 additions & 3 deletions lib/mcp/server/transports/stdio_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ class StdioTransport < Transport
STATUS_INTERRUPTED = Signal.list["INT"] + 128

def initialize(server)
@server = server
super(server)
@open = false
@session = nil
$stdin.set_encoding("UTF-8")
$stdout.set_encoding("UTF-8")
super
end

def open
@open = true
@session = ServerSession.new(server: @server, transport: self)
while @open && (line = $stdin.gets)
handle_json_request(line.strip)
response = @session.handle_json(line.strip)
send_response(response) if response
end
rescue Interrupt
warn("\nExiting...")
Expand Down
33 changes: 24 additions & 9 deletions lib/mcp/server/transports/streamable_http_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Transports
class StreamableHTTPTransport < Transport
def initialize(server, stateless: false)
super(server)
# { session_id => { stream: stream_object }
# Maps `session_id` to `{ stream: stream_object, server_session: ServerSession }`.
@sessions = {}
@mutex = Mutex.new

Expand Down Expand Up @@ -228,18 +228,25 @@ def response?(body)

def handle_initialization(body_string, body)
session_id = nil
server_session = nil

unless @stateless
session_id = SecureRandom.uuid
server_session = ServerSession.new(server: @server, transport: self, session_id: session_id)

@mutex.synchronize do
@sessions[session_id] = {
stream: nil,
server_session: server_session,
}
end
end

response = @server.handle_json(body_string)
response = if server_session
server_session.handle_json(body_string)
else
@server.handle_json(body_string)
end

headers = {
"Content-Type" => "application/json",
Expand All @@ -255,16 +262,24 @@ def handle_accepted
end

def handle_regular_request(body_string, session_id)
unless @stateless
if session_id && !session_exists?(session_id)
return session_not_found_response
server_session = nil
stream = nil

if session_id && !@stateless
@mutex.synchronize do
session = @sessions[session_id]
return session_not_found_response unless session

server_session = session[:server_session]
stream = session[:stream]
end
end

response = @server.handle_json(body_string)

# Stream can be nil since stateless mode doesn't retain streams
stream = get_session_stream(session_id) if session_id
response = if server_session
server_session.handle_json(body_string)
else
@server.handle_json(body_string)
end

if stream
send_response_to_stream(stream, response, session_id)
Expand Down
28 changes: 27 additions & 1 deletion lib/mcp/server_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,41 @@

module MCP
class ServerContext
def initialize(context, progress:)
def initialize(context, progress:, notification_target:)
@context = context
@progress = progress
@notification_target = notification_target
end

# Reports progress for the current tool operation.
# The notification is automatically scoped to the originating session.
#
# @param progress [Numeric] Current progress value.
# @param total [Numeric, nil] Total expected value.
# @param message [String, nil] Human-readable status message.
def report_progress(progress, total: nil, message: nil)
@progress.report(progress, total: total, message: message)
end

# Sends a progress notification scoped to the originating session.
#
# @param progress_token [String, Integer] The token identifying the operation.
# @param progress [Numeric] Current progress value.
# @param total [Numeric, nil] Total expected value.
# @param message [String, nil] Human-readable status message.
def notify_progress(progress_token:, progress:, total: nil, message: nil)
@notification_target.notify_progress(progress_token: progress_token, progress: progress, total: total, message: message)
end

# Sends a log message notification scoped to the originating session.
#
# @param data [Object] The log data to send.
# @param level [String] Log level (e.g., `"debug"`, `"info"`, `"error"`).
# @param logger [String, nil] Logger name.
def notify_log_message(data:, level:, logger: nil)
@notification_target.notify_log_message(data: data, level: level, logger: logger)
end

def method_missing(name, ...)
if @context.respond_to?(name)
@context.public_send(name, ...)
Expand Down
Loading