From 2bd71bd2ac59f17498fceff799dc2575f00ea16b Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Tue, 7 Apr 2026 14:19:43 -0500 Subject: [PATCH 1/2] add stop conv streaming and thinking text --- Directory.Packages.props | 6 +- .../Conversations/Dtos/ChatResponseDto.cs | 4 + .../IConversationCancellationService.cs | 33 ++ .../Conversations/ConversationPlugin.cs | 3 + .../ConversationCancellationService.cs | 63 ++++ .../Conversation/ConversationController.cs | 103 +++--- .../Conversations/Request/NewMessageModel.cs | 9 + .../ConverstionCancellationResponse.cs | 5 + .../Providers/ChatCompletionProvider.cs | 120 ++++--- .../Providers/Chat/ChatCompletionProvider.cs | 120 ++++--- .../Hooks/ChatHubConversationHook.cs | 1 + .../Observers/ChatHubObserver.cs | 1 + .../Providers/Chat/ChatCompletionProvider.cs | 120 ++++--- .../Constants/Constants.cs | 1 + .../Providers/Chat/ChatCompletionProvider.cs | 301 +++++++++++++----- .../Providers/ChatCompletionProvider.cs | 38 ++- .../Providers/Chat/ChatCompletionProvider.cs | 137 ++++---- .../Providers/ChatCompletionProvider.cs | 60 ++-- 18 files changed, 762 insertions(+), 363 deletions(-) create mode 100644 src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs create mode 100644 src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs create mode 100644 src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 041659c97..9d7ae8a8e 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -6,8 +6,8 @@ - - + + @@ -33,7 +33,7 @@ - + diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs index 43077ba6a..277fa3fb9 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs @@ -46,6 +46,10 @@ public class ChatResponseDto : InstructResult [JsonPropertyName("is_streaming")] public bool IsStreaming { get; set; } + [JsonPropertyName("meta_data")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public Dictionary? MetaData { get; set; } + [JsonPropertyName("created_at")] public DateTime CreatedAt { get; set; } = DateTime.UtcNow; } diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs new file mode 100644 index 000000000..6456a065f --- /dev/null +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs @@ -0,0 +1,33 @@ +using System.Threading; + +namespace BotSharp.Abstraction.Conversations; + +/// +/// Service to manage cancellation tokens for streaming chat completions. +/// Allows stopping an active streaming response by conversation ID. +/// +public interface IConversationCancellationService +{ + /// + /// Register a new cancellation token source for the given conversation. + /// Returns the CancellationToken to be used in streaming loops. + /// + CancellationToken RegisterConversation(string conversationId); + + /// + /// Cancel an active streaming operation for the given conversation. + /// + /// True if the conversation was found and cancelled, false otherwise. + bool CancelStreaming(string conversationId); + + /// + /// Remove the cancellation token source for the given conversation. + /// Should be called when streaming completes (either normally or via cancellation). + /// + void UnregisterConversation(string conversationId); + + /// + /// Get the cancellation token for the given conversation if one is registered. + /// + CancellationToken GetToken(string conversationId); +} diff --git a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs index 6de7da9d6..c891679c4 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs @@ -48,6 +48,9 @@ public void RegisterDI(IServiceCollection services, IConfiguration config) return settingService.Bind("GoogleApi"); }); + // Streaming cancellation + services.AddSingleton(); + // Observer and observable services.AddSingleton>>(); services.AddScoped>>(); diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs new file mode 100644 index 000000000..962d53006 --- /dev/null +++ b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs @@ -0,0 +1,63 @@ +using System.Collections.Concurrent; + +namespace BotSharp.Core.Conversations.Services; + +public class ConversationCancellationService : IConversationCancellationService +{ + private readonly ConcurrentDictionary _cancellationTokenSources = new(); + private readonly ILogger _logger; + + public ConversationCancellationService( + ILogger logger) + { + _logger = logger; + } + + public CancellationToken RegisterConversation(string conversationId) + { + // Cancel any existing streaming for this conversation + if (_cancellationTokenSources.TryRemove(conversationId, out var existingCts)) + { + existingCts.Cancel(); + existingCts.Dispose(); + _logger.LogWarning("Cancelled existing streaming session for conversation {ConversationId}", conversationId); + } + + var cts = new CancellationTokenSource(); + _cancellationTokenSources[conversationId] = cts; + _logger.LogInformation("Registered streaming cancellation for conversation {ConversationId}", conversationId); + return cts.Token; + } + + public bool CancelStreaming(string conversationId) + { + if (_cancellationTokenSources.TryGetValue(conversationId, out var cts)) + { + cts.Cancel(); + _logger.LogInformation("Streaming cancelled for conversation {ConversationId}", conversationId); + return true; + } + + _logger.LogWarning("No active streaming found for conversation {ConversationId}", conversationId); + return false; + } + + public void UnregisterConversation(string conversationId) + { + if (_cancellationTokenSources.TryRemove(conversationId, out var cts)) + { + cts.Dispose(); + _logger.LogDebug("Unregistered streaming cancellation for conversation {ConversationId}", conversationId); + } + } + + public CancellationToken GetToken(string conversationId) + { + if (_cancellationTokenSources.TryGetValue(conversationId, out var cts)) + { + return cts.Token; + } + + return CancellationToken.None; + } +} diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs index 5315e9d15..30961ef84 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs @@ -131,6 +131,7 @@ public async Task> GetDialogs( Data = message.Data, Sender = UserDto.FromUser(user), Payload = message.Payload, + MetaData = message.MetaData, HasMessageFiles = files.Any(x => x.MessageId.IsEqualTo(message.MessageId) && x.FileSource == FileSource.User) }); } @@ -146,6 +147,7 @@ public async Task> GetDialogs( Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, Function = message.FunctionName, Data = message.Data, + MetaData = message.MetaData, Sender = new() { FirstName = agent?.Name ?? "Unkown", @@ -397,18 +399,36 @@ public async Task SendMessage( await conv.SetConversationId(conversationId, input.States); SetStates(conv, input); + IConversationCancellationService? convCancellation = null; + if (input.IsStreamingMessage) + { + convCancellation = _services.GetRequiredService(); + convCancellation.RegisterConversation(conversationId); + } + var response = new ChatResponseModel(); - await conv.SendMessage(agentId, inputMsg, - replyMessage: input.Postback, - async msg => - { - response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content; - response.Function = msg.FunctionName; - response.MessageLabel = msg.MessageLabel; - response.RichContent = msg.SecondaryRichContent ?? msg.RichContent; - response.Instruction = msg.Instruction; - response.Data = msg.Data; - }); + try + { + await conv.SendMessage(agentId, inputMsg, + replyMessage: input.Postback, + async msg => + { + response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content; + response.Function = msg.FunctionName; + response.MessageLabel = msg.MessageLabel; + response.RichContent = msg.SecondaryRichContent ?? msg.RichContent; + response.Instruction = msg.Instruction; + response.Data = msg.Data; + }); + } + catch (OperationCanceledException) when (input.IsStreamingMessage) + { + response.Text = string.Empty; + } + finally + { + convCancellation?.UnregisterConversation(conversationId); + } var state = _services.GetRequiredService(); response.States = state.GetStates(); @@ -455,20 +475,20 @@ public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string Response.Headers.Append(Microsoft.Net.Http.Headers.HeaderNames.Connection, "keep-alive"); await conv.SendMessage(agentId, inputMsg, - replyMessage: input.Postback, - // responsed generated - async msg => - { - response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content; - response.MessageLabel = msg.MessageLabel; - response.Function = msg.FunctionName; - response.RichContent = msg.SecondaryRichContent ?? msg.RichContent; - response.Instruction = msg.Instruction; - response.Data = msg.Data; - response.States = state.GetStates(); - - await OnChunkReceived(Response, response); - }); + replyMessage: input.Postback, + // responsed generated + async msg => + { + response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content; + response.MessageLabel = msg.MessageLabel; + response.Function = msg.FunctionName; + response.RichContent = msg.SecondaryRichContent ?? msg.RichContent; + response.Instruction = msg.Instruction; + response.Data = msg.Data; + response.States = state.GetStates(); + + await OnChunkReceived(Response, response); + }); response.States = state.GetStates(); response.MessageId = inputMsg.MessageId; @@ -477,18 +497,13 @@ await conv.SendMessage(agentId, inputMsg, // await OnEventCompleted(Response); } - private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg) + [HttpPost("/conversation/{conversationId}/stop-streaming")] + public ConverstionCancellationResponse StopStreaming([FromRoute] string conversationId) { - var indicator = new ChatResponseModel - { - ConversationId = conversationId, - MessageId = msg.MessageId, - Text = msg.Indication, - Function = "indicating", - Instruction = msg.Instruction, - States = new Dictionary() - }; - await OnChunkReceived(Response, indicator); + var streamingCancellation = _services.GetRequiredService(); + var cancelled = streamingCancellation.CancelStreaming(conversationId); + + return new ConverstionCancellationResponse { Success = cancelled }; } #endregion @@ -515,6 +530,8 @@ private void SetStates(IConversationService conv, NewMessageModel input) { conv.States.SetState("sampling_factor", input.SamplingFactor, source: StateSource.External); } + + conv.States.SetState("use_stream_message", input.IsStreamingMessage, source: StateSource.Application); } private FileContentResult BuildFileResult(string file) @@ -567,5 +584,19 @@ private JsonSerializerOptions InitJsonOptions(BotSharpOptions options) return jsonOption; } + + private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg) + { + var indicator = new ChatResponseModel + { + ConversationId = conversationId, + MessageId = msg.MessageId, + Text = msg.Indication, + Function = "indicating", + Instruction = msg.Instruction, + States = [] + }; + await OnChunkReceived(Response, indicator); + } #endregion } \ No newline at end of file diff --git a/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs b/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs index 939fa601a..6169f1371 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs @@ -1,6 +1,15 @@ +using System.Text.Json.Serialization; + namespace BotSharp.OpenAPI.ViewModels.Conversations; public class NewMessageModel : IncomingMessageModel { public override string Channel { get; set; } = ConversationChannel.OpenAPI; + + /// + /// Indicates whether this message uses streaming completion. + /// When true, the streaming can be stopped via the stop endpoint. + /// + [JsonPropertyName("is_streaming_msg")] + public bool IsStreamingMessage { get; set; } } diff --git a/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs b/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs new file mode 100644 index 000000000..5e3f659ad --- /dev/null +++ b/src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs @@ -0,0 +1,5 @@ +namespace BotSharp.OpenAPI.ViewModels.Conversations; + +public class ConverstionCancellationResponse : ResponseBase +{ +} diff --git a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs index 774ac70e8..7507f263c 100644 --- a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs @@ -216,73 +216,95 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, MessageId = messageId }; - await foreach (var choice in client.Messages.StreamClaudeMessageAsync(parameters)) + var streamingCancellation = _services.GetRequiredService(); + var cancellationToken = streamingCancellation.GetToken(conv.ConversationId); + + try { - var startMsg = choice.StreamStartMessage; - var contentBlock = choice.ContentBlock; - var delta = choice.Delta; + await foreach (var choice in client.Messages.StreamClaudeMessageAsync(parameters).WithCancellation(cancellationToken)) + { + var startMsg = choice.StreamStartMessage; + var contentBlock = choice.ContentBlock; + var delta = choice.Delta; - tokenUsage = delta?.Usage ?? startMsg?.Usage ?? choice.Usage; + tokenUsage = delta?.Usage ?? startMsg?.Usage ?? choice.Usage; - if (delta != null) - { - if (delta.StopReason == StopReason.ToolUse) + if (delta != null) { - var toolCall = choice.ToolCalls.FirstOrDefault(); - responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + if (delta.StopReason == StopReason.ToolUse) { - CurrentAgentId = agent.Id, - MessageId = messageId, - ToolCallId = toolCall?.Id, - FunctionName = toolCall?.Name, - FunctionArgs = toolCall?.Arguments?.ToString()?.IfNullOrEmptyAs("{}") ?? "{}" - }; + var toolCall = choice.ToolCalls.FirstOrDefault(); + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = toolCall?.Id, + FunctionName = toolCall?.Name, + FunctionArgs = toolCall?.Arguments?.ToString()?.IfNullOrEmptyAs("{}") ?? "{}" + }; #if DEBUG - _logger.LogDebug($"Tool Call (id: {toolCall?.Id}) => {toolCall?.Name}({toolCall?.Arguments})"); + _logger.LogDebug($"Tool Call (id: {toolCall?.Id}) => {toolCall?.Name}({toolCall?.Arguments})"); #endif - } - else if (delta.StopReason == StopReason.EndTurn) - { - var allText = textStream.GetText(); - responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + } + else if (delta.StopReason == StopReason.EndTurn) { - CurrentAgentId = agent.Id, - MessageId = messageId, - IsStreaming = true - }; + var allText = textStream.GetText(); + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; #if DEBUG - _logger.LogDebug($"Stream text Content: {allText}"); + _logger.LogDebug($"Stream text Content: {allText}"); #endif - } - else if (!string.IsNullOrEmpty(delta.StopReason)) - { - responseMessage = new RoleDialogModel(AgentRole.Assistant, delta.StopReason) - { - CurrentAgentId = agent.Id, - MessageId = messageId, - IsStreaming = true - }; - } - else - { - var deltaText = delta.Text ?? string.Empty; - textStream.Collect(deltaText); - - hub.Push(new() + } + else if (!string.IsNullOrEmpty(delta.StopReason)) { - EventName = ChatEvent.OnReceiveLlmStreamMessage, - RefId = conv.ConversationId, - Data = new RoleDialogModel(AgentRole.Assistant, deltaText) + responseMessage = new RoleDialogModel(AgentRole.Assistant, delta.StopReason) { CurrentAgentId = agent.Id, - MessageId = messageId - } - }); + MessageId = messageId, + IsStreaming = true + }; + } + else + { + var deltaText = delta.Text ?? string.Empty; + textStream.Collect(deltaText); + + hub.Push(new() + { + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, + Data = new RoleDialogModel(AgentRole.Assistant, deltaText) + { + CurrentAgentId = agent.Id, + MessageId = messageId + } + }); + } } } } + catch (OperationCanceledException) + { + _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId); + } + + // Build responseMessage from collected text when cancelled before FinishReason + if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content)) + { + var allText = textStream.GetText(); + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } hub.Push(new() { diff --git a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs index 5c94a00c9..057be0812 100644 --- a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -287,71 +287,93 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, MessageId = messageId }; - await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options)) - { - tokenUsage = choice.Usage; + var streamingCancellation = _services.GetRequiredService(); + var cancellationToken = streamingCancellation.GetToken(conv.ConversationId); - if (!choice.ToolCallUpdates.IsNullOrEmpty()) + try + { + await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options).WithCancellation(cancellationToken)) { - toolCalls.AddRange(choice.ToolCallUpdates); - } + tokenUsage = choice.Usage; - if (!choice.ContentUpdate.IsNullOrEmpty()) - { - var text = choice.ContentUpdate[0]?.Text ?? string.Empty; - textStream.Collect(text); + if (!choice.ToolCallUpdates.IsNullOrEmpty()) + { + toolCalls.AddRange(choice.ToolCallUpdates); + } + + if (!choice.ContentUpdate.IsNullOrEmpty()) + { + var text = choice.ContentUpdate[0]?.Text ?? string.Empty; + textStream.Collect(text); #if DEBUG - _logger.LogCritical($"Content update: {text}"); + _logger.LogCritical($"Content update: {text}"); #endif - var content = new RoleDialogModel(AgentRole.Assistant, text) - { - CurrentAgentId = agent.Id, - MessageId = messageId - }; - hub.Push(new() - { - EventName = ChatEvent.OnReceiveLlmStreamMessage, - RefId = conv.ConversationId, - Data = content - }); - } + var content = new RoleDialogModel(AgentRole.Assistant, text) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + hub.Push(new() + { + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, + Data = content + }); + } - if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall) - { - var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); - var functionName = meta?.FunctionName; - var toolCallId = meta?.ToolCallId; - var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); - var functionArgument = string.Join(string.Empty, args); + if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall) + { + var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); + var functionName = meta?.FunctionName; + var toolCallId = meta?.ToolCallId; + var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); + var functionArgument = string.Join(string.Empty, args); #if DEBUG - _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); + _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); #endif - responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = toolCallId, + FunctionName = functionName, + FunctionArgs = functionArgument + }; + } + else if (choice.FinishReason.HasValue) { - CurrentAgentId = agent.Id, - MessageId = messageId, - ToolCallId = toolCallId, - FunctionName = functionName, - FunctionArgs = functionArgument - }; - } - else if (choice.FinishReason.HasValue) - { - var allText = textStream.GetText(); - _logger.LogCritical($"Text Content: {allText}"); + var allText = textStream.GetText(); + _logger.LogCritical($"Text Content: {allText}"); - responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) - { - CurrentAgentId = agent.Id, - MessageId = messageId, - IsStreaming = true - }; + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } } } + catch (OperationCanceledException) + { + _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId); + } + + // Build responseMessage from collected text when cancelled before FinishReason + if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content)) + { + var allText = textStream.GetText(); + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } hub.Push(new() { diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs index f2c1b3b67..bd57f5f9f 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Hooks/ChatHubConversationHook.cs @@ -112,6 +112,7 @@ public override async Task OnResponseGenerated(RoleDialogModel message) Function = message.FunctionName, RichContent = message.SecondaryRichContent ?? message.RichContent, Data = message.Data, + MetaData = message.MetaData, States = state.GetStates(), IsStreaming = message.IsStreaming, Sender = sender diff --git a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs index 25862ff0d..6bd9209a3 100644 --- a/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs +++ b/src/Plugins/BotSharp.Plugin.ChatHub/Observers/ChatHubObserver.cs @@ -80,6 +80,7 @@ public override void OnNext(HubObserveData value) Function = message.FunctionName, RichContent = message.SecondaryRichContent ?? message.RichContent, Data = message.Data, + MetaData = message.MetaData, Sender = new() { FirstName = "AI", diff --git a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs index 058aeec8a..7b69be7bf 100644 --- a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs @@ -254,71 +254,93 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, MessageId = messageId }; - await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options)) - { - tokenUsage = choice.Usage; + var streamingCancellation = _services.GetRequiredService(); + var cancellationToken = streamingCancellation.GetToken(conv.ConversationId); - if (!choice.ToolCallUpdates.IsNullOrEmpty()) + try + { + await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options).WithCancellation(cancellationToken)) { - toolCalls.AddRange(choice.ToolCallUpdates); - } + tokenUsage = choice.Usage; - if (!choice.ContentUpdate.IsNullOrEmpty()) - { - var text = choice.ContentUpdate[0]?.Text ?? string.Empty; - textStream.Collect(text); + if (!choice.ToolCallUpdates.IsNullOrEmpty()) + { + toolCalls.AddRange(choice.ToolCallUpdates); + } + + if (!choice.ContentUpdate.IsNullOrEmpty()) + { + var text = choice.ContentUpdate[0]?.Text ?? string.Empty; + textStream.Collect(text); #if DEBUG - _logger.LogCritical($"Content update: {text}"); + _logger.LogCritical($"Content update: {text}"); #endif - var content = new RoleDialogModel(AgentRole.Assistant, text) - { - CurrentAgentId = agent.Id, - MessageId = messageId - }; - hub.Push(new() - { - EventName = ChatEvent.OnReceiveLlmStreamMessage, - RefId = conv.ConversationId, - Data = content - }); - } + var content = new RoleDialogModel(AgentRole.Assistant, text) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + hub.Push(new() + { + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, + Data = content + }); + } - if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall) - { - var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); - var functionName = meta?.FunctionName; - var toolCallId = meta?.ToolCallId; - var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); - var functionArgument = string.Join(string.Empty, args); + if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall) + { + var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); + var functionName = meta?.FunctionName; + var toolCallId = meta?.ToolCallId; + var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); + var functionArgument = string.Join(string.Empty, args); #if DEBUG - _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); + _logger.LogCritical($"Tool Call (id: {toolCallId}) => {functionName}({functionArgument})"); #endif - responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = toolCallId, + FunctionName = functionName, + FunctionArgs = functionArgument + }; + } + else if (choice.FinishReason.HasValue) { - CurrentAgentId = agent.Id, - MessageId = messageId, - ToolCallId = toolCallId, - FunctionName = functionName, - FunctionArgs = functionArgument - }; - } - else if (choice.FinishReason.HasValue) - { - var allText = textStream.GetText(); - _logger.LogCritical($"Text Content: {allText}"); + var allText = textStream.GetText(); + _logger.LogCritical($"Text Content: {allText}"); - responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) - { - CurrentAgentId = agent.Id, - MessageId = messageId, - IsStreaming = true - }; + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } } } + catch (OperationCanceledException) + { + _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId); + } + + // Build responseMessage from collected text when cancelled before FinishReason + if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content)) + { + var allText = textStream.GetText(); + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } hub.Push(new() { diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Constants/Constants.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Constants/Constants.cs index e0aa12b6c..c1300c0a5 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Constants/Constants.cs +++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Constants/Constants.cs @@ -3,4 +3,5 @@ namespace BotSharp.Plugin.GoogleAI.Constants; internal static class Constants { internal const string ThoughtSignature = "thought_signature"; + internal const string ThinkingText = "thinking_text"; } diff --git a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/ChatCompletionProvider.cs index 460a79a17..c7df1f6bb 100644 --- a/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.GoogleAI/Providers/Chat/ChatCompletionProvider.cs @@ -3,6 +3,7 @@ using BotSharp.Abstraction.Files.Utilities; using BotSharp.Abstraction.Hooks; using BotSharp.Abstraction.MessageHub.Models; +using BotSharp.Abstraction.MLTasks.Settings; using BotSharp.Core.Infrastructures.Streams; using BotSharp.Core.MessageHub; using GenerativeAI; @@ -16,6 +17,7 @@ public class ChatCompletionProvider : IChatCompletion { private readonly IServiceProvider _services; private readonly ILogger _logger; + private readonly IConversationStateService _state; private List renderedInstructions = []; private string _model; @@ -28,11 +30,13 @@ public class ChatCompletionProvider : IChatCompletion public ChatCompletionProvider( IServiceProvider services, GoogleAiSettings googleSettings, - ILogger logger) + ILogger logger, + IConversationStateService state) { _settings = googleSettings; _services = services; _logger = logger; + _state = state; } public async Task GetChatCompletions(Agent agent, List conversations) @@ -51,13 +55,19 @@ public async Task GetChatCompletions(Agent agent, List x.Thought != true && x.FunctionCall == null); + var functionPart = parts?.FirstOrDefault(x => x.FunctionCall != null); + var thoughtPart = parts?.FirstOrDefault(x => x.Thought == true); + + var part = textPart ?? functionPart ?? thoughtPart ?? parts?.FirstOrDefault(); + var text = textPart?.Text ?? part?.Text ?? string.Empty; + var thoughtSignature = thoughtPart?.ThoughtSignature ?? part?.ThoughtSignature; RoleDialogModel responseMessage; - if (response.GetFunction() != null) + if (functionPart?.FunctionCall != null) { - var toolCall = response.GetFunction(); + var toolCall = functionPart.FunctionCall; responseMessage = new RoleDialogModel(AgentRole.Function, text) { CurrentAgentId = agent.Id, @@ -67,7 +77,7 @@ public async Task GetChatCompletions(Agent agent, List { - [Constants.ThoughtSignature] = part?.ThoughtSignature + [Constants.ThoughtSignature] = thoughtSignature }, RenderedInstruction = string.Join("\r\n", renderedInstructions) }; @@ -91,12 +101,18 @@ public async Task GetChatCompletions(Agent agent, List { - [Constants.ThoughtSignature] = part?.ThoughtSignature + [Constants.ThoughtSignature] = thoughtSignature }, RenderedInstruction = string.Join("\r\n", renderedInstructions) }; } + if (responseMessage != null && thoughtPart != null) + { + responseMessage.MetaData ??= []; + responseMessage.MetaData[Constants.ThinkingText] = thoughtPart.Text; + } + // After chat completion hook foreach (var hook in contentHooks) { @@ -130,19 +146,30 @@ public async Task GetChatCompletionsAsync(Agent agent, List x.Thought != true && x.FunctionCall == null); + var functionPart = parts?.FirstOrDefault(x => x.FunctionCall != null); + var thoughtPart = parts?.FirstOrDefault(x => x.Thought == true); + + var part = textPart ?? functionPart ?? thoughtPart ?? parts?.FirstOrDefault(); + var text = textPart?.Text ?? part?.Text ?? string.Empty; + var thoughtSignature = thoughtPart?.ThoughtSignature ?? part?.ThoughtSignature; var msg = new RoleDialogModel(AgentRole.Assistant, text) { CurrentAgentId = agent.Id, MetaData = new Dictionary { - [Constants.ThoughtSignature] = part?.ThoughtSignature + [Constants.ThoughtSignature] = thoughtSignature }, RenderedInstruction = string.Join("\r\n", renderedInstructions) }; + if (thoughtPart != null) + { + msg.MetaData[Constants.ThinkingText] = thoughtPart.Text; + } + // After chat completion hook foreach (var hook in hooks) { @@ -156,9 +183,9 @@ public async Task GetChatCompletionsAsync(Agent agent, List GetChatCompletionsAsync(Agent agent, List { - [Constants.ThoughtSignature] = part?.ThoughtSignature + [Constants.ThoughtSignature] = thoughtSignature }, RenderedInstruction = string.Join("\r\n", renderedInstructions) }; @@ -192,7 +219,7 @@ public async Task GetChatCompletionsAsync(Agent agent, List { - [Constants.ThoughtSignature] = part?.ThoughtSignature + [Constants.ThoughtSignature] = thoughtSignature }, RenderedInstruction = string.Join("\r\n", renderedInstructions) }; @@ -236,6 +263,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, }); using var textStream = new RealtimeTextStream(); + using var thinkingTextStream = new RealtimeTextStream(); ChatThoughtModel? thoughtModel = null; UsageMetadata? tokenUsage = null; @@ -245,70 +273,120 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, MessageId = messageId }; - await foreach (var response in chatClient.StreamContentAsync(request)) + var streamingCancellation = _services.GetRequiredService(); + var cancellationToken = streamingCancellation.GetToken(conv.ConversationId); + + try { - var candidate = response?.Candidates?.FirstOrDefault(); - if (candidate == null) + await foreach (var response in chatClient.StreamContentAsync(request, cancellationToken)) { - continue; - } - - var part = candidate?.Content?.Parts?.FirstOrDefault(); - thoughtModel = part?.FunctionCall != null - ? new() { ToolCall = part.FunctionCall, ThoughtSignature = part.ThoughtSignature } - : thoughtModel; + var candidate = response?.Candidates?.FirstOrDefault(); + if (candidate == null) + { + continue; + } - if (!string.IsNullOrEmpty(part?.Text)) - { - var text = part.Text; - textStream.Collect(text); + var parts = candidate?.Content?.Parts; + var textPart = parts?.FirstOrDefault(x => x.Thought != true && x.FunctionCall == null); + var functionPart = parts?.FirstOrDefault(x => x.FunctionCall != null); + var thoughtPart = parts?.FirstOrDefault(x => x.Thought == true); - hub.Push(new() - { - EventName = ChatEvent.OnReceiveLlmStreamMessage, - RefId = conv.ConversationId, - Data = new RoleDialogModel(AgentRole.Assistant, text) - { - CurrentAgentId = agent.Id, - MessageId = messageId - } - }); - } + var part = textPart ?? functionPart ?? thoughtPart ?? parts?.FirstOrDefault(); - if (candidate!.FinishReason == FinishReason.STOP) - { - var thought = part?.FunctionCall != null - ? new() { ToolCall = part.FunctionCall, ThoughtSignature = part.ThoughtSignature } + thoughtModel = functionPart?.FunctionCall != null + ? new() { ToolCall = functionPart.FunctionCall, ThoughtSignature = functionPart.ThoughtSignature } : thoughtModel; - var functionCall = thought?.ToolCall; - if (functionCall != null) + // Collect thinking text separately + if (!string.IsNullOrEmpty(thoughtPart?.Text)) { - responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + var text = thoughtPart.Text; + thinkingTextStream.Collect(text); + hub.Push(new() { - CurrentAgentId = agent.Id, - MessageId = messageId, - ToolCallId = functionCall.Id, - FunctionName = functionCall.Name, - FunctionArgs = functionCall.Args?.ToJsonString(), - MetaData = new Dictionary + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, + Data = new RoleDialogModel(AgentRole.Assistant, string.Empty) { - [Constants.ThoughtSignature] = thought?.ThoughtSignature + CurrentAgentId = agent.Id, + MessageId = messageId, + MetaData = new() + { + [Constants.ThinkingText] = text + } } - }; + }); + } -#if DEBUG - _logger.LogDebug($"Tool Call (id: {functionCall.Id}) => {functionCall.Name}({functionCall.Args})"); -#endif + if (!string.IsNullOrEmpty(textPart?.Text)) + { + var text = textPart.Text; + textStream.Collect(text); + + hub.Push(new() + { + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, + Data = new RoleDialogModel(AgentRole.Assistant, text) + { + CurrentAgentId = agent.Id, + MessageId = messageId + } + }); } - else + + if (candidate!.FinishReason == FinishReason.STOP) { - var allText = textStream.GetText(); -#if DEBUG - _logger.LogDebug($"Stream text Content: {allText}"); -#endif + var thought = functionPart?.FunctionCall != null + ? new() { ToolCall = functionPart.FunctionCall, ThoughtSignature = functionPart.ThoughtSignature } + : thoughtModel; + var functionCall = thought?.ToolCall; + var thoughtSignature = thoughtPart?.ThoughtSignature ?? part?.ThoughtSignature; - responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + if (functionCall != null) + { + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = functionCall.Id, + FunctionName = functionCall.Name, + FunctionArgs = functionCall.Args?.ToJsonString(), + MetaData = new Dictionary + { + [Constants.ThoughtSignature] = thought?.ThoughtSignature + } + }; + + #if DEBUG + _logger.LogDebug($"Tool Call (id: {functionCall.Id}) => {functionCall.Name}({functionCall.Args})"); + #endif + } + else + { + var allText = textStream.GetText(); + #if DEBUG + _logger.LogDebug($"Stream text Content: {allText}"); + #endif + + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true, + MetaData = new Dictionary + { + [Constants.ThoughtSignature] = thoughtSignature + } + }; + } + + tokenUsage = response?.UsageMetadata; + } + else if (candidate.FinishReason.HasValue) + { + var text = candidate.FinishMessage ?? candidate.FinishReason.Value.ToString(); + responseMessage = new RoleDialogModel(AgentRole.Assistant, text) { CurrentAgentId = agent.Id, MessageId = messageId, @@ -318,26 +396,34 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, [Constants.ThoughtSignature] = part?.ThoughtSignature } }; - } - tokenUsage = response?.UsageMetadata; + tokenUsage = response?.UsageMetadata; + } } - else if (candidate.FinishReason.HasValue) + } + catch (OperationCanceledException) + { + _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId); + } + + // Build responseMessage from collected text when cancelled before FinishReason + if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content)) + { + var allText = textStream.GetText(); + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) { - var text = candidate.FinishMessage ?? candidate.FinishReason.Value.ToString(); - responseMessage = new RoleDialogModel(AgentRole.Assistant, text) - { - CurrentAgentId = agent.Id, - MessageId = messageId, - IsStreaming = true, - MetaData = new Dictionary - { - [Constants.ThoughtSignature] = part?.ThoughtSignature - } - }; + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } - tokenUsage = response?.UsageMetadata; - } + // Set thinking text in metadata + var thinkingText = thinkingTextStream.GetText(); + if (!string.IsNullOrEmpty(thinkingText)) + { + responseMessage.MetaData ??= []; + responseMessage.MetaData[Constants.ThinkingText] = thinkingText; } hub.Push(new() @@ -516,6 +602,8 @@ public void SetModelName(string model) var maxTokens = int.TryParse(state.GetState("max_tokens"), out var tokens) ? tokens : agent.LlmConfig?.MaxOutputTokens ?? LlmConstant.DEFAULT_MAX_OUTPUT_TOKEN; + + var thinkingLevel = ParseThinking(settings?.Reasoning, agent); var request = new GenerateContentRequest { SystemInstruction = !systemPrompts.IsNullOrEmpty() ? new Content(systemPrompts[0], AgentRole.System) : null, @@ -524,7 +612,12 @@ public void SetModelName(string model) GenerationConfig = new() { Temperature = temperature, - MaxOutputTokens = maxTokens + MaxOutputTokens = maxTokens, + ThinkingConfig = thinkingLevel.HasValue ? new() + { + IncludeThoughts = true, + ThinkingLevel = thinkingLevel + } : null } }; @@ -596,4 +689,52 @@ private string GetPrompt(IEnumerable systemPrompts, IEnumerable return prompt; } + + #region Thinking level + private ThinkingLevel? ParseThinking(ReasoningSetting? settings, Agent agent) + { + var level = _state.GetState("thyinking_level"); + + if (string.IsNullOrEmpty(level) && _model == agent?.LlmConfig?.Model) + { + level = agent?.LlmConfig?.ReasoningEffortLevel; + } + + if (string.IsNullOrEmpty(level)) + { + level = settings?.EffortLevel; + if (settings?.Parameters != null + && settings.Parameters.TryGetValue("EffortLevel", out var settingValue) + && !string.IsNullOrEmpty(settingValue?.Default)) + { + level = settingValue.Default; + } + } + + var thinkingLevel = ParseThinkingLevel(level); + return thinkingLevel; + } + + private ThinkingLevel? ParseThinkingLevel(string? level) + { + if (string.IsNullOrWhiteSpace(level)) + { + return null; + } + + var parsedLevel = ThinkingLevel.LOW; + level = level.ToLower(); + switch (level) + { + case "low": + parsedLevel = ThinkingLevel.LOW; + break; + case "high": + parsedLevel = ThinkingLevel.HIGH; + break; + } + + return parsedLevel; + } + #endregion } diff --git a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs index a24633939..8b4a02319 100644 --- a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs @@ -206,22 +206,32 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, MessageId = messageId }; - await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams)) - { - Console.Write(response); - textStream.Collect(response); + var streamingCancellation = _services.GetRequiredService(); + var cancellationToken = streamingCancellation.GetToken(conv.ConversationId); - var content = new RoleDialogModel(AgentRole.Assistant, response) - { - CurrentAgentId = agent.Id, - MessageId = messageId - }; - hub.Push(new() + try + { + await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams).WithCancellation(cancellationToken)) { - EventName = ChatEvent.OnReceiveLlmStreamMessage, - RefId = conv.ConversationId, - Data = content - }); + Console.Write(response); + textStream.Collect(response); + + var content = new RoleDialogModel(AgentRole.Assistant, response) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + hub.Push(new() + { + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, + Data = content + }); + } + } + catch (OperationCanceledException) + { + _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId); } responseMessage = new RoleDialogModel(AgentRole.Assistant, textStream.GetText()) diff --git a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs index d6fc20b9c..93e9201e2 100644 --- a/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.OpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -1,6 +1,5 @@ #pragma warning disable OPENAI001 using BotSharp.Abstraction.MessageHub.Models; -using BotSharp.Abstraction.Utilities; using BotSharp.Core.Infrastructures.Streams; using BotSharp.Core.MessageHub; using OpenAI.Chat; @@ -261,82 +260,104 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, MessageId = messageId }; - await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options)) - { - tokenUsage = choice.Usage; + var streamingCancellation = _services.GetRequiredService(); + var cancellationToken = streamingCancellation.GetToken(conv.ConversationId); - if (!choice.ToolCallUpdates.IsNullOrEmpty()) + try + { + await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options, cancellationToken)) { - toolCalls.AddRange(choice.ToolCallUpdates); - } + tokenUsage = choice.Usage; - if (!choice.ContentUpdate.IsNullOrEmpty()) - { - var text = choice.ContentUpdate[0]?.Text ?? string.Empty; - textStream.Collect(text); + if (!choice.ToolCallUpdates.IsNullOrEmpty()) + { + toolCalls.AddRange(choice.ToolCallUpdates); + } + + if (!choice.ContentUpdate.IsNullOrEmpty()) + { + var text = choice.ContentUpdate[0]?.Text ?? string.Empty; + textStream.Collect(text); #if DEBUG - _logger.LogDebug($"Stream Content update: {text}"); + _logger.LogDebug($"Stream Content update: {text}"); #endif - hub.Push(new() - { - EventName = ChatEvent.OnReceiveLlmStreamMessage, - RefId = conv.ConversationId, - Data = new RoleDialogModel(AgentRole.Assistant, text) + hub.Push(new() { - CurrentAgentId = agent.Id, - MessageId = messageId - } - }); - } + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, + Data = new RoleDialogModel(AgentRole.Assistant, text) + { + CurrentAgentId = agent.Id, + MessageId = messageId + } + }); + } - if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall) - { - var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); - var functionName = meta?.FunctionName; - var toolCallId = meta?.ToolCallId; - var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); - var functionArguments = string.Join(string.Empty, args); + if (choice.FinishReason == ChatFinishReason.ToolCalls || choice.FinishReason == ChatFinishReason.FunctionCall) + { + var meta = toolCalls.FirstOrDefault(x => !string.IsNullOrEmpty(x.FunctionName)); + var functionName = meta?.FunctionName; + var toolCallId = meta?.ToolCallId; + var args = toolCalls.Where(x => x.FunctionArgumentsUpdate != null).Select(x => x.FunctionArgumentsUpdate.ToString()).ToList(); + var functionArguments = string.Join(string.Empty, args); #if DEBUG - _logger.LogDebug($"Tool Call (id: {toolCallId}) => {functionName}({functionArguments})"); + _logger.LogDebug($"Tool Call (id: {toolCallId}) => {functionName}({functionArguments})"); #endif - responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = toolCallId, + FunctionName = functionName, + FunctionArgs = functionArguments + }; + } + else if (choice.FinishReason == ChatFinishReason.Stop) { - CurrentAgentId = agent.Id, - MessageId = messageId, - ToolCallId = toolCallId, - FunctionName = functionName, - FunctionArgs = functionArguments - }; - } - else if (choice.FinishReason == ChatFinishReason.Stop) - { - var allText = textStream.GetText(); + var allText = textStream.GetText(); #if DEBUG - _logger.LogDebug($"Stream text Content: {allText}"); + _logger.LogDebug($"Stream text Content: {allText}"); #endif - responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } + else if (choice.FinishReason.HasValue) { - CurrentAgentId = agent.Id, - MessageId = messageId, - IsStreaming = true - }; + var text = choice.FinishReason == ChatFinishReason.Length ? "Model reached the maximum number of tokens allowed." + : choice.FinishReason == ChatFinishReason.ContentFilter ? "Content is omitted due to content filter rule." + : choice.FinishReason.Value.ToString(); + responseMessage = new RoleDialogModel(AgentRole.Assistant, text) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; + } } - else if (choice.FinishReason.HasValue) + } + catch (OperationCanceledException) + { + _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId); + } + + // Build responseMessage from collected text when cancelled before FinishReason + if (cancellationToken.IsCancellationRequested && string.IsNullOrEmpty(responseMessage.Content)) + { + var allText = textStream.GetText(); + responseMessage = new RoleDialogModel(AgentRole.Assistant, allText) { - var text = choice.FinishReason == ChatFinishReason.Length ? "Model reached the maximum number of tokens allowed." - : choice.FinishReason == ChatFinishReason.ContentFilter ? "Content is omitted due to content filter rule." - : choice.FinishReason.Value.ToString(); - responseMessage = new RoleDialogModel(AgentRole.Assistant, text) - { - CurrentAgentId = agent.Id, - MessageId = messageId, - IsStreaming = true - }; - } + CurrentAgentId = agent.Id, + MessageId = messageId, + IsStreaming = true + }; } hub.Push(new() diff --git a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs index 5b5c03bda..eb3a3ad32 100644 --- a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs @@ -173,35 +173,45 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, using var textStream = new RealtimeTextStream(); - await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall)) + var streamingCancellation = _services.GetRequiredService(); + var cancellationToken = streamingCancellation.GetToken(conv.ConversationId); + + try { - if (response.FunctionCall != null) - { - responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) - { - CurrentAgentId = agent.Id, - MessageId = messageId, - ToolCallId = response.FunctionCall.Name, - FunctionName = response.FunctionCall.Name, - FunctionArgs = response.FunctionCall.Arguments - }; - } - else + await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall).WithCancellation(cancellationToken)) { - textStream.Collect(response.Text); - responseMessage = new RoleDialogModel(AgentRole.Assistant, response.Text) + if (response.FunctionCall != null) { - CurrentAgentId = agent.Id, - MessageId = messageId - }; - - hub.Push(new() + responseMessage = new RoleDialogModel(AgentRole.Function, string.Empty) + { + CurrentAgentId = agent.Id, + MessageId = messageId, + ToolCallId = response.FunctionCall.Name, + FunctionName = response.FunctionCall.Name, + FunctionArgs = response.FunctionCall.Arguments + }; + } + else { - EventName = ChatEvent.OnReceiveLlmStreamMessage, - RefId = conv.ConversationId, - Data = responseMessage - }); - } + textStream.Collect(response.Text); + responseMessage = new RoleDialogModel(AgentRole.Assistant, response.Text) + { + CurrentAgentId = agent.Id, + MessageId = messageId + }; + + hub.Push(new() + { + EventName = ChatEvent.OnReceiveLlmStreamMessage, + RefId = conv.ConversationId, + Data = responseMessage + }); + } + } + } + catch (OperationCanceledException) + { + _logger.LogWarning("Streaming was cancelled for conversation {ConversationId}", conv.ConversationId); } if (responseMessage.Role == AgentRole.Assistant) From e52da508918cfb2fa09573df8e038837c3c65573 Mon Sep 17 00:00:00 2001 From: Jicheng Lu <103353@smsassist.com> Date: Thu, 9 Apr 2026 12:55:00 -0500 Subject: [PATCH 2/2] refine cancellation token --- .../Providers/ChatCompletionProvider.cs | 2 +- .../Providers/Chat/ChatCompletionProvider.cs | 2 +- .../Providers/Chat/ChatCompletionProvider.cs | 2 +- .../Providers/ChatCompletionProvider.cs | 2 +- .../Providers/ChatCompletionProvider.cs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs index 7507f263c..8306c71be 100644 --- a/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.AnthropicAI/Providers/ChatCompletionProvider.cs @@ -221,7 +221,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, try { - await foreach (var choice in client.Messages.StreamClaudeMessageAsync(parameters).WithCancellation(cancellationToken)) + await foreach (var choice in client.Messages.StreamClaudeMessageAsync(parameters, cancellationToken)) { var startMsg = choice.StreamStartMessage; var contentBlock = choice.ContentBlock; diff --git a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs index 057be0812..d9638e720 100644 --- a/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.AzureOpenAI/Providers/Chat/ChatCompletionProvider.cs @@ -292,7 +292,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, try { - await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options).WithCancellation(cancellationToken)) + await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options, cancellationToken)) { tokenUsage = choice.Usage; diff --git a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs index 7b69be7bf..be2b11e51 100644 --- a/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.DeepSeekAI/Providers/Chat/ChatCompletionProvider.cs @@ -259,7 +259,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, try { - await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options).WithCancellation(cancellationToken)) + await foreach (var choice in chatClient.CompleteChatStreamingAsync(messages, options, cancellationToken)) { tokenUsage = choice.Usage; diff --git a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs index 8b4a02319..4c5cfc824 100644 --- a/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.LLamaSharp/Providers/ChatCompletionProvider.cs @@ -211,7 +211,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, try { - await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams).WithCancellation(cancellationToken)) + await foreach (var response in executor.InferAsync(agent.Instruction, inferenceParams, cancellationToken)) { Console.Write(response); textStream.Collect(response); diff --git a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs index eb3a3ad32..9f1e1d4d5 100644 --- a/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs +++ b/src/Plugins/BotSharp.Plugin.SparkDesk/Providers/ChatCompletionProvider.cs @@ -178,7 +178,7 @@ public async Task GetChatCompletionsStreamingAsync(Agent agent, try { - await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall).WithCancellation(cancellationToken)) + await foreach (StreamedChatResponse response in client.ChatAsStreamAsync(modelVersion: _settings.ModelVersion, messages, functions: funcall.Length == 0 ? null : funcall, cancellationToken: cancellationToken)) { if (response.FunctionCall != null) {