-
-
Notifications
You must be signed in to change notification settings - Fork 623
add stop conv streaming and thinking text #1319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
iceljc
wants to merge
3
commits into
SciSharp:master
Choose a base branch
from
iceljc:features/refine-streaming-message
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationCancellationService.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| using System.Threading; | ||
|
|
||
| namespace BotSharp.Abstraction.Conversations; | ||
|
|
||
| /// <summary> | ||
| /// Service to manage cancellation tokens for streaming chat completions. | ||
| /// Allows stopping an active streaming response by conversation ID. | ||
| /// </summary> | ||
| public interface IConversationCancellationService | ||
| { | ||
| /// <summary> | ||
| /// Register a new cancellation token source for the given conversation. | ||
| /// Returns the CancellationToken to be used in streaming loops. | ||
| /// </summary> | ||
| CancellationToken RegisterConversation(string conversationId); | ||
|
|
||
| /// <summary> | ||
| /// Cancel an active streaming operation for the given conversation. | ||
| /// </summary> | ||
| /// <returns>True if the conversation was found and cancelled, false otherwise.</returns> | ||
| bool CancelStreaming(string conversationId); | ||
|
|
||
| /// <summary> | ||
| /// Remove the cancellation token source for the given conversation. | ||
| /// Should be called when streaming completes (either normally or via cancellation). | ||
| /// </summary> | ||
| void UnregisterConversation(string conversationId); | ||
|
|
||
| /// <summary> | ||
| /// Get the cancellation token for the given conversation if one is registered. | ||
| /// </summary> | ||
| CancellationToken GetToken(string conversationId); | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationCancellationService.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| using System.Collections.Concurrent; | ||
|
|
||
| namespace BotSharp.Core.Conversations.Services; | ||
|
|
||
| public class ConversationCancellationService : IConversationCancellationService | ||
| { | ||
| private readonly ConcurrentDictionary<string, CancellationTokenSource> _cancellationTokenSources = new(); | ||
| private readonly ILogger _logger; | ||
|
|
||
| public ConversationCancellationService( | ||
| ILogger<ConversationCancellationService> logger) | ||
| { | ||
| _logger = logger; | ||
| } | ||
|
|
||
| public CancellationToken RegisterConversation(string conversationId) | ||
| { | ||
| // Cancel any existing streaming for this conversation | ||
| if (_cancellationTokenSources.TryRemove(conversationId, out var existingCts)) | ||
| { | ||
| existingCts.Cancel(); | ||
| existingCts.Dispose(); | ||
| _logger.LogWarning("Cancelled existing streaming session for conversation {ConversationId}", conversationId); | ||
| } | ||
|
|
||
| var cts = new CancellationTokenSource(); | ||
| _cancellationTokenSources[conversationId] = cts; | ||
| _logger.LogInformation("Registered streaming cancellation for conversation {ConversationId}", conversationId); | ||
| return cts.Token; | ||
| } | ||
|
|
||
| public bool CancelStreaming(string conversationId) | ||
| { | ||
| if (_cancellationTokenSources.TryGetValue(conversationId, out var cts)) | ||
| { | ||
| cts.Cancel(); | ||
| _logger.LogInformation("Streaming cancelled for conversation {ConversationId}", conversationId); | ||
| return true; | ||
| } | ||
|
|
||
| _logger.LogWarning("No active streaming found for conversation {ConversationId}", conversationId); | ||
| return false; | ||
| } | ||
|
|
||
| public void UnregisterConversation(string conversationId) | ||
| { | ||
| if (_cancellationTokenSources.TryRemove(conversationId, out var cts)) | ||
| { | ||
| cts.Dispose(); | ||
| _logger.LogDebug("Unregistered streaming cancellation for conversation {ConversationId}", conversationId); | ||
| } | ||
| } | ||
|
|
||
| public CancellationToken GetToken(string conversationId) | ||
| { | ||
| if (_cancellationTokenSources.TryGetValue(conversationId, out var cts)) | ||
| { | ||
| return cts.Token; | ||
| } | ||
|
|
||
| return CancellationToken.None; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,6 +131,7 @@ public async Task<IEnumerable<ChatResponseModel>> GetDialogs( | |
| Data = message.Data, | ||
| Sender = UserDto.FromUser(user), | ||
| Payload = message.Payload, | ||
| MetaData = message.MetaData, | ||
| HasMessageFiles = files.Any(x => x.MessageId.IsEqualTo(message.MessageId) && x.FileSource == FileSource.User) | ||
| }); | ||
| } | ||
|
|
@@ -146,6 +147,7 @@ public async Task<IEnumerable<ChatResponseModel>> GetDialogs( | |
| Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content, | ||
| Function = message.FunctionName, | ||
| Data = message.Data, | ||
| MetaData = message.MetaData, | ||
| Sender = new() | ||
| { | ||
| FirstName = agent?.Name ?? "Unkown", | ||
|
|
@@ -397,18 +399,36 @@ public async Task<ChatResponseModel> SendMessage( | |
| await conv.SetConversationId(conversationId, input.States); | ||
| SetStates(conv, input); | ||
|
|
||
| IConversationCancellationService? convCancellation = null; | ||
| if (input.IsStreamingMessage) | ||
| { | ||
| convCancellation = _services.GetRequiredService<IConversationCancellationService>(); | ||
| convCancellation.RegisterConversation(conversationId); | ||
| } | ||
|
|
||
| var response = new ChatResponseModel(); | ||
| await conv.SendMessage(agentId, inputMsg, | ||
| replyMessage: input.Postback, | ||
| async msg => | ||
| { | ||
| response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content; | ||
| response.Function = msg.FunctionName; | ||
| response.MessageLabel = msg.MessageLabel; | ||
| response.RichContent = msg.SecondaryRichContent ?? msg.RichContent; | ||
| response.Instruction = msg.Instruction; | ||
| response.Data = msg.Data; | ||
| }); | ||
| try | ||
| { | ||
| await conv.SendMessage(agentId, inputMsg, | ||
| replyMessage: input.Postback, | ||
| async msg => | ||
| { | ||
| response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content; | ||
| response.Function = msg.FunctionName; | ||
| response.MessageLabel = msg.MessageLabel; | ||
| response.RichContent = msg.SecondaryRichContent ?? msg.RichContent; | ||
| response.Instruction = msg.Instruction; | ||
| response.Data = msg.Data; | ||
| }); | ||
| } | ||
| catch (OperationCanceledException) when (input.IsStreamingMessage) | ||
| { | ||
| response.Text = string.Empty; | ||
| } | ||
| finally | ||
| { | ||
| convCancellation?.UnregisterConversation(conversationId); | ||
| } | ||
|
|
||
| var state = _services.GetRequiredService<IConversationStateService>(); | ||
| response.States = state.GetStates(); | ||
|
|
@@ -455,20 +475,20 @@ public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string | |
| Response.Headers.Append(Microsoft.Net.Http.Headers.HeaderNames.Connection, "keep-alive"); | ||
|
|
||
| await conv.SendMessage(agentId, inputMsg, | ||
| replyMessage: input.Postback, | ||
| // responsed generated | ||
| async msg => | ||
| { | ||
| response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content; | ||
| response.MessageLabel = msg.MessageLabel; | ||
| response.Function = msg.FunctionName; | ||
| response.RichContent = msg.SecondaryRichContent ?? msg.RichContent; | ||
| response.Instruction = msg.Instruction; | ||
| response.Data = msg.Data; | ||
| response.States = state.GetStates(); | ||
| await OnChunkReceived(Response, response); | ||
| }); | ||
| replyMessage: input.Postback, | ||
| // responsed generated | ||
| async msg => | ||
| { | ||
| response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content; | ||
| response.MessageLabel = msg.MessageLabel; | ||
| response.Function = msg.FunctionName; | ||
| response.RichContent = msg.SecondaryRichContent ?? msg.RichContent; | ||
| response.Instruction = msg.Instruction; | ||
| response.Data = msg.Data; | ||
| response.States = state.GetStates(); | ||
|
|
||
| await OnChunkReceived(Response, response); | ||
| }); | ||
|
Comment on lines
475
to
+491
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 3. Sse stop-streaming broken SendMessageSse never registers a cancellation token source for the conversation, so StopStreaming cannot cancel SSE-initiated streaming and providers will run with CancellationToken.None. Agent Prompt
|
||
|
|
||
| response.States = state.GetStates(); | ||
| response.MessageId = inputMsg.MessageId; | ||
|
|
@@ -477,18 +497,13 @@ await conv.SendMessage(agentId, inputMsg, | |
| // await OnEventCompleted(Response); | ||
| } | ||
|
|
||
| private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg) | ||
| [HttpPost("/conversation/{conversationId}/stop-streaming")] | ||
| public ConverstionCancellationResponse StopStreaming([FromRoute] string conversationId) | ||
| { | ||
| var indicator = new ChatResponseModel | ||
| { | ||
| ConversationId = conversationId, | ||
| MessageId = msg.MessageId, | ||
| Text = msg.Indication, | ||
| Function = "indicating", | ||
| Instruction = msg.Instruction, | ||
| States = new Dictionary<string, string>() | ||
| }; | ||
| await OnChunkReceived(Response, indicator); | ||
| var streamingCancellation = _services.GetRequiredService<IConversationCancellationService>(); | ||
| var cancelled = streamingCancellation.CancelStreaming(conversationId); | ||
|
|
||
| return new ConverstionCancellationResponse { Success = cancelled }; | ||
| } | ||
| #endregion | ||
|
|
||
|
|
@@ -515,6 +530,8 @@ private void SetStates(IConversationService conv, NewMessageModel input) | |
| { | ||
| conv.States.SetState("sampling_factor", input.SamplingFactor, source: StateSource.External); | ||
| } | ||
|
|
||
| conv.States.SetState("use_stream_message", input.IsStreamingMessage, source: StateSource.Application); | ||
| } | ||
|
|
||
| private FileContentResult BuildFileResult(string file) | ||
|
|
@@ -567,5 +584,19 @@ private JsonSerializerOptions InitJsonOptions(BotSharpOptions options) | |
|
|
||
| return jsonOption; | ||
| } | ||
|
|
||
| private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg) | ||
| { | ||
| var indicator = new ChatResponseModel | ||
| { | ||
| ConversationId = conversationId, | ||
| MessageId = msg.MessageId, | ||
| Text = msg.Indication, | ||
| Function = "indicating", | ||
| Instruction = msg.Instruction, | ||
| States = [] | ||
| }; | ||
| await OnChunkReceived(Response, indicator); | ||
| } | ||
| #endregion | ||
| } | ||
9 changes: 9 additions & 0 deletions
9
src/Infrastructure/BotSharp.OpenAPI/ViewModels/Conversations/Request/NewMessageModel.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,15 @@ | ||
| using System.Text.Json.Serialization; | ||
|
|
||
| namespace BotSharp.OpenAPI.ViewModels.Conversations; | ||
|
|
||
| public class NewMessageModel : IncomingMessageModel | ||
| { | ||
| public override string Channel { get; set; } = ConversationChannel.OpenAPI; | ||
|
|
||
| /// <summary> | ||
| /// Indicates whether this message uses streaming completion. | ||
| /// When true, the streaming can be stopped via the stop endpoint. | ||
| /// </summary> | ||
| [JsonPropertyName("is_streaming_msg")] | ||
| public bool IsStreamingMessage { get; set; } | ||
| } |
5 changes: 5 additions & 0 deletions
5
...ure/BotSharp.OpenAPI/ViewModels/Conversations/Response/ConverstionCancellationResponse.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| namespace BotSharp.OpenAPI.ViewModels.Conversations; | ||
|
|
||
| public class ConverstionCancellationResponse : ResponseBase | ||
| { | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4. Metadata dropped in responses
🐞 Bug≡ CorrectnessAgent Prompt
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools