Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions BotSharp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Test.BrowserUse",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.LLM.Tests", "tests\BotSharp.LLM.Tests\BotSharp.LLM.Tests.csproj", "{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BotSharp.Test.RealtimeVoice", "tests\BotSharp.Test.RealtimeVoice\BotSharp.Test.RealtimeVoice.csproj", "{B067B126-88CD-4282-BEEF-7369B64423EF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -599,6 +601,14 @@ Global
{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|Any CPU.Build.0 = Release|Any CPU
{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|x64.ActiveCfg = Release|Any CPU
{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E}.Release|x64.Build.0 = Release|Any CPU
{B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|x64.ActiveCfg = Debug|Any CPU
{B067B126-88CD-4282-BEEF-7369B64423EF}.Debug|x64.Build.0 = Debug|Any CPU
{B067B126-88CD-4282-BEEF-7369B64423EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B067B126-88CD-4282-BEEF-7369B64423EF}.Release|Any CPU.Build.0 = Release|Any CPU
{B067B126-88CD-4282-BEEF-7369B64423EF}.Release|x64.ActiveCfg = Release|Any CPU
{B067B126-88CD-4282-BEEF-7369B64423EF}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -668,6 +678,7 @@ Global
{970BE341-9AC8-99A5-6572-E703C1E02FCB} = {E29DC6C4-5E57-48C5-BCB0-6B8F84782749}
{7D0DB012-9798-4BB9-B15B-A5B0B7B3B094} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC}
{7C0C7D13-D161-4AB0-9C29-83A0F1FF990E} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC}
{B067B126-88CD-4282-BEEF-7369B64423EF} = {32FAFFFE-A4CB-4FEE-BF7C-84518BBC6DCC}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A9969D89-C98B-40A5-A12B-FC87E55B3A19}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace BotSharp.Abstraction.Agents.Settings;

public class AgentSettings
{
public string DataDir { get; set; } = string.Empty;
public string DataDir { get; set; } = "agents";
public string TemplateFormat { get; set; } = "liquid";
public string HostAgentId { get; set; } = string.Empty;
public bool EnableTranslator { get; set; } = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace BotSharp.Abstraction.Conversations.Settings;

public class ConversationSetting
{
public string DataDir { get; set; }
public string DataDir { get; set; } = "conversations";
public string ChatCompletion { get; set; }
public bool EnableKnowledgeBase { get; set; }
public bool ShowVerboseLog { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Task Connect(RealtimeHubConnection conn,
Action<RoleDialogModel> onInputAudioTranscriptionCompleted,
Action onUserInterrupted);
Task AppenAudioBuffer(string message);
Task AppenAudioBuffer(ArraySegment<byte> data, int length);

Task SendEventToModel(object message);
Task Disconnect();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace BotSharp.Abstraction.Realtime.Enums;

public enum StreamChannelStatus
{
Open = 1,
Closed = 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ public interface IRealtimeHub
IRealTimeCompletion Completer { get; }
IRealTimeCompletion SetCompleter(string provider);

Task Listen(WebSocket userWebSocket, Action<string> onUserMessageReceived);
Task ConnectToModel(Func<string, Task> responseToUser);
}
13 changes: 13 additions & 0 deletions src/Infrastructure/BotSharp.Abstraction/Realtime/IStreamChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using BotSharp.Abstraction.Realtime.Enums;
using BotSharp.Abstraction.Realtime.Models;
using System.Threading;

namespace BotSharp.Abstraction.Realtime;

public interface IStreamChannel
{
Task ConnectAsync(string conversationId);
Task<StreamReceiveResult> ReceiveAsync(ArraySegment<byte> buffer, CancellationToken cancellation);
Task SendAsync(byte[] data, CancellationToken cancellation);
Task CloseAsync(StreamChannelStatus status, string description, CancellationToken cancellation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace BotSharp.Abstraction.Realtime.Models;

public class ModelResponseEvent
{
[JsonPropertyName("event")]
public string Event { get; set; } = string.Empty;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace BotSharp.Abstraction.Realtime.Models;

public class ModelResponseMediaEvent : ModelResponseEvent
{
[JsonPropertyName("media")]
public string Media { get; set; } = null!;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ namespace BotSharp.Abstraction.Realtime.Models;

public class RealtimeHubConnection
{
public string Event { get; set; } = null!;
public string StreamId { get; set; } = null!;
public string? LastAssistantItemId { get; set; } = null!;
public long LatestMediaTimestamp { get; set; }
Expand All @@ -13,10 +12,9 @@ public class RealtimeHubConnection
public ConcurrentQueue<string> MarkQueue { get; set; } = new();
public string CurrentAgentId { get; set; } = null!;
public string ConversationId { get; set; } = null!;
public string Data { get; set; } = string.Empty;
public Func<string, object> OnModelMessageReceived { get; set; } = null!;
public Func<object> OnModelAudioResponseDone { get; set; } = null!;
public Func<object> OnModelUserInterrupted { get; set; } = null!;
public Func<string, string> OnModelMessageReceived { get; set; } = null!;
public Func<string> OnModelAudioResponseDone { get; set; } = null!;
public Func<string> OnModelUserInterrupted { get; set; } = null!;

public void ResetResponseState()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ namespace BotSharp.Abstraction.Realtime.Models;

public class RealtimeModelSettings
{
public string InputAudioFormat { get; set; } = "g711_ulaw";
public string OutputAudioFormat { get; set; } = "g711_ulaw";
public string Voice { get; set; } = "alloy";
public float Temperature { get; set; } = 0.8f;
public int MaxResponseOutputTokens { get; set; } = 512;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using BotSharp.Abstraction.Realtime.Enums;

namespace BotSharp.Abstraction.Realtime.Models;

public class StreamReceiveResult
{
public StreamChannelStatus Status { get; set; }
public int Count { get; set; }
public bool EndOfMessage { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NAudio" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\BotSharp.Abstraction\BotSharp.Abstraction.csproj" />
<ProjectReference Include="..\BotSharp.Core\BotSharp.Core.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ public void RegisterDI(IServiceCollection services, IConfiguration config)

services.AddScoped<IRealtimeHub, RealtimeHub>();
services.AddScoped<IConversationHook, RealtimeConversationHook>();
services.AddScoped<IStreamChannel, WaveStremChannel>();
}
}
119 changes: 8 additions & 111 deletions src/Infrastructure/BotSharp.Core.Realtime/Services/RealtimeHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,53 +21,9 @@ public RealtimeHub(IServiceProvider services, ILogger<RealtimeHub> logger)
_logger = logger;
}

public async Task Listen(WebSocket userWebSocket,
Action<string> onUserMessageReceived)
public async Task ConnectToModel(Func<string, Task> responseToUser)
{
var buffer = new byte[1024 * 32];
WebSocketReceiveResult result;

do
{
Array.Clear(buffer, 0, buffer.Length);
result = await userWebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
string receivedText = Encoding.UTF8.GetString(buffer, 0, result.Count);

if (string.IsNullOrEmpty(receivedText))
{
continue;
}

onUserMessageReceived(receivedText);

if (_conn.Event == "user_connected")
{
await ConnectToModel(userWebSocket);
}
else if (_conn.Event == "user_data_received")
{
await _completer.AppenAudioBuffer(_conn.Data);
}
else if (_conn.Event == "user_dtmf_receiving")
{
}
else if (_conn.Event == "user_dtmf_received")
{
await HandleUserDtmfReceived();
}
else if (_conn.Event == "user_disconnected")
{
await _completer.Disconnect();
await HandleUserDisconnected();
}
} while (!result.CloseStatus.HasValue);

await userWebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
}

private async Task ConnectToModel(WebSocket userWebSocket)
{
var hookProvider = _services.GetRequiredService<ConversationHookProvider>();
var hookProvider = _services.GetService<ConversationHookProvider>();
var convService = _services.GetRequiredService<IConversationService>();
convService.SetConversationId(_conn.ConversationId, []);
var conversation = await convService.GetConversation(_conn.ConversationId);
Expand Down Expand Up @@ -103,7 +59,7 @@ await _completer.Connect(_conn,
onModelAudioDeltaReceived: async (audioDeltaData, itemId) =>
{
var data = _conn.OnModelMessageReceived(audioDeltaData);
await SendEventToUser(userWebSocket, data);
await responseToUser(data);

// If this is the first delta of a new response, set the start timestamp
if (!_conn.ResponseStartTimestamp.HasValue)
Expand All @@ -118,12 +74,12 @@ await _completer.Connect(_conn,
}

// Send mark messages to Media Streams so we know if and when AI response playback is finished
await SendMark(userWebSocket, _conn);
// await SendMark(userWebSocket, _conn);
},
onModelAudioResponseDone: async () =>
{
var data = _conn.OnModelAudioResponseDone();
await SendEventToUser(userWebSocket, data);
await responseToUser(data);
},
onAudioTranscriptDone: async transcript =>
{
Expand Down Expand Up @@ -151,7 +107,7 @@ await _completer.Connect(_conn,
dialogs.Add(message);
storage.Append(_conn.ConversationId, message);

foreach (var hook in hookProvider.HooksOrderByPriority)
foreach (var hook in hookProvider?.HooksOrderByPriority ?? [])
{
hook.SetAgent(agent)
.SetConversation(conversation);
Expand All @@ -172,7 +128,7 @@ await _completer.Connect(_conn,
storage.Append(_conn.ConversationId, message);
routing.Context.SetMessageId(_conn.ConversationId, message.MessageId);

foreach (var hook in hookProvider.HooksOrderByPriority)
foreach (var hook in hookProvider?.HooksOrderByPriority ?? [])
{
hook.SetAgent(agent)
.SetConversation(conversation);
Expand All @@ -186,69 +142,10 @@ await _completer.Connect(_conn,
_conn.ResetResponseState();

var data = _conn.OnModelUserInterrupted();
await SendEventToUser(userWebSocket, data);
await responseToUser(data);
});
}

private async Task SendMark(WebSocket userWebSocket, RealtimeHubConnection conn)
{
if (!string.IsNullOrEmpty(conn.StreamId))
{
var markEvent = new
{
@event = "mark",
streamSid = conn.StreamId,
mark = new { name = "responsePart" }
};
await SendEventToUser(userWebSocket, markEvent);
conn.MarkQueue.Enqueue("responsePart");
}
}

private async Task HandleUserDtmfReceived()
{
var routing = _services.GetRequiredService<IRoutingService>();
var hookProvider = _services.GetRequiredService<ConversationHookProvider>();
var agentService = _services.GetRequiredService<IAgentService>();
var agent = await agentService.LoadAgent(_conn.CurrentAgentId);
var dialogs = routing.Context.GetDialogs();
var convService = _services.GetRequiredService<IConversationService>();
var conversation = await convService.GetConversation(_conn.ConversationId);

var message = new RoleDialogModel(AgentRole.User, _conn.Data)
{
CurrentAgentId = routing.Context.GetCurrentAgentId()
};
dialogs.Add(message);

var storage = _services.GetRequiredService<IConversationStorage>();
storage.Append(_conn.ConversationId, message);

foreach (var hook in hookProvider.HooksOrderByPriority)
{
hook.SetAgent(agent)
.SetConversation(conversation);

await hook.OnMessageReceived(message);
}

await _completer.InsertConversationItem(message);
var instruction = await _completer.UpdateSession(_conn);
await _completer.TriggerModelInference($"{instruction}\r\n\r\nReply based on the user input: {message.Content}");
}

private async Task HandleUserDisconnected()
{

}

private async Task SendEventToUser(WebSocket webSocket, object message)
{
var data = JsonSerializer.Serialize(message);
var buffer = Encoding.UTF8.GetBytes(data);
await webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
}

public RealtimeHubConnection SetHubConnection(string conversationId)
{
_conn = new RealtimeHubConnection
Expand Down
Loading
Loading