Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
<PackageVersion Include="A2A" Version="0.3.3-preview" />
<PackageVersion Include="CsvHelper" Version="33.1.0" />
<PackageVersion Include="FuzzySharp" Version="2.0.2" />
<PackageVersion Include="Google_GenerativeAI" Version="3.6.3" />
<PackageVersion Include="Google_GenerativeAI.Live" Version="3.6.3" />
<PackageVersion Include="Google_GenerativeAI" Version="3.6.4" />
<PackageVersion Include="Google_GenerativeAI.Live" Version="3.6.4" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="Polly" Version="8.4.2" />
<PackageVersion Include="RabbitMQ.Client" Version="7.2.0" />
Expand All @@ -33,7 +33,7 @@
<PackageVersion Include="Whisper.net.Runtime" Version="1.8.1" />
<PackageVersion Include="NCrontab" Version="3.3.3" />
<PackageVersion Include="Azure.AI.OpenAI" Version="2.7.0-beta.1" />
<PackageVersion Include="OpenAI" Version="2.9.1" />
<PackageVersion Include="OpenAI" Version="2.10.0" />
<PackageVersion Include="MailKit" Version="4.14.1" />
<PackageVersion Include="Microsoft.Data.Sqlite" Version="10.0.0" />
<PackageVersion Include="MySql.Data" Version="9.5.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class ChatResponseDto : InstructResult
[JsonPropertyName("is_streaming")]
public bool IsStreaming { get; set; }

[JsonPropertyName("meta_data")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public Dictionary<string, string?>? MetaData { get; set; }

[JsonPropertyName("created_at")]
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.Threading;

namespace BotSharp.Abstraction.Conversations;

/// <summary>
/// Service to manage cancellation tokens for streaming chat completions.
/// Allows stopping an active streaming response by conversation ID.
/// </summary>
public interface IConversationCancellationService
{
/// <summary>
/// Register a new cancellation token source for the given conversation.
/// Returns the CancellationToken to be used in streaming loops.
/// </summary>
CancellationToken RegisterConversation(string conversationId);

/// <summary>
/// Cancel an active streaming operation for the given conversation.
/// </summary>
/// <returns>True if the conversation was found and cancelled, false otherwise.</returns>
bool CancelStreaming(string conversationId);

/// <summary>
/// Remove the cancellation token source for the given conversation.
/// Should be called when streaming completes (either normally or via cancellation).
/// </summary>
void UnregisterConversation(string conversationId);

/// <summary>
/// Get the cancellation token for the given conversation if one is registered.
/// </summary>
CancellationToken GetToken(string conversationId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public void RegisterDI(IServiceCollection services, IConfiguration config)
return settingService.Bind<GoogleApiSettings>("GoogleApi");
});

// Streaming cancellation
services.AddSingleton<IConversationCancellationService, ConversationCancellationService>();

// Observer and observable
services.AddSingleton<MessageHub<HubObserveData<RoleDialogModel>>>();
services.AddScoped<ObserverSubscriptionContainer<HubObserveData<RoleDialogModel>>>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System.Collections.Concurrent;

namespace BotSharp.Core.Conversations.Services;

public class ConversationCancellationService : IConversationCancellationService
{
private readonly ConcurrentDictionary<string, CancellationTokenSource> _cancellationTokenSources = new();
private readonly ILogger _logger;

public ConversationCancellationService(
ILogger<ConversationCancellationService> logger)
{
_logger = logger;
}

public CancellationToken RegisterConversation(string conversationId)
{
// Cancel any existing streaming for this conversation
if (_cancellationTokenSources.TryRemove(conversationId, out var existingCts))
{
existingCts.Cancel();
existingCts.Dispose();
_logger.LogWarning("Cancelled existing streaming session for conversation {ConversationId}", conversationId);
}

var cts = new CancellationTokenSource();
_cancellationTokenSources[conversationId] = cts;
_logger.LogInformation("Registered streaming cancellation for conversation {ConversationId}", conversationId);
return cts.Token;
}

public bool CancelStreaming(string conversationId)
{
if (_cancellationTokenSources.TryGetValue(conversationId, out var cts))
{
cts.Cancel();
_logger.LogInformation("Streaming cancelled for conversation {ConversationId}", conversationId);
return true;
}

_logger.LogWarning("No active streaming found for conversation {ConversationId}", conversationId);
return false;
}

public void UnregisterConversation(string conversationId)
{
if (_cancellationTokenSources.TryRemove(conversationId, out var cts))
{
cts.Dispose();
_logger.LogDebug("Unregistered streaming cancellation for conversation {ConversationId}", conversationId);
}
}

public CancellationToken GetToken(string conversationId)
{
if (_cancellationTokenSources.TryGetValue(conversationId, out var cts))
{
return cts.Token;
}

return CancellationToken.None;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public async Task<IEnumerable<ChatResponseModel>> GetDialogs(
Data = message.Data,
Sender = UserDto.FromUser(user),
Payload = message.Payload,
MetaData = message.MetaData,
HasMessageFiles = files.Any(x => x.MessageId.IsEqualTo(message.MessageId) && x.FileSource == FileSource.User)
});
}
Expand All @@ -146,6 +147,7 @@ public async Task<IEnumerable<ChatResponseModel>> GetDialogs(
Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content,
Function = message.FunctionName,
Data = message.Data,
MetaData = message.MetaData,
Sender = new()
{
FirstName = agent?.Name ?? "Unkown",
Expand Down Expand Up @@ -397,18 +399,36 @@ public async Task<ChatResponseModel> SendMessage(
await conv.SetConversationId(conversationId, input.States);
SetStates(conv, input);

IConversationCancellationService? convCancellation = null;
if (input.IsStreamingMessage)
{
convCancellation = _services.GetRequiredService<IConversationCancellationService>();
convCancellation.RegisterConversation(conversationId);
}

var response = new ChatResponseModel();
await conv.SendMessage(agentId, inputMsg,
replyMessage: input.Postback,
async msg =>
{
response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
response.Function = msg.FunctionName;
response.MessageLabel = msg.MessageLabel;
response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
response.Instruction = msg.Instruction;
response.Data = msg.Data;
});
try
{
await conv.SendMessage(agentId, inputMsg,
replyMessage: input.Postback,
async msg =>
{
response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
response.Function = msg.FunctionName;
response.MessageLabel = msg.MessageLabel;
response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
response.Instruction = msg.Instruction;
response.Data = msg.Data;
});
Comment on lines +414 to +422
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

4. Metadata dropped in responses 🐞 Bug ≡ Correctness

ConversationController.SendMessage and SendMessageSse never copy RoleDialogModel.MetaData to
ChatResponseModel, so newly added meta_data/thinking_text does not reach OpenAPI clients.
Agent Prompt
### Issue description
`ChatResponseDto.MetaData` is now part of the API contract, but OpenAPI endpoints don’t populate it from `RoleDialogModel.MetaData`, dropping `thinking_text` and other metadata.

### Issue Context
Providers populate `RoleDialogModel.MetaData` (e.g., GoogleAI). `GetDialogs` already maps `MetaData`, but `SendMessage`/`SendMessageSse` do not.

### Fix Focus Areas
- src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs[379-438]
- src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs[442-491]

### Implementation notes
- In both callbacks (`SendMessage` and `SendMessageSse`), add `response.MetaData = msg.MetaData;` (and, if relevant, `response.IsStreaming = msg.IsStreaming;`).
- If you emit tool-call indication chunks via `OnReceiveToolCallIndication`, consider whether they should also include any relevant metadata.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

}
catch (OperationCanceledException) when (input.IsStreamingMessage)
{
response.Text = string.Empty;
}
finally
{
convCancellation?.UnregisterConversation(conversationId);
}

var state = _services.GetRequiredService<IConversationStateService>();
response.States = state.GetStates();
Expand Down Expand Up @@ -455,20 +475,20 @@ public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string
Response.Headers.Append(Microsoft.Net.Http.Headers.HeaderNames.Connection, "keep-alive");

await conv.SendMessage(agentId, inputMsg,
replyMessage: input.Postback,
// responsed generated
async msg =>
{
response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
response.MessageLabel = msg.MessageLabel;
response.Function = msg.FunctionName;
response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
response.Instruction = msg.Instruction;
response.Data = msg.Data;
response.States = state.GetStates();
await OnChunkReceived(Response, response);
});
replyMessage: input.Postback,
// responsed generated
async msg =>
{
response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
response.MessageLabel = msg.MessageLabel;
response.Function = msg.FunctionName;
response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
response.Instruction = msg.Instruction;
response.Data = msg.Data;
response.States = state.GetStates();

await OnChunkReceived(Response, response);
});
Comment on lines 475 to +491
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

3. Sse stop-streaming broken 🐞 Bug ≡ Correctness

SendMessageSse never registers a cancellation token source for the conversation, so StopStreaming
cannot cancel SSE-initiated streaming and providers will run with CancellationToken.None.
Agent Prompt
### Issue description
`SendMessageSse` does not call `IConversationCancellationService.RegisterConversation(...)`, so `/conversation/{conversationId}/stop-streaming` cannot cancel SSE-started streams.

### Issue Context
Providers use `IConversationCancellationService.GetToken(conv.ConversationId)` and pass it into streaming enumerators; without registration this returns `CancellationToken.None`, and `CancelStreaming` will not find a CTS to cancel.

### Fix Focus Areas
- src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs[442-498]
- src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs[500-507]

### Implementation notes
- In `SendMessageSse`, add the same `RegisterConversation(conversationId)` + `try/finally { UnregisterConversation(conversationId); }` pattern used in `SendMessage` (likely gated by `input.IsStreamingMessage`, or always for SSE if SSE always implies streaming).
- Consider catching `OperationCanceledException` to close the SSE stream cleanly (optionally emit a final event / [DONE]).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


response.States = state.GetStates();
response.MessageId = inputMsg.MessageId;
Expand All @@ -477,18 +497,13 @@ await conv.SendMessage(agentId, inputMsg,
// await OnEventCompleted(Response);
}

private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg)
[HttpPost("/conversation/{conversationId}/stop-streaming")]
public ConverstionCancellationResponse StopStreaming([FromRoute] string conversationId)
{
var indicator = new ChatResponseModel
{
ConversationId = conversationId,
MessageId = msg.MessageId,
Text = msg.Indication,
Function = "indicating",
Instruction = msg.Instruction,
States = new Dictionary<string, string>()
};
await OnChunkReceived(Response, indicator);
var streamingCancellation = _services.GetRequiredService<IConversationCancellationService>();
var cancelled = streamingCancellation.CancelStreaming(conversationId);

return new ConverstionCancellationResponse { Success = cancelled };
}
#endregion

Expand All @@ -515,6 +530,8 @@ private void SetStates(IConversationService conv, NewMessageModel input)
{
conv.States.SetState("sampling_factor", input.SamplingFactor, source: StateSource.External);
}

conv.States.SetState("use_stream_message", input.IsStreamingMessage, source: StateSource.Application);
}

private FileContentResult BuildFileResult(string file)
Expand Down Expand Up @@ -567,5 +584,19 @@ private JsonSerializerOptions InitJsonOptions(BotSharpOptions options)

return jsonOption;
}

private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg)
{
var indicator = new ChatResponseModel
{
ConversationId = conversationId,
MessageId = msg.MessageId,
Text = msg.Indication,
Function = "indicating",
Instruction = msg.Instruction,
States = []
};
await OnChunkReceived(Response, indicator);
}
#endregion
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
using System.Text.Json.Serialization;

namespace BotSharp.OpenAPI.ViewModels.Conversations;

public class NewMessageModel : IncomingMessageModel
{
public override string Channel { get; set; } = ConversationChannel.OpenAPI;

/// <summary>
/// Indicates whether this message uses streaming completion.
/// When true, the streaming can be stopped via the stop endpoint.
/// </summary>
[JsonPropertyName("is_streaming_msg")]
public bool IsStreamingMessage { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace BotSharp.OpenAPI.ViewModels.Conversations;

public class ConverstionCancellationResponse : ResponseBase
{
}
Loading
Loading