From 2bd71bd2ac59f17498fceff799dc2575f00ea16b Mon Sep 17 00:00:00 2001
From: Jicheng Lu <103353@smsassist.com>
Date: Tue, 7 Apr 2026 14:19:43 -0500
Subject: [PATCH 1/2] add stop conv streaming and thinking text
---
Directory.Packages.props | 6 +-
.../Conversations/Dtos/ChatResponseDto.cs | 4 +
.../IConversationCancellationService.cs | 33 ++
.../Conversations/ConversationPlugin.cs | 3 +
.../ConversationCancellationService.cs | 63 ++++
.../Conversation/ConversationController.cs | 103 +++---
.../Conversations/Request/NewMessageModel.cs | 9 +
.../ConverstionCancellationResponse.cs | 5 +
.../Providers/ChatCompletionProvider.cs | 120 ++++---
.../Providers/Chat/ChatCompletionProvider.cs | 120 ++++---
.../Hooks/ChatHubConversationHook.cs | 1 +
.../Observers/ChatHubObserver.cs | 1 +
.../Providers/Chat/ChatCompletionProvider.cs | 120 ++++---
.../Constants/Constants.cs | 1 +
.../Providers/Chat/ChatCompletionProvider.cs | 301 +++++++++++++-----
.../Providers/ChatCompletionProvider.cs | 38 ++-
.../Providers/Chat/ChatCompletionProvider.cs | 137 ++++----
.../Providers/ChatCompletionProvider.cs | 60 ++--
18 files changed, 762 insertions(+), 363 deletions(-)
create mode 100644 src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs
create mode 100644 src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs
create mode 100644 src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 041659c97..9d7ae8a8e 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -6,8 +6,8 @@
-
-
+
+
@@ -33,7 +33,7 @@
-
+
diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs
index 43077ba6a..277fa3fb9 100644
--- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs
+++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs
@@ -46,6 +46,10 @@ public class ChatResponseDto : InstructResult
[JsonPropertyName("is_streaming")]
public bool IsStreaming { get; set; }
+ [JsonPropertyName("meta_data")]
+ [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
+ public Dictionary? MetaData { get; set; }
+
[JsonPropertyName("created_at")]
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs
new file mode 100644
index 000000000..6456a065f
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs
@@ -0,0 +1,33 @@
+using System.Threading;
+
+namespace BotSharp.Abstraction.Conversations;
+
+///
+/// Service to manage cancellation tokens for streaming chat completions.
+/// Allows stopping an active streaming response by conversation ID.
+///
+public interface IConversationCancellationService
+{
+ ///
+ /// Register a new cancellation token source for the given conversation.
+ /// Returns the CancellationToken to be used in streaming loops.
+ ///
+ CancellationToken RegisterConversation(string conversationId);
+
+ ///
+ /// Cancel an active streaming operation for the given conversation.
+ ///
+ /// True if the conversation was found and cancelled, false otherwise.
+ bool CancelStreaming(string conversationId);
+
+ ///
+ /// Remove the cancellation token source for the given conversation.
+ /// Should be called when streaming completes (either normally or via cancellation).
+ ///
+ void UnregisterConversation(string conversationId);
+
+ ///
+ /// Get the cancellation token for the given conversation if one is registered.
+ ///
+ CancellationToken GetToken(string conversationId);
+}
diff --git a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs
index 6de7da9d6..c891679c4 100644
--- a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs
+++ b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs
@@ -48,6 +48,9 @@ public void RegisterDI(IServiceCollection services, IConfiguration config)
return settingService.Bind("GoogleApi");
});
+ // Streaming cancellation
+ services.AddSingleton();
+
// Observer and observable
services.AddSingleton>>();
services.AddScoped>>();
diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs
new file mode 100644
index 000000000..962d53006
--- /dev/null
+++ b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs
@@ -0,0 +1,63 @@
+using System.Collections.Concurrent;
+
+namespace BotSharp.Core.Conversations.Services;
+
+public class ConversationCancellationService : IConversationCancellationService
+{
+ private readonly ConcurrentDictionary _cancellationTokenSources = new();
+ private readonly ILogger _logger;
+
+ public ConversationCancellationService(
+ ILogger logger)
+ {
+ _logger = logger;
+ }
+
+ public CancellationToken RegisterConversation(string conversationId)
+ {
+ // Cancel any existing streaming for this conversation
+ if (_cancellationTokenSources.TryRemove(conversationId, out var existingCts))
+ {
+ existingCts.Cancel();
+ existingCts.Dispose();
+ _logger.LogWarning("Cancelled existing streaming session for conversation {ConversationId}", conversationId);
+ }
+
+ var cts = new CancellationTokenSource();
+ _cancellationTokenSources[conversationId] = cts;
+ _logger.LogInformation("Registered streaming cancellation for conversation {ConversationId}", conversationId);
+ return cts.Token;
+ }
+
+ public bool CancelStreaming(string conversationId)
+ {
+ if (_cancellationTokenSources.TryGetValue(conversationId, out var cts))
+ {
+ cts.Cancel();
+ _logger.LogInformation("Streaming cancelled for conversation {ConversationId}", conversationId);
+ return true;
+ }
+
+ _logger.LogWarning("No active streaming found for conversation {ConversationId}", conversationId);
+ return false;
+ }
+
+ public void UnregisterConversation(string conversationId)
+ {
+ if (_cancellationTokenSources.TryRemove(conversationId, out var cts))
+ {
+ cts.Dispose();
+ _logger.LogDebug("Unregistered streaming cancellation for conversation {ConversationId}", conversationId);
+ }
+ }
+
+ public CancellationToken GetToken(string conversationId)
+ {
+ if (_cancellationTokenSources.TryGetValue(conversationId, out var cts))
+ {
+ return cts.Token;
+ }
+
+ return CancellationToken.None;
+ }
+}
diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs
index 5315e9d15..30961ef84 100644
--- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs
+++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs
@@ -131,6 +131,7 @@ public async Task> GetDialogs(
Data = message.Data,
Sender = UserDto.FromUser(user),
Payload = message.Payload,
+ MetaData = message.MetaData,
HasMessageFiles = files.Any(x => x.MessageId.IsEqualTo(message.MessageId) && x.FileSource == FileSource.User)
});
}
@@ -146,6 +147,7 @@ public async Task> GetDialogs(
Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content,
Function = message.FunctionName,
Data = message.Data,
+ MetaData = message.MetaData,
Sender = new()
{
FirstName = agent?.Name ?? "Unkown",
@@ -397,18 +399,36 @@ public async Task SendMessage(
await conv.SetConversationId(conversationId, input.States);
SetStates(conv, input);
+ IConversationCancellationService? convCancellation = null;
+ if (input.IsStreamingMessage)
+ {
+ convCancellation = _services.GetRequiredService();
+ convCancellation.RegisterConversation(conversationId);
+ }
+
var response = new ChatResponseModel();
- await conv.SendMessage(agentId, inputMsg,
- replyMessage: input.Postback,
- async msg =>
- {
- response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
- response.Function = msg.FunctionName;
- response.MessageLabel = msg.MessageLabel;
- response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
- response.Instruction = msg.Instruction;
- response.Data = msg.Data;
- });
+ try
+ {
+ await conv.SendMessage(agentId, inputMsg,
+ replyMessage: input.Postback,
+ async msg =>
+ {
+ response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
+ response.Function = msg.FunctionName;
+ response.MessageLabel = msg.MessageLabel;
+ response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
+ response.Instruction = msg.Instruction;
+ response.Data = msg.Data;
+ });
+ }
+ catch (OperationCanceledException) when (input.IsStreamingMessage)
+ {
+ response.Text = string.Empty;
+ }
+ finally
+ {
+ convCancellation?.UnregisterConversation(conversationId);
+ }
var state = _services.GetRequiredService();
response.States = state.GetStates();
@@ -455,20 +475,20 @@ public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string
Response.Headers.Append(Microsoft.Net.Http.Headers.HeaderNames.Connection, "keep-alive");
await conv.SendMessage(agentId, inputMsg,
- replyMessage: input.Postback,
- // responsed generated
- async msg =>
- {
- response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
- response.MessageLabel = msg.MessageLabel;
- response.Function = msg.FunctionName;
- response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
- response.Instruction = msg.Instruction;
- response.Data = msg.Data;
- response.States = state.GetStates();
-
- await OnChunkReceived(Response, response);
- });
+ replyMessage: input.Postback,
+ // responsed generated
+ async msg =>
+ {
+ response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
+ response.MessageLabel = msg.MessageLabel;
+ response.Function = msg.FunctionName;
+ response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
+ response.Instruction = msg.Instruction;
+ response.Data = msg.Data;
+ response.States = state.GetStates();
+
+ await OnChunkReceived(Response, response);
+ });
response.States = state.GetStates();
response.MessageId = inputMsg.MessageId;
@@ -477,18 +497,13 @@ await conv.SendMessage(agentId, inputMsg,
// await OnEventCompleted(Response);
}
- private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg)
+ [HttpPost("/conversation/{conversationId}/stop-streaming")]
+ public ConverstionCancellationResponse StopStreaming([FromRoute] string conversationId)
{
- var indicator = new ChatResponseModel
- {
- ConversationId = conversationId,
- MessageId = msg.MessageId,
- Text = msg.Indication,
- Function = "indicating",
- Instruction = msg.Instruction,
- States = new Dictionary()
- };
- await OnChunkReceived(Response, indicator);
+ var streamingCancellation = _services.GetRequiredService();
+ var cancelled = streamingCancellation.CancelStreaming(conversationId);
+
+ return new ConverstionCancellationResponse { Success = cancelled };
}
#endregion
@@ -515,6 +530,8 @@ private void SetStates(IConversationService conv, NewMessageModel input)
{
conv.States.SetState("sampling_factor", input.SamplingFactor, source: StateSource.External);
}
+
+ conv.States.SetState("use_stream_message", input.IsStreamingMessage, source: StateSource.Application);
}
private FileContentResult BuildFileResult(string file)
@@ -567,5 +584,19 @@ private JsonSerializerOptions InitJsonOptions(BotSharpOptions options)
return jsonOption;
}
+
+ private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg)
+ {
+ var indicator = new ChatResponseModel
+ {
+ ConversationId = conversationId,
+ MessageId = msg.MessageId,
+ Text = msg.Indication,
+ Function = "indicating",
+ Instruction = msg.Instruction,
+ States = []
+ };
+ await OnChunkReceived(Response, indicator);
+ }
#endregion
}
\ No newline at end of file
diff --git a/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs b/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs
index 939fa601a..6169f1371 100644
--- a/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs
+++ b/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs
@@ -1,6 +1,15 @@
+using System.Text.Json.Serialization;
+
namespace BotSharp.OpenAPI.ViewModels.Conversations;
public class NewMessageModel : IncomingMessageModel
{
public override string Channel { get; set; } = ConversationChannel.OpenAPI;
+
+ ///
+ /// Indicates whether this message uses streaming completion.
+ /// When true, the streaming can be stopped via the stop endpoint.
+ ///
+ [JsonPropertyName("is_streaming_msg")]
+ public bool IsStreamingMessage { get; set; }
}
diff --git a/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs b/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs
new file mode 100644
index 000000000..5e3f659ad
--- /dev/null
+++ b/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs
@@ -0,0 +1,5 @@
+namespace BotSharp.OpenAPI.ViewModels.Conversations;
+
+public class ConverstionCancellationResponse : ResponseBase
+{
+}
diff --git a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs
index 774ac70e8..7507f263c 100644
--- a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs
@@ -216,73 +216,95 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
MessageId = messageId
};
- await foreach (var choice in client.Messages.StreamClaudeMessageAsync(parameters))
+ var streamingCancellation = _services.GetRequiredService();
+ var cancellationToken = streamingCancellation.GetToken(conv.ConversationId);
+
+ try
{
- var startMsg = choice.StreamStartMessage;
- var contentBlock = choice.ContentBlock;
- var delta = choice.Delta;
+ await foreach (var choice in client.Messages.StreamClaudeMessageAsync(parameters).WithCancellation(cancellationToken))
+ {
+ var startMsg = choice.StreamStartMessage;
+ var contentBlock = choice.ContentBlock;
+ var delta = choice.Delta;
- tokenUsage = delta?.Usage ?? startMsg?.Usage ?? choice.Usage;
+ tokenUsage = delta?.Usage ?? startMsg?.Usage ?? choice.Usage;
- if (delta != null)
- {
- if (delta.StopReason == StopReason.ToolUse)
+ if (delta != null)
{
- var toolCall = choice.ToolCalls.FirstOrDefault();
- responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ if (delta.StopReason == StopReason.ToolUse)
{
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- ToolCallId = toolCall?.Id,
- FunctionName = toolCall?.Name,
- FunctionArgs = toolCall?.Arguments?.ToString()?.IfNullOrEmptyAs("{}") ?? "{}"
- };
+ var toolCall = choice.ToolCalls.FirstOrDefault();
+ responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ ToolCallId = toolCall?.Id,
+ FunctionName = toolCall?.Name,
+ FunctionArgs = toolCall?.Arguments?.ToString()?.IfNullOrEmptyAs("{}") ?? "{}"
+ };
#if DEBUG
- _logger.LogDebug($"Tool Call (id: {toolCall?.Id}) => {toolCall?.Name}({toolCall?.Arguments})");
+ _logger.LogDebug($"Tool Call (id: {toolCall?.Id}) => {toolCall?.Name}({toolCall?.Arguments})");
#endif
- }
- else if (delta.StopReason == StopReason.EndTurn)
- {
- var allText = textStream.GetText();
- responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ }
+ else if (delta.StopReason == StopReason.EndTurn)
{
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- IsStreaming = true
- };
+ var allText = textStream.GetText();
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
#if DEBUG
- _logger.LogDebug($"Stream text Content: {allText}");
+ _logger.LogDebug($"Stream text Content: {allText}");
#endif
- }
- else if (!string.IsNullOrEmpty(delta.StopReason))
- {
- responseMessage = new RoleDialogModel(AgentRole.Assistant, delta.StopReason)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- IsStreaming = true
- };
- }
- else
- {
- var deltaText = delta.Text ?? string.Empty;
- textStream.Collect(deltaText);
-
- hub.Push(new()
+ }
+ else if (!string.IsNullOrEmpty(delta.StopReason))
{
- EventName = ChatEvent.OnReceiveLlmStreamMessage,
- RefId = conv.ConversationId,
- Data = new RoleDialogModel(AgentRole.Assistant, deltaText)
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, delta.StopReason)
{
CurrentAgentId = agent.Id,
- MessageId = messageId
- }
- });
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
+ else
+ {
+ var deltaText = delta.Text ?? string.Empty;
+ textStream.Collect(deltaText);
+
+ hub.Push(new()
+ {
+ EventName = ChatEvent.OnReceiveLlmStreamMessage,
+ RefId = conv.ConversationId,
+ Data = new RoleDialogModel(AgentRole.Assistant, deltaText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId
+ }
+ });
+ }
}
}
}
+ catch (OperationCanceledException)
+ {
+ _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId);
+ }
+
+ // Build responseMessage from collected text when cancelled before FinishReason
+ if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content))
+ {
+ var allText = textStream.GetText();
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
hub.Push(new()
{
diff --git a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs
index 5c94a00c9..057be0812 100644
--- a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs
@@ -287,71 +287,93 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
MessageId = messageId
};
- await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options))
- {
- tokenUsage = choice.Usage;
+ var streamingCancellation = _services.GetRequiredService();
+ var cancellationToken = streamingCancellation.GetToken(conv.ConversationId);
- if (!choice.ToolCallUpdates.IsNullOrEmpty())
+ try
+ {
+ await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options).WithCancellation(cancellationToken))
{
- toolCalls.AddRange(choice.ToolCallUpdates);
- }
+ tokenUsage = choice.Usage;
- if (!choice.ContentUpdate.IsNullOrEmpty())
- {
- var text = choice.ContentUpdate[0]?.Text ?? string.Empty;
- textStream.Collect(text);
+ if (!choice.ToolCallUpdates.IsNullOrEmpty())
+ {
+ toolCalls.AddRange(choice.ToolCallUpdates);
+ }
+
+ if (!choice.ContentUpdate.IsNullOrEmpty())
+ {
+ var text = choice.ContentUpdate[0]?.Text ?? string.Empty;
+ textStream.Collect(text);
#if DEBUG
- _logger.LogCritical($"Content update: {text}");
+ _logger.LogCritical($"Content update: {text}");
#endif
- var content = new RoleDialogModel(AgentRole.Assistant, text)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId
- };
- hub.Push(new()
- {
- EventName = ChatEvent.OnReceiveLlmStreamMessage,
- RefId = conv.ConversationId,
- Data = content
- });
- }
+ var content = new RoleDialogModel(AgentRole.Assistant, text)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId
+ };
+ hub.Push(new()
+ {
+ EventName = ChatEvent.OnReceiveLlmStreamMessage,
+ RefId = conv.ConversationId,
+ Data = content
+ });
+ }
- if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall)
- {
- var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName));
- var functionName = meta?.FunctionName;
- var toolCallId = meta?.ToolCallId;
- var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList();
- var functionArgument = string.Join(string.Empty, args);
+ if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall)
+ {
+ var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName));
+ var functionName = meta?.FunctionName;
+ var toolCallId = meta?.ToolCallId;
+ var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList();
+ var functionArgument = string.Join(string.Empty, args);
#if DEBUG
- _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})");
+ _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})");
#endif
- responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ ToolCallId = toolCallId,
+ FunctionName = functionName,
+ FunctionArgs = functionArgument
+ };
+ }
+ else if (choice.FinishReason.HasValue)
{
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- ToolCallId = toolCallId,
- FunctionName = functionName,
- FunctionArgs = functionArgument
- };
- }
- else if (choice.FinishReason.HasValue)
- {
- var allText = textStream.GetText();
- _logger.LogCritical($"Text Content: {allText}");
+ var allText = textStream.GetText();
+ _logger.LogCritical($"Text Content: {allText}");
- responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- IsStreaming = true
- };
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
}
}
+ catch (OperationCanceledException)
+ {
+ _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId);
+ }
+
+ // Build responseMessage from collected text when cancelled before FinishReason
+ if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content))
+ {
+ var allText = textStream.GetText();
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
hub.Push(new()
{
diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs
index f2c1b3b67..bd57f5f9f 100644
--- a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs
+++ b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs
@@ -112,6 +112,7 @@ public override async Task OnResponseGenerated(RoleDialogModel message)
Function = message.FunctionName,
RichContent = message.SecondaryRichContent ?? message.RichContent,
Data = message.Data,
+ MetaData = message.MetaData,
States = state.GetStates(),
IsStreaming = message.IsStreaming,
Sender = sender
diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs
index 25862ff0d..6bd9209a3 100644
--- a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs
+++ b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs
@@ -80,6 +80,7 @@ public override void OnNext(HubObserveData value)
Function = message.FunctionName,
RichContent = message.SecondaryRichContent ?? message.RichContent,
Data = message.Data,
+ MetaData = message.MetaData,
Sender = new()
{
FirstName = "AI",
diff --git a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs
index 058aeec8a..7b69be7bf 100644
--- a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs
@@ -254,71 +254,93 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
MessageId = messageId
};
- await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options))
- {
- tokenUsage = choice.Usage;
+ var streamingCancellation = _services.GetRequiredService();
+ var cancellationToken = streamingCancellation.GetToken(conv.ConversationId);
- if (!choice.ToolCallUpdates.IsNullOrEmpty())
+ try
+ {
+ await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options).WithCancellation(cancellationToken))
{
- toolCalls.AddRange(choice.ToolCallUpdates);
- }
+ tokenUsage = choice.Usage;
- if (!choice.ContentUpdate.IsNullOrEmpty())
- {
- var text = choice.ContentUpdate[0]?.Text ?? string.Empty;
- textStream.Collect(text);
+ if (!choice.ToolCallUpdates.IsNullOrEmpty())
+ {
+ toolCalls.AddRange(choice.ToolCallUpdates);
+ }
+
+ if (!choice.ContentUpdate.IsNullOrEmpty())
+ {
+ var text = choice.ContentUpdate[0]?.Text ?? string.Empty;
+ textStream.Collect(text);
#if DEBUG
- _logger.LogCritical($"Content update: {text}");
+ _logger.LogCritical($"Content update: {text}");
#endif
- var content = new RoleDialogModel(AgentRole.Assistant, text)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId
- };
- hub.Push(new()
- {
- EventName = ChatEvent.OnReceiveLlmStreamMessage,
- RefId = conv.ConversationId,
- Data = content
- });
- }
+ var content = new RoleDialogModel(AgentRole.Assistant, text)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId
+ };
+ hub.Push(new()
+ {
+ EventName = ChatEvent.OnReceiveLlmStreamMessage,
+ RefId = conv.ConversationId,
+ Data = content
+ });
+ }
- if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall)
- {
- var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName));
- var functionName = meta?.FunctionName;
- var toolCallId = meta?.ToolCallId;
- var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList();
- var functionArgument = string.Join(string.Empty, args);
+ if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall)
+ {
+ var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName));
+ var functionName = meta?.FunctionName;
+ var toolCallId = meta?.ToolCallId;
+ var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList();
+ var functionArgument = string.Join(string.Empty, args);
#if DEBUG
- _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})");
+ _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})");
#endif
- responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ ToolCallId = toolCallId,
+ FunctionName = functionName,
+ FunctionArgs = functionArgument
+ };
+ }
+ else if (choice.FinishReason.HasValue)
{
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- ToolCallId = toolCallId,
- FunctionName = functionName,
- FunctionArgs = functionArgument
- };
- }
- else if (choice.FinishReason.HasValue)
- {
- var allText = textStream.GetText();
- _logger.LogCritical($"Text Content: {allText}");
+ var allText = textStream.GetText();
+ _logger.LogCritical($"Text Content: {allText}");
- responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- IsStreaming = true
- };
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
}
}
+ catch (OperationCanceledException)
+ {
+ _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId);
+ }
+
+ // Build responseMessage from collected text when cancelled before FinishReason
+ if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content))
+ {
+ var allText = textStream.GetText();
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
hub.Push(new()
{
diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Constants/Constants.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Constants/Constants.cs
index e0aa12b6c..c1300c0a5 100644
--- a/src/Plugins/BotSharp.Plugin.GoogleAI/Constants/Constants.cs
+++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Constants/Constants.cs
@@ -3,4 +3,5 @@ namespace BotSharp.Plugin.GoogleAI.Constants;
internal static class Constants
{
internal const string ThoughtSignature = "thought_signature";
+ internal const string ThinkingText = "thinking_text";
}
diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/ChatCompletionProvider.cs
index 460a79a17..c7df1f6bb 100644
--- a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/ChatCompletionProvider.cs
@@ -3,6 +3,7 @@
using BotSharp.Abstraction.Files.Utilities;
using BotSharp.Abstraction.Hooks;
using BotSharp.Abstraction.MessageHub.Models;
+using BotSharp.Abstraction.MLTasks.Settings;
using BotSharp.Core.Infrastructures.Streams;
using BotSharp.Core.MessageHub;
using GenerativeAI;
@@ -16,6 +17,7 @@ public class ChatCompletionProvider : IChatCompletion
{
private readonly IServiceProvider _services;
private readonly ILogger _logger;
+ private readonly IConversationStateService _state;
private List renderedInstructions = [];
private string _model;
@@ -28,11 +30,13 @@ public class ChatCompletionProvider : IChatCompletion
public ChatCompletionProvider(
IServiceProvider services,
GoogleAiSettings googleSettings,
- ILogger logger)
+ ILogger logger,
+ IConversationStateService state)
{
_settings = googleSettings;
_services = services;
_logger = logger;
+ _state = state;
}
public async Task GetChatCompletions(Agent agent, List conversations)
@@ -51,13 +55,19 @@ public async Task GetChatCompletions(Agent agent, List x.Thought != true && x.FunctionCall == null);
+ var functionPart = parts?.FirstOrDefault(x => x.FunctionCall != null);
+ var thoughtPart = parts?.FirstOrDefault(x => x.Thought == true);
+
+ var part = textPart ?? functionPart ?? thoughtPart ?? parts?.FirstOrDefault();
+ var text = textPart?.Text ?? part?.Text ?? string.Empty;
+ var thoughtSignature = thoughtPart?.ThoughtSignature ?? part?.ThoughtSignature;
RoleDialogModel responseMessage;
- if (response.GetFunction() != null)
+ if (functionPart?.FunctionCall != null)
{
- var toolCall = response.GetFunction();
+ var toolCall = functionPart.FunctionCall;
responseMessage = new RoleDialogModel(AgentRole.Function, text)
{
CurrentAgentId = agent.Id,
@@ -67,7 +77,7 @@ public async Task GetChatCompletions(Agent agent, List
{
- [Constants.ThoughtSignature] = part?.ThoughtSignature
+ [Constants.ThoughtSignature] = thoughtSignature
},
RenderedInstruction = string.Join("\r\n", renderedInstructions)
};
@@ -91,12 +101,18 @@ public async Task GetChatCompletions(Agent agent, List
{
- [Constants.ThoughtSignature] = part?.ThoughtSignature
+ [Constants.ThoughtSignature] = thoughtSignature
},
RenderedInstruction = string.Join("\r\n", renderedInstructions)
};
}
+ if (responseMessage != null && thoughtPart != null)
+ {
+ responseMessage.MetaData ??= [];
+ responseMessage.MetaData[Constants.ThinkingText] = thoughtPart.Text;
+ }
+
// After chat completion hook
foreach (var hook in contentHooks)
{
@@ -130,19 +146,30 @@ public async Task GetChatCompletionsAsync(Agent agent, List x.Thought != true && x.FunctionCall == null);
+ var functionPart = parts?.FirstOrDefault(x => x.FunctionCall != null);
+ var thoughtPart = parts?.FirstOrDefault(x => x.Thought == true);
+
+ var part = textPart ?? functionPart ?? thoughtPart ?? parts?.FirstOrDefault();
+ var text = textPart?.Text ?? part?.Text ?? string.Empty;
+ var thoughtSignature = thoughtPart?.ThoughtSignature ?? part?.ThoughtSignature;
var msg = new RoleDialogModel(AgentRole.Assistant, text)
{
CurrentAgentId = agent.Id,
MetaData = new Dictionary
{
- [Constants.ThoughtSignature] = part?.ThoughtSignature
+ [Constants.ThoughtSignature] = thoughtSignature
},
RenderedInstruction = string.Join("\r\n", renderedInstructions)
};
+ if (thoughtPart != null)
+ {
+ msg.MetaData[Constants.ThinkingText] = thoughtPart.Text;
+ }
+
// After chat completion hook
foreach (var hook in hooks)
{
@@ -156,9 +183,9 @@ public async Task GetChatCompletionsAsync(Agent agent, List GetChatCompletionsAsync(Agent agent, List
{
- [Constants.ThoughtSignature] = part?.ThoughtSignature
+ [Constants.ThoughtSignature] = thoughtSignature
},
RenderedInstruction = string.Join("\r\n", renderedInstructions)
};
@@ -192,7 +219,7 @@ public async Task GetChatCompletionsAsync(Agent agent, List
{
- [Constants.ThoughtSignature] = part?.ThoughtSignature
+ [Constants.ThoughtSignature] = thoughtSignature
},
RenderedInstruction = string.Join("\r\n", renderedInstructions)
};
@@ -236,6 +263,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
});
using var textStream = new RealtimeTextStream();
+ using var thinkingTextStream = new RealtimeTextStream();
ChatThoughtModel? thoughtModel = null;
UsageMetadata? tokenUsage = null;
@@ -245,70 +273,120 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
MessageId = messageId
};
- await foreach (var response in chatClient.StreamContentAsync(request))
+ var streamingCancellation = _services.GetRequiredService();
+ var cancellationToken = streamingCancellation.GetToken(conv.ConversationId);
+
+ try
{
- var candidate = response?.Candidates?.FirstOrDefault();
- if (candidate == null)
+ await foreach (var response in chatClient.StreamContentAsync(request, cancellationToken))
{
- continue;
- }
-
- var part = candidate?.Content?.Parts?.FirstOrDefault();
- thoughtModel = part?.FunctionCall != null
- ? new() { ToolCall = part.FunctionCall, ThoughtSignature = part.ThoughtSignature }
- : thoughtModel;
+ var candidate = response?.Candidates?.FirstOrDefault();
+ if (candidate == null)
+ {
+ continue;
+ }
- if (!string.IsNullOrEmpty(part?.Text))
- {
- var text = part.Text;
- textStream.Collect(text);
+ var parts = candidate?.Content?.Parts;
+ var textPart = parts?.FirstOrDefault(x => x.Thought != true && x.FunctionCall == null);
+ var functionPart = parts?.FirstOrDefault(x => x.FunctionCall != null);
+ var thoughtPart = parts?.FirstOrDefault(x => x.Thought == true);
- hub.Push(new()
- {
- EventName = ChatEvent.OnReceiveLlmStreamMessage,
- RefId = conv.ConversationId,
- Data = new RoleDialogModel(AgentRole.Assistant, text)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId
- }
- });
- }
+ var part = textPart ?? functionPart ?? thoughtPart ?? parts?.FirstOrDefault();
- if (candidate!.FinishReason == FinishReason.STOP)
- {
- var thought = part?.FunctionCall != null
- ? new() { ToolCall = part.FunctionCall, ThoughtSignature = part.ThoughtSignature }
+ thoughtModel = functionPart?.FunctionCall != null
+ ? new() { ToolCall = functionPart.FunctionCall, ThoughtSignature = functionPart.ThoughtSignature }
: thoughtModel;
- var functionCall = thought?.ToolCall;
- if (functionCall != null)
+ // Collect thinking text separately
+ if (!string.IsNullOrEmpty(thoughtPart?.Text))
{
- responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ var text = thoughtPart.Text;
+ thinkingTextStream.Collect(text);
+ hub.Push(new()
{
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- ToolCallId = functionCall.Id,
- FunctionName = functionCall.Name,
- FunctionArgs = functionCall.Args?.ToJsonString(),
- MetaData = new Dictionary
+ EventName = ChatEvent.OnReceiveLlmStreamMessage,
+ RefId = conv.ConversationId,
+ Data = new RoleDialogModel(AgentRole.Assistant, string.Empty)
{
- [Constants.ThoughtSignature] = thought?.ThoughtSignature
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ MetaData = new()
+ {
+ [Constants.ThinkingText] = text
+ }
}
- };
+ });
+ }
-#if DEBUG
- _logger.LogDebug($"Tool Call (id: {functionCall.Id}) => {functionCall.Name}({functionCall.Args})");
-#endif
+ if (!string.IsNullOrEmpty(textPart?.Text))
+ {
+ var text = textPart.Text;
+ textStream.Collect(text);
+
+ hub.Push(new()
+ {
+ EventName = ChatEvent.OnReceiveLlmStreamMessage,
+ RefId = conv.ConversationId,
+ Data = new RoleDialogModel(AgentRole.Assistant, text)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId
+ }
+ });
}
- else
+
+ if (candidate!.FinishReason == FinishReason.STOP)
{
- var allText = textStream.GetText();
-#if DEBUG
- _logger.LogDebug($"Stream text Content: {allText}");
-#endif
+ var thought = functionPart?.FunctionCall != null
+ ? new() { ToolCall = functionPart.FunctionCall, ThoughtSignature = functionPart.ThoughtSignature }
+ : thoughtModel;
+ var functionCall = thought?.ToolCall;
+ var thoughtSignature = thoughtPart?.ThoughtSignature ?? part?.ThoughtSignature;
- responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ if (functionCall != null)
+ {
+ responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ ToolCallId = functionCall.Id,
+ FunctionName = functionCall.Name,
+ FunctionArgs = functionCall.Args?.ToJsonString(),
+ MetaData = new Dictionary
+ {
+ [Constants.ThoughtSignature] = thought?.ThoughtSignature
+ }
+ };
+
+ #if DEBUG
+ _logger.LogDebug($"Tool Call (id: {functionCall.Id}) => {functionCall.Name}({functionCall.Args})");
+ #endif
+ }
+ else
+ {
+ var allText = textStream.GetText();
+ #if DEBUG
+ _logger.LogDebug($"Stream text Content: {allText}");
+ #endif
+
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true,
+ MetaData = new Dictionary
+ {
+ [Constants.ThoughtSignature] = thoughtSignature
+ }
+ };
+ }
+
+ tokenUsage = response?.UsageMetadata;
+ }
+ else if (candidate.FinishReason.HasValue)
+ {
+ var text = candidate.FinishMessage ?? candidate.FinishReason.Value.ToString();
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, text)
{
CurrentAgentId = agent.Id,
MessageId = messageId,
@@ -318,26 +396,34 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
[Constants.ThoughtSignature] = part?.ThoughtSignature
}
};
- }
- tokenUsage = response?.UsageMetadata;
+ tokenUsage = response?.UsageMetadata;
+ }
}
- else if (candidate.FinishReason.HasValue)
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId);
+ }
+
+ // Build responseMessage from collected text when cancelled before FinishReason
+ if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content))
+ {
+ var allText = textStream.GetText();
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
{
- var text = candidate.FinishMessage ?? candidate.FinishReason.Value.ToString();
- responseMessage = new RoleDialogModel(AgentRole.Assistant, text)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- IsStreaming = true,
- MetaData = new Dictionary
- {
- [Constants.ThoughtSignature] = part?.ThoughtSignature
- }
- };
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
- tokenUsage = response?.UsageMetadata;
- }
+ // Set thinking text in metadata
+ var thinkingText = thinkingTextStream.GetText();
+ if (!string.IsNullOrEmpty(thinkingText))
+ {
+ responseMessage.MetaData ??= [];
+ responseMessage.MetaData[Constants.ThinkingText] = thinkingText;
}
hub.Push(new()
@@ -516,6 +602,8 @@ public void SetModelName(string model)
var maxTokens = int.TryParse(state.GetState("max_tokens"), out var tokens)
? tokens
: agent.LlmConfig?.MaxOutputTokens ?? LlmConstant.DEFAULT_MAX_OUTPUT_TOKEN;
+
+ var thinkingLevel = ParseThinking(settings?.Reasoning, agent);
var request = new GenerateContentRequest
{
SystemInstruction = !systemPrompts.IsNullOrEmpty() ? new Content(systemPrompts[0], AgentRole.System) : null,
@@ -524,7 +612,12 @@ public void SetModelName(string model)
GenerationConfig = new()
{
Temperature = temperature,
- MaxOutputTokens = maxTokens
+ MaxOutputTokens = maxTokens,
+ ThinkingConfig = thinkingLevel.HasValue ? new()
+ {
+ IncludeThoughts = true,
+ ThinkingLevel = thinkingLevel
+ } : null
}
};
@@ -596,4 +689,52 @@ private string GetPrompt(IEnumerable systemPrompts, IEnumerable
return prompt;
}
+
+ #region Thinking level
+ private ThinkingLevel? ParseThinking(ReasoningSetting? settings, Agent agent)
+ {
+ var level = _state.GetState("thyinking_level");
+
+ if (string.IsNullOrEmpty(level) && _model == agent?.LlmConfig?.Model)
+ {
+ level = agent?.LlmConfig?.ReasoningEffortLevel;
+ }
+
+ if (string.IsNullOrEmpty(level))
+ {
+ level = settings?.EffortLevel;
+ if (settings?.Parameters != null
+ && settings.Parameters.TryGetValue("EffortLevel", out var settingValue)
+ && !string.IsNullOrEmpty(settingValue?.Default))
+ {
+ level = settingValue.Default;
+ }
+ }
+
+ var thinkingLevel = ParseThinkingLevel(level);
+ return thinkingLevel;
+ }
+
+ private ThinkingLevel? ParseThinkingLevel(string? level)
+ {
+ if (string.IsNullOrWhiteSpace(level))
+ {
+ return null;
+ }
+
+ var parsedLevel = ThinkingLevel.LOW;
+ level = level.ToLower();
+ switch (level)
+ {
+ case "low":
+ parsedLevel = ThinkingLevel.LOW;
+ break;
+ case "high":
+ parsedLevel = ThinkingLevel.HIGH;
+ break;
+ }
+
+ return parsedLevel;
+ }
+ #endregion
}
diff --git a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs
index a24633939..8b4a02319 100644
--- a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs
@@ -206,22 +206,32 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
MessageId = messageId
};
- await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams))
- {
- Console.Write(response);
- textStream.Collect(response);
+ var streamingCancellation = _services.GetRequiredService();
+ var cancellationToken = streamingCancellation.GetToken(conv.ConversationId);
- var content = new RoleDialogModel(AgentRole.Assistant, response)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId
- };
- hub.Push(new()
+ try
+ {
+ await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams).WithCancellation(cancellationToken))
{
- EventName = ChatEvent.OnReceiveLlmStreamMessage,
- RefId = conv.ConversationId,
- Data = content
- });
+ Console.Write(response);
+ textStream.Collect(response);
+
+ var content = new RoleDialogModel(AgentRole.Assistant, response)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId
+ };
+ hub.Push(new()
+ {
+ EventName = ChatEvent.OnReceiveLlmStreamMessage,
+ RefId = conv.ConversationId,
+ Data = content
+ });
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId);
}
responseMessage = new RoleDialogModel(AgentRole.Assistant, textStream.GetText())
diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs
index d6fc20b9c..93e9201e2 100644
--- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs
@@ -1,6 +1,5 @@
#pragma warning disable OPENAI001
using BotSharp.Abstraction.MessageHub.Models;
-using BotSharp.Abstraction.Utilities;
using BotSharp.Core.Infrastructures.Streams;
using BotSharp.Core.MessageHub;
using OpenAI.Chat;
@@ -261,82 +260,104 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
MessageId = messageId
};
- await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options))
- {
- tokenUsage = choice.Usage;
+ var streamingCancellation = _services.GetRequiredService();
+ var cancellationToken = streamingCancellation.GetToken(conv.ConversationId);
- if (!choice.ToolCallUpdates.IsNullOrEmpty())
+ try
+ {
+ await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options, cancellationToken))
{
- toolCalls.AddRange(choice.ToolCallUpdates);
- }
+ tokenUsage = choice.Usage;
- if (!choice.ContentUpdate.IsNullOrEmpty())
- {
- var text = choice.ContentUpdate[0]?.Text ?? string.Empty;
- textStream.Collect(text);
+ if (!choice.ToolCallUpdates.IsNullOrEmpty())
+ {
+ toolCalls.AddRange(choice.ToolCallUpdates);
+ }
+
+ if (!choice.ContentUpdate.IsNullOrEmpty())
+ {
+ var text = choice.ContentUpdate[0]?.Text ?? string.Empty;
+ textStream.Collect(text);
#if DEBUG
- _logger.LogDebug($"Stream Content update: {text}");
+ _logger.LogDebug($"Stream Content update: {text}");
#endif
- hub.Push(new()
- {
- EventName = ChatEvent.OnReceiveLlmStreamMessage,
- RefId = conv.ConversationId,
- Data = new RoleDialogModel(AgentRole.Assistant, text)
+ hub.Push(new()
{
- CurrentAgentId = agent.Id,
- MessageId = messageId
- }
- });
- }
+ EventName = ChatEvent.OnReceiveLlmStreamMessage,
+ RefId = conv.ConversationId,
+ Data = new RoleDialogModel(AgentRole.Assistant, text)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId
+ }
+ });
+ }
- if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall)
- {
- var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName));
- var functionName = meta?.FunctionName;
- var toolCallId = meta?.ToolCallId;
- var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList();
- var functionArguments = string.Join(string.Empty, args);
+ if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall)
+ {
+ var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName));
+ var functionName = meta?.FunctionName;
+ var toolCallId = meta?.ToolCallId;
+ var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList();
+ var functionArguments = string.Join(string.Empty, args);
#if DEBUG
- _logger.LogDebug($"Tool Call (id: {toolCallId}) => {functionName}({functionArguments})");
+ _logger.LogDebug($"Tool Call (id: {toolCallId}) => {functionName}({functionArguments})");
#endif
- responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ ToolCallId = toolCallId,
+ FunctionName = functionName,
+ FunctionArgs = functionArguments
+ };
+ }
+ else if (choice.FinishReason == ChatFinishReason.Stop)
{
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- ToolCallId = toolCallId,
- FunctionName = functionName,
- FunctionArgs = functionArguments
- };
- }
- else if (choice.FinishReason == ChatFinishReason.Stop)
- {
- var allText = textStream.GetText();
+ var allText = textStream.GetText();
#if DEBUG
- _logger.LogDebug($"Stream text Content: {allText}");
+ _logger.LogDebug($"Stream text Content: {allText}");
#endif
- responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
+ else if (choice.FinishReason.HasValue)
{
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- IsStreaming = true
- };
+ var text = choice.FinishReason == ChatFinishReason.Length ? "Model reached the maximum number of tokens allowed."
+ : choice.FinishReason == ChatFinishReason.ContentFilter ? "Content is omitted due to content filter rule."
+ : choice.FinishReason.Value.ToString();
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, text)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
+ }
}
- else if (choice.FinishReason.HasValue)
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId);
+ }
+
+ // Build responseMessage from collected text when cancelled before FinishReason
+ if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content))
+ {
+ var allText = textStream.GetText();
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, allText)
{
- var text = choice.FinishReason == ChatFinishReason.Length ? "Model reached the maximum number of tokens allowed."
- : choice.FinishReason == ChatFinishReason.ContentFilter ? "Content is omitted due to content filter rule."
- : choice.FinishReason.Value.ToString();
- responseMessage = new RoleDialogModel(AgentRole.Assistant, text)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- IsStreaming = true
- };
- }
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ IsStreaming = true
+ };
}
hub.Push(new()
diff --git a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs
index 5b5c03bda..eb3a3ad32 100644
--- a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs
@@ -173,35 +173,45 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
using var textStream = new RealtimeTextStream();
- await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall))
+ var streamingCancellation = _services.GetRequiredService();
+ var cancellationToken = streamingCancellation.GetToken(conv.ConversationId);
+
+ try
{
- if (response.FunctionCall != null)
- {
- responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
- {
- CurrentAgentId = agent.Id,
- MessageId = messageId,
- ToolCallId = response.FunctionCall.Name,
- FunctionName = response.FunctionCall.Name,
- FunctionArgs = response.FunctionCall.Arguments
- };
- }
- else
+ await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall).WithCancellation(cancellationToken))
{
- textStream.Collect(response.Text);
- responseMessage = new RoleDialogModel(AgentRole.Assistant, response.Text)
+ if (response.FunctionCall != null)
{
- CurrentAgentId = agent.Id,
- MessageId = messageId
- };
-
- hub.Push(new()
+ responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId,
+ ToolCallId = response.FunctionCall.Name,
+ FunctionName = response.FunctionCall.Name,
+ FunctionArgs = response.FunctionCall.Arguments
+ };
+ }
+ else
{
- EventName = ChatEvent.OnReceiveLlmStreamMessage,
- RefId = conv.ConversationId,
- Data = responseMessage
- });
- }
+ textStream.Collect(response.Text);
+ responseMessage = new RoleDialogModel(AgentRole.Assistant, response.Text)
+ {
+ CurrentAgentId = agent.Id,
+ MessageId = messageId
+ };
+
+ hub.Push(new()
+ {
+ EventName = ChatEvent.OnReceiveLlmStreamMessage,
+ RefId = conv.ConversationId,
+ Data = responseMessage
+ });
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId);
}
if (responseMessage.Role == AgentRole.Assistant)
From e52da508918cfb2fa09573df8e038837c3c65573 Mon Sep 17 00:00:00 2001
From: Jicheng Lu <103353@smsassist.com>
Date: Thu, 9 Apr 2026 12:55:00 -0500
Subject: [PATCH 2/2] refine cancellation token
---
.../Providers/ChatCompletionProvider.cs | 2 +-
.../Providers/Chat/ChatCompletionProvider.cs | 2 +-
.../Providers/Chat/ChatCompletionProvider.cs | 2 +-
.../Providers/ChatCompletionProvider.cs | 2 +-
.../Providers/ChatCompletionProvider.cs | 2 +-
5 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs
index 7507f263c..8306c71be 100644
--- a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs
@@ -221,7 +221,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
try
{
- await foreach (var choice in client.Messages.StreamClaudeMessageAsync(parameters).WithCancellation(cancellationToken))
+ await foreach (var choice in client.Messages.StreamClaudeMessageAsync(parameters, cancellationToken))
{
var startMsg = choice.StreamStartMessage;
var contentBlock = choice.ContentBlock;
diff --git a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs
index 057be0812..d9638e720 100644
--- a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs
@@ -292,7 +292,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
try
{
- await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options).WithCancellation(cancellationToken))
+ await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options, cancellationToken))
{
tokenUsage = choice.Usage;
diff --git a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs
index 7b69be7bf..be2b11e51 100644
--- a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs
@@ -259,7 +259,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
try
{
- await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options).WithCancellation(cancellationToken))
+ await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options, cancellationToken))
{
tokenUsage = choice.Usage;
diff --git a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs
index 8b4a02319..4c5cfc824 100644
--- a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs
@@ -211,7 +211,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
try
{
- await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams).WithCancellation(cancellationToken))
+ await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams, cancellationToken))
{
Console.Write(response);
textStream.Collect(response);
diff --git a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs
index eb3a3ad32..9f1e1d4d5 100644
--- a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs
+++ b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs
@@ -178,7 +178,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent,
try
{
- await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall).WithCancellation(cancellationToken))
+ await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall, cancellationToken: cancellationToken))
{
if (response.FunctionCall != null)
{